chore: add L0 concurrency diagnostics

This commit is contained in:
2026-02-08 18:59:35 +08:00
parent d8d645a129
commit 1e7fcb8fdb

View File

@@ -31,6 +31,7 @@ const MODULE_ID = 'state-integration';
// ★ 并发配置 // ★ 并发配置
const CONCURRENCY = 50; const CONCURRENCY = 50;
const STAGGER_DELAY = 15; const STAGGER_DELAY = 15;
const DEBUG_CONCURRENCY = true;
let initialized = false; let initialized = false;
let extractionCancelled = false; let extractionCancelled = false;
@@ -154,6 +155,9 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
let failed = 0; let failed = 0;
const total = pendingPairs.length; const total = pendingPairs.length;
let builtAtoms = 0; let builtAtoms = 0;
let active = 0;
let peakActive = 0;
const tStart = performance.now();
// ★ Phase 1: 收集所有新提取的 atoms不向量化 // ★ Phase 1: 收集所有新提取的 atoms不向量化
const allNewAtoms = []; const allNewAtoms = [];
@@ -163,7 +167,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
const poolSize = Math.min(CONCURRENCY, pendingPairs.length); const poolSize = Math.min(CONCURRENCY, pendingPairs.length);
let nextIndex = 0; let nextIndex = 0;
let started = 0; let started = 0;
const runWorker = async () => { const runWorker = async (workerId) => {
while (true) { while (true) {
if (extractionCancelled) return; if (extractionCancelled) return;
const idx = nextIndex++; const idx = nextIndex++;
@@ -180,6 +184,12 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
const floor = pair.aiFloor; const floor = pair.aiFloor;
const prev = getL0FloorStatus(floor); const prev = getL0FloorStatus(floor);
active++;
if (active > peakActive) peakActive = active;
if (DEBUG_CONCURRENCY && (idx % 10 === 0)) {
xbLog.info(MODULE_ID, `L0 pool start idx=${idx} active=${active} peak=${peakActive} worker=${workerId}`);
}
try { try {
const atoms = await extractAtomsForRound(pair.userMsg, pair.aiMsg, floor, { timeout: 20000 }); const atoms = await extractAtomsForRound(pair.userMsg, pair.aiMsg, floor, { timeout: 20000 });
@@ -210,15 +220,24 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress) {
}); });
failed++; failed++;
} finally { } finally {
active--;
if (!extractionCancelled) { if (!extractionCancelled) {
completed++; completed++;
onProgress?.(`提取: ${completed}/${total}`, completed, total); onProgress?.(`提取: ${completed}/${total}`, completed, total);
} }
if (DEBUG_CONCURRENCY && (completed % 25 === 0 || completed === total)) {
const elapsed = Math.max(1, Math.round(performance.now() - tStart));
xbLog.info(MODULE_ID, `L0 pool progress=${completed}/${total} active=${active} peak=${peakActive} elapsedMs=${elapsed}`);
}
} }
} }
}; };
await Promise.all(Array.from({ length: poolSize }, runWorker)); await Promise.all(Array.from({ length: poolSize }, (_, i) => runWorker(i)));
if (DEBUG_CONCURRENCY) {
const elapsed = Math.max(1, Math.round(performance.now() - tStart));
xbLog.info(MODULE_ID, `L0 pool done completed=${completed}/${total} failed=${failed} peakActive=${peakActive} elapsedMs=${elapsed}`);
}
try { try {
saveMetadataDebounced?.(); saveMetadataDebounced?.();