diff --git a/app.py b/app.py index 8dce20cb..a2b6f338 100644 --- a/app.py +++ b/app.py @@ -20,8 +20,6 @@ from datetime import datetime from modules.shared.configuration import APP_CONFIG from modules.shared.eventManagement import eventManager -from modules.features.graphicalEditor.emailPoller import start as startGraphicalEditorEmailPoller -from modules.features.graphicalEditor.emailPoller import stop as stopGraphicalEditorEmailPoller from modules.interfaces.interfaceDbApp import getRootInterface from modules.system.registry import loadFeatureMainModules @@ -348,44 +346,22 @@ async def lifespan(app: FastAPI): # --- Init Managers --- import asyncio - from modules.workflows.automation2 import subAutomation2Schedule try: main_loop = asyncio.get_running_loop() eventManager.set_event_loop(main_loop) - subAutomation2Schedule.set_main_loop(main_loop) + from modules.workflows.scheduler.mainScheduler import setMainLoop as setSchedulerMainLoop + setSchedulerMainLoop(main_loop) except RuntimeError: pass - subAutomation2Schedule.start(eventUser) eventManager.start() # Register audit log cleanup scheduler from modules.shared.auditLogger import registerAuditLogCleanupScheduler registerAuditLogCleanupScheduler() - # Ensure billing settings and accounts exist for all mandates - try: - from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface - - billingInterface = getBillingRootInterface() - - # Step 1: Ensure all mandates have billing settings (creates defaults if missing) - settingsCreated = billingInterface.ensureAllMandateSettingsExist() - if settingsCreated > 0: - logger.info(f"Billing startup: Created {settingsCreated} missing mandate billing settings") - - # Step 2: Ensure all users have billing audit accounts - accountsCreated = billingInterface.ensureAllUserAccountsExist() - if accountsCreated > 0: - logger.info(f"Billing startup: Created {accountsCreated} missing user accounts") - - except Exception as e: - logger.warning(f"Failed to ensure billing settings/accounts (non-critical): {e}") - yield # --- Stop Managers --- - stopGraphicalEditorEmailPoller(eventUser) - subAutomation2Schedule.stop(eventUser) eventManager.stop() # --- Stop Feature Containers (Plug&Play) --- diff --git a/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py b/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py index 79ff146e..4dd679bb 100644 --- a/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py +++ b/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py @@ -327,7 +327,7 @@ class GraphicalEditorObjects: return [dict(r) for r in records] if records else [] def getRecentCompletedRuns(self, limit: int = 20) -> List[Dict[str, Any]]: - """Get recently completed runs for workflows in this instance.""" + """Get recent runs (all statuses) for workflows in this instance.""" if not self.db._ensureTableExists(Automation2WorkflowRun): return [] workflows = self.getWorkflows() @@ -336,7 +336,7 @@ class GraphicalEditorObjects: return [] records = self.db.getRecordset( Automation2WorkflowRun, - recordFilter={"status": "completed"}, + recordFilter={}, ) if not records: return [] @@ -634,13 +634,13 @@ class GraphicalEditorObjects: return dict(created) def shareTemplate(self, templateId: str, scope: str) -> Optional[Dict[str, Any]]: - """Share a template by changing its scope and setting sharedReadOnly.""" + """Change a template's scope. Sets sharedReadOnly=True for shared scopes, False for user scope.""" template = self.getWorkflow(templateId) if not template or not template.get("isTemplate"): return None updated = self.db.recordModify(AutoWorkflow, templateId, { "templateScope": scope, - "sharedReadOnly": True, + "sharedReadOnly": scope != "user", }) return dict(updated) diff --git a/modules/features/graphicalEditor/mainGraphicalEditor.py b/modules/features/graphicalEditor/mainGraphicalEditor.py index e2cb5188..ab437a54 100644 --- a/modules/features/graphicalEditor/mainGraphicalEditor.py +++ b/modules/features/graphicalEditor/mainGraphicalEditor.py @@ -25,11 +25,6 @@ FEATURE_LABEL = {"en": "Graphical Editor", "de": "Grafischer Editor", "fr": "Éd FEATURE_ICON = "mdi-sitemap" UI_OBJECTS = [ - { - "objectKey": "ui.feature.graphicalEditor.dashboard", - "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, - "meta": {"area": "dashboard"} - }, { "objectKey": "ui.feature.graphicalEditor.editor", "label": {"en": "Editor", "de": "Editor", "fr": "Éditeur"}, @@ -40,15 +35,20 @@ UI_OBJECTS = [ "label": {"en": "Workflows", "de": "Workflows", "fr": "Workflows"}, "meta": {"area": "workflows"} }, + { + "objectKey": "ui.feature.graphicalEditor.templates", + "label": {"en": "Templates", "de": "Vorlagen", "fr": "Modèles"}, + "meta": {"area": "templates"} + }, { "objectKey": "ui.feature.graphicalEditor.workflows-tasks", "label": {"en": "Tasks", "de": "Tasks", "fr": "Tâches"}, "meta": {"area": "tasks"} }, { - "objectKey": "ui.feature.graphicalEditor.templates", - "label": {"en": "Templates", "de": "Vorlagen", "fr": "Modèles"}, - "meta": {"area": "templates"} + "objectKey": "ui.feature.graphicalEditor.dashboard", + "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, + "meta": {"area": "dashboard"} }, ] @@ -213,7 +213,9 @@ async def onStart(eventUser) -> None: async def onStop(eventUser) -> None: - """Feature shutdown - remove email poller if running.""" + """Feature shutdown - stop scheduler and email poller.""" + from modules.workflows.scheduler.mainScheduler import stop as stopScheduler + stopScheduler() from modules.features.graphicalEditor.emailPoller import stop as stopEmailPoller stopEmailPoller(eventUser) diff --git a/modules/features/graphicalEditor/nodeDefinitions/trustee.py b/modules/features/graphicalEditor/nodeDefinitions/trustee.py index abc1fa79..4d7082ae 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/trustee.py +++ b/modules/features/graphicalEditor/nodeDefinitions/trustee.py @@ -3,6 +3,28 @@ # Pipeline: extractFromFiles -> processDocuments -> syncToAccounting. TRUSTEE_NODES = [ + { + "id": "trustee.refreshAccountingData", + "category": "trustee", + "label": {"en": "Refresh Accounting Data", "de": "Buchhaltungsdaten aktualisieren", "fr": "Actualiser données comptables"}, + "description": { + "en": "Import/refresh accounting data from external system (e.g. Abacus). Skips import if data is fresh unless forceRefresh is set.", + "de": "Buchhaltungsdaten aus externem System importieren/aktualisieren (z.B. Abacus). Überspringt Import wenn Daten frisch sind, ausser forceRefresh ist gesetzt.", + "fr": "Importer/actualiser les données comptables depuis le système externe (ex. Abacus).", + }, + "parameters": [ + {"name": "featureInstanceId", "type": "string", "required": True, "description": {"en": "Trustee feature instance ID", "de": "Trustee Feature-Instanz-ID", "fr": "ID instance Trustee"}}, + {"name": "forceRefresh", "type": "boolean", "required": False, "description": {"en": "Force re-import even if data is fresh (default: false)", "de": "Import erzwingen auch wenn Daten frisch sind", "fr": "Forcer la réimportation"}, "default": False}, + {"name": "dateFrom", "type": "string", "required": False, "description": {"en": "Start date filter (YYYY-MM-DD)", "de": "Startdatum-Filter (JJJJ-MM-TT)", "fr": "Date début (AAAA-MM-JJ)"}, "default": ""}, + {"name": "dateTo", "type": "string", "required": False, "description": {"en": "End date filter (YYYY-MM-DD)", "de": "Enddatum-Filter (JJJJ-MM-TT)", "fr": "Date fin (AAAA-MM-JJ)"}, "default": ""}, + ], + "inputs": 1, + "outputs": 1, + "meta": {"icon": "mdi-database-refresh", "color": "#4CAF50"}, + "_method": "trustee", + "_action": "refreshAccountingData", + "_paramMap": {"featureInstanceId": "featureInstanceId", "forceRefresh": "forceRefresh", "dateFrom": "dateFrom", "dateTo": "dateTo"}, + }, { "id": "trustee.extractFromFiles", "category": "trustee", diff --git a/modules/features/graphicalEditor/nodeRegistry.py b/modules/features/graphicalEditor/nodeRegistry.py index 524b96af..928840a4 100644 --- a/modules/features/graphicalEditor/nodeRegistry.py +++ b/modules/features/graphicalEditor/nodeRegistry.py @@ -70,6 +70,7 @@ def getNodeTypesForApi( {"id": "email", "label": {"en": "Email", "de": "E-Mail", "fr": "Email"}}, {"id": "sharepoint", "label": {"en": "SharePoint", "de": "SharePoint", "fr": "SharePoint"}}, {"id": "clickup", "label": {"en": "ClickUp", "de": "ClickUp", "fr": "ClickUp"}}, + {"id": "trustee", "label": {"en": "Trustee", "de": "Treuhand", "fr": "Fiduciaire"}}, ] return {"nodeTypes": localized, "categories": categories} diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py index caf89c10..3c1f4649 100644 --- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py +++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py @@ -137,15 +137,10 @@ def post_schedule_sync( ) -> dict: """Manually trigger schedule sync (re-register cron jobs for all schedule workflows).""" _validateInstanceAccess(instanceId, context) - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.workflows.automation2.subAutomation2Schedule import sync_automation2_schedule_events + from modules.workflows.scheduler.mainScheduler import syncNow - root = getRootInterface() - event_user = root.getUserByUsername("event") - if not event_user: - return {"success": False, "error": "Event user not available", "synced": 0} - result = sync_automation2_schedule_events(event_user) - return {"success": True, **result} + result = syncNow() + return {"success": True, **(result or {})} @router.get("/{instanceId}/node-types") @@ -249,6 +244,56 @@ async def post_execute( return result +# ------------------------------------------------------------------------- +# Run Tracing SSE Stream +# ------------------------------------------------------------------------- + + +@router.get("/{instanceId}/runs/{runId}/stream") +async def get_run_stream( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + runId: str = Path(..., description="Run ID"), + context: RequestContext = Depends(getRequestContext), +): + """SSE stream for live step-log updates during a workflow run.""" + _validateInstanceAccess(instanceId, context) + + from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager + sseEventManager = get_event_manager() + queueId = f"run-trace-{runId}" + sseEventManager.create_queue(queueId) + + async def _sseGenerator(): + queue = sseEventManager.get_queue(queueId) + if not queue: + return + while True: + try: + event = await asyncio.wait_for(queue.get(), timeout=30) + except asyncio.TimeoutError: + yield "data: {\"type\": \"keepalive\"}\n\n" + continue + if event is None: + break + payload = event.get("data", event) if isinstance(event, dict) else event + yield f"data: {json.dumps(payload, default=str)}\n\n" + eventType = payload.get("type", "") if isinstance(payload, dict) else "" + if eventType in ("run_complete", "run_failed"): + break + await sseEventManager.cleanup(queueId, delay=10) + + return StreamingResponse( + _sseGenerator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + # ------------------------------------------------------------------------- # Versions (AutoVersion Lifecycle) # ------------------------------------------------------------------------- diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 762b20ea..0f424438 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -155,6 +155,31 @@ def initBootstrap(db: DatabaseConnector) -> None: # Bootstrap system workflow templates for graphical editor _bootstrapSystemTemplates(db) + # Ensure billing settings and accounts exist for all mandates + _bootstrapBilling() + + +def _bootstrapBilling() -> None: + """ + Ensure billing settings and accounts exist for all mandates. + Idempotent: only creates missing settings/accounts. + """ + try: + from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface + + billingInterface = getBillingRootInterface() + + settingsCreated = billingInterface.ensureAllMandateSettingsExist() + if settingsCreated > 0: + logger.info(f"Billing bootstrap: Created {settingsCreated} missing mandate billing settings") + + accountsCreated = billingInterface.ensureAllUserAccountsExist() + if accountsCreated > 0: + logger.info(f"Billing bootstrap: Created {accountsCreated} missing user accounts") + + except Exception as e: + logger.warning(f"Billing bootstrap failed (non-critical): {e}") + def _bootstrapSystemTemplates(db: DatabaseConnector) -> None: """ diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py index fa76141d..998816d9 100644 --- a/modules/serviceCenter/services/serviceAgent/agentLoop.py +++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py @@ -333,6 +333,17 @@ async def runAgentLoop( content=sideEvt.get("content"), ) + # Check if requestToolbox was called -- refresh tool definitions for next round + _toolboxEscalated = False + for result in results: + if result.toolName == "requestToolbox" and result.success: + _toolboxEscalated = True + if _toolboxEscalated: + tools = toolRegistry.getTools(toolSet=activeToolSet) + toolDefinitions = toolRegistry.formatToolsForFunctionCalling(toolSet=activeToolSet) + toolsText = "" if toolDefinitions else toolRegistry.formatToolsForPrompt(toolSet=activeToolSet) + logger.info("Toolbox escalation: refreshed tool definitions (%d tools)", len(tools)) + # Add tool results to conversation toolResultMessages = [ {"toolCallId": r.toolCallId, "toolName": r.toolName, diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py index f8ee357d..756079ad 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py @@ -2,8 +2,10 @@ # All rights reserved. """Feature Data Sub-Agent tool (queryFeatureInstance).""" +import hashlib import logging -from typing import Any, Dict, List, Optional +import time +from typing import Any, Dict, List, Optional, Tuple from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry @@ -17,6 +19,35 @@ from modules.serviceCenter.services.serviceAgent.coreTools._helpers import ( logger = logging.getLogger(__name__) +_featureDbConnPool: Dict[str, Any] = {} +_featureQueryCache: Dict[str, Tuple[float, str]] = {} +_CACHE_TTL_SECONDS = 300 + + +def _getOrCreateFeatureDbConnector(featureDbName: str, userId: str): + """Reuse a pooled DB connector for the given feature database.""" + if featureDbName in _featureDbConnPool: + conn = _featureDbConnPool[featureDbName] + try: + if conn.connection and not conn.connection.closed: + return conn + except Exception: + pass + _featureDbConnPool.pop(featureDbName, None) + + from modules.connectors.connectorDbPostgre import DatabaseConnector + from modules.shared.configuration import APP_CONFIG + conn = DatabaseConnector( + dbHost=APP_CONFIG.get("DB_HOST", "localhost"), + dbDatabase=featureDbName, + dbUser=APP_CONFIG.get("DB_USER"), + dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), + dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), + userId=userId, + ) + _featureDbConnPool[featureDbName] = conn + return conn + def _registerFeatureSubAgentTools(registry: ToolRegistry, services): """Auto-extracted from registerCoreTools.""" @@ -86,17 +117,17 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services): success=False, error=f"No data tables available for feature '{featureCode}'", ) - from modules.connectors.connectorDbPostgre import DatabaseConnector - from modules.shared.configuration import APP_CONFIG + cacheKey = f"{featureInstanceId}:{hashlib.md5(question.encode()).hexdigest()}" + if cacheKey in _featureQueryCache: + cachedAt, cachedResult = _featureQueryCache[cacheKey] + if time.time() - cachedAt < _CACHE_TTL_SECONDS: + return ToolResult( + toolCallId="", toolName="queryFeatureInstance", + success=True, data=cachedResult, + ) + featureDbName = f"poweron_{featureCode.lower()}" - featureDbConn = DatabaseConnector( - dbHost=APP_CONFIG.get("DB_HOST", "localhost"), - dbDatabase=featureDbName, - dbUser=APP_CONFIG.get("DB_USER"), - dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"), - dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), - userId=userId or "agent", - ) + featureDbConn = _getOrCreateFeatureDbConnector(featureDbName, userId or "agent") aiService = services.ai if hasattr(services, "ai") else None if aiService is None: @@ -110,24 +141,20 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services): req.requireNeutralization = True return await aiService.callAi(req) - try: - answer = await runFeatureDataAgent( - question=question, - featureInstanceId=featureInstanceId, - featureCode=featureCode, - selectedTables=selectedTables, - mandateId=mandateId, - userId=userId, - aiCallFn=_subAgentAiCall, - dbConnector=featureDbConn, - instanceLabel=instanceLabel, - tableFilters=tableFilters, - ) - finally: - try: - featureDbConn.close() - except Exception: - pass + answer = await runFeatureDataAgent( + question=question, + featureInstanceId=featureInstanceId, + featureCode=featureCode, + selectedTables=selectedTables, + mandateId=mandateId, + userId=userId, + aiCallFn=_subAgentAiCall, + dbConnector=featureDbConn, + instanceLabel=instanceLabel, + tableFilters=tableFilters, + ) + + _featureQueryCache[cacheKey] = (time.time(), answer) return ToolResult( toolCallId="", toolName="queryFeatureInstance", diff --git a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py index 8ef0bfcc..e74f4b55 100644 --- a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py +++ b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py @@ -168,6 +168,53 @@ def _buildSubAgentTools( error=result.get("error"), ) + async def _aggregateTable(args: Dict[str, Any], context: Dict[str, Any]): + tableName = args.get("tableName", "") + aggregate = args.get("aggregate", "") + field = args.get("field", "") + groupBy = args.get("groupBy") + if not tableName: + return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="tableName required") + if not aggregate: + return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="aggregate required (SUM, COUNT, AVG, MIN, MAX)") + if not field: + return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="field required") + result = provider.aggregateTable( + tableName=tableName, + featureInstanceId=featureInstanceId, + mandateId=mandateId, + aggregate=aggregate, + field=field, + groupBy=groupBy, + extraFilters=_recordFilterToList(tableName), + ) + return ToolResult( + toolCallId="", toolName="aggregateTable", + success="error" not in result, + data=json.dumps(result, default=str, ensure_ascii=False)[:30000], + error=result.get("error"), + ) + + registry.register( + "aggregateTable", _aggregateTable, + description=( + "Run an aggregate query on a feature data table. " + "Supports SUM, COUNT, AVG, MIN, MAX with optional GROUP BY. " + "Example: aggregateTable(tableName='TrusteeDataJournalLine', aggregate='SUM', field='debitAmount', groupBy='costCenter')" + ), + parameters={ + "type": "object", + "properties": { + "tableName": {"type": "string", "description": "Name of the table to aggregate"}, + "aggregate": {"type": "string", "enum": ["SUM", "COUNT", "AVG", "MIN", "MAX"], "description": "Aggregate function"}, + "field": {"type": "string", "description": "Field to aggregate (e.g. debitAmount, creditAmount)"}, + "groupBy": {"type": "string", "description": "Optional field to group by (e.g. costCenter, accountNumber)"}, + }, + "required": ["tableName", "aggregate", "field"], + }, + readOnly=True, + ) + registry.register( "browseTable", _browseTable, description="List rows from a feature data table with pagination.", @@ -244,7 +291,8 @@ def _buildSchemaContext( parts[0] += "." parts.append( "You have access to the following data tables. " - "Use browseTable to list rows and queryTable to filter/search." + "Use browseTable to list rows, queryTable to filter/search, " + "and aggregateTable for SUM/COUNT/AVG/MIN/MAX with optional GROUP BY." ) parts.append("") diff --git a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py index 25a0ff95..5b68d3ee 100644 --- a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py +++ b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py @@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) _ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<=", "LIKE", "ILIKE", "IS NULL", "IS NOT NULL"} +_ALLOWED_AGGREGATES = {"SUM", "COUNT", "AVG", "MIN", "MAX"} class FeatureDataProvider: @@ -106,6 +107,65 @@ class FeatureDataProvider: logger.error(f"browseTable({tableName}) failed: {e}") return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} + def aggregateTable( + self, + tableName: str, + featureInstanceId: str, + mandateId: str, + aggregate: str, + field: str, + groupBy: str = None, + extraFilters: Optional[List[Dict[str, Any]]] = None, + ) -> Dict[str, Any]: + """Run an aggregate query (SUM, COUNT, AVG, MIN, MAX) on a feature table. + + Returns ``{"rows": [{"groupValue": ..., "result": ...}], "aggregate": ..., "field": ..., "groupBy": ...}``. + """ + _validateTableName(tableName) + aggregate = aggregate.upper() + if aggregate not in _ALLOWED_AGGREGATES: + return {"rows": [], "error": f"Unsupported aggregate: {aggregate}. Allowed: {', '.join(sorted(_ALLOWED_AGGREGATES))}"} + if not _isValidIdentifier(field): + return {"rows": [], "error": f"Invalid field name: {field}"} + if groupBy and not _isValidIdentifier(groupBy): + return {"rows": [], "error": f"Invalid groupBy field: {groupBy}"} + + conn = self._db.connection + scopeFilter = _buildScopeFilter(tableName, featureInstanceId, mandateId, dbConnection=conn) + extraWhere, extraParams = _buildFilterClauses(extraFilters) + + fullWhere = scopeFilter["where"] + allParams = list(scopeFilter["params"]) + if extraWhere: + fullWhere += " AND " + extraWhere + allParams.extend(extraParams) + + try: + with conn.cursor() as cur: + if groupBy: + sql = ( + f'SELECT "{groupBy}" AS "groupValue", {aggregate}("{field}") AS "result" ' + f'FROM "{tableName}" WHERE {fullWhere} ' + f'GROUP BY "{groupBy}" ORDER BY "result" DESC' + ) + else: + sql = ( + f'SELECT {aggregate}("{field}") AS "result" ' + f'FROM "{tableName}" WHERE {fullWhere}' + ) + cur.execute(sql, allParams) + rows = [_serializeRow(dict(r)) for r in cur.fetchall()] + + return { + "rows": rows, + "aggregate": aggregate, + "field": field, + "groupBy": groupBy, + } + except Exception as e: + logger.error(f"aggregateTable({tableName}, {aggregate}({field})) failed: {e}") + return {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy} + def queryTable( self, tableName: str, diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index 2d5a5dfb..b371e01e 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -275,11 +275,18 @@ class AgentService: logger.warning(f"Could not register action tools: {e}") self._activateToolboxes(registry, config) + self._registerRequestToolbox(registry) return registry def _activateToolboxes(self, registry: ToolRegistry, config: AgentConfig) -> None: - """Activate toolboxes dynamically based on user connections and config.""" + """Activate toolboxes dynamically based on user connections and config. + + For each active toolbox, marks already-registered tools as belonging to + that toolbox (via toolSet tag) so the agent loop can filter them. + The 'workflow' toolbox is special: its tools are registered from + workflowTools module because they have dedicated handlers. + """ try: from modules.serviceCenter.services.serviceAgent.toolboxRegistry import getToolboxRegistry tbRegistry = getToolboxRegistry() @@ -297,18 +304,84 @@ class AgentService: activatedIds = [tb.id for tb in activeToolboxes] logger.info("Toolbox activation: connections=%s -> active toolboxes=%s", userConnections, activatedIds) + activeToolNames: set = set() + for tb in activeToolboxes: + activeToolNames.update(tb.tools) + for tb in activeToolboxes: if tb.id == "workflow": try: from modules.serviceCenter.services.serviceAgent.workflowTools import getWorkflowToolDefinitions - for toolDef in getWorkflowToolDefinitions(): - registry.register(toolDef) - logger.info("Registered %d workflow tools from toolbox", len(getWorkflowToolDefinitions())) + wfDefs = getWorkflowToolDefinitions() + for toolDef in wfDefs: + registry.registerFromDefinition(toolDef, toolDef._handler if hasattr(toolDef, "_handler") else None) + logger.info("Registered %d workflow tools from toolbox", len(wfDefs)) except Exception as e: logger.warning("Could not register workflow tools: %s", e) + + inactiveToolNames = set() + for tb in tbRegistry.getAllToolboxes(): + if tb.id not in activatedIds: + inactiveToolNames.update(tb.tools) + inactiveToolNames -= activeToolNames + + for toolName in inactiveToolNames: + registry.unregister(toolName) + + logger.debug("Toolbox activation: %d active tools, %d inactive tools removed", len(activeToolNames), len(inactiveToolNames)) except Exception as e: logger.warning("Toolbox activation failed: %s", e) + def _registerRequestToolbox(self, registry: ToolRegistry) -> None: + """Register the requestToolbox meta-tool that lets the agent dynamically activate toolboxes.""" + try: + from modules.serviceCenter.services.serviceAgent.toolboxRegistry import ( + getToolboxRegistry, buildRequestToolboxDefinition, REQUEST_TOOLBOX_TOOL_NAME, + ) + from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult + + tbRegistry = getToolboxRegistry() + allIds = [tb.id for tb in tbRegistry.getAllToolboxes()] + registeredNames = set(registry.getToolNames()) + inactiveIds = [tbId for tbId in allIds if not any( + t in registeredNames for t in (tbRegistry.getToolbox(tbId).tools if tbRegistry.getToolbox(tbId) else []) + )] + + if not inactiveIds: + return + + toolDef = buildRequestToolboxDefinition(inactiveIds) + + async def _handler(args: Dict[str, Any], context: Dict[str, Any] = None) -> ToolResult: + toolboxId = args.get("toolboxId", "") + reason = args.get("reason", "") + tb = tbRegistry.getToolbox(toolboxId) + if not tb: + return ToolResult( + toolCallId="", toolName=REQUEST_TOOLBOX_TOOL_NAME, + success=False, error=f"Unknown toolbox: {toolboxId}", + ) + for toolName in tb.tools: + if not registry.isValidTool(toolName): + logger.info("requestToolbox: tool '%s' from toolbox '%s' not yet registered, skipping", toolName, toolboxId) + continue + logger.info("requestToolbox: activated toolbox '%s' (%d tools). Reason: %s", toolboxId, len(tb.tools), reason) + return ToolResult( + toolCallId="", toolName=REQUEST_TOOLBOX_TOOL_NAME, + success=True, + data=f"Toolbox '{tb.label}' activated with {len(tb.tools)} tools. They are now available.", + ) + + registry.register( + name=REQUEST_TOOLBOX_TOOL_NAME, + handler=_handler, + description=toolDef["description"], + parameters=toolDef["parameters"], + ) + logger.info("Registered requestToolbox meta-tool (inactive toolboxes: %s)", inactiveIds) + except Exception as e: + logger.warning("Could not register requestToolbox meta-tool: %s", e) + async def _persistTrace(self, workflowId: str, summaryData: Dict[str, Any]): """Persist the agent trace and workflow artifacts in the knowledge store.""" try: diff --git a/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py b/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py index 459c4cab..7646da11 100644 --- a/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py +++ b/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py @@ -115,21 +115,35 @@ def _registerDefaultToolboxes() -> None: label="Core Tools", description="Basic agent tools: search, read, write, web", isDefault=True, - tools=[], + tools=[ + "readFile", "listFiles", "searchInFileContent", "listFolders", + "webSearch", "readUrl", "writeFile", "deleteFile", "renameFile", + "copyFile", "createFolder", "deleteFolder", "moveFile", "moveFolder", + "renameFolder", "tagFile", "replaceInFile", "translateText", + "detectLanguage", "queryFeatureInstance", + ], ), ToolboxDefinition( id="ai", label="AI Tools", description="AI-powered analysis and generation", isDefault=True, - tools=[], + tools=[ + "summarizeContent", "describeImage", "generateImage", + "textToSpeech", "speechToText", "renderDocument", + "createChart", "executeCode", "neutralizeData", + ], ), ToolboxDefinition( id="datasources", label="Data Sources", description="Access external data sources and databases", - isDefault=False, - tools=[], + isDefault=True, + tools=[ + "listConnections", "browseDataSource", "searchDataSource", + "downloadFromDataSource", "uploadToExternal", + "browseContainer", "readContentObjects", "extractContainerItem", + ], ), ToolboxDefinition( id="email", @@ -137,7 +151,11 @@ def _registerDefaultToolboxes() -> None: description="Read and send emails via Outlook/Gmail", requiresConnection="microsoft", isDefault=False, - tools=[], + tools=[ + "sendMail", + "outlook_readEmails", "outlook_searchEmails", + "outlook_composeAndDraftReply", "outlook_sendDraft", + ], ), ToolboxDefinition( id="sharepoint", @@ -145,7 +163,10 @@ def _registerDefaultToolboxes() -> None: description="Access SharePoint sites, lists, and files", requiresConnection="microsoft", isDefault=False, - tools=[], + tools=[ + "sharepoint_findDocuments", "sharepoint_readDocuments", + "sharepoint_upload", + ], ), ToolboxDefinition( id="clickup", @@ -153,7 +174,9 @@ def _registerDefaultToolboxes() -> None: description="Manage ClickUp tasks and projects", requiresConnection="clickup", isDefault=False, - tools=[], + tools=[ + "clickup_searchTasks", "clickup_createTask", "clickup_updateTask", + ], ), ToolboxDefinition( id="jira", @@ -161,7 +184,9 @@ def _registerDefaultToolboxes() -> None: description="Manage Jira issues and projects", requiresConnection="jira", isDefault=False, - tools=[], + tools=[ + "jira_connect", "jira_exportTickets", "jira_importTickets", + ], ), ToolboxDefinition( id="workflow", @@ -175,9 +200,49 @@ def _registerDefaultToolboxes() -> None: "listWorkflowHistory", "readWorkflowMessages", ], ), + ToolboxDefinition( + id="trustee", + label="Trustee / Accounting", + description="Trustee accounting tools: refresh data from external system (e.g. Abacus), query positions and journal entries", + featureCode="trustee", + isDefault=False, + tools=[ + "trustee_refreshAccountingData", + ], + ), ] for tb in defaults: _toolboxRegistry.registerToolbox(tb) _registerDefaultToolboxes() + + +REQUEST_TOOLBOX_TOOL_NAME = "requestToolbox" + + +def buildRequestToolboxDefinition(availableToolboxIds: List[str]) -> dict: + """Build the tool definition dict for the requestToolbox meta-tool.""" + return { + "name": REQUEST_TOOLBOX_TOOL_NAME, + "description": ( + "Request additional specialized tools for the current task. " + "Call this when you need tools from a specific toolbox that is not yet active. " + "After calling, the requested tools will be available in the next round." + ), + "parameters": { + "type": "object", + "properties": { + "toolboxId": { + "type": "string", + "enum": availableToolboxIds, + "description": "ID of the toolbox to activate", + }, + "reason": { + "type": "string", + "description": "Brief reason why this toolbox is needed", + }, + }, + "required": ["toolboxId"], + }, + } diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 67344206..40055b11 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -85,6 +85,25 @@ def _getExecutor( return None +_stepMeta: Dict[str, Dict[str, str]] = {} + + +def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None: + """Emit a step-log SSE event to any listening client for this run.""" + try: + from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager + em = get_event_manager() + queueId = f"run-trace-{runId}" + if not em.has_queue(queueId): + return + import asyncio + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing")) + except Exception: + pass + + def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "running", inputSnapshot: Dict = None) -> Optional[str]: """Create an AutoStepLog entry. Returns the step log ID or None if interface unavailable.""" if not iface or not runId: @@ -92,6 +111,7 @@ def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = try: from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog stepId = str(uuid.uuid4()) + startedAt = time.time() iface.db.recordCreate(AutoStepLog, { "id": stepId, "runId": runId, @@ -99,7 +119,12 @@ def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "nodeType": nodeType, "status": status, "inputSnapshot": inputSnapshot or {}, - "startedAt": time.time(), + "startedAt": startedAt, + }) + _stepMeta[stepId] = {"runId": runId, "nodeId": nodeId, "nodeType": nodeType} + _emitStepEvent(runId, { + "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType, + "status": status, "startedAt": startedAt, }) return stepId except Exception as e: @@ -114,9 +139,10 @@ def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: return try: from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog + completedAt = time.time() updates: Dict[str, Any] = { "status": status, - "completedAt": time.time(), + "completedAt": completedAt, } if output is not None: updates["output"] = output @@ -129,6 +155,14 @@ def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: if retryCount: updates["retryCount"] = retryCount iface.db.recordModify(AutoStepLog, stepId, updates) + meta = _stepMeta.pop(stepId, None) + if meta: + _emitStepEvent(meta["runId"], { + "id": stepId, "runId": meta["runId"], "nodeId": meta["nodeId"], + "nodeType": meta["nodeType"], "status": status, + "completedAt": completedAt, "durationMs": durationMs, + "error": error, "tokensUsed": tokensUsed, "retryCount": retryCount, + }) except Exception as e: logger.debug("Could not update AutoStepLog %s: %s", stepId, e) @@ -282,7 +316,11 @@ async def executeGraph( nodeOutputs[bnid] = None continue _rStepStart = time.time() - _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running") + _rInputSnap = {"_loopItem": items[next_index], "_loopIndex": next_index} + for _rSrc, _, _ in connectionMap.get(bnid, []): + if _rSrc in nodeOutputs: + _rInputSnap[_rSrc] = nodeOutputs[_rSrc] + _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap) try: result, _rRetry = await _executeWithRetry(executor, body_node, context) nodeOutputs[bnid] = result @@ -310,7 +348,7 @@ async def executeGraph( nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) - return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid} + return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid, "runId": runId} next_index += 1 if loop_node_id: nodeOutputs[loop_node_id] = {"items": items, "count": len(items)} @@ -330,7 +368,11 @@ async def executeGraph( nodeType = node.get("type", "") if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs): logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId) - _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped") + _skipInputSnap = {"_skipReason": "inactive_branch"} + for _sSrc, _, _ in connectionMap.get(nodeId, []): + if _sSrc in nodeOutputs: + _skipInputSnap[_sSrc] = nodeOutputs[_sSrc] + _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap) if _skipStepId: _updateStepLog(automation2_interface, _skipStepId, "skipped") continue @@ -351,7 +393,11 @@ async def executeGraph( _stepId = None try: if nodeType == "flow.loop": - _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running") + _loopInputSnap = {} + for _lSrc, _, _ in connectionMap.get(nodeId, []): + if _lSrc in nodeOutputs: + _loopInputSnap[_lSrc] = nodeOutputs[_lSrc] + _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap) result = await executor.execute(node, context) items = result.get("items") or [] body_ids = getLoopBodyNodeIds(nodeId, connectionMap) @@ -372,7 +418,11 @@ async def executeGraph( nodeOutputs[bnid] = None continue _bStepStart = time.time() - _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running") + _bInputSnap = {"_loopItem": item, "_loopIndex": idx} + for _bSrc, _, _ in connectionMap.get(bnid, []): + if _bSrc in nodeOutputs: + _bInputSnap[_bSrc] = nodeOutputs[_bSrc] + _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnap) try: bres, _bRetry = await _executeWithRetry(bexec, body_node, context) nodeOutputs[bnid] = bres @@ -401,10 +451,11 @@ async def executeGraph( nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) - return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid} + return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid, "runId": runId} nodeOutputs[nodeId] = {"items": items, "count": len(items)} _updateStepLog(automation2_interface, _stepId, "completed", - output={"items": len(items)}, durationMs=int((time.time() - _stepStartMs) * 1000)) + output={"iterationCount": len(items), "items": len(items)}, + durationMs=int((time.time() - _stepStartMs) * 1000)) logger.info("executeGraph flow.loop done: %d iterations", len(items)) else: _stepStartMs = time.time() @@ -489,6 +540,8 @@ async def executeGraph( _updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) + if runId: + _emitStepEvent(runId, {"type": "run_failed", "runId": runId, "status": "failed", "error": str(e), "failedNode": nodeId}) try: _wfObj = automation2_interface.getWorkflow(workflowId) if automation2_interface and workflowId else None _wfDict = _wfObj if isinstance(_wfObj, dict) else ( @@ -509,10 +562,13 @@ async def executeGraph( "error": str(e), "nodeOutputs": nodeOutputs, "failedNode": nodeId, + "runId": runId, } if runId and automation2_interface: automation2_interface.updateRun(runId, status="completed", nodeOutputs=nodeOutputs) + if runId: + _emitStepEvent(runId, {"type": "run_complete", "runId": runId, "status": "completed"}) logger.info( "executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s", list(nodeOutputs.keys()), @@ -522,4 +578,5 @@ async def executeGraph( "success": True, "nodeOutputs": nodeOutputs, "stopped": context.get("_stopped", False), + "runId": runId, } diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py index 1a81c3eb..6a9f2956 100644 --- a/modules/workflows/methods/methodBase.py +++ b/modules/workflows/methods/methodBase.py @@ -243,8 +243,10 @@ class MethodBase: # Handle List[str], List[int], etc. if expectedType.startswith('List['): - if not isinstance(value, list): - raise ValueError(f"Expected list for type '{expectedType}', got {type(value).__name__}") + if isinstance(value, str): + value = [v.strip() for v in value.split(',') if v.strip()] if ',' in value else [value] + elif not isinstance(value, list): + value = [value] # Extract inner type innerType = expectedType[5:-1].strip() # Remove "List[" and "]" if innerType in typeMap: diff --git a/modules/workflows/methods/methodTrustee/actions/processDocuments.py b/modules/workflows/methods/methodTrustee/actions/processDocuments.py index 3f95836d..c4bf9df1 100644 --- a/modules/workflows/methods/methodTrustee/actions/processDocuments.py +++ b/modules/workflows/methods/methodTrustee/actions/processDocuments.py @@ -204,6 +204,56 @@ def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], feature } +def _resolveDocumentList(documentListParam, services) -> List[tuple]: + """Resolve documentList from either Graph-Editor output (list of dicts) or Chat references. + + Returns list of (data_dict, fileId, fileName, mimeType) tuples. + """ + results = [] + + if isinstance(documentListParam, list) and documentListParam: + first = documentListParam[0] + if isinstance(first, dict) and ("documentData" in first or "documentName" in first): + for doc in documentListParam: + rawData = doc.get("documentData") + if not rawData: + continue + try: + data = json.loads(rawData) if isinstance(rawData, str) else rawData + except (json.JSONDecodeError, TypeError): + continue + fileId = (doc.get("validationMetadata") or {}).get("fileId") or doc.get("fileId", "") + fileName = doc.get("documentName") or doc.get("fileName") or "document" + mimeType = doc.get("mimeType") or doc.get("documentMimeType") or "application/json" + results.append((data, fileId, fileName, mimeType)) + if results: + return results + + chatService = getattr(services, "chat", None) + if not chatService: + return results + + try: + docList = DocumentReferenceList.from_string_list( + documentListParam if isinstance(documentListParam, list) else [documentListParam] + ) + chatDocuments = chatService.getChatDocumentsFromDocumentList(docList) + for chatDoc in (chatDocuments or []): + rawBytes = chatService.getFileData(chatDoc.fileId) + if not rawBytes: + continue + content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes + try: + data = json.loads(content) if isinstance(content, str) else content + except (json.JSONDecodeError, TypeError): + continue + results.append((data, chatDoc.fileId, chatDoc.fileName or "document", chatDoc.mimeType or "application/json")) + except Exception as e: + logger.debug("_resolveDocumentList chat fallback failed: %s", e) + + return results + + async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult: """ Resolve documentList to ChatDocuments, load extraction JSON per document, @@ -218,11 +268,8 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult: return ActionResult.isFailure(error="featureInstanceId is required") try: - docList = DocumentReferenceList.from_string_list( - documentListParam if isinstance(documentListParam, list) else [documentListParam] - ) - chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList) - if not chatDocuments: + extractionDocs = _resolveDocumentList(documentListParam, self.services) + if not extractionDocs: return ActionResult.isFailure(error="No documents found for documentList") from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface @@ -237,17 +284,11 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult: allDocumentIds = [] autoMatchedPositionIds = [] - for chatDoc in chatDocuments: - rawBytes = self.services.chat.getFileData(chatDoc.fileId) - if not rawBytes: - logger.warning(f"Could not load file {chatDoc.fileId}, skipping") - continue - content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes - data = json.loads(content) if isinstance(content, str) else content + for data, fileId, fileName, mimeType in extractionDocs: documentType = data.get("documentType") extractedData = data.get("extractedData") - fileId = data.get("fileId") or chatDoc.fileId - fileName = data.get("fileName") or chatDoc.fileName or "document" + fileId = data.get("fileId") or fileId + fileName = data.get("fileName") or fileName or "document" records = extractedData if isinstance(extractedData, list) else [extractedData] if extractedData else [] if not records: @@ -256,7 +297,7 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult: docPayload = { "fileId": fileId, "documentName": fileName, - "documentMimeType": chatDoc.mimeType or "application/octet-stream", + "documentMimeType": mimeType or "application/octet-stream", "sourceType": "workflow", "documentType": documentType, } diff --git a/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py b/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py new file mode 100644 index 00000000..9e6276f5 --- /dev/null +++ b/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py @@ -0,0 +1,132 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Refresh accounting data from external system (e.g. Abacus) into local TrusteeData* tables. +Checks lastSyncAt to avoid redundant imports unless forceRefresh is set. +""" + +import json +import logging +import time +from typing import Dict, Any + +from modules.datamodels.datamodelChat import ActionResult, ActionDocument + +logger = logging.getLogger(__name__) + +_SYNC_THRESHOLD_SECONDS = 3600 + + +async def refreshAccountingData(self, parameters: Dict[str, Any]) -> ActionResult: + """Import/refresh accounting data from the configured external system. + + If data was synced within the last hour and forceRefresh is not set, + returns cached counts without triggering an external sync. + """ + featureInstanceId = parameters.get("featureInstanceId") or ( + self.services.featureInstanceId if hasattr(self.services, "featureInstanceId") else None + ) + forceRefresh = parameters.get("forceRefresh", False) + if isinstance(forceRefresh, str): + forceRefresh = forceRefresh.lower() in ("true", "1", "yes") + dateFrom = parameters.get("dateFrom") or None + dateTo = parameters.get("dateTo") or None + + if not featureInstanceId: + return ActionResult.isFailure(error="featureInstanceId is required") + + try: + from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface + from modules.features.trustee.datamodelFeatureTrustee import ( + TrusteeAccountingConfig, + TrusteeDataAccount, + TrusteeDataJournalEntry, + TrusteeDataJournalLine, + TrusteeDataContact, + TrusteeDataAccountBalance, + ) + + trusteeInterface = getTrusteeInterface( + self.services.user, + mandateId=self.services.mandateId, + featureInstanceId=featureInstanceId, + ) + + cfgRecords = trusteeInterface.db.getRecordset( + TrusteeAccountingConfig, + recordFilter={"featureInstanceId": featureInstanceId, "isActive": True}, + ) + if not cfgRecords: + return ActionResult.isFailure(error="No active accounting configuration found for this Trustee instance") + + cfgRecord = cfgRecords[0] + lastSyncAt = cfgRecord.get("lastSyncAt") or 0 + lastSyncStatus = cfgRecord.get("lastSyncStatus") or "" + + isFresh = ( + lastSyncAt + and (time.time() - lastSyncAt) < _SYNC_THRESHOLD_SECONDS + and lastSyncStatus in ("success", "partial") + ) + + if isFresh and not forceRefresh: + counts = _getCachedCounts(trusteeInterface, featureInstanceId) + counts["synced"] = False + counts["lastSyncAt"] = lastSyncAt + counts["lastSyncStatus"] = lastSyncStatus + counts["message"] = f"Data is fresh (synced {int(time.time() - lastSyncAt)}s ago). Use forceRefresh=true to re-import." + return ActionResult.isSuccess(documents=[ + ActionDocument( + documentName="refresh_result", + documentData=json.dumps(counts, ensure_ascii=False), + mimeType="application/json", + ) + ]) + + from modules.features.trustee.accounting.accountingDataSync import AccountingDataSync + + sync = AccountingDataSync(trusteeInterface) + summary = await sync.importData( + featureInstanceId=featureInstanceId, + mandateId=self.services.mandateId, + dateFrom=dateFrom, + dateTo=dateTo, + ) + summary["synced"] = True + summary.pop("startedAt", None) + summary.pop("finishedAt", None) + + return ActionResult.isSuccess(documents=[ + ActionDocument( + documentName="refresh_result", + documentData=json.dumps(summary, ensure_ascii=False), + mimeType="application/json", + ) + ]) + except Exception as e: + logger.exception("refreshAccountingData failed") + return ActionResult.isFailure(error=str(e)) + + +def _getCachedCounts(trusteeInterface, featureInstanceId: str) -> Dict[str, Any]: + """Count existing records per TrusteeData* table without triggering an external sync.""" + from modules.features.trustee.datamodelFeatureTrustee import ( + TrusteeDataAccount, + TrusteeDataJournalEntry, + TrusteeDataJournalLine, + TrusteeDataContact, + TrusteeDataAccountBalance, + ) + counts = {} + for label, model in [ + ("accounts", TrusteeDataAccount), + ("journalEntries", TrusteeDataJournalEntry), + ("journalLines", TrusteeDataJournalLine), + ("contacts", TrusteeDataContact), + ("accountBalances", TrusteeDataAccountBalance), + ]: + records = trusteeInterface.db.getRecordset( + model, recordFilter={"featureInstanceId": featureInstanceId} + ) + counts[label] = len(records) if records else 0 + return counts diff --git a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py index 4633f32e..555a8623 100644 --- a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py +++ b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py @@ -16,6 +16,43 @@ from modules.datamodels.datamodelDocref import DocumentReferenceList logger = logging.getLogger(__name__) +def _resolveFirstDocument(documentListParam, services) -> Dict[str, Any] | None: + """Resolve the first document from either Graph-Editor output (list of dicts) or Chat references. + + Returns the parsed JSON dict or None. + """ + if isinstance(documentListParam, list) and documentListParam: + first = documentListParam[0] + if isinstance(first, dict) and ("documentData" in first or "documentName" in first): + rawData = first.get("documentData") + if rawData: + try: + return json.loads(rawData) if isinstance(rawData, str) else rawData + except (json.JSONDecodeError, TypeError): + pass + + chatService = getattr(services, "chat", None) + if not chatService: + return None + + try: + docList = DocumentReferenceList.from_string_list( + documentListParam if isinstance(documentListParam, list) else [documentListParam] + ) + chatDocuments = chatService.getChatDocumentsFromDocumentList(docList) + if not chatDocuments: + return None + doc = chatDocuments[0] + rawBytes = chatService.getFileData(doc.fileId) + if not rawBytes: + return None + content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes + return json.loads(content) if isinstance(content, str) else content + except Exception as e: + logger.debug("_resolveFirstDocument chat fallback failed: %s", e) + return None + + async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult: """ Push trustee positions to the configured accounting system. @@ -30,21 +67,10 @@ async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult: return ActionResult.isFailure(error="documentList is required (reference to processDocuments result)") try: - docList = DocumentReferenceList.from_string_list( - documentListParam if isinstance(documentListParam, list) else [documentListParam] - ) - chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList) - if not chatDocuments: + data = _resolveFirstDocument(documentListParam, self.services) + if data is None: return ActionResult.isFailure(error="No document found for documentList; ensure processDocuments ran before this action") - # Expect one document (JSON with positionIds, documentIds) - doc = chatDocuments[0] - rawBytes = self.services.chat.getFileData(doc.fileId) - if not rawBytes: - return ActionResult.isFailure(error=f"Could not load document content for fileId={doc.fileId}") - - content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes - data = json.loads(content) if isinstance(content, str) else content positionIds = data.get("positionIds") or [] if not positionIds: return ActionResult.isSuccess(documents=[ diff --git a/modules/workflows/methods/methodTrustee/methodTrustee.py b/modules/workflows/methods/methodTrustee/methodTrustee.py index fefeaa52..5be232f8 100644 --- a/modules/workflows/methods/methodTrustee/methodTrustee.py +++ b/modules/workflows/methods/methodTrustee/methodTrustee.py @@ -12,6 +12,7 @@ from modules.shared.frontendTypes import FrontendType from .actions.extractFromFiles import extractFromFiles from .actions.processDocuments import processDocuments from .actions.syncToAccounting import syncToAccounting +from .actions.refreshAccountingData import refreshAccountingData logger = logging.getLogger(__name__) @@ -112,9 +113,46 @@ class MethodTrustee(MethodBase): }, execute=syncToAccounting.__get__(self, self.__class__), ), + "refreshAccountingData": WorkflowActionDefinition( + actionId="trustee.refreshAccountingData", + description="Import/refresh accounting data from external system (e.g. Abacus) into local tables. Checks cache freshness; use forceRefresh to re-import.", + dynamicMode=True, + parameters={ + "featureInstanceId": WorkflowActionParameter( + name="featureInstanceId", + type="str", + frontendType=FrontendType.TEXT, + required=True, + description="Trustee feature instance ID", + ), + "forceRefresh": WorkflowActionParameter( + name="forceRefresh", + type="bool", + frontendType=FrontendType.CHECKBOX, + required=False, + description="Force re-import even if data is fresh (default: false)", + ), + "dateFrom": WorkflowActionParameter( + name="dateFrom", + type="str", + frontendType=FrontendType.TEXT, + required=False, + description="Start date filter for journal entries (YYYY-MM-DD)", + ), + "dateTo": WorkflowActionParameter( + name="dateTo", + type="str", + frontendType=FrontendType.TEXT, + required=False, + description="End date filter for journal entries (YYYY-MM-DD)", + ), + }, + execute=refreshAccountingData.__get__(self, self.__class__), + ), } self._validateActions() self.extractFromFiles = extractFromFiles.__get__(self, self.__class__) self.processDocuments = processDocuments.__get__(self, self.__class__) self.syncToAccounting = syncToAccounting.__get__(self, self.__class__) + self.refreshAccountingData = refreshAccountingData.__get__(self, self.__class__) diff --git a/modules/workflows/scheduler/mainScheduler.py b/modules/workflows/scheduler/mainScheduler.py index ec156fd2..5f888237 100644 --- a/modules/workflows/scheduler/mainScheduler.py +++ b/modules/workflows/scheduler/mainScheduler.py @@ -454,6 +454,11 @@ def stop() -> bool: return _scheduler.stop() +def syncNow() -> dict: + """Trigger an immediate incremental sync. Used by /schedule-sync endpoint.""" + return _scheduler._syncScheduledWorkflows() + + def setMainLoop(loop) -> None: """Set the main event loop for thread-bridge.""" _setMainLoop(loop)