feat(story-summary): make vector APIs configurable

This commit is contained in:
2026-04-03 15:31:13 +08:00
parent 5424dae2d6
commit af7e0f689d
9 changed files with 468 additions and 187 deletions

View File

@@ -29,7 +29,7 @@ import { filterText } from '../utils/text-filter.js';
const MODULE_ID = 'state-integration';
// ★ 并发配置
const CONCURRENCY = 10;
const DEFAULT_CONCURRENCY = 10;
const STAGGER_DELAY = 15;
const DEBUG_CONCURRENCY = true;
const R_AGG_MAX_CHARS = 256;
@@ -168,7 +168,9 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
return { built: 0 };
}
xbLog.info(MODULE_ID, `增量 L0 提取pending=${pendingPairs.length}, concurrency=${CONCURRENCY}`);
const concurrency = Math.max(1, Math.min(50, Number(vectorCfg?.l0Concurrency) || DEFAULT_CONCURRENCY));
xbLog.info(MODULE_ID, `增量 L0 提取pending=${pendingPairs.length}, concurrency=${concurrency}`);
let completed = 0;
let failed = 0;
@@ -181,14 +183,6 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
// ★ Phase 1: 收集所有新提取的 atoms不向量化
const allNewAtoms = [];
// ★ 限流检测:连续失败 N 次后暂停并降速
let consecutiveFailures = 0;
let rateLimited = false;
const RATE_LIMIT_THRESHOLD = 6; // 连续失败多少次触发限流保护
const RATE_LIMIT_WAIT_MS = 60000; // 限流后等待时间60 秒)
const RETRY_INTERVAL_MS = 1000; // 降速模式下每次请求间隔1 秒)
const RETRY_CONCURRENCY = 1; // ★ 降速模式下的并发数默认1建议不要超过5
// ★ 通用处理单个 pair 的逻辑(复用于正常模式和降速模式)
const processPair = async (pair, idx, workerId) => {
const floor = pair.aiFloor;
@@ -209,9 +203,6 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
throw new Error('llm_failed');
}
// ★ 成功:重置连续失败计数
consecutiveFailures = 0;
if (!atoms.length) {
setL0FloorStatus(floor, { status: 'empty', reason: 'llm_empty', atoms: 0 });
} else {
@@ -231,13 +222,6 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
reason: String(e?.message || e).replace(/\s+/g, ' ').slice(0, 120),
});
failed++;
// ★ 限流检测:连续失败累加
consecutiveFailures++;
if (consecutiveFailures >= RATE_LIMIT_THRESHOLD && !rateLimited) {
rateLimited = true;
xbLog.warn(MODULE_ID, `连续失败 ${consecutiveFailures} 次,疑似触发 API 限流,将暂停所有并发`);
}
} finally {
active--;
if (!extractionCancelled) {
@@ -252,12 +236,12 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
};
// ★ 并发池处理(保持固定并发度)
const poolSize = Math.min(CONCURRENCY, pendingPairs.length);
const poolSize = Math.min(concurrency, pendingPairs.length);
let nextIndex = 0;
let started = 0;
const runWorker = async (workerId) => {
while (true) {
if (extractionCancelled || rateLimited) return;
if (extractionCancelled) return;
const idx = nextIndex++;
if (idx >= pendingPairs.length) return;
@@ -267,7 +251,7 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
await new Promise(r => setTimeout(r, stagger * STAGGER_DELAY));
}
if (extractionCancelled || rateLimited) return;
if (extractionCancelled) return;
await processPair(pair, idx, workerId);
}
@@ -279,61 +263,6 @@ export async function incrementalExtractAtoms(chatId, chat, onProgress, options
xbLog.info(MODULE_ID, `L0 pool done completed=${completed}/${total} failed=${failed} peakActive=${peakActive} elapsedMs=${elapsed}`);
}
// ═════════════════════════════════════════════════════════════════════
// ★ 限流恢复:重置进度,从头开始以限速模式慢慢跑
// ═════════════════════════════════════════════════════════════════════
if (rateLimited && !extractionCancelled) {
const waitSec = RATE_LIMIT_WAIT_MS / 1000;
xbLog.info(MODULE_ID, `限流保护:将重置进度并从头开始降速重来(并发=${RETRY_CONCURRENCY}, 间隔=${RETRY_INTERVAL_MS}ms`);
onProgress?.(`疑似限流,${waitSec}s 后降速重头开始...`, completed, total);
await new Promise(r => setTimeout(r, RATE_LIMIT_WAIT_MS));
if (!extractionCancelled) {
// ★ 核心逻辑:重置计数器,让 UI 从 0 开始跑,给用户“重头开始”的反馈
rateLimited = false;
consecutiveFailures = 0;
completed = 0;
failed = 0;
let retryNextIdx = 0;
xbLog.info(MODULE_ID, `限流恢复:开始降速模式扫描 ${pendingPairs.length} 个楼层`);
const retryWorkers = Math.min(RETRY_CONCURRENCY, pendingPairs.length);
const runRetryWorker = async (wid) => {
while (true) {
if (extractionCancelled) return;
const idx = retryNextIdx++;
if (idx >= pendingPairs.length) return;
const pair = pendingPairs[idx];
const floor = pair.aiFloor;
// ★ 检查该楼层状态
const st = getL0FloorStatus(floor);
if (st?.status === 'ok' || st?.status === 'empty') {
// 刚才已经成功了,直接跳过(仅增加进度计数)
completed++;
onProgress?.(`提取: ${completed}/${total} (跳过已完成)`, completed, total);
continue;
}
// ★ 没做过的,用 slow 模式处理
await processPair(pair, idx, `retry-${wid}`);
// 每个请求后休息,避免再次触发限流
if (idx < pendingPairs.length - 1 && RETRY_INTERVAL_MS > 0) {
await new Promise(r => setTimeout(r, RETRY_INTERVAL_MS));
}
}
};
await Promise.all(Array.from({ length: retryWorkers }, (_, i) => runRetryWorker(i)));
xbLog.info(MODULE_ID, `降速重头开始阶段结束`);
}
}
try {
saveMetadataDebounced?.();
} catch { }