chore: add user threads db table setup

This commit is contained in:
Christopher Gondek 2025-10-08 11:37:36 +02:00
parent 8f96c3ef30
commit 30c3f9f7f1
3 changed files with 80 additions and 3 deletions

41
app.py
View file

@ -1,11 +1,14 @@
import os import os
import sys import sys
import asyncio import asyncio
from urllib.parse import quote_plus
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from modules.features.chatBot.database import init_models as init_chatbot_models
os.environ["NUMEXPR_MAX_THREADS"] = "12" os.environ["NUMEXPR_MAX_THREADS"] = "12"
# Fix for Windows asyncio compatibility with psycopg # Fix for Windows asyncio compatibility with psycopg
if sys.platform == 'win32': if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
from fastapi import FastAPI, HTTPException, Depends, Body, status, Response from fastapi import FastAPI, HTTPException, Depends, Body, status, Response
@ -238,6 +241,15 @@ def initLogging():
) )
def make_sqlalchemy_db_url() -> str:
host = APP_CONFIG.get("SQLALCHEMY_DB_HOST", "localhost")
port = APP_CONFIG.get("SQLALCHEMY_DB_PORT", "5432")
db = APP_CONFIG.get("SQLALCHEMY_DB_DATABASE", "project_gateway")
user = APP_CONFIG.get("SQLALCHEMY_DB_USER", "postgres")
pwd = quote_plus(APP_CONFIG.get("SQLALCHEMY_DB_PASSWORD_SECRET", ""))
return f"postgresql+psycopg://{user}:{pwd}@{host}:{port}/{db}"
# Initialize logging # Initialize logging
initLogging() initLogging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -249,7 +261,22 @@ instanceLabel = APP_CONFIG.get("APP_ENV_LABEL")
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
logger.info("Application is starting up") logger.info("Application is starting up")
# Initialize LangGraph checkpointer # --- Init SQLAlchemy ---
engine = create_async_engine(
make_sqlalchemy_db_url(), pool_pre_ping=True, echo=False
)
SessionLocal = async_sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
app.state.checkpoint_engine = engine
app.state.checkpoint_sessionmaker = SessionLocal
# NOTE: Might need Alembic migrations in the future
await init_chatbot_models(engine)
# --- Initialize LangGraph checkpointer ---
from modules.features.chatBot.utils.checkpointer import ( from modules.features.chatBot.utils.checkpointer import (
initialize_checkpointer, initialize_checkpointer,
close_checkpointer, close_checkpointer,
@ -262,12 +289,20 @@ async def lifespan(app: FastAPI):
logger.error(f"Failed to initialize LangGraph checkpointer: {str(e)}") logger.error(f"Failed to initialize LangGraph checkpointer: {str(e)}")
# Continue startup even if checkpointer fails to initialize # Continue startup even if checkpointer fails to initialize
# --- Init Event Manager ---
eventManager.start() eventManager.start()
yield yield
# Cleanup # --- Cleanup Event Manager ---
eventManager.stop() eventManager.stop()
# --- Cleanup LangGraph checkpointer ---
await close_checkpointer() await close_checkpointer()
# --- Cleanup SQLAlchemy ---
await engine.dispose()
logger.info("Application has been shut down") logger.info("Application has been shut down")

View file

@ -0,0 +1,40 @@
from typing import AsyncIterator
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String
class Base(DeclarativeBase):
pass
# User Thread Mapping Table
class UserThreadMapping(Base):
"""Mapping of users to their chat threads.
Used to keep track of which user owns which chat thread.
Also stores meta data like thread name.
"""
__tablename__ = "userThreads"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
userId: Mapped[int] = mapped_column(nullable=False)
threadId: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
threadName: Mapped[str] = mapped_column(String(255), nullable=False)
# Dependency that pulls the sessionmaker off app.state
# This is set in app.py on startup in @asynccontextmanager
async def get_session(request: Request) -> AsyncIterator[AsyncSession]:
SessionLocal: async_sessionmaker[AsyncSession] = (
request.app.state.checkpoint_sessionmaker
)
async with SessionLocal() as session:
yield session
# Optional helper to init tables at startup (demo only)
async def init_models(engine) -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

View file

@ -117,3 +117,5 @@ langchain-anthropic==0.3.1 # For Claude models
psycopg[binary]==3.2.1 # For PostgreSQL async support (LangGraph checkpointer) psycopg[binary]==3.2.1 # For PostgreSQL async support (LangGraph checkpointer)
psycopg-pool==3.2.1 # Connection pooling for PostgreSQL psycopg-pool==3.2.1 # Connection pooling for PostgreSQL
langgraph-checkpoint-postgres==2.0.24 langgraph-checkpoint-postgres==2.0.24
greenlet==3.2.4