From 8cf31077a62100f2cac3595282c225e9a6258fe4 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sun, 22 Mar 2026 01:20:44 +0100 Subject: [PATCH] fixed global RAG and admin consent msft --- modules/datamodels/datamodelKnowledge.py | 45 ++++ .../workspace/routeFeatureWorkspace.py | 252 ++++++++++++++++-- modules/interfaces/interfaceDbChat.py | 20 +- modules/interfaces/interfaceDbKnowledge.py | 54 +++- modules/routes/routeSecurityMsft.py | 112 ++++++-- .../services/serviceAgent/agentLoop.py | 103 ++++++- .../serviceAgent/conversationManager.py | 101 +++++-- .../services/serviceAgent/mainServiceAgent.py | 241 +++++++++++++++++ .../serviceBilling/billingExhaustedNotify.py | 4 +- .../serviceKnowledge/mainServiceKnowledge.py | 94 ++++++- 10 files changed, 950 insertions(+), 76 deletions(-) diff --git a/modules/datamodels/datamodelKnowledge.py b/modules/datamodels/datamodelKnowledge.py index 4bc43500..d03e9d5a 100644 --- a/modules/datamodels/datamodelKnowledge.py +++ b/modules/datamodels/datamodelKnowledge.py @@ -96,6 +96,51 @@ registerModelLabels( ) +class RoundMemory(BaseModel): + """Persistent per-round memory for agent tool results, file refs, and decisions. + + Stored after each agent round so that RAG can retrieve relevant context + even after the ConversationManager summarises older messages away. + """ + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") + workflowId: str = Field(description="FK to the workflow") + roundNumber: int = Field(default=0, description="Agent round that produced this memory") + memoryType: str = Field( + description="Category: file_ref, tool_result, decision, data_source_ref" + ) + key: str = Field(description="Dedup key, e.g. 'readFile:' or 'plan'") + summary: str = Field(default="", description="Compact summary (max ~2000 chars)") + fullData: Optional[str] = Field( + default=None, + description="Full tool output when small enough (max ~8000 chars)", + ) + fileIds: List[str] = Field(default_factory=list, description="Referenced file IDs") + embedding: Optional[List[float]] = Field( + default=None, + description="Embedding of summary for semantic retrieval", + json_schema_extra={"db_type": "vector(1536)"}, + ) + createdAt: float = Field(default_factory=getUtcTimestamp, description="Creation timestamp") + + +registerModelLabels( + "RoundMemory", + {"en": "Round Memory", "fr": "Mémoire de tour"}, + { + "id": {"en": "ID", "fr": "ID"}, + "workflowId": {"en": "Workflow ID", "fr": "ID du workflow"}, + "roundNumber": {"en": "Round Number", "fr": "Numéro de tour"}, + "memoryType": {"en": "Memory Type", "fr": "Type de mémoire"}, + "key": {"en": "Key", "fr": "Clé"}, + "summary": {"en": "Summary", "fr": "Résumé"}, + "fullData": {"en": "Full Data", "fr": "Données complètes"}, + "fileIds": {"en": "File IDs", "fr": "IDs de fichier"}, + "embedding": {"en": "Embedding", "fr": "Vecteur d'embedding"}, + "createdAt": {"en": "Created At", "fr": "Créé le"}, + }, +) + + class WorkflowMemory(BaseModel): """Workflow-scoped key-value cache for entities and facts. Extracted during agent rounds, persisted for cross-round and cross-workflow reuse.""" diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index 6f317e2b..cf8efc04 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -8,6 +8,7 @@ SSE-based endpoints for the agent-driven AI Workspace. import logging import json import asyncio +import uuid from typing import Any, Dict, Optional, List from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, UploadFile, File @@ -22,6 +23,7 @@ from modules.interfaces import interfaceDbChat, interfaceDbManagement from modules.interfaces.interfaceAiObjects import AiObjects from modules.serviceCenter.core.serviceStreaming import get_event_manager from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentEventTypeEnum, PendingFileEdit +from modules.shared.timeUtils import parseTimestamp logger = logging.getLogger(__name__) @@ -245,8 +247,120 @@ def _buildFeatureDataSourceContext(featureDataSourceIds: List[str]) -> str: return "\n".join(parts) if found else "" +def _workspaceFilesToChatDocuments(dbMgmt, fileIds: List[str]) -> List[Dict[str, Any]]: + """Build ChatDocument payloads for workspace files referenced on the user message.""" + documents: List[Dict[str, Any]] = [] + for fid in fileIds or []: + try: + fr = dbMgmt.getFile(fid) + if not fr: + logger.warning(f"Workspace user message: file {fid} not found, skipping attachment record") + continue + fd = fr if isinstance(fr, dict) else fr.model_dump() + documents.append({ + "id": str(uuid.uuid4()), + "fileId": fd.get("id") or fid, + "fileName": fd.get("fileName") or "file", + "fileSize": int(fd.get("fileSize") or 0), + "mimeType": fd.get("mimeType") or "application/octet-stream", + "roundNumber": 0, + "taskNumber": 0, + "actionNumber": 0, + }) + except Exception as e: + logger.warning(f"Workspace user message: could not load file {fid}: {e}") + return documents + + +def _buildWorkspaceAttachmentLabel(chatService: Any, dataSourceIds: List[str], featureDataSourceIds: List[str]) -> str: + """Short human-readable line for non-file attachments (data sources) on the user message.""" + parts: List[str] = [] + dsLabels: List[str] = [] + for dsId in dataSourceIds or []: + try: + ds = chatService.getDataSource(dsId) if chatService and hasattr(chatService, "getDataSource") else None + if ds: + label = ds.get("label") or ds.get("path") or dsId[:8] + dsLabels.append(str(label)) + except Exception as e: + logger.debug(f"Label for data source {dsId}: {e}") + if dsLabels: + parts.append("Datenquellen: " + ", ".join(dsLabels)) + + fdsLabels: List[str] = [] + for fdsId in featureDataSourceIds or []: + try: + from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource + from modules.interfaces.interfaceDbApp import getRootInterface + rootIf = getRootInterface() + records = rootIf.db.getRecordset(FeatureDataSource, recordFilter={"id": fdsId}) + if records: + fds = records[0] + tbl = fds.get("tableName") or "" + lbl = fds.get("label") or tbl + fdsLabels.append(f"{tbl} ({lbl})".strip() if tbl else str(lbl)) + except Exception as e: + logger.debug(f"Label for feature data source {fdsId}: {e}") + if fdsLabels: + parts.append("Feature-Daten: " + ", ".join(fdsLabels)) + + return " | ".join(parts) + + +def _workspaceMessageToClientDict(msg: Any) -> Dict[str, Any]: + """Serialize ChatMessage (or dict) for workspace GET /messages including documents.""" + if isinstance(msg, dict): + raw = dict(msg) + elif hasattr(msg, "model_dump"): + raw = msg.model_dump() + elif hasattr(msg, "dict"): + raw = msg.dict() + else: + raw = { + "id": getattr(msg, "id", None), + "workflowId": getattr(msg, "workflowId", None), + "role": getattr(msg, "role", ""), + "message": getattr(msg, "message", None) or getattr(msg, "content", None), + "publishedAt": getattr(msg, "publishedAt", None), + "sequenceNr": getattr(msg, "sequenceNr", None), + "documentsLabel": getattr(msg, "documentsLabel", None), + "documents": getattr(msg, "documents", None) or [], + } + if raw.get("message") is not None and raw.get("content") is None: + raw["content"] = raw["message"] + docs = raw.get("documents") or [] + serialized_docs: List[Dict[str, Any]] = [] + for doc in docs: + if isinstance(doc, dict): + serialized_docs.append(doc) + elif hasattr(doc, "model_dump"): + serialized_docs.append(doc.model_dump()) + elif hasattr(doc, "dict"): + serialized_docs.append(doc.dict()) + else: + serialized_docs.append({ + "id": getattr(doc, "id", ""), + "messageId": getattr(doc, "messageId", ""), + "fileId": getattr(doc, "fileId", ""), + "fileName": getattr(doc, "fileName", ""), + "fileSize": getattr(doc, "fileSize", 0), + "mimeType": getattr(doc, "mimeType", ""), + "roundNumber": getattr(doc, "roundNumber", None), + "taskNumber": getattr(doc, "taskNumber", None), + "actionNumber": getattr(doc, "actionNumber", None), + "actionId": getattr(doc, "actionId", None), + }) + raw["documents"] = serialized_docs + return raw + + def _loadConversationHistory(chatInterface, workflowId: str, currentPrompt: str) -> List[Dict[str, str]]: - """Load prior messages from DB for follow-up context, excluding the current prompt.""" + """Load prior messages from DB for follow-up context, excluding the current prompt. + + File documents attached to user messages are serialized as a short + ``[Attached files: …]`` block appended to the message content so the + agent sees which files a previous prompt referred to. + """ try: rawMessages = chatInterface.getMessages(workflowId) or [] except Exception as e: @@ -258,17 +372,55 @@ def _loadConversationHistory(chatInterface, workflowId: str, currentPrompt: str) if isinstance(msg, dict): role = msg.get("role", "") content = msg.get("message", "") or msg.get("content", "") + docs = msg.get("documents") or [] + docsLabel = msg.get("documentsLabel") or "" else: role = getattr(msg, "role", "") content = getattr(msg, "message", "") or getattr(msg, "content", "") - if role in ("user", "assistant") and content: - history.append({"role": role, "content": content}) + docs = getattr(msg, "documents", None) or [] + docsLabel = getattr(msg, "documentsLabel", "") or "" + if role not in ("user", "assistant"): + continue + if not content and not docs: + continue + + enriched = content or "" + + if role == "user" and docs: + fileParts = [] + for doc in docs: + if isinstance(doc, dict): + fName = doc.get("fileName", "") + fId = doc.get("fileId", "") + fMime = doc.get("mimeType", "") + fSize = doc.get("fileSize", 0) + elif hasattr(doc, "fileName"): + fName = getattr(doc, "fileName", "") + fId = getattr(doc, "fileId", "") + fMime = getattr(doc, "mimeType", "") + fSize = getattr(doc, "fileSize", 0) + else: + continue + if fId or fName: + fileParts.append(f" - {fName} (id: {fId}, type: {fMime}, size: {fSize} bytes)") + if fileParts: + enriched += "\n\n[Attached files]\n" + "\n".join(fileParts) + + if role == "user" and docsLabel: + enriched += f"\n[Attachments: {docsLabel}]" + + if enriched.strip(): + history.append({"role": role, "content": enriched}) if not history: return [] # Drop the last user message if it matches the current prompt (already added by the agent loop) - if history[-1]["role"] == "user" and history[-1]["content"].strip() == currentPrompt.strip(): + lastContent = history[-1].get("content", "").strip() + currentStripped = currentPrompt.strip() + if history[-1]["role"] == "user" and ( + lastContent == currentStripped or lastContent.startswith(currentStripped) + ): history = history[:-1] if history: @@ -276,6 +428,36 @@ def _loadConversationHistory(chatInterface, workflowId: str, currentPrompt: str) return history +def _collectPriorFileIds(chatInterface, workflowId: str) -> List[str]: + """Collect fileIds from all prior user messages in the workflow. + + Returns a deduplicated list of file IDs so follow-up prompts + can reference files that were attached to earlier messages. + """ + try: + rawMessages = chatInterface.getMessages(workflowId) or [] + except Exception: + return [] + + seen: set = set() + result: List[str] = [] + for msg in rawMessages: + if isinstance(msg, dict): + role = msg.get("role", "") + docs = msg.get("documents") or [] + else: + role = getattr(msg, "role", "") + docs = getattr(msg, "documents", None) or [] + if role != "user": + continue + for doc in docs: + fid = doc.get("fileId", "") if isinstance(doc, dict) else getattr(doc, "fileId", "") + if fid and fid not in seen: + seen.add(fid) + result.append(fid) + return result + + async def _deriveWorkflowName(prompt: str, aiService) -> str: """Use AI to generate a concise workflow title from the user prompt.""" from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum @@ -344,11 +526,36 @@ async def streamWorkspaceStart( queueId = f"workspace-{workflowId}" eventManager.create_queue(queueId) - chatInterface.createMessage({ + dbMgmt = _getDbManagement(context, featureInstanceId=instanceId) + userDocuments = _workspaceFilesToChatDocuments(dbMgmt, userInput.fileIds or []) + + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + + svcCtx = ServiceCenterContext( + user=context.user, + mandate_id=mandateId or "", + feature_instance_id=instanceId, + workflow_id=workflowId, + ) + chatSvc = getService("chat", svcCtx) + attachmentLabel = _buildWorkspaceAttachmentLabel( + chatSvc, + userInput.dataSourceIds or [], + userInput.featureDataSourceIds or [], + ) + + userMessageData: Dict[str, Any] = { "workflowId": workflowId, "role": "user", "message": userInput.prompt, - }) + } + if userDocuments: + userMessageData["documents"] = userDocuments + if attachmentLabel: + userMessageData["documentsLabel"] = attachmentLabel + + chatInterface.createMessage(userMessageData) agentTask = asyncio.ensure_future( _runWorkspaceAgent( @@ -472,6 +679,18 @@ async def _runWorkspaceAgent( conversationHistory = _loadConversationHistory(chatInterface, workflowId, prompt) + priorFileIds = _collectPriorFileIds(chatInterface, workflowId) + currentFileIdSet = set(fileIds or []) + mergedFileIds = list(fileIds or []) + for pf in priorFileIds: + if pf not in currentFileIdSet: + mergedFileIds.append(pf) + if len(mergedFileIds) > len(fileIds or []): + logger.info( + f"Merged {len(mergedFileIds) - len(fileIds or [])} prior file(s) into agent context " + f"(total: {len(mergedFileIds)}) for workflow {workflowId}" + ) + accumulatedText = "" messagePersisted = False @@ -483,7 +702,7 @@ async def _runWorkspaceAgent( async for event in agentService.runAgent( prompt=enrichedPrompt, - fileIds=fileIds, + fileIds=mergedFileIds, workflowId=workflowId, userLanguage=userLanguage, conversationHistory=conversationHistory, @@ -755,17 +974,14 @@ async def getWorkspaceMessages( _validateInstanceAccess(instanceId, context) chatInterface = _getChatInterface(context, featureInstanceId=instanceId) messages = chatInterface.getMessages(workflowId) or [] - items = [] - for msg in messages: - if isinstance(msg, dict): - items.append(msg) - else: - items.append({ - "id": getattr(msg, "id", None), - "role": getattr(msg, "role", ""), - "content": getattr(msg, "message", "") or getattr(msg, "content", ""), - "createdAt": getattr(msg, "publishedAt", None) or getattr(msg, "createdAt", None), - }) + items = [_workspaceMessageToClientDict(m) for m in messages] + items.sort( + key=lambda m: ( + parseTimestamp(m.get("publishedAt"), default=0) or 0, + m.get("sequenceNr") or 0, + str(m.get("id") or ""), + ) + ) return JSONResponse({"messages": items}) diff --git a/modules/interfaces/interfaceDbChat.py b/modules/interfaces/interfaceDbChat.py index 9ad072ad..f2a3d4c6 100644 --- a/modules/interfaces/interfaceDbChat.py +++ b/modules/interfaces/interfaceDbChat.py @@ -885,7 +885,7 @@ class ChatObjects: "role": msg.get("role", "assistant"), "status": msg.get("status", "step"), "sequenceNr": msg.get("sequenceNr", 0), - "publishedAt": msg.get("publishedAt", msg.get("timestamp", getUtcTimestamp())), + "publishedAt": msg.get("publishedAt") or msg.get("_createdAt") or msg.get("timestamp") or 0, "success": msg.get("success"), "actionId": msg.get("actionId"), "actionMethod": msg.get("actionMethod"), @@ -899,8 +899,15 @@ class ChatObjects: # Apply default sorting by publishedAt if no sort specified. # Use parseTimestamp to tolerate mixed DB types (float/string) on INT. + # Tie-break with sequenceNr then id so order matches conversation flow. if pagination is None or not pagination.sort: - messageDicts.sort(key=lambda x: parseTimestamp(x.get("publishedAt"), default=0)) + messageDicts.sort( + key=lambda x: ( + parseTimestamp(x.get("publishedAt"), default=0) or 0, + x.get("sequenceNr") or 0, + str(x.get("id") or ""), + ) + ) # Apply filtering (if filters provided) if pagination and pagination.filters: @@ -1039,6 +1046,15 @@ class ChatObjects: if "actionNumber" not in messageData: messageData["actionNumber"] = workflow.currentAction + + if not messageData.get("publishedAt"): + messageData["publishedAt"] = getUtcTimestamp() + + if not messageData.get("sequenceNr"): + existing = self._getRecordset( + ChatMessage, recordFilter={"workflowId": workflowId} + ) + messageData["sequenceNr"] = len(existing) + 1 # Note: Chat data is user-owned, no mandate/featureInstance context stored # mandateId/featureInstanceId removed from ChatMessage model diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py index fa83f1c8..c8a597df 100644 --- a/modules/interfaces/interfaceDbKnowledge.py +++ b/modules/interfaces/interfaceDbKnowledge.py @@ -10,7 +10,7 @@ import logging from typing import Dict, Any, List, Optional from modules.connectors.connectorDbPostgre import _get_cached_connector -from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk, WorkflowMemory +from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk, RoundMemory, WorkflowMemory from modules.datamodels.datamodelUam import User from modules.shared.configuration import APP_CONFIG from modules.shared.timeUtils import getUtcTimestamp @@ -125,6 +125,58 @@ class KnowledgeObjects: count += 1 return count + # ========================================================================= + # RoundMemory CRUD + # ========================================================================= + + def storeRoundMemory(self, memory: RoundMemory) -> Dict[str, Any]: + """Create or update a RoundMemory entry (upsert by id).""" + data = memory.model_dump() + existing = self.db._loadRecord(RoundMemory, memory.id) + if existing: + return self.db.recordModify(RoundMemory, memory.id, data) + return self.db.recordCreate(RoundMemory, data) + + def getRoundMemories(self, workflowId: str) -> List[Dict[str, Any]]: + """Get all RoundMemory entries for a workflow, sorted by roundNumber.""" + records = self.db.getRecordset(RoundMemory, recordFilter={"workflowId": workflowId}) + records.sort(key=lambda r: r.get("roundNumber", 0)) + return records + + def getRoundMemoriesByType( + self, workflowId: str, memoryType: str + ) -> List[Dict[str, Any]]: + """Get RoundMemory entries filtered by type (e.g. 'file_ref').""" + return self.db.getRecordset( + RoundMemory, recordFilter={"workflowId": workflowId, "memoryType": memoryType} + ) + + def semanticSearchRoundMemory( + self, + queryVector: List[float], + workflowId: str, + limit: int = 10, + minScore: float = None, + ) -> List[Dict[str, Any]]: + """Semantic search across RoundMemory entries for a workflow.""" + return self.db.semanticSearch( + modelClass=RoundMemory, + vectorColumn="embedding", + queryVector=queryVector, + limit=limit, + recordFilter={"workflowId": workflowId}, + minScore=minScore, + ) + + def deleteRoundMemories(self, workflowId: str) -> int: + """Delete all RoundMemory entries for a workflow. Returns count.""" + entries = self.db.getRecordset(RoundMemory, recordFilter={"workflowId": workflowId}) + count = 0 + for entry in entries: + if self.db.recordDelete(RoundMemory, entry["id"]): + count += 1 + return count + # ========================================================================= # WorkflowMemory CRUD # ========================================================================= diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py index 00c9c415..d7fac372 100644 --- a/modules/routes/routeSecurityMsft.py +++ b/modules/routes/routeSecurityMsft.py @@ -100,6 +100,11 @@ def _admin_consent_redirect_uri() -> str: return "" +def _msft_data_admin_consent_scope_param() -> str: + """Space-separated delegated Graph scopes (not .default) for v2.0/adminconsent.""" + return " ".join(f"https://graph.microsoft.com/{s}" for s in msftDataScopes) + + @router.get("/auth/login") @limiter.limit("5/minute") def auth_login(request: Request) -> RedirectResponse: @@ -449,26 +454,46 @@ async def auth_connect_callback( @router.get("/adminconsent") @limiter.limit("5/minute") def adminconsent(request: Request) -> RedirectResponse: + """Tenant admin grants delegated Graph permissions for the Data app (msftDataScopes only). + + Uses the v2.0 admin consent endpoint (not /oauth2/v2.0/authorize with prompt=admin_consent, + which returns AADSTS901001). The ``scope`` parameter limits consent to the listed delegated + permissions instead of every API permission on the app registration. + """ _require_msft_data_config() - admin_consent_redirect = _admin_consent_redirect_uri() - admin_consent_url = ( - f"{AUTHORITY}/adminconsent" - f"?client_id={DATA_CLIENT_ID}" - f"&redirect_uri={quote(admin_consent_redirect, safe='')}" + redirect_uri = _admin_consent_redirect_uri() + if not redirect_uri: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Could not derive admin consent redirect URI from Service_MSFT_DATA_REDIRECT_URI", + ) + state_jwt = _issue_oauth_state({"flow": "admin_consent"}) + scope_param = _msft_data_admin_consent_scope_param() + # /v2.0/adminconsent rejects /common and /consumers — use the concrete tenant + # or fall back to /organizations (lets Microsoft resolve to the admin's tenant). + consent_tenant = TENANT_ID if TENANT_ID not in ("common", "consumers") else "organizations" + consent_authority = f"https://login.microsoftonline.com/{consent_tenant}" + admin_url = ( + f"{consent_authority}/v2.0/adminconsent" + f"?client_id={quote(DATA_CLIENT_ID, safe='')}" + f"&redirect_uri={quote(redirect_uri, safe='')}" + f"&scope={quote(scope_param, safe='')}" + f"&state={quote(state_jwt, safe='')}" ) - logger.info(f"Redirecting to admin consent URL for tenant: {TENANT_ID}") - return RedirectResponse(admin_consent_url) + logger.info(f"Redirecting to v2.0 admin consent for tenant: {TENANT_ID}") + return RedirectResponse(admin_url) @router.get("/adminconsent/callback") def adminconsent_callback( + request: Request, + state: Optional[str] = Query(None, description="OAuth state JWT returned by Microsoft"), admin_consent: Optional[str] = Query(None), tenant: Optional[str] = Query(None), error: Optional[str] = Query(None), error_description: Optional[str] = Query(None), - request: Request = None, ) -> HTMLResponse: - """Handle Microsoft Admin Consent callback""" + """Handle v2.0/adminconsent redirect (admin_consent=True, tenant=..., state=...).""" try: if error: logger.error(f"Admin consent error: {error} - {error_description}") @@ -485,29 +510,72 @@ def adminconsent_callback( """, status_code=400, ) - if admin_consent == "True" and tenant: - logger.info(f"Admin consent granted for tenant: {tenant}") + + if not state: + logger.error("Admin consent success callback missing state") return HTMLResponse( - content=f""" + content=""" - Admin Consent Successful + Admin Consent Failed -

Admin Consent Successful

-

Tenant: {tenant}

- +

Admin Consent Failed

+

Parameter „state“ fehlt.

- """ + """, + status_code=400, ) + + state_data = _parse_oauth_state(state) + if state_data.get("flow") != "admin_consent": + raise HTTPException(status_code=400, detail="Invalid OAuth flow for this callback") + + granted = str(admin_consent or "").strip().lower() in ("true", "1", "yes") + if not granted: + logger.error("Admin consent callback missing admin_consent=true") + return HTMLResponse( + content=""" + + Admin Consent Failed + +

Admin Consent Failed

+

Die Administratorzustimmung wurde nicht bestätigt (admin_consent fehlt oder ist falsch).

+ + + """, + status_code=400, + ) + if not tenant: + logger.error("Admin consent callback missing tenant id") + return HTMLResponse( + content=""" + + Admin Consent Failed + +

Admin Consent Failed

+

Keine Tenant-ID in der Antwort (tenant fehlt).

+ + + """, + status_code=400, + ) + + logger.info(f"Admin consent granted for tenant: {tenant}") return HTMLResponse( content=f""" - -

Admin Consent Status

-

Admin Consent: {admin_consent or 'Not provided'}

-

Tenant: {tenant or 'Not provided'}

- + + Admin Consent Successful + +

Admin Consent Successful

+

Die Berechtigungen wurden für den Tenant erteilt.

+

Tenant: {tenant}

+ + + """ ) + except HTTPException: + raise except Exception as e: logger.error(f"Error in admin consent callback: {str(e)}", exc_info=True) return HTMLResponse( diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py index 3569d330..69fe31b2 100644 --- a/modules/serviceCenter/services/serviceAgent/agentLoop.py +++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py @@ -43,6 +43,8 @@ async def runAgentLoop( aiCallStreamFn: Callable = None, userLanguage: str = "", conversationHistory: List[Dict[str, Any]] = None, + persistRoundMemoryFn: Callable[..., Awaitable[None]] = None, + getExternalMemoryKeysFn: Callable[[], List[str]] = None, ) -> AsyncGenerator[AgentEvent, None]: """Run the agent loop. Yields AgentEvent for each step (SSE-ready). @@ -59,6 +61,9 @@ async def runAgentLoop( mandateId: Mandate ID for RAG scoping userLanguage: ISO 639-1 language code for agent responses conversationHistory: Prior messages [{role, content/message}] for follow-up context + persistRoundMemoryFn: Optional callback to persist round memories after tool execution + getExternalMemoryKeysFn: Optional callback that returns RoundMemory keys for + this workflow, used by summarization to de-duplicate persisted facts """ state = AgentState(workflowId=workflowId, maxRounds=config.maxRounds) trace = AgentTrace( @@ -79,7 +84,7 @@ async def runAgentLoop( conversation = ConversationManager(systemPrompt) if conversationHistory: conversation.loadHistory(conversationHistory) - conversation.addUserMessage(prompt) + conversation.addUserMessage(prompt, isCurrentPrompt=True) while state.status == AgentStatusEnum.RUNNING and state.currentRound < state.maxRounds: await asyncio.sleep(0) @@ -142,7 +147,15 @@ async def runAgentLoop( state.totalAiCalls += 1 return resp.content - await conversation.summarize(state.currentRound, _summarizeCall) + memKeys: List[str] = [] + if getExternalMemoryKeysFn: + try: + memKeys = getExternalMemoryKeysFn() + except Exception: + pass + await conversation.summarize( + state.currentRound, _summarizeCall, externalMemoryKeys=memKeys or None + ) # AI call aiRequest = AiCallRequest( @@ -307,6 +320,18 @@ async def runAgentLoop( ] conversation.addToolResults(toolResultMessages) + # Persist round memories (file refs, tool results, decisions) + if persistRoundMemoryFn: + try: + await persistRoundMemoryFn( + toolCalls=toolCalls, + results=results, + textContent=textContent, + roundNumber=state.currentRound, + ) + except Exception as memErr: + logger.warning(f"RoundMemory persist failed (non-blocking): {memErr}") + roundLog.durationMs = int((time.time() - roundStartTime) * 1000) trace.rounds.append(roundLog) @@ -501,6 +526,80 @@ def _buildProgressSummary(state: AgentState, reason: str) -> str: ) +_FILE_REF_TOOLS = {"readFile", "readContentObjects", "describeImage", "listFiles"} +_DATA_SOURCE_TOOLS = {"browseDataSource", "searchDataSource", "downloadFromDataSource"} +_DECISION_TOOLS = {"writeFile", "replaceInFile"} + + +def _classifyToolResult( + tc: ToolCallRequest, result: ToolResult +) -> Optional[Dict[str, Any]]: + """Classify a successful tool result into a RoundMemory dict. + + Returns a dict with keys {memoryType, key, summary, fullData, fileIds} + or None if the result is not worth persisting. + """ + name = tc.name + data = result.data or "" + + if len(data) < 50: + return None + + truncSummary = data[:2000] + fullData = data if len(data) < 8000 else None + fileId = tc.args.get("fileId", "") + fileIds = [fileId] if fileId else [] + + if name in _FILE_REF_TOOLS: + return { + "memoryType": "file_ref", + "key": f"{name}:{fileId}" if fileId else name, + "summary": truncSummary, + "fullData": fullData, + "fileIds": fileIds, + } + + if name in _DATA_SOURCE_TOOLS: + dsId = tc.args.get("dataSourceId", "") or tc.args.get("featureDataSourceId", "") + path = tc.args.get("path", "") + return { + "memoryType": "data_source_ref", + "key": f"{name}:{dsId}:{path}" if dsId else name, + "summary": truncSummary, + "fullData": fullData, + "fileIds": fileIds, + } + + if name in _DECISION_TOOLS: + return { + "memoryType": "decision", + "key": f"{name}:{fileId}" if fileId else name, + "summary": truncSummary, + "fullData": None, + "fileIds": fileIds, + } + + if name == "queryFeatureInstance": + return { + "memoryType": "tool_result", + "key": f"queryFeatureInstance:{tc.args.get('query', '')[:60]}", + "summary": truncSummary, + "fullData": fullData, + "fileIds": [], + } + + if len(data) > 500: + return { + "memoryType": "tool_result", + "key": f"{name}:{tc.id}", + "summary": truncSummary, + "fullData": fullData, + "fileIds": fileIds, + } + + return None + + _ARTIFACT_TOOLS = {"writeFile", "replaceInFile", "deleteFile", "renameFile", "copyFile", "createFolder", "deleteFolder", "renderDocument", "generateImage"} diff --git a/modules/serviceCenter/services/serviceAgent/conversationManager.py b/modules/serviceCenter/services/serviceAgent/conversationManager.py index 79570c03..fe53a921 100644 --- a/modules/serviceCenter/services/serviceAgent/conversationManager.py +++ b/modules/serviceCenter/services/serviceAgent/conversationManager.py @@ -10,9 +10,9 @@ from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolDefin logger = logging.getLogger(__name__) -FIRST_SUMMARY_ROUND = 4 -META_SUMMARY_ROUND = 7 -KEEP_RECENT_MESSAGES = 4 +FIRST_SUMMARY_ROUND = 6 +META_SUMMARY_ROUND = 10 +KEEP_RECENT_MESSAGES = 6 MAX_ESTIMATED_TOKENS = 60000 _MAX_HISTORY_MESSAGES = 40 _MAX_HISTORY_MSG_CHARS = 12000 @@ -22,9 +22,12 @@ class ConversationManager: """Manages the conversation history and context window for agent runs. Progressive summarization strategy: - - Rounds 1-3: full conversation retained - - Round 4+: older messages compressed into a running summary - - Round 7+: meta-summary replaces prior summaries + - Rounds 1-5: full conversation retained + - Round 6+: older messages compressed into a running summary + - Round 10+: meta-summary replaces prior summaries + Long-term facts (file refs, tool results, decisions) are persisted + externally in RoundMemory and retrieved via RAG, so the summary + can focus on reasoning and relationships. Supports RAG context injection before each round via injectRagContext.""" def __init__(self, systemPrompt: str): @@ -69,9 +72,19 @@ class ConversationManager: for msg in self._messages ] - def addUserMessage(self, content: str): - """Add a user message.""" - self._messages.append({"role": "user", "content": content}) + def addUserMessage(self, content: str, isCurrentPrompt: bool = False): + """Add a user message. + + Args: + content: Message text. + isCurrentPrompt: If True, this message is the user's current + task prompt and will never be removed by progressive + summarization. + """ + msg: Dict[str, Any] = {"role": "user", "content": content} + if isCurrentPrompt: + msg["_isCurrentPrompt"] = True + self._messages.append(msg) def addAssistantMessage(self, content: str, toolCalls: List[Dict[str, Any]] = None): """Add an assistant message, optionally with tool calls.""" @@ -135,8 +148,8 @@ class ConversationManager: """Check if progressive summarization should be triggered. Triggers: - - At round FIRST_SUMMARY_ROUND (4) if not yet summarized - - At round META_SUMMARY_ROUND (7) for meta-summary + - At round FIRST_SUMMARY_ROUND (6) if not yet summarized + - At round META_SUMMARY_ROUND (10) for meta-summary - Every 5 rounds after that - When estimated token count exceeds MAX_ESTIMATED_TOKENS """ @@ -149,12 +162,23 @@ class ConversationManager: return True return False - async def summarize(self, currentRound: int, aiCallFn) -> Optional[str]: + async def summarize( + self, + currentRound: int, + aiCallFn, + externalMemoryKeys: List[str] = None, + ) -> Optional[str]: """Perform progressive summarization of older messages. - Rounds 1-3: full history retained, no summarization. - Round 4+: compress older messages into a running summary. - Round 7+: meta-summary that consolidates prior summaries. + Rounds 1-5: full history retained, no summarization. + Round 6+: compress older messages into a running summary. + Round 10+: meta-summary that consolidates prior summaries. + + Args: + currentRound: Current agent round number. + aiCallFn: Async function that takes a prompt string and returns summary text. + externalMemoryKeys: Keys of RoundMemory entries for this workflow, + so the summary prompt can de-duplicate already-persisted facts. """ if currentRound < FIRST_SUMMARY_ROUND and self.estimateTokenCount() <= MAX_ESTIMATED_TOKENS: return None @@ -184,11 +208,25 @@ class ConversationManager: messagesToSummarize = nonSystemMessages[:splitIdx] recentMessages = nonSystemMessages[splitIdx:] + # Protect the current user prompt: it must NEVER be summarized away. + promptInRecent = any(m.get("_isCurrentPrompt") for m in recentMessages) + if not promptInRecent: + for i, m in enumerate(messagesToSummarize): + if m.get("_isCurrentPrompt"): + recentMessages = messagesToSummarize[i:] + recentMessages + messagesToSummarize = messagesToSummarize[:i] + break + if not messagesToSummarize: + return None + summaryInput = _formatMessagesForSummary(messagesToSummarize) previousSummary = self._summaries[-1]["content"] if self._summaries else "" isMetaSummary = currentRound >= META_SUMMARY_ROUND and len(self._summaries) >= 2 - summaryPrompt = _buildSummaryPrompt(summaryInput, previousSummary, isMetaSummary) + summaryPrompt = _buildSummaryPrompt( + summaryInput, previousSummary, isMetaSummary, + externalMemoryKeys=externalMemoryKeys, + ) try: summaryText = await aiCallFn(summaryPrompt) @@ -241,8 +279,30 @@ def _formatMessagesForSummary(messages: List[Dict[str, Any]]) -> str: return "\n\n".join(parts) -def _buildSummaryPrompt(messagesText: str, previousSummary: str, isMetaSummary: bool = False) -> str: - """Build the prompt for progressive summarization.""" +def _buildSummaryPrompt( + messagesText: str, + previousSummary: str, + isMetaSummary: bool = False, + externalMemoryKeys: List[str] = None, +) -> str: + """Build the prompt for progressive summarization. + + When externalMemoryKeys is provided, the summary prompt tells the AI + that those facts are preserved in external memory and need not be + repeated verbatim — the summary can focus on reasoning, decisions, + and relationships instead. + """ + externalHint = "" + if externalMemoryKeys: + keyList = ", ".join(externalMemoryKeys[:20]) + externalHint = ( + "NOTE: The following facts are preserved in external persistent memory " + "and do NOT need to be repeated in detail in the summary: " + f"[{keyList}]. " + "Focus on reasoning, decisions, relationships, and anything that is " + "NOT captured by those external memory entries.\n\n" + ) + if isMetaSummary: prompt = ( "Create a comprehensive meta-summary consolidating the previous summary " @@ -251,10 +311,11 @@ def _buildSummaryPrompt(messagesText: str, previousSummary: str, isMetaSummary: ) else: prompt = ( - "Summarize the following conversation concisely. Preserve all key facts, " - "decisions, entities (names, numbers, dates), and tool results. " + "Summarize the following conversation concisely. Preserve key decisions, " + "reasoning chains, entities (names, numbers, dates), and action outcomes. " "Do not lose any important information.\n\n" ) + prompt += externalHint if previousSummary: prompt += f"Previous Summary:\n{previousSummary}\n\n" prompt += f"New Messages to Summarize:\n{messagesText}\n\nProvide a concise, factual summary:" diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index 905621b9..cec64813 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -142,6 +142,8 @@ class AgentService: aiCallStreamFn = self._createAiCallStreamFn() getWorkflowCostFn = self._createGetWorkflowCostFn(workflowId) buildRagContextFn = self._createBuildRagContextFn() + persistRoundMemoryFn = self._createPersistRoundMemoryFn(workflowId) + getExternalMemoryKeysFn = self._createGetExternalMemoryKeysFn(workflowId) async for event in runAgentLoop( prompt=enrichedPrompt, @@ -157,6 +159,8 @@ class AgentService: aiCallStreamFn=aiCallStreamFn, userLanguage=resolvedLanguage, conversationHistory=conversationHistory, + persistRoundMemoryFn=persistRoundMemoryFn, + getExternalMemoryKeysFn=getExternalMemoryKeysFn, ): if event.type == AgentEventTypeEnum.AGENT_SUMMARY: await self._persistTrace(workflowId, event.data or {}) @@ -347,18 +351,120 @@ class AgentService: ) -> str: try: knowledgeService = self.services.getService("knowledge") + workflowHintItems = _buildWorkflowHintItems( + self.services, workflowId + ) return await knowledgeService.buildAgentContext( currentPrompt=currentPrompt, workflowId=workflowId, userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, + workflowHintItems=workflowHintItems, ) except Exception as e: logger.debug(f"RAG context not available: {e}") return "" return _buildRagContext + def _createPersistRoundMemoryFn(self, workflowId: str): + """Create callback that persists RoundMemory entries after tool execution.""" + from modules.serviceCenter.services.serviceAgent.agentLoop import _classifyToolResult + from modules.datamodels.datamodelKnowledge import RoundMemory + + async def _persistRoundMemory( + toolCalls, results, textContent: str, roundNumber: int + ): + try: + knowledgeService = self.services.getService("knowledge") + except Exception: + return + knowledgeDb = knowledgeService._knowledgeDb + + for tc, result in zip(toolCalls, results): + if not result.success: + continue + classified = _classifyToolResult(tc, result) + if not classified: + continue + + summary = classified["summary"] + embedding = await knowledgeService._embedSingle(summary[:500]) if summary else [] + + mem = RoundMemory( + workflowId=workflowId, + roundNumber=roundNumber, + memoryType=classified["memoryType"], + key=classified["key"], + summary=summary, + fullData=classified.get("fullData"), + fileIds=classified.get("fileIds", []), + embedding=embedding if embedding else None, + ) + knowledgeDb.storeRoundMemory(mem) + + return _persistRoundMemory + + def _createGetExternalMemoryKeysFn(self, workflowId: str): + """Create callback that returns RoundMemory keys for summarization hints.""" + def _getKeys() -> List[str]: + try: + knowledgeService = self.services.getService("knowledge") + memories = knowledgeService._knowledgeDb.getRoundMemories(workflowId) + return [m.get("key", "") for m in memories if m.get("key")] + except Exception: + return [] + return _getKeys + + +def _buildWorkflowHintItems( + services, currentWorkflowId: str +) -> List[Dict[str, Any]]: + """Build a compact list of other workflows for the RAG cross-workflow hint. + + Returns key-value items like: + key="Pendenzenliste Excel (3 msgs)" value="last: 2h ago" + Limited to 10 most recent other workflows to keep the hint small. + """ + try: + chatInterface = services.chat.interfaceDbChat + allWorkflows = chatInterface.getWorkflows() or [] + except Exception: + return [] + + others = [w for w in allWorkflows if w.get("id") != currentWorkflowId] + if not others: + return [] + + import time as _time + now = _time.time() + others.sort(key=lambda w: w.get("_createdAt") or w.get("startedAt") or 0, reverse=True) + others = others[:10] + + items = [] + for wf in others: + name = wf.get("name") or "(unnamed)" + createdAt = wf.get("_createdAt") or wf.get("startedAt") or 0 + ageSec = now - createdAt if createdAt else 0 + if ageSec < 3600: + ageStr = f"{int(ageSec / 60)}m ago" + elif ageSec < 86400: + ageStr = f"{int(ageSec / 3600)}h ago" + else: + ageStr = f"{int(ageSec / 86400)}d ago" + + wfId = wf.get("id", "") + items.append({ + "key": f"{name} (id: {wfId})", + "value": ageStr, + }) + + countLabel = f"{len(allWorkflows) - 1} other conversation(s)" + if len(allWorkflows) - 1 > 10: + countLabel += f" (showing 10 newest)" + items.insert(0, {"key": countLabel, "value": "use listWorkflowHistory to browse"}) + return items + def _getOrCreateTempFolder(chatService) -> Optional[str]: """Return the ID of the root-level 'Temp' folder, creating it if it doesn't exist.""" @@ -2952,3 +3058,138 @@ def _registerCoreTools(registry: ToolRegistry, services): }, readOnly=True ) + + # ---- Cross-workflow tools ---- + + async def _listWorkflowHistory(args: Dict[str, Any], context: Dict[str, Any]) -> ToolResult: + """List all chat workflows in this workspace with metadata.""" + import json as _json + try: + chatService = services.chat + chatInterface = chatService.interfaceDbChat + allWorkflows = chatInterface.getWorkflows() or [] + + allWorkflows.sort( + key=lambda w: w.get("_createdAt") or w.get("startedAt") or 0, + reverse=True, + ) + allWorkflows = allWorkflows[:50] + + items = [] + for wf in allWorkflows: + wfId = wf.get("id", "") + name = wf.get("name") or "(unnamed)" + createdAt = wf.get("_createdAt") or wf.get("startedAt") or 0 + lastActivity = wf.get("lastActivity") or createdAt + + msgs = chatInterface.getMessages(wfId) or [] + messageCount = len(msgs) + lastPreview = "" + if msgs: + lastMsg = msgs[-1] if isinstance(msgs[-1], dict) else ( + msgs[-1].model_dump() if hasattr(msgs[-1], "model_dump") else {} + ) + content = lastMsg.get("message") or lastMsg.get("content") or "" + lastPreview = content[:150] + + items.append({ + "id": wfId, + "name": name, + "createdAt": createdAt, + "lastActivity": lastActivity, + "messageCount": messageCount, + "lastMessagePreview": lastPreview, + }) + + return ToolResult( + toolCallId="", toolName="listWorkflowHistory", + success=True, data=_json.dumps(items, ensure_ascii=False), + ) + except Exception as e: + return ToolResult( + toolCallId="", toolName="listWorkflowHistory", + success=False, error=str(e), + ) + + registry.register( + "listWorkflowHistory", _listWorkflowHistory, + description=( + "List all chat conversations/workflows in this workspace. " + "Returns id, name, createdAt, lastActivity, messageCount, and a preview " + "of the last message for each workflow. Use this to discover previous " + "conversations when the user asks about past chats or wants a summary " + "across conversations." + ), + parameters={ + "type": "object", + "properties": {}, + }, + readOnly=True, + ) + + async def _readWorkflowMessages(args: Dict[str, Any], context: Dict[str, Any]) -> ToolResult: + """Read messages from a specific workflow.""" + import json as _json + targetWorkflowId = args.get("workflowId", "") + limit = int(args.get("limit", 20)) + offset = int(args.get("offset", 0)) + + if not targetWorkflowId: + return ToolResult( + toolCallId="", toolName="readWorkflowMessages", + success=False, error="workflowId is required", + ) + + try: + chatService = services.chat + chatInterface = chatService.interfaceDbChat + allMsgs = chatInterface.getMessages(targetWorkflowId) or [] + + sliced = allMsgs[offset:offset + limit] + items = [] + for msg in sliced: + raw = msg if isinstance(msg, dict) else ( + msg.model_dump() if hasattr(msg, "model_dump") else {} + ) + content = raw.get("message") or raw.get("content") or "" + if len(content) > 2000: + content = content[:2000] + "..." + items.append({ + "role": raw.get("role", ""), + "message": content, + "publishedAt": raw.get("publishedAt") or raw.get("_createdAt") or 0, + }) + + header = f"Workflow {targetWorkflowId}: {len(allMsgs)} total messages" + if offset > 0 or len(allMsgs) > offset + limit: + header += f" (showing {offset + 1}-{offset + len(sliced)})" + + return ToolResult( + toolCallId="", toolName="readWorkflowMessages", + success=True, + data=header + "\n" + _json.dumps(items, ensure_ascii=False), + ) + except Exception as e: + return ToolResult( + toolCallId="", toolName="readWorkflowMessages", + success=False, error=str(e), + ) + + registry.register( + "readWorkflowMessages", _readWorkflowMessages, + description=( + "Read messages from a specific chat workflow/conversation. " + "Use this after listWorkflowHistory to read the content of a " + "specific past conversation. Supports pagination via offset/limit." + ), + parameters={ + "type": "object", + "properties": { + "workflowId": {"type": "string", "description": "ID of the workflow to read messages from"}, + "limit": {"type": "integer", "description": "Max messages to return (default 20)"}, + "offset": {"type": "integer", "description": "Skip first N messages (default 0)"}, + }, + "required": ["workflowId"], + }, + readOnly=True, + ) diff --git a/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py b/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py index 6c9cb74e..aba08b89 100644 --- a/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py +++ b/modules/serviceCenter/services/serviceBilling/billingExhaustedNotify.py @@ -104,10 +104,12 @@ def maybeEmailMandatePoolExhausted( f"damit Benutzer wieder AI-Funktionen nutzen können.\n" ) escaped = html.escape(body) + # Cannot use '\\n' inside f-string {…} expression (SyntaxError); build replacement outside. + brWithNl = "
" + "\n" htmlMessage = f""" -{escaped.replace(chr(10), '
\n')} +{escaped.replace(chr(10), brWithNl)} """ messaging = getMessagingInterface() diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 91e85da4..d6943c58 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) CHARS_PER_TOKEN = 4 DEFAULT_CHUNK_TOKENS = 400 -DEFAULT_CONTEXT_BUDGET = 8000 +DEFAULT_CONTEXT_BUDGET = 12000 class KnowledgeService: @@ -170,8 +170,18 @@ class KnowledgeService: featureInstanceId: str = "", mandateId: str = "", contextBudget: int = DEFAULT_CONTEXT_BUDGET, + workflowHintItems: List[Dict[str, Any]] = None, ) -> str: - """Build RAG context for an agent round by searching all 3 layers. + """Build RAG context for an agent round by searching all layers. + + Layer priority: + 0 - File refs from RoundMemory (always included so the agent knows + which files exist in this workflow) + 1 - Instance documents (user's own indexed files) + 1.5 - Semantically relevant RoundMemory entries + 2 - Workflow entities + 3 - Shared knowledge + 4 - Cross-workflow hint (other conversations in this workspace) Args: currentPrompt: The current user prompt to find relevant context for. @@ -180,6 +190,8 @@ class KnowledgeService: featureInstanceId: Feature instance scope. mandateId: Mandate scope. contextBudget: Maximum characters for the context string. + workflowHintItems: Optional pre-built list of other workflow summaries + for the cross-workflow hint layer. Returns: Formatted context string for injection into the agent's system prompt. @@ -190,6 +202,21 @@ class KnowledgeService: builder = _ContextBuilder(budget=contextBudget) + # Layer 0: File references from RoundMemory (always included) + fileRefMemories = self._knowledgeDb.getRoundMemoriesByType(workflowId, "file_ref") + if fileRefMemories: + refItems = [ + {"key": m.get("key", ""), "value": m.get("summary", "")[:300]} + for m in fileRefMemories + ] + builder.add( + priority=0, + label="Known Files", + items=refItems, + isKeyValue=True, + maxChars=2000, + ) + # Layer 1: Instance Layer (user's own documents, highest priority) instanceChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, @@ -199,12 +226,43 @@ class KnowledgeService: minScore=0.65, ) if instanceChunks: - builder.add(priority=1, label="Relevant Documents", items=instanceChunks) + builder.add(priority=1, label="Relevant Documents", items=instanceChunks, maxChars=4000) + + # Layer 1.5: Semantically relevant RoundMemory entries + roundMemories = self._knowledgeDb.semanticSearchRoundMemory( + queryVector=queryVector, + workflowId=workflowId, + limit=10, + minScore=0.55, + ) + if roundMemories: + memItems = [] + for m in roundMemories: + data = m.get("fullData") or m.get("summary", "") + memItems.append({ + "data": data, + "contextRef": { + "type": m.get("memoryType", ""), + "key": m.get("key", ""), + "round": m.get("roundNumber", 0), + }, + }) + seen = {m.get("key") for m in fileRefMemories} if fileRefMemories else set() + memItems = [ + mi for mi in memItems if mi["contextRef"].get("key") not in seen + ] + if memItems: + builder.add( + priority=2, + label="Previous Round Context", + items=memItems, + maxChars=4000, + ) # Layer 2: Workflow Layer (current workflow entities & memory) entities = self._knowledgeDb.getWorkflowEntities(workflowId) if entities: - builder.add(priority=2, label="Workflow Context", items=entities, isKeyValue=True) + builder.add(priority=3, label="Workflow Context", items=entities, isKeyValue=True, maxChars=2000) # Layer 3: Shared Layer (mandate-wide shared documents) sharedChunks = self._knowledgeDb.semanticSearch( @@ -215,7 +273,17 @@ class KnowledgeService: minScore=0.7, ) if sharedChunks: - builder.add(priority=3, label="Shared Knowledge", items=sharedChunks) + builder.add(priority=4, label="Shared Knowledge", items=sharedChunks, maxChars=2000) + + # Layer 4: Cross-workflow hint (other conversations in this workspace) + if workflowHintItems: + builder.add( + priority=5, + label="Other Conversations", + items=workflowHintItems, + isKeyValue=True, + maxChars=500, + ) return builder.build() @@ -520,12 +588,14 @@ class _ContextBuilder: label: str, items: List[Dict[str, Any]], isKeyValue: bool = False, + maxChars: int = 0, ): self._sections.append({ "priority": priority, "label": label, "items": items, "isKeyValue": isKeyValue, + "maxChars": maxChars, }) def build(self) -> str: @@ -537,12 +607,15 @@ class _ContextBuilder: if remaining <= 0: break + sectionCap = section.get("maxChars") or remaining + sectionRemaining = min(sectionCap, remaining) + header = f"### {section['label']}\n" sectionText = header - remaining -= len(header) + sectionRemaining -= len(header) for item in section["items"]: - if remaining <= 0: + if sectionRemaining <= 0: break if section["isKeyValue"]: @@ -550,14 +623,15 @@ class _ContextBuilder: else: data = item.get("data", "") ref = item.get("contextRef", {}) - score = item.get("_score", "") refStr = f" [{ref}]" if ref else "" line = f"{data}{refStr}\n" - if len(line) <= remaining: + if len(line) <= sectionRemaining: sectionText += line - remaining -= len(line) + sectionRemaining -= len(line) + consumed = min(sectionCap, remaining) - sectionRemaining + remaining -= consumed parts.append(sectionText) return "\n".join(parts).strip()