perf: switch L0 extraction to worker pool

This commit is contained in:
2026-02-08 18:37:04 +08:00
parent 8226c48624
commit d8d645a129

View File

@@ -29,7 +29,7 @@ import { filterText } from '../utils/text-filter.js';
const MODULE_ID = 'state-integration'; const MODULE_ID = 'state-integration';
// ★ 并发配置 // ★ 并发配置
const CONCURRENCY = 90; const CONCURRENCY = 50;
const STAGGER_DELAY = 15; const STAGGER_DELAY = 15;
let initialized = false; let initialized = false;
@@ -159,22 +159,22 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
const allNewAtoms = []; const allNewAtoms = [];
// ★ 30 并发批次处理 // ★ 30 并发批次处理
for (let i = 0; i < pendingPairs.length; i += CONCURRENCY) { // 并发池处理(保持固定并发度)
// ★ 检查取消 const poolSize = Math.min(CONCURRENCY, pendingPairs.length);
if (extractionCancelled) { let nextIndex = 0;
xbLog.info(MODULE_ID, `用户取消,已完成 ${completed}/${total}`); let started = 0;
break; const runWorker = async () => {
while (true) {
if (extractionCancelled) return;
const idx = nextIndex++;
if (idx >= pendingPairs.length) return;
const pair = pendingPairs[idx];
const stagger = started++;
if (STAGGER_DELAY > 0) {
await new Promise(r => setTimeout(r, stagger * STAGGER_DELAY));
} }
const batch = pendingPairs.slice(i, i + CONCURRENCY);
const promises = batch.map((pair, idx) => (async () => {
// 首批错开启动,避免瞬间打满
if (i === 0) {
await new Promise(r => setTimeout(r, idx * STAGGER_DELAY));
}
// 再次检查取消
if (extractionCancelled) return; if (extractionCancelled) return;
const floor = pair.aiFloor; const floor = pair.aiFloor;
@@ -194,7 +194,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
} else { } else {
atoms.forEach(a => a.chatId = chatId); atoms.forEach(a => a.chatId = chatId);
saveStateAtoms(atoms); saveStateAtoms(atoms);
// Phase 1: 只收集,不向量化 // Phase 1: 只收集,不向量化
allNewAtoms.push(...atoms); allNewAtoms.push(...atoms);
setL0FloorStatus(floor, { status: 'ok', atoms: atoms.length }); setL0FloorStatus(floor, { status: 'ok', atoms: atoms.length });
@@ -215,17 +215,11 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
onProgress?.(`提取: ${completed}/${total}`, completed, total); onProgress?.(`提取: ${completed}/${total}`, completed, total);
} }
} }
})());
await Promise.all(promises);
// 批次间短暂间隔
if (i + CONCURRENCY < pendingPairs.length && !extractionCancelled) {
await new Promise(r => setTimeout(r, 30));
}
} }
};
await Promise.all(Array.from({ length: poolSize }, runWorker));
// ★ 立即保存文本,不要等防抖
try { try {
saveMetadataDebounced?.(); saveMetadataDebounced?.();
} catch { } } catch { }