diff --git a/modules/story-summary/vector/pipeline/state-integration.js b/modules/story-summary/vector/pipeline/state-integration.js index 1554efc..26982f0 100644 --- a/modules/story-summary/vector/pipeline/state-integration.js +++ b/modules/story-summary/vector/pipeline/state-integration.js @@ -29,7 +29,7 @@ import { filterText } from '../utils/text-filter.js'; const MODULE_ID = 'state-integration'; // ★ 并发配置 -const CONCURRENCY = 90; +const CONCURRENCY = 50; const STAGGER_DELAY = 15; let initialized = false; @@ -159,22 +159,22 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { const allNewAtoms = []; // ★ 30 并发批次处理 - for (let i = 0; i < pendingPairs.length; i += CONCURRENCY) { - // ★ 检查取消 - if (extractionCancelled) { - xbLog.info(MODULE_ID, `用户取消,已完成 ${completed}/${total}`); - break; - } + // 并发池处理(保持固定并发度) + const poolSize = Math.min(CONCURRENCY, pendingPairs.length); + let nextIndex = 0; + let started = 0; + const runWorker = async () => { + while (true) { + if (extractionCancelled) return; + const idx = nextIndex++; + if (idx >= pendingPairs.length) return; - const batch = pendingPairs.slice(i, i + CONCURRENCY); - - const promises = batch.map((pair, idx) => (async () => { - // 首批错开启动,避免瞬间打满 - if (i === 0) { - await new Promise(r => setTimeout(r, idx * STAGGER_DELAY)); + const pair = pendingPairs[idx]; + const stagger = started++; + if (STAGGER_DELAY > 0) { + await new Promise(r => setTimeout(r, stagger * STAGGER_DELAY)); } - // 再次检查取消 if (extractionCancelled) return; const floor = pair.aiFloor; @@ -194,7 +194,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { } else { atoms.forEach(a => a.chatId = chatId); saveStateAtoms(atoms); - // ★ Phase 1: 只收集,不向量化 + // Phase 1: 只收集,不向量化 allNewAtoms.push(...atoms); setL0FloorStatus(floor, { status: 'ok', atoms: atoms.length }); @@ -215,17 +215,11 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) { onProgress?.(`提取: ${completed}/${total}`, completed, total); } } - })()); - - await Promise.all(promises); - - // 批次间短暂间隔 - if (i + CONCURRENCY < pendingPairs.length && !extractionCancelled) { - await new Promise(r => setTimeout(r, 30)); } - } + }; + + await Promise.all(Array.from({ length: poolSize }, runWorker)); - // ★ 立即保存文本,不要等防抖 try { saveMetadataDebounced?.(); } catch { }