The for-await loop over OpenAI stream chunks runs synchronously when data is buffered, causing res.write() calls to queue without flushing. Add setImmediate yield after each progress event so the event loop reaches its I/O phase and pushes data to the network immediately.
303 lines
10 KiB
TypeScript
303 lines
10 KiB
TypeScript
import OpenAI from 'openai';
|
||
import { config } from '../config';
|
||
import type { StatementFile } from '@family-budget/shared';
|
||
|
||
const PDF2JSON_PROMPT = `Ты — конвертер банковских выписок. Твоя задача: извлечь данные из текста банковской выписки ниже и вернуть строго один валидный JSON-объект в формате ниже. Никакого текста до или после JSON, только сам объект.
|
||
|
||
## Структура выходного JSON
|
||
|
||
{
|
||
"schemaVersion": "1.0",
|
||
"bank": "<название банка из выписки>",
|
||
"statement": {
|
||
"accountNumber": "<номер счёта, только цифры, без пробелов>",
|
||
"currency": "RUB",
|
||
"openingBalance": <число в копейках, целое>,
|
||
"closingBalance": <число в копейках, целое>,
|
||
"exportedAt": "<дата экспорта в формате ISO 8601 с offset, например 2026-02-27T13:23:00+03:00>"
|
||
},
|
||
"transactions": [
|
||
{
|
||
"operationAt": "<дата и время операции в формате ISO 8601 с offset>",
|
||
"amountSigned": <число: положительное для прихода, отрицательное для расхода; в копейках>,
|
||
"commission": <число, целое, >= 0, в копейках>,
|
||
"description": "<полное описание операции из выписки>"
|
||
}
|
||
]
|
||
}
|
||
|
||
## Правила конвертации
|
||
|
||
1. Суммы — всегда в копейках (рубли × 100). Пример: 500,00 ₽ → 50000, -1234,56 ₽ → -123456.
|
||
2. amountSigned: приход — положительное, расход — отрицательное.
|
||
3. operationAt — дата и время, если не указано — 00:00:00, offset +03:00 для МСК.
|
||
4. commission — если не указана — 0.
|
||
5. description — полный текст операции как в выписке.
|
||
6. accountNumber — только цифры, без пробелов и дефисов.
|
||
7. openingBalance / closingBalance — в копейках.
|
||
8. bank — краткое название (VTB, Sberbank, Тинькофф).
|
||
9. exportedAt — дата формирования выписки.
|
||
10. transactions — хронологический порядок.
|
||
|
||
## Требования
|
||
|
||
- transactions не должен быть пустым.
|
||
- Все числа — целые.
|
||
- Даты — ISO 8601 с offset.
|
||
- currency всегда "RUB".
|
||
- schemaVersion всегда "1.0".`;
|
||
|
||
export interface PdfConversionError {
|
||
status: number;
|
||
error: string;
|
||
message: string;
|
||
}
|
||
|
||
export function isPdfConversionError(r: unknown): r is PdfConversionError {
|
||
return (
|
||
typeof r === 'object' &&
|
||
r !== null &&
|
||
'status' in r &&
|
||
'error' in r &&
|
||
'message' in r
|
||
);
|
||
}
|
||
|
||
// Lazy-loaded so the app starts even if pdf-parse is missing from node_modules.
|
||
let _pdfParse: ((buf: Buffer) => Promise<{ text: string }>) | undefined;
|
||
function getPdfParse() {
|
||
if (!_pdfParse) {
|
||
// eslint-disable-next-line @typescript-eslint/no-require-imports
|
||
_pdfParse = require('pdf-parse') as (buf: Buffer) => Promise<{ text: string }>;
|
||
}
|
||
return _pdfParse;
|
||
}
|
||
|
||
export async function convertPdfToStatement(
|
||
buffer: Buffer,
|
||
): Promise<StatementFile | PdfConversionError> {
|
||
if (!config.llmApiKey || config.llmApiKey.trim() === '') {
|
||
return {
|
||
status: 503,
|
||
error: 'SERVICE_UNAVAILABLE',
|
||
message: 'Конвертация PDF недоступна: не задан LLM_API_KEY',
|
||
};
|
||
}
|
||
|
||
let text: string;
|
||
try {
|
||
const result = await getPdfParse()(buffer);
|
||
text = result.text || '';
|
||
} catch (err) {
|
||
console.error('PDF extraction error:', err);
|
||
return {
|
||
status: 400,
|
||
error: 'BAD_REQUEST',
|
||
message: 'Не удалось обработать PDF-файл',
|
||
};
|
||
}
|
||
|
||
if (!text || text.trim().length === 0) {
|
||
return {
|
||
status: 400,
|
||
error: 'BAD_REQUEST',
|
||
message: 'Не удалось извлечь текст из PDF',
|
||
};
|
||
}
|
||
|
||
const openai = new OpenAI({
|
||
apiKey: config.llmApiKey,
|
||
...(config.llmApiBaseUrl && { baseURL: config.llmApiBaseUrl }),
|
||
timeout: 5 * 60 * 1000,
|
||
});
|
||
|
||
try {
|
||
const completion = await openai.chat.completions.create({
|
||
model: config.llmModel,
|
||
messages: [
|
||
{ role: 'system', content: PDF2JSON_PROMPT },
|
||
{ role: 'user', content: `Текст выписки:\n\n${text}` },
|
||
],
|
||
temperature: 0,
|
||
max_tokens: 32768,
|
||
});
|
||
|
||
const content = completion.choices[0]?.message?.content?.trim();
|
||
if (!content) {
|
||
return {
|
||
status: 422,
|
||
error: 'VALIDATION_ERROR',
|
||
message: 'Результат конвертации пуст',
|
||
};
|
||
}
|
||
|
||
return parseConversionResult(content);
|
||
} catch (err) {
|
||
console.error('LLM conversion error:', err);
|
||
return {
|
||
status: 502,
|
||
error: 'BAD_GATEWAY',
|
||
message: extractLlmErrorMessage(err),
|
||
};
|
||
}
|
||
}
|
||
|
||
export type ProgressStage = 'pdf' | 'llm' | 'import';
|
||
export type OnProgress = (stage: ProgressStage, progress: number, message: string) => void;
|
||
|
||
const LLM_PROGRESS_MIN = 10;
|
||
const LLM_PROGRESS_MAX = 98;
|
||
const LLM_PROGRESS_RANGE = LLM_PROGRESS_MAX - LLM_PROGRESS_MIN;
|
||
const THROTTLE_MS = 300;
|
||
|
||
function yieldToEventLoop(): Promise<void> {
|
||
return new Promise(resolve => setImmediate(resolve));
|
||
}
|
||
|
||
export async function convertPdfToStatementStreaming(
|
||
buffer: Buffer,
|
||
onProgress: OnProgress,
|
||
): Promise<StatementFile | PdfConversionError> {
|
||
if (!config.llmApiKey || config.llmApiKey.trim() === '') {
|
||
return {
|
||
status: 503,
|
||
error: 'SERVICE_UNAVAILABLE',
|
||
message: 'Конвертация PDF недоступна: не задан LLM_API_KEY',
|
||
};
|
||
}
|
||
|
||
onProgress('pdf', 2, 'Извлечение текста из PDF...');
|
||
await yieldToEventLoop();
|
||
|
||
let text: string;
|
||
try {
|
||
const result = await getPdfParse()(buffer);
|
||
text = result.text || '';
|
||
} catch (err) {
|
||
console.error('PDF extraction error:', err);
|
||
return {
|
||
status: 400,
|
||
error: 'BAD_REQUEST',
|
||
message: 'Не удалось обработать PDF-файл',
|
||
};
|
||
}
|
||
|
||
if (!text || text.trim().length === 0) {
|
||
return {
|
||
status: 400,
|
||
error: 'BAD_REQUEST',
|
||
message: 'Не удалось извлечь текст из PDF',
|
||
};
|
||
}
|
||
|
||
onProgress('pdf', 8, 'Текст извлечён, отправка в LLM...');
|
||
await yieldToEventLoop();
|
||
|
||
const openai = new OpenAI({
|
||
apiKey: config.llmApiKey,
|
||
...(config.llmApiBaseUrl && { baseURL: config.llmApiBaseUrl }),
|
||
timeout: 5 * 60 * 1000,
|
||
});
|
||
|
||
try {
|
||
const stream = await openai.chat.completions.create({
|
||
model: config.llmModel,
|
||
messages: [
|
||
{ role: 'system', content: PDF2JSON_PROMPT },
|
||
{ role: 'user', content: `Текст выписки:\n\n${text}` },
|
||
],
|
||
temperature: 0,
|
||
max_tokens: 32768,
|
||
stream: true,
|
||
});
|
||
|
||
const expectedChars = Math.max(2_000, Math.min(text.length * 2, 30_000));
|
||
|
||
let accumulated = '';
|
||
let charsReceived = 0;
|
||
let lastEmitTime = 0;
|
||
|
||
for await (const chunk of stream) {
|
||
const delta = chunk.choices[0]?.delta?.content;
|
||
if (delta) {
|
||
accumulated += delta;
|
||
charsReceived += delta.length;
|
||
|
||
const now = Date.now();
|
||
if (now - lastEmitTime >= THROTTLE_MS) {
|
||
const ratio = Math.min(1, charsReceived / expectedChars);
|
||
const llmProgress = Math.min(
|
||
LLM_PROGRESS_MAX,
|
||
Math.round(ratio * LLM_PROGRESS_RANGE + LLM_PROGRESS_MIN),
|
||
);
|
||
onProgress('llm', llmProgress, 'Конвертация через LLM...');
|
||
lastEmitTime = now;
|
||
// Let the event loop flush socket writes to the network
|
||
await yieldToEventLoop();
|
||
}
|
||
}
|
||
}
|
||
|
||
onProgress('llm', LLM_PROGRESS_MAX, 'LLM завершил, обработка результата...');
|
||
await yieldToEventLoop();
|
||
|
||
const content = accumulated.trim();
|
||
if (!content) {
|
||
return {
|
||
status: 422,
|
||
error: 'VALIDATION_ERROR',
|
||
message: 'Результат конвертации пуст',
|
||
};
|
||
}
|
||
|
||
return parseConversionResult(content);
|
||
} catch (err) {
|
||
console.error('LLM streaming error:', err);
|
||
return {
|
||
status: 502,
|
||
error: 'BAD_GATEWAY',
|
||
message: extractLlmErrorMessage(err),
|
||
};
|
||
}
|
||
}
|
||
|
||
function extractLlmErrorMessage(err: unknown): string {
|
||
const raw = String(
|
||
(err as Record<string, unknown>)?.message ??
|
||
(err as Record<string, Record<string, unknown>>)?.error?.message ?? '',
|
||
);
|
||
if (/context.length|n_ctx|too.many.tokens|maximum.context/i.test(raw)) {
|
||
return 'PDF-файл слишком большой для обработки. Попробуйте файл с меньшим количеством операций или используйте модель с большим контекстным окном.';
|
||
}
|
||
if (/timeout|timed?\s*out|ETIMEDOUT|ECONNREFUSED/i.test(raw)) {
|
||
return 'LLM-сервер не отвечает. Проверьте, что сервер запущен и доступен.';
|
||
}
|
||
return 'Временная ошибка конвертации';
|
||
}
|
||
|
||
function parseConversionResult(content: string): StatementFile | PdfConversionError {
|
||
const jsonMatch = content.match(/\{[\s\S]*\}/);
|
||
const jsonStr = jsonMatch ? jsonMatch[0] : content;
|
||
let parsed: unknown;
|
||
try {
|
||
parsed = JSON.parse(jsonStr);
|
||
} catch {
|
||
return {
|
||
status: 422,
|
||
error: 'VALIDATION_ERROR',
|
||
message: 'Результат конвертации не является валидным JSON',
|
||
};
|
||
}
|
||
|
||
const data = parsed as Record<string, unknown>;
|
||
if (data.schemaVersion !== '1.0') {
|
||
return {
|
||
status: 422,
|
||
error: 'VALIDATION_ERROR',
|
||
message: 'Результат конвертации не соответствует схеме 1.0',
|
||
};
|
||
}
|
||
|
||
return parsed as StatementFile;
|
||
}
|