diff --git a/modules/story-summary/vector/pipeline/state-integration.js b/modules/story-summary/vector/pipeline/state-integration.js index 26982f0..f5d1ae6 100644 --- a/modules/story-summary/vector/pipeline/state-integration.js +++ b/modules/story-summary/vector/pipeline/state-integration.js @@ -31,6 +31,7 @@ const MODULE_ID = 'state-integration'; // ★ 并发配置 const CONCURRENCY = 50; const STAGGER_DELAY = 15; +const DEBUG_CONCURRENCY = true; let initialized = false; let extractionCancelled = false; @@ -154,6 +155,9 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { let failed = 0; const total = pendingPairs.length; let builtAtoms = 0; + let active = 0; + let peakActive = 0; + const tStart = performance.now(); // ★ Phase 1: 收集所有新提取的 atoms(不向量化) const allNewAtoms = []; @@ -163,7 +167,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { const poolSize = Math.min(CONCURRENCY, pendingPairs.length); let nextIndex = 0; let started = 0; - const runWorker = async () => { + const runWorker = async (workerId) => { while (true) { if (extractionCancelled) return; const idx = nextIndex++; @@ -180,6 +184,12 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { const floor = pair.aiFloor; const prev = getL0FloorStatus(floor); + active++; + if (active > peakActive) peakActive = active; + if (DEBUG_CONCURRENCY && (idx % 10 === 0)) { + xbLog.info(MODULE_ID, `L0 pool start idx=${idx} active=${active} peak=${peakActive} worker=${workerId}`); + } + try { const atoms = await extractAtomsForRound(pair.userMsg, pair.aiMsg, floor, { timeout: 20000 }); @@ -210,15 +220,24 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { }); failed++; } finally { + active--; if (!extractionCancelled) { completed++; onProgress?.(`提取: ${completed}/${total}`, completed, total); } + if (DEBUG_CONCURRENCY && (completed % 25 === 0 || completed === total)) { + const elapsed = Math.max(1, Math.round(performance.now() - tStart)); + xbLog.info(MODULE_ID, `L0 pool progress=${completed}/${total} active=${active} peak=${peakActive} elapsedMs=${elapsed}`); + } } } }; - await Promise.all(Array.from({ length: poolSize }, runWorker)); + await Promise.all(Array.from({ length: poolSize }, (_, i) => runWorker(i))); + if (DEBUG_CONCURRENCY) { + const elapsed = Math.max(1, Math.round(performance.now() - tStart)); + xbLog.info(MODULE_ID, `L0 pool done completed=${completed}/${total} failed=${failed} peakActive=${peakActive} elapsedMs=${elapsed}`); + } try { saveMetadataDebounced?.();