chore: use minimal langgraph app to read and delete checkpointer history
This commit is contained in:
parent
bfc07ee0b1
commit
9bf454823b
1 changed files with 51 additions and 69 deletions
|
|
@ -22,6 +22,7 @@ from modules.datamodels.datamodelChatbot import (
|
|||
from modules.datamodels.datamodelUam import User
|
||||
|
||||
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
|
||||
from langgraph.graph import StateGraph, MessagesState, START, END
|
||||
from modules.shared.configuration import APP_CONFIG
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -492,22 +493,65 @@ async def post_message_stream(
|
|||
)
|
||||
|
||||
|
||||
# Module-level singleton for minimal app used to read thread state
|
||||
_MINIMAL_APP = None
|
||||
|
||||
|
||||
def _build_minimal_app(*, checkpointer):
|
||||
"""Build a minimal LangGraph app for reading thread state.
|
||||
|
||||
This creates a valid graph with a no-op node that we never actually run.
|
||||
LangGraph requires a valid graph structure (with edges from START) to compile,
|
||||
even though we only use it to call aget_state() to read from the checkpointer.
|
||||
|
||||
Args:
|
||||
checkpointer: The checkpointer to attach to the graph.
|
||||
|
||||
Returns:
|
||||
A compiled StateGraph that can be used to read thread state.
|
||||
"""
|
||||
graph = StateGraph(MessagesState)
|
||||
|
||||
# No-op node that returns the state unchanged
|
||||
def noop(state: dict) -> dict:
|
||||
return state
|
||||
|
||||
graph.add_node("noop", noop)
|
||||
graph.add_edge(START, "noop")
|
||||
graph.add_edge("noop", END)
|
||||
|
||||
return graph.compile(checkpointer=checkpointer)
|
||||
|
||||
|
||||
def _get_minimal_app():
|
||||
"""Get the module-level singleton minimal app.
|
||||
|
||||
Returns:
|
||||
The cached minimal app, building it on first access.
|
||||
"""
|
||||
global _MINIMAL_APP
|
||||
if _MINIMAL_APP is None:
|
||||
_MINIMAL_APP = _build_minimal_app(checkpointer=get_checkpointer())
|
||||
return _MINIMAL_APP
|
||||
|
||||
|
||||
async def get_thread_messages_from_langgraph(
|
||||
*,
|
||||
thread_id: str,
|
||||
app,
|
||||
) -> List[dict]:
|
||||
"""Retrieve and format messages from LangGraph checkpointer.
|
||||
|
||||
Args:
|
||||
thread_id: The unique identifier for the chat thread.
|
||||
app: The compiled LangGraph app with checkpointer.
|
||||
|
||||
Returns:
|
||||
List of message dicts with role, content, and timestamp.
|
||||
List of message dicts with role and content.
|
||||
"""
|
||||
ROLE_MAP = {"human": "user", "ai": "assistant"}
|
||||
|
||||
# Get the minimal app (singleton, built once)
|
||||
app = _get_minimal_app()
|
||||
|
||||
cfg = {"configurable": {"thread_id": thread_id}}
|
||||
state = await app.aget_state(cfg)
|
||||
|
||||
|
|
@ -562,41 +606,8 @@ async def get_thread_detail_for_user(
|
|||
result = await session.execute(stmt)
|
||||
thread_mapping = result.scalar_one()
|
||||
|
||||
# Build the chatbot app to access LangGraph state
|
||||
# Use same approach as post_message for consistency
|
||||
tool_ids = permissions.get_chatbot_tools(user_id=user.id)
|
||||
if not tool_ids:
|
||||
raise ValueError("User does not have permission to use any chatbot tools")
|
||||
|
||||
model_name = permissions.get_chatbot_model(user_id=user.id)
|
||||
system_prompt = permissions.get_system_prompt(user_id=user.id)
|
||||
|
||||
# Get tools from registry
|
||||
registry = get_registry()
|
||||
tools = registry.get_tool_instances(tool_ids=tool_ids)
|
||||
|
||||
# Get model and checkpointer
|
||||
model = get_langchain_model(model_name=model_name)
|
||||
checkpointer = get_checkpointer()
|
||||
|
||||
# Get context window size from config
|
||||
context_window_size = int(
|
||||
APP_CONFIG.get("CHATBOT_CONTEXT_WINDOW_TOKEN_SIZE", 100000)
|
||||
)
|
||||
|
||||
# Create chatbot instance
|
||||
chatbot = await Chatbot.create(
|
||||
model=model,
|
||||
memory=checkpointer,
|
||||
system_prompt=system_prompt,
|
||||
tools=tools,
|
||||
context_window_size=context_window_size,
|
||||
)
|
||||
|
||||
# Get messages from LangGraph checkpointer
|
||||
message_dicts = await get_thread_messages_from_langgraph(
|
||||
thread_id=thread_id, app=chatbot.app
|
||||
)
|
||||
# Get messages from LangGraph checkpointer (optimized - no full chatbot needed)
|
||||
message_dicts = await get_thread_messages_from_langgraph(thread_id=thread_id)
|
||||
|
||||
# Convert to MessageItem objects
|
||||
messages = [MessageItem(**m) for m in message_dicts]
|
||||
|
|
@ -638,39 +649,10 @@ async def delete_thread_for_user(
|
|||
thread_id=thread_id, user=user, session=session
|
||||
)
|
||||
|
||||
# Build the chatbot app to access the checkpointer
|
||||
tool_ids = permissions.get_chatbot_tools(user_id=user.id)
|
||||
if not tool_ids:
|
||||
raise ValueError("User does not have permission to use any chatbot tools")
|
||||
|
||||
model_name = permissions.get_chatbot_model(user_id=user.id)
|
||||
system_prompt = permissions.get_system_prompt(user_id=user.id)
|
||||
|
||||
# Get tools from registry
|
||||
registry = get_registry()
|
||||
tools = registry.get_tool_instances(tool_ids=tool_ids)
|
||||
|
||||
# Get model and checkpointer
|
||||
model = get_langchain_model(model_name=model_name)
|
||||
# Delete from LangGraph checkpointer (optimized - no app/tools/model needed)
|
||||
checkpointer = get_checkpointer()
|
||||
|
||||
# Get context window size from config
|
||||
context_window_size = int(
|
||||
APP_CONFIG.get("CHATBOT_CONTEXT_WINDOW_TOKEN_SIZE", 100000)
|
||||
)
|
||||
|
||||
# Create chatbot instance
|
||||
chatbot = await Chatbot.create(
|
||||
model=model,
|
||||
memory=checkpointer,
|
||||
system_prompt=system_prompt,
|
||||
tools=tools,
|
||||
context_window_size=context_window_size,
|
||||
)
|
||||
|
||||
# Delete from LangGraph checkpointer
|
||||
try:
|
||||
await chatbot.app.checkpointer.adelete_thread(thread_id)
|
||||
await checkpointer.adelete_thread(thread_id)
|
||||
logger.info(f"Deleted thread {thread_id} from LangGraph checkpointer")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
|
|
|
|||
Loading…
Reference in a new issue