From 30c3f9f7f1263d8e2c552c03774ca7ac4dd08283 Mon Sep 17 00:00:00 2001 From: Christopher Gondek Date: Wed, 8 Oct 2025 11:37:36 +0200 Subject: [PATCH] chore: add user threads db table setup --- app.py | 41 ++++++++++++++++++++++++++-- modules/features/chatBot/database.py | 40 +++++++++++++++++++++++++++ requirements.txt | 2 ++ 3 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 modules/features/chatBot/database.py diff --git a/app.py b/app.py index 7d92601f..b75b59e8 100644 --- a/app.py +++ b/app.py @@ -1,11 +1,14 @@ import os import sys import asyncio +from urllib.parse import quote_plus +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession +from modules.features.chatBot.database import init_models as init_chatbot_models os.environ["NUMEXPR_MAX_THREADS"] = "12" # Fix for Windows asyncio compatibility with psycopg -if sys.platform == 'win32': +if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) from fastapi import FastAPI, HTTPException, Depends, Body, status, Response @@ -238,6 +241,15 @@ def initLogging(): ) +def make_sqlalchemy_db_url() -> str: + host = APP_CONFIG.get("SQLALCHEMY_DB_HOST", "localhost") + port = APP_CONFIG.get("SQLALCHEMY_DB_PORT", "5432") + db = APP_CONFIG.get("SQLALCHEMY_DB_DATABASE", "project_gateway") + user = APP_CONFIG.get("SQLALCHEMY_DB_USER", "postgres") + pwd = quote_plus(APP_CONFIG.get("SQLALCHEMY_DB_PASSWORD_SECRET", "")) + return f"postgresql+psycopg://{user}:{pwd}@{host}:{port}/{db}" + + # Initialize logging initLogging() logger = logging.getLogger(__name__) @@ -249,7 +261,22 @@ instanceLabel = APP_CONFIG.get("APP_ENV_LABEL") async def lifespan(app: FastAPI): logger.info("Application is starting up") - # Initialize LangGraph checkpointer + # --- Init SQLAlchemy --- + + engine = create_async_engine( + make_sqlalchemy_db_url(), pool_pre_ping=True, echo=False + ) + SessionLocal = async_sessionmaker( + engine, expire_on_commit=False, class_=AsyncSession + ) + app.state.checkpoint_engine = engine + app.state.checkpoint_sessionmaker = SessionLocal + + # NOTE: Might need Alembic migrations in the future + await init_chatbot_models(engine) + + # --- Initialize LangGraph checkpointer --- + from modules.features.chatBot.utils.checkpointer import ( initialize_checkpointer, close_checkpointer, @@ -262,12 +289,20 @@ async def lifespan(app: FastAPI): logger.error(f"Failed to initialize LangGraph checkpointer: {str(e)}") # Continue startup even if checkpointer fails to initialize + # --- Init Event Manager --- eventManager.start() + yield - # Cleanup + # --- Cleanup Event Manager --- eventManager.stop() + + # --- Cleanup LangGraph checkpointer --- await close_checkpointer() + + # --- Cleanup SQLAlchemy --- + await engine.dispose() + logger.info("Application has been shut down") diff --git a/modules/features/chatBot/database.py b/modules/features/chatBot/database.py new file mode 100644 index 00000000..fc190205 --- /dev/null +++ b/modules/features/chatBot/database.py @@ -0,0 +1,40 @@ +from typing import AsyncIterator +from fastapi import Request +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy import String + + +class Base(DeclarativeBase): + pass + + +# User Thread Mapping Table +class UserThreadMapping(Base): + """Mapping of users to their chat threads. + + Used to keep track of which user owns which chat thread. + Also stores meta data like thread name. + """ + + __tablename__ = "userThreads" + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + userId: Mapped[int] = mapped_column(nullable=False) + threadId: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) + threadName: Mapped[str] = mapped_column(String(255), nullable=False) + + +# Dependency that pulls the sessionmaker off app.state +# This is set in app.py on startup in @asynccontextmanager +async def get_session(request: Request) -> AsyncIterator[AsyncSession]: + SessionLocal: async_sessionmaker[AsyncSession] = ( + request.app.state.checkpoint_sessionmaker + ) + async with SessionLocal() as session: + yield session + + +# Optional helper to init tables at startup (demo only) +async def init_models(engine) -> None: + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) diff --git a/requirements.txt b/requirements.txt index 4a089c4b..6e8f2399 100644 --- a/requirements.txt +++ b/requirements.txt @@ -117,3 +117,5 @@ langchain-anthropic==0.3.1 # For Claude models psycopg[binary]==3.2.1 # For PostgreSQL async support (LangGraph checkpointer) psycopg-pool==3.2.1 # Connection pooling for PostgreSQL langgraph-checkpoint-postgres==2.0.24 + +greenlet==3.2.4 \ No newline at end of file