Update story-summary modules
This commit is contained in:
@@ -181,14 +181,83 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
|
||||
// ★ Phase 1: 收集所有新提取的 atoms(不向量化)
|
||||
const allNewAtoms = [];
|
||||
|
||||
// ★ 30 并发批次处理
|
||||
// 并发池处理(保持固定并发度)
|
||||
// ★ 限流检测:连续失败 N 次后暂停并降速
|
||||
let consecutiveFailures = 0;
|
||||
let rateLimited = false;
|
||||
const RATE_LIMIT_THRESHOLD = 3; // 连续失败多少次触发限流保护
|
||||
const RATE_LIMIT_WAIT_MS = 60000; // 限流后等待时间(60 秒)
|
||||
const RETRY_INTERVAL_MS = 1000; // 降速模式下每次请求间隔(1 秒)
|
||||
const RETRY_CONCURRENCY = 1; // ★ 降速模式下的并发数(默认1,建议不要超过5)
|
||||
|
||||
// ★ 通用处理单个 pair 的逻辑(复用于正常模式和降速模式)
|
||||
const processPair = async (pair, idx, workerId) => {
|
||||
const floor = pair.aiFloor;
|
||||
const prev = getL0FloorStatus(floor);
|
||||
|
||||
active++;
|
||||
if (active > peakActive) peakActive = active;
|
||||
if (DEBUG_CONCURRENCY && (idx % 10 === 0)) {
|
||||
xbLog.info(MODULE_ID, `L0 pool start idx=${idx} active=${active} peak=${peakActive} worker=${workerId}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const atoms = await extractAtomsForRound(pair.userMsg, pair.aiMsg, floor, { timeout: 20000 });
|
||||
|
||||
if (extractionCancelled) return;
|
||||
|
||||
if (atoms == null) {
|
||||
throw new Error('llm_failed');
|
||||
}
|
||||
|
||||
// ★ 成功:重置连续失败计数
|
||||
consecutiveFailures = 0;
|
||||
|
||||
if (!atoms.length) {
|
||||
setL0FloorStatus(floor, { status: 'empty', reason: 'llm_empty', atoms: 0 });
|
||||
} else {
|
||||
atoms.forEach(a => a.chatId = chatId);
|
||||
saveStateAtoms(atoms);
|
||||
allNewAtoms.push(...atoms);
|
||||
|
||||
setL0FloorStatus(floor, { status: 'ok', atoms: atoms.length });
|
||||
builtAtoms += atoms.length;
|
||||
}
|
||||
} catch (e) {
|
||||
if (extractionCancelled) return;
|
||||
|
||||
setL0FloorStatus(floor, {
|
||||
status: 'fail',
|
||||
attempts: (prev?.attempts || 0) + 1,
|
||||
reason: String(e?.message || e).replace(/\s+/g, ' ').slice(0, 120),
|
||||
});
|
||||
failed++;
|
||||
|
||||
// ★ 限流检测:连续失败累加
|
||||
consecutiveFailures++;
|
||||
if (consecutiveFailures >= RATE_LIMIT_THRESHOLD && !rateLimited) {
|
||||
rateLimited = true;
|
||||
xbLog.warn(MODULE_ID, `连续失败 ${consecutiveFailures} 次,疑似触发 API 限流,将暂停所有并发`);
|
||||
}
|
||||
} finally {
|
||||
active--;
|
||||
if (!extractionCancelled) {
|
||||
completed++;
|
||||
onProgress?.(`提取: ${completed}/${total}`, completed, total);
|
||||
}
|
||||
if (DEBUG_CONCURRENCY && (completed % 25 === 0 || completed === total)) {
|
||||
const elapsed = Math.max(1, Math.round(performance.now() - tStart));
|
||||
xbLog.info(MODULE_ID, `L0 pool progress=${completed}/${total} active=${active} peak=${peakActive} elapsedMs=${elapsed}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// ★ 并发池处理(保持固定并发度)
|
||||
const poolSize = Math.min(CONCURRENCY, pendingPairs.length);
|
||||
let nextIndex = 0;
|
||||
let started = 0;
|
||||
const runWorker = async (workerId) => {
|
||||
while (true) {
|
||||
if (extractionCancelled) return;
|
||||
if (extractionCancelled || rateLimited) return;
|
||||
const idx = nextIndex++;
|
||||
if (idx >= pendingPairs.length) return;
|
||||
|
||||
@@ -198,57 +267,9 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
|
||||
await new Promise(r => setTimeout(r, stagger * STAGGER_DELAY));
|
||||
}
|
||||
|
||||
if (extractionCancelled) return;
|
||||
if (extractionCancelled || rateLimited) return;
|
||||
|
||||
const floor = pair.aiFloor;
|
||||
const prev = getL0FloorStatus(floor);
|
||||
|
||||
active++;
|
||||
if (active > peakActive) peakActive = active;
|
||||
if (DEBUG_CONCURRENCY && (idx % 10 === 0)) {
|
||||
xbLog.info(MODULE_ID, `L0 pool start idx=${idx} active=${active} peak=${peakActive} worker=${workerId}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const atoms = await extractAtomsForRound(pair.userMsg, pair.aiMsg, floor, { timeout: 20000 });
|
||||
|
||||
if (extractionCancelled) return;
|
||||
|
||||
if (atoms == null) {
|
||||
throw new Error('llm_failed');
|
||||
}
|
||||
|
||||
if (!atoms.length) {
|
||||
setL0FloorStatus(floor, { status: 'empty', reason: 'llm_empty', atoms: 0 });
|
||||
} else {
|
||||
atoms.forEach(a => a.chatId = chatId);
|
||||
saveStateAtoms(atoms);
|
||||
// Phase 1: 只收集,不向量化
|
||||
allNewAtoms.push(...atoms);
|
||||
|
||||
setL0FloorStatus(floor, { status: 'ok', atoms: atoms.length });
|
||||
builtAtoms += atoms.length;
|
||||
}
|
||||
} catch (e) {
|
||||
if (extractionCancelled) return;
|
||||
|
||||
setL0FloorStatus(floor, {
|
||||
status: 'fail',
|
||||
attempts: (prev?.attempts || 0) + 1,
|
||||
reason: String(e?.message || e).replace(/\s+/g, ' ').slice(0, 120),
|
||||
});
|
||||
failed++;
|
||||
} finally {
|
||||
active--;
|
||||
if (!extractionCancelled) {
|
||||
completed++;
|
||||
onProgress?.(`提取: ${completed}/${total}`, completed, total);
|
||||
}
|
||||
if (DEBUG_CONCURRENCY && (completed % 25 === 0 || completed === total)) {
|
||||
const elapsed = Math.max(1, Math.round(performance.now() - tStart));
|
||||
xbLog.info(MODULE_ID, `L0 pool progress=${completed}/${total} active=${active} peak=${peakActive} elapsedMs=${elapsed}`);
|
||||
}
|
||||
}
|
||||
await processPair(pair, idx, workerId);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -258,6 +279,61 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
|
||||
xbLog.info(MODULE_ID, `L0 pool done completed=${completed}/${total} failed=${failed} peakActive=${peakActive} elapsedMs=${elapsed}`);
|
||||
}
|
||||
|
||||
// ═════════════════════════════════════════════════════════════════════
|
||||
// ★ 限流恢复:重置进度,从头开始以限速模式慢慢跑
|
||||
// ═════════════════════════════════════════════════════════════════════
|
||||
if (rateLimited && !extractionCancelled) {
|
||||
const waitSec = RATE_LIMIT_WAIT_MS / 1000;
|
||||
xbLog.info(MODULE_ID, `限流保护:将重置进度并从头开始降速重来(并发=${RETRY_CONCURRENCY}, 间隔=${RETRY_INTERVAL_MS}ms)`);
|
||||
onProgress?.(`疑似限流,${waitSec}s 后降速重头开始...`, completed, total);
|
||||
|
||||
await new Promise(r => setTimeout(r, RATE_LIMIT_WAIT_MS));
|
||||
|
||||
if (!extractionCancelled) {
|
||||
// ★ 核心逻辑:重置计数器,让 UI 从 0 开始跑,给用户“重头开始”的反馈
|
||||
rateLimited = false;
|
||||
consecutiveFailures = 0;
|
||||
completed = 0;
|
||||
failed = 0;
|
||||
|
||||
let retryNextIdx = 0;
|
||||
|
||||
xbLog.info(MODULE_ID, `限流恢复:开始降速模式扫描 ${pendingPairs.length} 个楼层`);
|
||||
|
||||
const retryWorkers = Math.min(RETRY_CONCURRENCY, pendingPairs.length);
|
||||
const runRetryWorker = async (wid) => {
|
||||
while (true) {
|
||||
if (extractionCancelled) return;
|
||||
const idx = retryNextIdx++;
|
||||
if (idx >= pendingPairs.length) return;
|
||||
|
||||
const pair = pendingPairs[idx];
|
||||
const floor = pair.aiFloor;
|
||||
|
||||
// ★ 检查该楼层状态
|
||||
const st = getL0FloorStatus(floor);
|
||||
if (st?.status === 'ok' || st?.status === 'empty') {
|
||||
// 刚才已经成功了,直接跳过(仅增加进度计数)
|
||||
completed++;
|
||||
onProgress?.(`提取: ${completed}/${total} (跳过已完成)`, completed, total);
|
||||
continue;
|
||||
}
|
||||
|
||||
// ★ 没做过的,用 slow 模式处理
|
||||
await processPair(pair, idx, `retry-${wid}`);
|
||||
|
||||
// 每个请求后休息,避免再次触发限流
|
||||
if (idx < pendingPairs.length - 1 && RETRY_INTERVAL_MS > 0) {
|
||||
await new Promise(r => setTimeout(r, RETRY_INTERVAL_MS));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
await Promise.all(Array.from({ length: retryWorkers }, (_, i) => runRetryWorker(i)));
|
||||
xbLog.info(MODULE_ID, `降速重头开始阶段结束`);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
saveMetadataDebounced?.();
|
||||
} catch { }
|
||||
|
||||
Reference in New Issue
Block a user