diff --git a/modules/story-summary/vector/pipeline/state-integration.js b/modules/story-summary/vector/pipeline/state-integration.js index f5d1ae6..93682f9 100644 --- a/modules/story-summary/vector/pipeline/state-integration.js +++ b/modules/story-summary/vector/pipeline/state-integration.js @@ -159,6 +159,21 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { let peakActive = 0; const tStart = performance.now(); + const atomBuffer = []; + const FLOOR_FLUSH_THRESHOLD = 10; + let bufferedFloors = 0; + const flushAtomBuffer = () => { + if (!atomBuffer.length) return; + saveStateAtoms(atomBuffer.splice(0, atomBuffer.length)); + bufferedFloors = 0; + }; + const markFloorBuffered = () => { + bufferedFloors++; + if (bufferedFloors >= FLOOR_FLUSH_THRESHOLD) { + flushAtomBuffer(); + } + }; + // ★ Phase 1: 收集所有新提取的 atoms(不向量化) const allNewAtoms = []; @@ -203,7 +218,8 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { setL0FloorStatus(floor, { status: 'empty', reason: 'llm_empty', atoms: 0 }); } else { atoms.forEach(a => a.chatId = chatId); - saveStateAtoms(atoms); + atomBuffer.push(...atoms); + markFloorBuffered(); // Phase 1: 只收集,不向量化 allNewAtoms.push(...atoms); @@ -234,6 +250,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { }; await Promise.all(Array.from({ length: poolSize }, (_, i) => runWorker(i))); + flushAtomBuffer(); if (DEBUG_CONCURRENCY) { const elapsed = Math.max(1, Math.round(performance.now() - tStart)); xbLog.info(MODULE_ID, `L0 pool done completed=${completed}/${total} failed=${failed} peakActive=${peakActive} elapsedMs=${elapsed}`);