Merge pull request 'Revert SSE streaming for PDF import, use synchronous flow' (#11) from revert/remove-sse-streaming into main

Reviewed-on: #11
This commit was merged in pull request #11.
This commit is contained in:
2026-03-14 17:13:00 +00:00
10 changed files with 73 additions and 695 deletions

View File

@@ -3,7 +3,7 @@ import multer from 'multer';
import { asyncHandler } from '../utils';
import { importStatement, isValidationError } from '../services/import';
import {
convertPdfToStatementStreaming,
convertPdfToStatement,
isPdfConversionError,
} from '../services/pdfToStatement';
@@ -28,10 +28,6 @@ function isJsonFile(file: { mimetype: string; originalname: string }): boolean {
);
}
function sseWrite(res: import('express').Response, data: Record<string, unknown>) {
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
const router = Router();
router.post(
@@ -55,60 +51,19 @@ router.post(
return;
}
let body: unknown;
if (isPdfFile(file)) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.socket?.setNoDelay(true);
res.flushHeaders();
try {
const converted = await convertPdfToStatementStreaming(
file.buffer,
(stage, progress, message) => {
sseWrite(res, { stage, progress, message });
},
);
const converted = await convertPdfToStatement(file.buffer);
if (isPdfConversionError(converted)) {
sseWrite(res, {
stage: 'error',
res.status(converted.status).json({
error: converted.error,
message: converted.message,
});
res.end();
return;
}
const result = await importStatement(converted);
if (isValidationError(result)) {
sseWrite(res, {
stage: 'error',
message: (result as { message: string }).message,
});
res.end();
return;
}
sseWrite(res, {
stage: 'done',
progress: 100,
result,
});
} catch (err) {
console.error('SSE import error:', err);
sseWrite(res, {
stage: 'error',
message: 'Внутренняя ошибка сервера',
});
}
res.end();
return;
}
// JSON files — synchronous response as before
let body: unknown;
body = converted;
} else {
try {
body = JSON.parse(file.buffer.toString('utf-8'));
} catch {
@@ -118,6 +73,7 @@ router.post(
});
return;
}
}
const result = await importStatement(body);
if (isValidationError(result)) {

View File

@@ -131,151 +131,6 @@ export async function convertPdfToStatement(
};
}
return parseConversionResult(content);
} catch (err) {
console.error('LLM conversion error:', err);
return {
status: 502,
error: 'BAD_GATEWAY',
message: extractLlmErrorMessage(err),
};
}
}
export type ProgressStage = 'pdf' | 'llm' | 'import';
export type OnProgress = (stage: ProgressStage, progress: number, message: string) => void;
const LLM_PROGRESS_MIN = 10;
const LLM_PROGRESS_MAX = 98;
const LLM_PROGRESS_RANGE = LLM_PROGRESS_MAX - LLM_PROGRESS_MIN;
const THROTTLE_MS = 300;
function yieldToEventLoop(): Promise<void> {
return new Promise(resolve => setImmediate(resolve));
}
export async function convertPdfToStatementStreaming(
buffer: Buffer,
onProgress: OnProgress,
): Promise<StatementFile | PdfConversionError> {
if (!config.llmApiKey || config.llmApiKey.trim() === '') {
return {
status: 503,
error: 'SERVICE_UNAVAILABLE',
message: 'Конвертация PDF недоступна: не задан LLM_API_KEY',
};
}
onProgress('pdf', 2, 'Извлечение текста из PDF...');
await yieldToEventLoop();
let text: string;
try {
const result = await getPdfParse()(buffer);
text = result.text || '';
} catch (err) {
console.error('PDF extraction error:', err);
return {
status: 400,
error: 'BAD_REQUEST',
message: 'Не удалось обработать PDF-файл',
};
}
if (!text || text.trim().length === 0) {
return {
status: 400,
error: 'BAD_REQUEST',
message: 'Не удалось извлечь текст из PDF',
};
}
onProgress('pdf', 8, 'Текст извлечён, отправка в LLM...');
await yieldToEventLoop();
const openai = new OpenAI({
apiKey: config.llmApiKey,
...(config.llmApiBaseUrl && { baseURL: config.llmApiBaseUrl }),
timeout: 5 * 60 * 1000,
});
try {
const stream = await openai.chat.completions.create({
model: config.llmModel,
messages: [
{ role: 'system', content: PDF2JSON_PROMPT },
{ role: 'user', content: `Текст выписки:\n\n${text}` },
],
temperature: 0,
max_tokens: 32768,
stream: true,
});
const expectedChars = Math.max(2_000, Math.min(text.length * 2, 30_000));
let accumulated = '';
let charsReceived = 0;
let lastEmitTime = 0;
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
accumulated += delta;
charsReceived += delta.length;
const now = Date.now();
if (now - lastEmitTime >= THROTTLE_MS) {
const ratio = Math.min(1, charsReceived / expectedChars);
const llmProgress = Math.min(
LLM_PROGRESS_MAX,
Math.round(ratio * LLM_PROGRESS_RANGE + LLM_PROGRESS_MIN),
);
onProgress('llm', llmProgress, 'Конвертация через LLM...');
lastEmitTime = now;
// Let the event loop flush socket writes to the network
await yieldToEventLoop();
}
}
}
onProgress('llm', LLM_PROGRESS_MAX, 'LLM завершил, обработка результата...');
await yieldToEventLoop();
const content = accumulated.trim();
if (!content) {
return {
status: 422,
error: 'VALIDATION_ERROR',
message: 'Результат конвертации пуст',
};
}
return parseConversionResult(content);
} catch (err) {
console.error('LLM streaming error:', err);
return {
status: 502,
error: 'BAD_GATEWAY',
message: extractLlmErrorMessage(err),
};
}
}
function extractLlmErrorMessage(err: unknown): string {
const raw = String(
(err as Record<string, unknown>)?.message ??
(err as Record<string, Record<string, unknown>>)?.error?.message ?? '',
);
if (/context.length|n_ctx|too.many.tokens|maximum.context/i.test(raw)) {
return 'PDF-файл слишком большой для обработки. Попробуйте файл с меньшим количеством операций или используйте модель с большим контекстным окном.';
}
if (/timeout|timed?\s*out|ETIMEDOUT|ECONNREFUSED/i.test(raw)) {
return 'LLM-сервер не отвечает. Проверьте, что сервер запущен и доступен.';
}
return 'Временная ошибка конвертации';
}
function parseConversionResult(content: string): StatementFile | PdfConversionError {
const jsonMatch = content.match(/\{[\s\S]*\}/);
const jsonStr = jsonMatch ? jsonMatch[0] : content;
let parsed: unknown;
@@ -299,4 +154,26 @@ function parseConversionResult(content: string): StatementFile | PdfConversionEr
}
return parsed as StatementFile;
} catch (err) {
console.error('LLM conversion error:', err);
return {
status: 502,
error: 'BAD_GATEWAY',
message: extractLlmErrorMessage(err),
};
}
}
function extractLlmErrorMessage(err: unknown): string {
const raw = String(
(err as Record<string, unknown>)?.message ??
(err as Record<string, Record<string, unknown>>)?.error?.message ?? '',
);
if (/context.length|n_ctx|too.many.tokens|maximum.context/i.test(raw)) {
return 'PDF-файл слишком большой для обработки. Попробуйте файл с меньшим количеством операций или используйте модель с большим контекстным окном.';
}
if (/timeout|timed?\s*out|ETIMEDOUT|ECONNREFUSED/i.test(raw)) {
return 'LLM-сервер не отвечает. Проверьте, что сервер запущен и доступен.';
}
return 'Временная ошибка конвертации';
}

View File

@@ -3,7 +3,7 @@ server {
root /usr/share/nginx/html;
index index.html;
# Import endpoint — SSE streaming, long timeout, no buffering
# Import endpoint — long timeout for LLM processing
location /api/import {
proxy_pass http://family-budget-backend:3000;
proxy_http_version 1.1;
@@ -14,8 +14,6 @@ server {
proxy_cookie_path / /;
proxy_connect_timeout 5s;
proxy_read_timeout 600s;
proxy_buffering off;
gzip off;
client_max_body_size 15m;
}

View File

@@ -1,6 +1,5 @@
import { Routes, Route, Navigate } from 'react-router-dom';
import { useAuth } from './context/AuthContext';
import { ImportProvider } from './context/ImportContext';
import { Layout } from './components/Layout';
import { LoginPage } from './pages/LoginPage';
import { HistoryPage } from './pages/HistoryPage';
@@ -19,7 +18,6 @@ export function App() {
}
return (
<ImportProvider>
<Layout>
<Routes>
<Route path="/" element={<Navigate to="/history" replace />} />
@@ -29,6 +27,5 @@ export function App() {
<Route path="*" element={<Navigate to="/history" replace />} />
</Routes>
</Layout>
</ImportProvider>
);
}

View File

@@ -11,74 +11,3 @@ export async function importStatement(
formData,
);
}
export interface SseProgressEvent {
stage: 'pdf' | 'llm' | 'import';
progress: number;
message: string;
}
export interface SseDoneEvent {
stage: 'done';
progress: 100;
result: ImportStatementResponse;
}
export interface SseErrorEvent {
stage: 'error';
message: string;
}
export type SseEvent = SseProgressEvent | SseDoneEvent | SseErrorEvent;
export async function importStatementStream(
file: File,
onEvent: (event: SseEvent) => void,
): Promise<void> {
const formData = new FormData();
formData.append('file', file);
const res = await fetch('/api/import/statement', {
method: 'POST',
body: formData,
credentials: 'include',
});
if (!res.ok) {
let msg = 'Ошибка импорта';
try {
const body = await res.json();
if (body.message) msg = body.message;
} catch { /* use default */ }
onEvent({ stage: 'error', message: msg });
return;
}
const reader = res.body?.getReader();
if (!reader) {
onEvent({ stage: 'error', message: 'Streaming не поддерживается' });
return;
}
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith('data: ')) continue;
try {
const parsed = JSON.parse(trimmed.slice(6)) as SseEvent;
onEvent(parsed);
} catch { /* skip malformed lines */ }
}
}
}

View File

@@ -2,7 +2,6 @@ import { useState, useRef } from 'react';
import type { ImportStatementResponse } from '@family-budget/shared';
import { importStatement } from '../api/import';
import { updateAccount } from '../api/accounts';
import { useImport } from '../context/ImportContext';
interface Props {
onClose: () => void;
@@ -10,19 +9,13 @@ interface Props {
}
export function ImportModal({ onClose, onDone }: Props) {
const { importState, startImport, clearImport } = useImport();
const [jsonResult, setJsonResult] = useState<ImportStatementResponse | null>(null);
const [jsonError, setJsonError] = useState('');
const [jsonLoading, setJsonLoading] = useState(false);
const [result, setResult] = useState<ImportStatementResponse | null>(null);
const [error, setError] = useState('');
const [loading, setLoading] = useState(false);
const [alias, setAlias] = useState('');
const [aliasSaved, setAliasSaved] = useState(false);
const fileRef = useRef<HTMLInputElement>(null);
const result = importState.result ?? jsonResult;
const error = importState.error || jsonError;
const loading = importState.active || jsonLoading;
const handleFileChange = async (
e: React.ChangeEvent<HTMLInputElement>,
) => {
@@ -35,45 +28,26 @@ export function ImportModal({ onClose, onDone }: Props) {
const isJson = type === 'application/json' || name.endsWith('.json');
if (!isPdf && !isJson) {
setJsonError('Допустимы только файлы PDF или JSON');
setError('Допустимы только файлы PDF или JSON');
return;
}
setJsonError('');
setJsonResult(null);
setLoading(true);
setError('');
setResult(null);
if (isPdf) {
startImport(file);
return;
}
setJsonLoading(true);
try {
const resp = await importStatement(file);
setJsonResult(resp);
setResult(resp);
} catch (err: unknown) {
const msg =
err instanceof Error ? err.message : 'Ошибка импорта';
setJsonError(msg);
setError(msg);
} finally {
setJsonLoading(false);
setLoading(false);
}
};
const handleClose = () => {
if (importState.active) {
if (window.confirm('Импорт продолжится в фоне. Закрыть окно?')) {
onClose();
}
} else {
onClose();
}
};
const handleDone = () => {
onDone();
};
const handleSaveAlias = async () => {
if (!result || !alias.trim()) return;
try {
@@ -84,21 +58,12 @@ export function ImportModal({ onClose, onDone }: Props) {
}
};
const stageLabel = (stage: string) => {
switch (stage) {
case 'pdf': return 'Извлечение текста...';
case 'llm': return 'Конвертация через LLM...';
case 'import': return 'Сохранение в базу...';
default: return 'Обработка...';
}
};
return (
<div className="modal-overlay" onClick={handleClose}>
<div className="modal-overlay" onClick={onClose}>
<div className="modal" onClick={(e) => e.stopPropagation()}>
<div className="modal-header">
<h2>Импорт выписки</h2>
<button className="btn-close" onClick={handleClose}>
<button className="btn-close" onClick={onClose}>
&times;
</button>
</div>
@@ -106,7 +71,7 @@ export function ImportModal({ onClose, onDone }: Props) {
<div className="modal-body">
{error && <div className="alert alert-error">{error}</div>}
{!result && !importState.active && (
{!result && (
<div className="import-upload">
<p>Выберите файл выписки (PDF или JSON, формат 1.0)</p>
<input
@@ -115,30 +80,13 @@ export function ImportModal({ onClose, onDone }: Props) {
accept=".pdf,.json,application/pdf,application/json"
onChange={handleFileChange}
className="file-input"
disabled={loading}
/>
{jsonLoading && (
{loading && (
<div className="import-loading">Импорт...</div>
)}
</div>
)}
{importState.active && (
<div className="import-upload">
<div className="import-progress-modal">
<div className="import-progress-modal-bar">
<div
className="import-progress-modal-fill"
style={{ width: `${importState.progress}%` }}
/>
</div>
<p className="import-progress-modal-label">
{stageLabel(importState.stage)} {importState.progress}%
</p>
</div>
</div>
)}
{result && (
<div className="import-result">
<div className="import-result-icon" aria-hidden="true"></div>
@@ -201,15 +149,16 @@ export function ImportModal({ onClose, onDone }: Props) {
<div className="modal-footer">
{result ? (
<button className="btn btn-primary" onClick={handleDone}>
<button className="btn btn-primary" onClick={onDone}>
Готово
</button>
) : (
<button
className="btn btn-secondary"
onClick={handleClose}
onClick={onClose}
disabled={loading}
>
{importState.active ? 'Свернуть' : 'Отмена'}
Отмена
</button>
)}
</div>

View File

@@ -1,70 +1,6 @@
import { useState, useEffect, useRef, useCallback, type ReactNode } from 'react';
import { useState, type ReactNode } from 'react';
import { NavLink } from 'react-router-dom';
import { useAuth } from '../context/AuthContext';
import { useImport } from '../context/ImportContext';
function ImportProgressBar() {
const { importState, clearImport, openModal } = useImport();
const [visible, setVisible] = useState(false);
const hideTimerRef = useRef<ReturnType<typeof setTimeout> | undefined>(undefined);
const isActive = importState.active;
const isDone = importState.stage === 'done';
const isError = importState.stage === 'error';
const showBar = isActive || isDone || isError;
useEffect(() => {
if (showBar) {
setVisible(true);
if (hideTimerRef.current) clearTimeout(hideTimerRef.current);
}
if (isDone || isError) {
hideTimerRef.current = setTimeout(() => {
setVisible(false);
clearImport();
}, 10_000);
}
return () => {
if (hideTimerRef.current) clearTimeout(hideTimerRef.current);
};
}, [showBar, isDone, isError, clearImport]);
const handleClick = useCallback(() => {
if (hideTimerRef.current) clearTimeout(hideTimerRef.current);
openModal();
setVisible(false);
}, [openModal]);
if (!visible) return null;
const barClass = isError
? 'import-progress-bar import-progress-bar--error'
: isDone
? 'import-progress-bar import-progress-bar--done'
: 'import-progress-bar';
const labelText = isError
? `Ошибка импорта: ${importState.message}`
: isDone && importState.result
? `Импорт завершён — ${importState.result.imported} операций`
: `${importState.message} ${importState.progress}%`;
return (
<div className={barClass}>
<div
className="import-progress-bar__fill"
style={{ width: `${isDone ? 100 : importState.progress}%` }}
/>
<button
type="button"
className={`import-progress-label ${isDone || isError ? 'import-progress-label--clickable' : ''}`}
onClick={isDone || isError ? handleClick : undefined}
>
{labelText}
</button>
</div>
);
}
export function Layout({ children }: { children: ReactNode }) {
const { user, logout } = useAuth();
@@ -74,8 +10,6 @@ export function Layout({ children }: { children: ReactNode }) {
return (
<div className="layout">
<ImportProgressBar />
<button
type="button"
className="burger-btn"

View File

@@ -1,114 +0,0 @@
import {
createContext,
useContext,
useState,
useCallback,
useRef,
type ReactNode,
} from 'react';
import type { ImportStatementResponse } from '@family-budget/shared';
import { importStatementStream, type SseEvent } from '../api/import';
export interface ImportProgress {
active: boolean;
stage: string;
progress: number;
message: string;
result?: ImportStatementResponse;
error?: string;
}
interface ImportContextValue {
importState: ImportProgress;
showModal: boolean;
openModal: () => void;
closeModal: () => void;
startImport: (file: File) => void;
clearImport: () => void;
}
const INITIAL: ImportProgress = {
active: false,
stage: '',
progress: 0,
message: '',
};
const ImportContext = createContext<ImportContextValue | null>(null);
export function ImportProvider({ children }: { children: ReactNode }) {
const [importState, setImportState] = useState<ImportProgress>(INITIAL);
const [showModal, setShowModal] = useState(false);
const runningRef = useRef(false);
const openModal = useCallback(() => setShowModal(true), []);
const closeModal = useCallback(() => setShowModal(false), []);
const startImport = useCallback((file: File) => {
if (runningRef.current) return;
runningRef.current = true;
setImportState({
active: true,
stage: 'pdf',
progress: 0,
message: 'Загрузка файла...',
});
importStatementStream(file, (event: SseEvent) => {
if (event.stage === 'done') {
setImportState({
active: false,
stage: 'done',
progress: 100,
message: 'Импорт завершён',
result: event.result,
});
runningRef.current = false;
} else if (event.stage === 'error') {
setImportState({
active: false,
stage: 'error',
progress: 0,
message: event.message,
error: event.message,
});
runningRef.current = false;
} else {
setImportState({
active: true,
stage: event.stage,
progress: event.progress,
message: event.message,
});
}
}).catch((err) => {
setImportState({
active: false,
stage: 'error',
progress: 0,
message: err instanceof Error ? err.message : 'Ошибка импорта',
error: err instanceof Error ? err.message : 'Ошибка импорта',
});
runningRef.current = false;
});
}, []);
const clearImport = useCallback(() => {
setImportState(INITIAL);
}, []);
return (
<ImportContext.Provider value={{
importState, showModal, openModal, closeModal, startImport, clearImport,
}}>
{children}
</ImportContext.Provider>
);
}
export function useImport(): ImportContextValue {
const ctx = useContext(ImportContext);
if (!ctx) throw new Error('useImport must be used within ImportProvider');
return ctx;
}

View File

@@ -18,7 +18,6 @@ import { TransactionTable } from '../components/TransactionTable';
import { Pagination } from '../components/Pagination';
import { EditTransactionModal } from '../components/EditTransactionModal';
import { ImportModal } from '../components/ImportModal';
import { useImport } from '../context/ImportContext';
import { toISODate } from '../utils/format';
const PARAM_KEYS = [
@@ -126,7 +125,7 @@ export function HistoryPage() {
const [accounts, setAccounts] = useState<Account[]>([]);
const [categories, setCategories] = useState<Category[]>([]);
const [editingTx, setEditingTx] = useState<Transaction | null>(null);
const { showModal: showImport, openModal: openImport, closeModal: closeImport, clearImport } = useImport();
const [showImport, setShowImport] = useState(false);
useEffect(() => {
getAccounts().then(setAccounts).catch(() => {});
@@ -198,8 +197,7 @@ export function HistoryPage() {
};
const handleImportDone = () => {
closeImport();
clearImport();
setShowImport(false);
fetchData();
getAccounts().then(setAccounts).catch(() => {});
};
@@ -210,7 +208,7 @@ export function HistoryPage() {
<h1>История операций</h1>
<button
className="btn btn-primary"
onClick={openImport}
onClick={() => setShowImport(true)}
>
Импорт выписки
</button>
@@ -264,7 +262,7 @@ export function HistoryPage() {
{showImport && (
<ImportModal
onClose={closeImport}
onClose={() => setShowImport(false)}
onDone={handleImportDone}
/>
)}

View File

@@ -1058,152 +1058,6 @@ textarea {
font-weight: 500;
}
/* Import progress bar in modal */
.import-progress-modal {
padding: 20px 0;
}
.import-progress-modal-bar {
height: 8px;
background: var(--color-border);
border-radius: 4px;
overflow: hidden;
margin-bottom: 10px;
}
.import-progress-modal-fill {
height: 100%;
background: linear-gradient(90deg, var(--color-primary), #6366f1);
border-radius: 4px;
transition: width 0.3s ease;
position: relative;
}
.import-progress-modal-fill::after {
content: '';
position: absolute;
inset: 0;
background: linear-gradient(
90deg,
transparent 0%,
rgba(255, 255, 255, 0.3) 50%,
transparent 100%
);
animation: shimmer 1.5s infinite;
}
.import-progress-modal-label {
text-align: center;
color: var(--color-text-secondary);
font-size: 14px;
}
/* ================================================================
Import progress bar (fixed top, Layout)
================================================================ */
.import-progress-bar {
position: fixed;
top: 0;
left: 0;
right: 0;
z-index: 300;
height: 4px;
background: var(--color-border);
}
.import-progress-bar--done {
background: var(--color-success-light);
}
.import-progress-bar--error {
background: var(--color-danger-light);
}
.import-progress-bar__fill {
height: 100%;
border-radius: 0 2px 2px 0;
transition: width 0.3s ease;
position: relative;
background: linear-gradient(90deg, var(--color-primary), #6366f1);
}
.import-progress-bar__fill::after {
content: '';
position: absolute;
inset: 0;
background: linear-gradient(
90deg,
transparent 0%,
rgba(255, 255, 255, 0.4) 50%,
transparent 100%
);
animation: shimmer 1.5s infinite;
}
.import-progress-bar--done .import-progress-bar__fill {
background: var(--color-success);
}
.import-progress-bar--done .import-progress-bar__fill::after {
animation: none;
}
.import-progress-bar--error .import-progress-bar__fill {
background: var(--color-danger);
width: 100% !important;
}
.import-progress-bar--error .import-progress-bar__fill::after {
animation: none;
}
@keyframes shimmer {
0% {
transform: translateX(-100%);
}
100% {
transform: translateX(100%);
}
}
.import-progress-label {
position: fixed;
top: 6px;
right: 16px;
z-index: 301;
background: var(--color-surface);
border: 1px solid var(--color-border);
border-radius: 16px;
padding: 4px 14px;
font-size: 12px;
font-weight: 500;
color: var(--color-text-secondary);
box-shadow: var(--shadow-sm);
white-space: nowrap;
cursor: default;
font-family: inherit;
}
.import-progress-label--clickable {
cursor: pointer;
}
.import-progress-label--clickable:hover {
background: var(--color-bg);
border-color: var(--color-border-hover);
}
.import-progress-bar--done .import-progress-label {
color: var(--color-success);
border-color: var(--color-success);
}
.import-progress-bar--error .import-progress-label {
color: var(--color-danger);
border-color: var(--color-danger);
}
/* ================================================================
Tabs
================================================================ */