54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
from collections.abc import Generator
|
|
|
|
from sqlalchemy import create_engine, inspect, text
|
|
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
|
|
|
|
from app.config import get_settings
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
def _connect_args(database_url: str) -> dict[str, object]:
|
|
if database_url.startswith("sqlite"):
|
|
return {"check_same_thread": False}
|
|
return {}
|
|
|
|
|
|
settings = get_settings()
|
|
engine = create_engine(settings.database_url, connect_args=_connect_args(settings.database_url))
|
|
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
|
|
|
|
def init_db() -> None:
|
|
import app.models # noqa: F401
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
_ensure_runtime_schema()
|
|
|
|
|
|
def _ensure_runtime_schema() -> None:
|
|
import app.models as models
|
|
|
|
inspector = inspect(engine)
|
|
table_names = set(inspector.get_table_names())
|
|
if "employees" in table_names and "employee_publications" not in table_names:
|
|
models.EmployeePublication.__table__.create(bind=engine, checkfirst=True)
|
|
inspector = inspect(engine)
|
|
table_names = set(inspector.get_table_names())
|
|
if "crawl_runs" not in table_names:
|
|
return
|
|
crawl_run_columns = {column["name"] for column in inspector.get_columns("crawl_runs")}
|
|
if "skipped_count" not in crawl_run_columns:
|
|
with engine.begin() as connection:
|
|
connection.execute(text("ALTER TABLE crawl_runs ADD COLUMN skipped_count INTEGER NOT NULL DEFAULT 0"))
|
|
|
|
|
|
def get_db() -> Generator[Session, None, None]:
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|