from collections.abc import Generator from sqlalchemy import create_engine, inspect, text from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker from app.config import get_settings class Base(DeclarativeBase): pass def _connect_args(database_url: str) -> dict[str, object]: if database_url.startswith("sqlite"): return {"check_same_thread": False} return {} settings = get_settings() engine = create_engine(settings.database_url, connect_args=_connect_args(settings.database_url)) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) def init_db() -> None: import app.models # noqa: F401 Base.metadata.create_all(bind=engine) _ensure_runtime_schema() def _ensure_runtime_schema() -> None: import app.models as models inspector = inspect(engine) table_names = set(inspector.get_table_names()) if "employees" in table_names and "employee_publications" not in table_names: models.EmployeePublication.__table__.create(bind=engine, checkfirst=True) inspector = inspect(engine) table_names = set(inspector.get_table_names()) if "employees" in table_names and "employee_news_links" not in table_names: models.EmployeeNewsLink.__table__.create(bind=engine, checkfirst=True) inspector = inspect(engine) table_names = set(inspector.get_table_names()) if "crawl_runs" not in table_names: return crawl_run_columns = {column["name"] for column in inspector.get_columns("crawl_runs")} if "skipped_count" not in crawl_run_columns: with engine.begin() as connection: connection.execute(text("ALTER TABLE crawl_runs ADD COLUMN skipped_count INTEGER NOT NULL DEFAULT 0")) def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db finally: db.close()