Revert SSE streaming for PDF import, use synchronous flow

SSE streaming added unnecessary complexity and latency due to
buffering issues across Node.js event loop, Nginx proxy, and
Docker layers. Reverted to a simple synchronous request/response
for PDF conversion. Kept extractLlmErrorMessage for user-friendly
LLM errors, lazy-loaded pdf-parse, and extended Nginx timeout.
This commit is contained in:
vakabunga
2026-03-14 20:12:27 +03:00
parent ea234ea007
commit 8b57dd987e
10 changed files with 73 additions and 695 deletions

View File

@@ -3,7 +3,7 @@ import multer from 'multer';
import { asyncHandler } from '../utils';
import { importStatement, isValidationError } from '../services/import';
import {
convertPdfToStatementStreaming,
convertPdfToStatement,
isPdfConversionError,
} from '../services/pdfToStatement';
@@ -28,10 +28,6 @@ function isJsonFile(file: { mimetype: string; originalname: string }): boolean {
);
}
function sseWrite(res: import('express').Response, data: Record<string, unknown>) {
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
const router = Router();
router.post(
@@ -55,68 +51,28 @@ router.post(
return;
}
if (isPdfFile(file)) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.socket?.setNoDelay(true);
res.flushHeaders();
try {
const converted = await convertPdfToStatementStreaming(
file.buffer,
(stage, progress, message) => {
sseWrite(res, { stage, progress, message });
},
);
if (isPdfConversionError(converted)) {
sseWrite(res, {
stage: 'error',
message: converted.message,
});
res.end();
return;
}
const result = await importStatement(converted);
if (isValidationError(result)) {
sseWrite(res, {
stage: 'error',
message: (result as { message: string }).message,
});
res.end();
return;
}
sseWrite(res, {
stage: 'done',
progress: 100,
result,
});
} catch (err) {
console.error('SSE import error:', err);
sseWrite(res, {
stage: 'error',
message: 'Внутренняя ошибка сервера',
});
}
res.end();
return;
}
// JSON files — synchronous response as before
let body: unknown;
try {
body = JSON.parse(file.buffer.toString('utf-8'));
} catch {
res.status(400).json({
error: 'BAD_REQUEST',
message: 'Некорректный JSON-файл',
});
return;
if (isPdfFile(file)) {
const converted = await convertPdfToStatement(file.buffer);
if (isPdfConversionError(converted)) {
res.status(converted.status).json({
error: converted.error,
message: converted.message,
});
return;
}
body = converted;
} else {
try {
body = JSON.parse(file.buffer.toString('utf-8'));
} catch {
res.status(400).json({
error: 'BAD_REQUEST',
message: 'Некорректный JSON-файл',
});
return;
}
}
const result = await importStatement(body);

View File

@@ -131,128 +131,31 @@ export async function convertPdfToStatement(
};
}
return parseConversionResult(content);
} catch (err) {
console.error('LLM conversion error:', err);
return {
status: 502,
error: 'BAD_GATEWAY',
message: extractLlmErrorMessage(err),
};
}
}
export type ProgressStage = 'pdf' | 'llm' | 'import';
export type OnProgress = (stage: ProgressStage, progress: number, message: string) => void;
const LLM_PROGRESS_MIN = 10;
const LLM_PROGRESS_MAX = 98;
const LLM_PROGRESS_RANGE = LLM_PROGRESS_MAX - LLM_PROGRESS_MIN;
const THROTTLE_MS = 300;
function yieldToEventLoop(): Promise<void> {
return new Promise(resolve => setImmediate(resolve));
}
export async function convertPdfToStatementStreaming(
buffer: Buffer,
onProgress: OnProgress,
): Promise<StatementFile | PdfConversionError> {
if (!config.llmApiKey || config.llmApiKey.trim() === '') {
return {
status: 503,
error: 'SERVICE_UNAVAILABLE',
message: 'Конвертация PDF недоступна: не задан LLM_API_KEY',
};
}
onProgress('pdf', 2, 'Извлечение текста из PDF...');
await yieldToEventLoop();
let text: string;
try {
const result = await getPdfParse()(buffer);
text = result.text || '';
} catch (err) {
console.error('PDF extraction error:', err);
return {
status: 400,
error: 'BAD_REQUEST',
message: 'Не удалось обработать PDF-файл',
};
}
if (!text || text.trim().length === 0) {
return {
status: 400,
error: 'BAD_REQUEST',
message: 'Не удалось извлечь текст из PDF',
};
}
onProgress('pdf', 8, 'Текст извлечён, отправка в LLM...');
await yieldToEventLoop();
const openai = new OpenAI({
apiKey: config.llmApiKey,
...(config.llmApiBaseUrl && { baseURL: config.llmApiBaseUrl }),
timeout: 5 * 60 * 1000,
});
try {
const stream = await openai.chat.completions.create({
model: config.llmModel,
messages: [
{ role: 'system', content: PDF2JSON_PROMPT },
{ role: 'user', content: `Текст выписки:\n\n${text}` },
],
temperature: 0,
max_tokens: 32768,
stream: true,
});
const expectedChars = Math.max(2_000, Math.min(text.length * 2, 30_000));
let accumulated = '';
let charsReceived = 0;
let lastEmitTime = 0;
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
accumulated += delta;
charsReceived += delta.length;
const now = Date.now();
if (now - lastEmitTime >= THROTTLE_MS) {
const ratio = Math.min(1, charsReceived / expectedChars);
const llmProgress = Math.min(
LLM_PROGRESS_MAX,
Math.round(ratio * LLM_PROGRESS_RANGE + LLM_PROGRESS_MIN),
);
onProgress('llm', llmProgress, 'Конвертация через LLM...');
lastEmitTime = now;
// Let the event loop flush socket writes to the network
await yieldToEventLoop();
}
}
}
onProgress('llm', LLM_PROGRESS_MAX, 'LLM завершил, обработка результата...');
await yieldToEventLoop();
const content = accumulated.trim();
if (!content) {
const jsonMatch = content.match(/\{[\s\S]*\}/);
const jsonStr = jsonMatch ? jsonMatch[0] : content;
let parsed: unknown;
try {
parsed = JSON.parse(jsonStr);
} catch {
return {
status: 422,
error: 'VALIDATION_ERROR',
message: 'Результат конвертации пуст',
message: 'Результат конвертации не является валидным JSON',
};
}
return parseConversionResult(content);
const data = parsed as Record<string, unknown>;
if (data.schemaVersion !== '1.0') {
return {
status: 422,
error: 'VALIDATION_ERROR',
message: 'Результат конвертации не соответствует схеме 1.0',
};
}
return parsed as StatementFile;
} catch (err) {
console.error('LLM streaming error:', err);
console.error('LLM conversion error:', err);
return {
status: 502,
error: 'BAD_GATEWAY',
@@ -274,29 +177,3 @@ function extractLlmErrorMessage(err: unknown): string {
}
return 'Временная ошибка конвертации';
}
function parseConversionResult(content: string): StatementFile | PdfConversionError {
const jsonMatch = content.match(/\{[\s\S]*\}/);
const jsonStr = jsonMatch ? jsonMatch[0] : content;
let parsed: unknown;
try {
parsed = JSON.parse(jsonStr);
} catch {
return {
status: 422,
error: 'VALIDATION_ERROR',
message: 'Результат конвертации не является валидным JSON',
};
}
const data = parsed as Record<string, unknown>;
if (data.schemaVersion !== '1.0') {
return {
status: 422,
error: 'VALIDATION_ERROR',
message: 'Результат конвертации не соответствует схеме 1.0',
};
}
return parsed as StatementFile;
}