From c3954953009b1eb6e03643bd08ece65ce43fae39 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 7 Apr 2026 22:32:05 +0200
Subject: [PATCH] integrated and initial teste unified automation
---
app.py | 28 +---
.../interfaceFeatureGraphicalEditor.py | 8 +-
.../graphicalEditor/mainGraphicalEditor.py | 20 +--
.../nodeDefinitions/trustee.py | 22 +++
.../features/graphicalEditor/nodeRegistry.py | 1 +
.../routeFeatureGraphicalEditor.py | 61 ++++++--
modules/interfaces/interfaceBootstrap.py | 25 ++++
.../services/serviceAgent/agentLoop.py | 11 ++
.../coreTools/_featureSubAgentTools.py | 85 +++++++----
.../services/serviceAgent/featureDataAgent.py | 50 ++++++-
.../serviceAgent/featureDataProvider.py | 60 ++++++++
.../services/serviceAgent/mainServiceAgent.py | 81 ++++++++++-
.../services/serviceAgent/toolboxRegistry.py | 81 +++++++++--
.../workflows/automation2/executionEngine.py | 75 ++++++++--
modules/workflows/methods/methodBase.py | 6 +-
.../methodTrustee/actions/processDocuments.py | 71 ++++++++--
.../actions/refreshAccountingData.py | 132 ++++++++++++++++++
.../methodTrustee/actions/syncToAccounting.py | 52 +++++--
.../methods/methodTrustee/methodTrustee.py | 38 +++++
modules/workflows/scheduler/mainScheduler.py | 5 +
20 files changed, 784 insertions(+), 128 deletions(-)
create mode 100644 modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py
diff --git a/app.py b/app.py
index 8dce20cb..a2b6f338 100644
--- a/app.py
+++ b/app.py
@@ -20,8 +20,6 @@ from datetime import datetime
from modules.shared.configuration import APP_CONFIG
from modules.shared.eventManagement import eventManager
-from modules.features.graphicalEditor.emailPoller import start as startGraphicalEditorEmailPoller
-from modules.features.graphicalEditor.emailPoller import stop as stopGraphicalEditorEmailPoller
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.system.registry import loadFeatureMainModules
@@ -348,44 +346,22 @@ async def lifespan(app: FastAPI):
# --- Init Managers ---
import asyncio
- from modules.workflows.automation2 import subAutomation2Schedule
try:
main_loop = asyncio.get_running_loop()
eventManager.set_event_loop(main_loop)
- subAutomation2Schedule.set_main_loop(main_loop)
+ from modules.workflows.scheduler.mainScheduler import setMainLoop as setSchedulerMainLoop
+ setSchedulerMainLoop(main_loop)
except RuntimeError:
pass
- subAutomation2Schedule.start(eventUser)
eventManager.start()
# Register audit log cleanup scheduler
from modules.shared.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
- # Ensure billing settings and accounts exist for all mandates
- try:
- from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface
-
- billingInterface = getBillingRootInterface()
-
- # Step 1: Ensure all mandates have billing settings (creates defaults if missing)
- settingsCreated = billingInterface.ensureAllMandateSettingsExist()
- if settingsCreated > 0:
- logger.info(f"Billing startup: Created {settingsCreated} missing mandate billing settings")
-
- # Step 2: Ensure all users have billing audit accounts
- accountsCreated = billingInterface.ensureAllUserAccountsExist()
- if accountsCreated > 0:
- logger.info(f"Billing startup: Created {accountsCreated} missing user accounts")
-
- except Exception as e:
- logger.warning(f"Failed to ensure billing settings/accounts (non-critical): {e}")
-
yield
# --- Stop Managers ---
- stopGraphicalEditorEmailPoller(eventUser)
- subAutomation2Schedule.stop(eventUser)
eventManager.stop()
# --- Stop Feature Containers (Plug&Play) ---
diff --git a/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py b/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py
index 79ff146e..4dd679bb 100644
--- a/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py
+++ b/modules/features/graphicalEditor/interfaceFeatureGraphicalEditor.py
@@ -327,7 +327,7 @@ class GraphicalEditorObjects:
return [dict(r) for r in records] if records else []
def getRecentCompletedRuns(self, limit: int = 20) -> List[Dict[str, Any]]:
- """Get recently completed runs for workflows in this instance."""
+ """Get recent runs (all statuses) for workflows in this instance."""
if not self.db._ensureTableExists(Automation2WorkflowRun):
return []
workflows = self.getWorkflows()
@@ -336,7 +336,7 @@ class GraphicalEditorObjects:
return []
records = self.db.getRecordset(
Automation2WorkflowRun,
- recordFilter={"status": "completed"},
+ recordFilter={},
)
if not records:
return []
@@ -634,13 +634,13 @@ class GraphicalEditorObjects:
return dict(created)
def shareTemplate(self, templateId: str, scope: str) -> Optional[Dict[str, Any]]:
- """Share a template by changing its scope and setting sharedReadOnly."""
+ """Change a template's scope. Sets sharedReadOnly=True for shared scopes, False for user scope."""
template = self.getWorkflow(templateId)
if not template or not template.get("isTemplate"):
return None
updated = self.db.recordModify(AutoWorkflow, templateId, {
"templateScope": scope,
- "sharedReadOnly": True,
+ "sharedReadOnly": scope != "user",
})
return dict(updated)
diff --git a/modules/features/graphicalEditor/mainGraphicalEditor.py b/modules/features/graphicalEditor/mainGraphicalEditor.py
index e2cb5188..ab437a54 100644
--- a/modules/features/graphicalEditor/mainGraphicalEditor.py
+++ b/modules/features/graphicalEditor/mainGraphicalEditor.py
@@ -25,11 +25,6 @@ FEATURE_LABEL = {"en": "Graphical Editor", "de": "Grafischer Editor", "fr": "Éd
FEATURE_ICON = "mdi-sitemap"
UI_OBJECTS = [
- {
- "objectKey": "ui.feature.graphicalEditor.dashboard",
- "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"},
- "meta": {"area": "dashboard"}
- },
{
"objectKey": "ui.feature.graphicalEditor.editor",
"label": {"en": "Editor", "de": "Editor", "fr": "Éditeur"},
@@ -40,15 +35,20 @@ UI_OBJECTS = [
"label": {"en": "Workflows", "de": "Workflows", "fr": "Workflows"},
"meta": {"area": "workflows"}
},
+ {
+ "objectKey": "ui.feature.graphicalEditor.templates",
+ "label": {"en": "Templates", "de": "Vorlagen", "fr": "Modèles"},
+ "meta": {"area": "templates"}
+ },
{
"objectKey": "ui.feature.graphicalEditor.workflows-tasks",
"label": {"en": "Tasks", "de": "Tasks", "fr": "Tâches"},
"meta": {"area": "tasks"}
},
{
- "objectKey": "ui.feature.graphicalEditor.templates",
- "label": {"en": "Templates", "de": "Vorlagen", "fr": "Modèles"},
- "meta": {"area": "templates"}
+ "objectKey": "ui.feature.graphicalEditor.dashboard",
+ "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"},
+ "meta": {"area": "dashboard"}
},
]
@@ -213,7 +213,9 @@ async def onStart(eventUser) -> None:
async def onStop(eventUser) -> None:
- """Feature shutdown - remove email poller if running."""
+ """Feature shutdown - stop scheduler and email poller."""
+ from modules.workflows.scheduler.mainScheduler import stop as stopScheduler
+ stopScheduler()
from modules.features.graphicalEditor.emailPoller import stop as stopEmailPoller
stopEmailPoller(eventUser)
diff --git a/modules/features/graphicalEditor/nodeDefinitions/trustee.py b/modules/features/graphicalEditor/nodeDefinitions/trustee.py
index abc1fa79..4d7082ae 100644
--- a/modules/features/graphicalEditor/nodeDefinitions/trustee.py
+++ b/modules/features/graphicalEditor/nodeDefinitions/trustee.py
@@ -3,6 +3,28 @@
# Pipeline: extractFromFiles -> processDocuments -> syncToAccounting.
TRUSTEE_NODES = [
+ {
+ "id": "trustee.refreshAccountingData",
+ "category": "trustee",
+ "label": {"en": "Refresh Accounting Data", "de": "Buchhaltungsdaten aktualisieren", "fr": "Actualiser données comptables"},
+ "description": {
+ "en": "Import/refresh accounting data from external system (e.g. Abacus). Skips import if data is fresh unless forceRefresh is set.",
+ "de": "Buchhaltungsdaten aus externem System importieren/aktualisieren (z.B. Abacus). Überspringt Import wenn Daten frisch sind, ausser forceRefresh ist gesetzt.",
+ "fr": "Importer/actualiser les données comptables depuis le système externe (ex. Abacus).",
+ },
+ "parameters": [
+ {"name": "featureInstanceId", "type": "string", "required": True, "description": {"en": "Trustee feature instance ID", "de": "Trustee Feature-Instanz-ID", "fr": "ID instance Trustee"}},
+ {"name": "forceRefresh", "type": "boolean", "required": False, "description": {"en": "Force re-import even if data is fresh (default: false)", "de": "Import erzwingen auch wenn Daten frisch sind", "fr": "Forcer la réimportation"}, "default": False},
+ {"name": "dateFrom", "type": "string", "required": False, "description": {"en": "Start date filter (YYYY-MM-DD)", "de": "Startdatum-Filter (JJJJ-MM-TT)", "fr": "Date début (AAAA-MM-JJ)"}, "default": ""},
+ {"name": "dateTo", "type": "string", "required": False, "description": {"en": "End date filter (YYYY-MM-DD)", "de": "Enddatum-Filter (JJJJ-MM-TT)", "fr": "Date fin (AAAA-MM-JJ)"}, "default": ""},
+ ],
+ "inputs": 1,
+ "outputs": 1,
+ "meta": {"icon": "mdi-database-refresh", "color": "#4CAF50"},
+ "_method": "trustee",
+ "_action": "refreshAccountingData",
+ "_paramMap": {"featureInstanceId": "featureInstanceId", "forceRefresh": "forceRefresh", "dateFrom": "dateFrom", "dateTo": "dateTo"},
+ },
{
"id": "trustee.extractFromFiles",
"category": "trustee",
diff --git a/modules/features/graphicalEditor/nodeRegistry.py b/modules/features/graphicalEditor/nodeRegistry.py
index 524b96af..928840a4 100644
--- a/modules/features/graphicalEditor/nodeRegistry.py
+++ b/modules/features/graphicalEditor/nodeRegistry.py
@@ -70,6 +70,7 @@ def getNodeTypesForApi(
{"id": "email", "label": {"en": "Email", "de": "E-Mail", "fr": "Email"}},
{"id": "sharepoint", "label": {"en": "SharePoint", "de": "SharePoint", "fr": "SharePoint"}},
{"id": "clickup", "label": {"en": "ClickUp", "de": "ClickUp", "fr": "ClickUp"}},
+ {"id": "trustee", "label": {"en": "Trustee", "de": "Treuhand", "fr": "Fiduciaire"}},
]
return {"nodeTypes": localized, "categories": categories}
diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
index caf89c10..3c1f4649 100644
--- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
+++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py
@@ -137,15 +137,10 @@ def post_schedule_sync(
) -> dict:
"""Manually trigger schedule sync (re-register cron jobs for all schedule workflows)."""
_validateInstanceAccess(instanceId, context)
- from modules.interfaces.interfaceDbApp import getRootInterface
- from modules.workflows.automation2.subAutomation2Schedule import sync_automation2_schedule_events
+ from modules.workflows.scheduler.mainScheduler import syncNow
- root = getRootInterface()
- event_user = root.getUserByUsername("event")
- if not event_user:
- return {"success": False, "error": "Event user not available", "synced": 0}
- result = sync_automation2_schedule_events(event_user)
- return {"success": True, **result}
+ result = syncNow()
+ return {"success": True, **(result or {})}
@router.get("/{instanceId}/node-types")
@@ -249,6 +244,56 @@ async def post_execute(
return result
+# -------------------------------------------------------------------------
+# Run Tracing SSE Stream
+# -------------------------------------------------------------------------
+
+
+@router.get("/{instanceId}/runs/{runId}/stream")
+async def get_run_stream(
+ request: Request,
+ instanceId: str = Path(..., description="Feature instance ID"),
+ runId: str = Path(..., description="Run ID"),
+ context: RequestContext = Depends(getRequestContext),
+):
+ """SSE stream for live step-log updates during a workflow run."""
+ _validateInstanceAccess(instanceId, context)
+
+ from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager
+ sseEventManager = get_event_manager()
+ queueId = f"run-trace-{runId}"
+ sseEventManager.create_queue(queueId)
+
+ async def _sseGenerator():
+ queue = sseEventManager.get_queue(queueId)
+ if not queue:
+ return
+ while True:
+ try:
+ event = await asyncio.wait_for(queue.get(), timeout=30)
+ except asyncio.TimeoutError:
+ yield "data: {\"type\": \"keepalive\"}\n\n"
+ continue
+ if event is None:
+ break
+ payload = event.get("data", event) if isinstance(event, dict) else event
+ yield f"data: {json.dumps(payload, default=str)}\n\n"
+ eventType = payload.get("type", "") if isinstance(payload, dict) else ""
+ if eventType in ("run_complete", "run_failed"):
+ break
+ await sseEventManager.cleanup(queueId, delay=10)
+
+ return StreamingResponse(
+ _sseGenerator(),
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "Connection": "keep-alive",
+ "X-Accel-Buffering": "no",
+ },
+ )
+
+
# -------------------------------------------------------------------------
# Versions (AutoVersion Lifecycle)
# -------------------------------------------------------------------------
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 762b20ea..0f424438 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -155,6 +155,31 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Bootstrap system workflow templates for graphical editor
_bootstrapSystemTemplates(db)
+ # Ensure billing settings and accounts exist for all mandates
+ _bootstrapBilling()
+
+
+def _bootstrapBilling() -> None:
+ """
+ Ensure billing settings and accounts exist for all mandates.
+ Idempotent: only creates missing settings/accounts.
+ """
+ try:
+ from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface
+
+ billingInterface = getBillingRootInterface()
+
+ settingsCreated = billingInterface.ensureAllMandateSettingsExist()
+ if settingsCreated > 0:
+ logger.info(f"Billing bootstrap: Created {settingsCreated} missing mandate billing settings")
+
+ accountsCreated = billingInterface.ensureAllUserAccountsExist()
+ if accountsCreated > 0:
+ logger.info(f"Billing bootstrap: Created {accountsCreated} missing user accounts")
+
+ except Exception as e:
+ logger.warning(f"Billing bootstrap failed (non-critical): {e}")
+
def _bootstrapSystemTemplates(db: DatabaseConnector) -> None:
"""
diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py
index fa76141d..998816d9 100644
--- a/modules/serviceCenter/services/serviceAgent/agentLoop.py
+++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py
@@ -333,6 +333,17 @@ async def runAgentLoop(
content=sideEvt.get("content"),
)
+ # Check if requestToolbox was called -- refresh tool definitions for next round
+ _toolboxEscalated = False
+ for result in results:
+ if result.toolName == "requestToolbox" and result.success:
+ _toolboxEscalated = True
+ if _toolboxEscalated:
+ tools = toolRegistry.getTools(toolSet=activeToolSet)
+ toolDefinitions = toolRegistry.formatToolsForFunctionCalling(toolSet=activeToolSet)
+ toolsText = "" if toolDefinitions else toolRegistry.formatToolsForPrompt(toolSet=activeToolSet)
+ logger.info("Toolbox escalation: refreshed tool definitions (%d tools)", len(tools))
+
# Add tool results to conversation
toolResultMessages = [
{"toolCallId": r.toolCallId, "toolName": r.toolName,
diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py
index f8ee357d..756079ad 100644
--- a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py
+++ b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py
@@ -2,8 +2,10 @@
# All rights reserved.
"""Feature Data Sub-Agent tool (queryFeatureInstance)."""
+import hashlib
import logging
-from typing import Any, Dict, List, Optional
+import time
+from typing import Any, Dict, List, Optional, Tuple
from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult
from modules.serviceCenter.services.serviceAgent.toolRegistry import ToolRegistry
@@ -17,6 +19,35 @@ from modules.serviceCenter.services.serviceAgent.coreTools._helpers import (
logger = logging.getLogger(__name__)
+_featureDbConnPool: Dict[str, Any] = {}
+_featureQueryCache: Dict[str, Tuple[float, str]] = {}
+_CACHE_TTL_SECONDS = 300
+
+
+def _getOrCreateFeatureDbConnector(featureDbName: str, userId: str):
+ """Reuse a pooled DB connector for the given feature database."""
+ if featureDbName in _featureDbConnPool:
+ conn = _featureDbConnPool[featureDbName]
+ try:
+ if conn.connection and not conn.connection.closed:
+ return conn
+ except Exception:
+ pass
+ _featureDbConnPool.pop(featureDbName, None)
+
+ from modules.connectors.connectorDbPostgre import DatabaseConnector
+ from modules.shared.configuration import APP_CONFIG
+ conn = DatabaseConnector(
+ dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
+ dbDatabase=featureDbName,
+ dbUser=APP_CONFIG.get("DB_USER"),
+ dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
+ dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
+ userId=userId,
+ )
+ _featureDbConnPool[featureDbName] = conn
+ return conn
+
def _registerFeatureSubAgentTools(registry: ToolRegistry, services):
"""Auto-extracted from registerCoreTools."""
@@ -86,17 +117,17 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services):
success=False, error=f"No data tables available for feature '{featureCode}'",
)
- from modules.connectors.connectorDbPostgre import DatabaseConnector
- from modules.shared.configuration import APP_CONFIG
+ cacheKey = f"{featureInstanceId}:{hashlib.md5(question.encode()).hexdigest()}"
+ if cacheKey in _featureQueryCache:
+ cachedAt, cachedResult = _featureQueryCache[cacheKey]
+ if time.time() - cachedAt < _CACHE_TTL_SECONDS:
+ return ToolResult(
+ toolCallId="", toolName="queryFeatureInstance",
+ success=True, data=cachedResult,
+ )
+
featureDbName = f"poweron_{featureCode.lower()}"
- featureDbConn = DatabaseConnector(
- dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
- dbDatabase=featureDbName,
- dbUser=APP_CONFIG.get("DB_USER"),
- dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET"),
- dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
- userId=userId or "agent",
- )
+ featureDbConn = _getOrCreateFeatureDbConnector(featureDbName, userId or "agent")
aiService = services.ai if hasattr(services, "ai") else None
if aiService is None:
@@ -110,24 +141,20 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services):
req.requireNeutralization = True
return await aiService.callAi(req)
- try:
- answer = await runFeatureDataAgent(
- question=question,
- featureInstanceId=featureInstanceId,
- featureCode=featureCode,
- selectedTables=selectedTables,
- mandateId=mandateId,
- userId=userId,
- aiCallFn=_subAgentAiCall,
- dbConnector=featureDbConn,
- instanceLabel=instanceLabel,
- tableFilters=tableFilters,
- )
- finally:
- try:
- featureDbConn.close()
- except Exception:
- pass
+ answer = await runFeatureDataAgent(
+ question=question,
+ featureInstanceId=featureInstanceId,
+ featureCode=featureCode,
+ selectedTables=selectedTables,
+ mandateId=mandateId,
+ userId=userId,
+ aiCallFn=_subAgentAiCall,
+ dbConnector=featureDbConn,
+ instanceLabel=instanceLabel,
+ tableFilters=tableFilters,
+ )
+
+ _featureQueryCache[cacheKey] = (time.time(), answer)
return ToolResult(
toolCallId="", toolName="queryFeatureInstance",
diff --git a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py
index 8ef0bfcc..e74f4b55 100644
--- a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py
+++ b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py
@@ -168,6 +168,53 @@ def _buildSubAgentTools(
error=result.get("error"),
)
+ async def _aggregateTable(args: Dict[str, Any], context: Dict[str, Any]):
+ tableName = args.get("tableName", "")
+ aggregate = args.get("aggregate", "")
+ field = args.get("field", "")
+ groupBy = args.get("groupBy")
+ if not tableName:
+ return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="tableName required")
+ if not aggregate:
+ return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="aggregate required (SUM, COUNT, AVG, MIN, MAX)")
+ if not field:
+ return ToolResult(toolCallId="", toolName="aggregateTable", success=False, error="field required")
+ result = provider.aggregateTable(
+ tableName=tableName,
+ featureInstanceId=featureInstanceId,
+ mandateId=mandateId,
+ aggregate=aggregate,
+ field=field,
+ groupBy=groupBy,
+ extraFilters=_recordFilterToList(tableName),
+ )
+ return ToolResult(
+ toolCallId="", toolName="aggregateTable",
+ success="error" not in result,
+ data=json.dumps(result, default=str, ensure_ascii=False)[:30000],
+ error=result.get("error"),
+ )
+
+ registry.register(
+ "aggregateTable", _aggregateTable,
+ description=(
+ "Run an aggregate query on a feature data table. "
+ "Supports SUM, COUNT, AVG, MIN, MAX with optional GROUP BY. "
+ "Example: aggregateTable(tableName='TrusteeDataJournalLine', aggregate='SUM', field='debitAmount', groupBy='costCenter')"
+ ),
+ parameters={
+ "type": "object",
+ "properties": {
+ "tableName": {"type": "string", "description": "Name of the table to aggregate"},
+ "aggregate": {"type": "string", "enum": ["SUM", "COUNT", "AVG", "MIN", "MAX"], "description": "Aggregate function"},
+ "field": {"type": "string", "description": "Field to aggregate (e.g. debitAmount, creditAmount)"},
+ "groupBy": {"type": "string", "description": "Optional field to group by (e.g. costCenter, accountNumber)"},
+ },
+ "required": ["tableName", "aggregate", "field"],
+ },
+ readOnly=True,
+ )
+
registry.register(
"browseTable", _browseTable,
description="List rows from a feature data table with pagination.",
@@ -244,7 +291,8 @@ def _buildSchemaContext(
parts[0] += "."
parts.append(
"You have access to the following data tables. "
- "Use browseTable to list rows and queryTable to filter/search."
+ "Use browseTable to list rows, queryTable to filter/search, "
+ "and aggregateTable for SUM/COUNT/AVG/MIN/MAX with optional GROUP BY."
)
parts.append("")
diff --git a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py
index 25a0ff95..5b68d3ee 100644
--- a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py
+++ b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py
@@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<=", "LIKE", "ILIKE", "IS NULL", "IS NOT NULL"}
+_ALLOWED_AGGREGATES = {"SUM", "COUNT", "AVG", "MIN", "MAX"}
class FeatureDataProvider:
@@ -106,6 +107,65 @@ class FeatureDataProvider:
logger.error(f"browseTable({tableName}) failed: {e}")
return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)}
+ def aggregateTable(
+ self,
+ tableName: str,
+ featureInstanceId: str,
+ mandateId: str,
+ aggregate: str,
+ field: str,
+ groupBy: str = None,
+ extraFilters: Optional[List[Dict[str, Any]]] = None,
+ ) -> Dict[str, Any]:
+ """Run an aggregate query (SUM, COUNT, AVG, MIN, MAX) on a feature table.
+
+ Returns ``{"rows": [{"groupValue": ..., "result": ...}], "aggregate": ..., "field": ..., "groupBy": ...}``.
+ """
+ _validateTableName(tableName)
+ aggregate = aggregate.upper()
+ if aggregate not in _ALLOWED_AGGREGATES:
+ return {"rows": [], "error": f"Unsupported aggregate: {aggregate}. Allowed: {', '.join(sorted(_ALLOWED_AGGREGATES))}"}
+ if not _isValidIdentifier(field):
+ return {"rows": [], "error": f"Invalid field name: {field}"}
+ if groupBy and not _isValidIdentifier(groupBy):
+ return {"rows": [], "error": f"Invalid groupBy field: {groupBy}"}
+
+ conn = self._db.connection
+ scopeFilter = _buildScopeFilter(tableName, featureInstanceId, mandateId, dbConnection=conn)
+ extraWhere, extraParams = _buildFilterClauses(extraFilters)
+
+ fullWhere = scopeFilter["where"]
+ allParams = list(scopeFilter["params"])
+ if extraWhere:
+ fullWhere += " AND " + extraWhere
+ allParams.extend(extraParams)
+
+ try:
+ with conn.cursor() as cur:
+ if groupBy:
+ sql = (
+ f'SELECT "{groupBy}" AS "groupValue", {aggregate}("{field}") AS "result" '
+ f'FROM "{tableName}" WHERE {fullWhere} '
+ f'GROUP BY "{groupBy}" ORDER BY "result" DESC'
+ )
+ else:
+ sql = (
+ f'SELECT {aggregate}("{field}") AS "result" '
+ f'FROM "{tableName}" WHERE {fullWhere}'
+ )
+ cur.execute(sql, allParams)
+ rows = [_serializeRow(dict(r)) for r in cur.fetchall()]
+
+ return {
+ "rows": rows,
+ "aggregate": aggregate,
+ "field": field,
+ "groupBy": groupBy,
+ }
+ except Exception as e:
+ logger.error(f"aggregateTable({tableName}, {aggregate}({field})) failed: {e}")
+ return {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy}
+
def queryTable(
self,
tableName: str,
diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
index 2d5a5dfb..b371e01e 100644
--- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
+++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
@@ -275,11 +275,18 @@ class AgentService:
logger.warning(f"Could not register action tools: {e}")
self._activateToolboxes(registry, config)
+ self._registerRequestToolbox(registry)
return registry
def _activateToolboxes(self, registry: ToolRegistry, config: AgentConfig) -> None:
- """Activate toolboxes dynamically based on user connections and config."""
+ """Activate toolboxes dynamically based on user connections and config.
+
+ For each active toolbox, marks already-registered tools as belonging to
+ that toolbox (via toolSet tag) so the agent loop can filter them.
+ The 'workflow' toolbox is special: its tools are registered from
+ workflowTools module because they have dedicated handlers.
+ """
try:
from modules.serviceCenter.services.serviceAgent.toolboxRegistry import getToolboxRegistry
tbRegistry = getToolboxRegistry()
@@ -297,18 +304,84 @@ class AgentService:
activatedIds = [tb.id for tb in activeToolboxes]
logger.info("Toolbox activation: connections=%s -> active toolboxes=%s", userConnections, activatedIds)
+ activeToolNames: set = set()
+ for tb in activeToolboxes:
+ activeToolNames.update(tb.tools)
+
for tb in activeToolboxes:
if tb.id == "workflow":
try:
from modules.serviceCenter.services.serviceAgent.workflowTools import getWorkflowToolDefinitions
- for toolDef in getWorkflowToolDefinitions():
- registry.register(toolDef)
- logger.info("Registered %d workflow tools from toolbox", len(getWorkflowToolDefinitions()))
+ wfDefs = getWorkflowToolDefinitions()
+ for toolDef in wfDefs:
+ registry.registerFromDefinition(toolDef, toolDef._handler if hasattr(toolDef, "_handler") else None)
+ logger.info("Registered %d workflow tools from toolbox", len(wfDefs))
except Exception as e:
logger.warning("Could not register workflow tools: %s", e)
+
+ inactiveToolNames = set()
+ for tb in tbRegistry.getAllToolboxes():
+ if tb.id not in activatedIds:
+ inactiveToolNames.update(tb.tools)
+ inactiveToolNames -= activeToolNames
+
+ for toolName in inactiveToolNames:
+ registry.unregister(toolName)
+
+ logger.debug("Toolbox activation: %d active tools, %d inactive tools removed", len(activeToolNames), len(inactiveToolNames))
except Exception as e:
logger.warning("Toolbox activation failed: %s", e)
+ def _registerRequestToolbox(self, registry: ToolRegistry) -> None:
+ """Register the requestToolbox meta-tool that lets the agent dynamically activate toolboxes."""
+ try:
+ from modules.serviceCenter.services.serviceAgent.toolboxRegistry import (
+ getToolboxRegistry, buildRequestToolboxDefinition, REQUEST_TOOLBOX_TOOL_NAME,
+ )
+ from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult
+
+ tbRegistry = getToolboxRegistry()
+ allIds = [tb.id for tb in tbRegistry.getAllToolboxes()]
+ registeredNames = set(registry.getToolNames())
+ inactiveIds = [tbId for tbId in allIds if not any(
+ t in registeredNames for t in (tbRegistry.getToolbox(tbId).tools if tbRegistry.getToolbox(tbId) else [])
+ )]
+
+ if not inactiveIds:
+ return
+
+ toolDef = buildRequestToolboxDefinition(inactiveIds)
+
+ async def _handler(args: Dict[str, Any], context: Dict[str, Any] = None) -> ToolResult:
+ toolboxId = args.get("toolboxId", "")
+ reason = args.get("reason", "")
+ tb = tbRegistry.getToolbox(toolboxId)
+ if not tb:
+ return ToolResult(
+ toolCallId="", toolName=REQUEST_TOOLBOX_TOOL_NAME,
+ success=False, error=f"Unknown toolbox: {toolboxId}",
+ )
+ for toolName in tb.tools:
+ if not registry.isValidTool(toolName):
+ logger.info("requestToolbox: tool '%s' from toolbox '%s' not yet registered, skipping", toolName, toolboxId)
+ continue
+ logger.info("requestToolbox: activated toolbox '%s' (%d tools). Reason: %s", toolboxId, len(tb.tools), reason)
+ return ToolResult(
+ toolCallId="", toolName=REQUEST_TOOLBOX_TOOL_NAME,
+ success=True,
+ data=f"Toolbox '{tb.label}' activated with {len(tb.tools)} tools. They are now available.",
+ )
+
+ registry.register(
+ name=REQUEST_TOOLBOX_TOOL_NAME,
+ handler=_handler,
+ description=toolDef["description"],
+ parameters=toolDef["parameters"],
+ )
+ logger.info("Registered requestToolbox meta-tool (inactive toolboxes: %s)", inactiveIds)
+ except Exception as e:
+ logger.warning("Could not register requestToolbox meta-tool: %s", e)
+
async def _persistTrace(self, workflowId: str, summaryData: Dict[str, Any]):
"""Persist the agent trace and workflow artifacts in the knowledge store."""
try:
diff --git a/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py b/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py
index 459c4cab..7646da11 100644
--- a/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py
+++ b/modules/serviceCenter/services/serviceAgent/toolboxRegistry.py
@@ -115,21 +115,35 @@ def _registerDefaultToolboxes() -> None:
label="Core Tools",
description="Basic agent tools: search, read, write, web",
isDefault=True,
- tools=[],
+ tools=[
+ "readFile", "listFiles", "searchInFileContent", "listFolders",
+ "webSearch", "readUrl", "writeFile", "deleteFile", "renameFile",
+ "copyFile", "createFolder", "deleteFolder", "moveFile", "moveFolder",
+ "renameFolder", "tagFile", "replaceInFile", "translateText",
+ "detectLanguage", "queryFeatureInstance",
+ ],
),
ToolboxDefinition(
id="ai",
label="AI Tools",
description="AI-powered analysis and generation",
isDefault=True,
- tools=[],
+ tools=[
+ "summarizeContent", "describeImage", "generateImage",
+ "textToSpeech", "speechToText", "renderDocument",
+ "createChart", "executeCode", "neutralizeData",
+ ],
),
ToolboxDefinition(
id="datasources",
label="Data Sources",
description="Access external data sources and databases",
- isDefault=False,
- tools=[],
+ isDefault=True,
+ tools=[
+ "listConnections", "browseDataSource", "searchDataSource",
+ "downloadFromDataSource", "uploadToExternal",
+ "browseContainer", "readContentObjects", "extractContainerItem",
+ ],
),
ToolboxDefinition(
id="email",
@@ -137,7 +151,11 @@ def _registerDefaultToolboxes() -> None:
description="Read and send emails via Outlook/Gmail",
requiresConnection="microsoft",
isDefault=False,
- tools=[],
+ tools=[
+ "sendMail",
+ "outlook_readEmails", "outlook_searchEmails",
+ "outlook_composeAndDraftReply", "outlook_sendDraft",
+ ],
),
ToolboxDefinition(
id="sharepoint",
@@ -145,7 +163,10 @@ def _registerDefaultToolboxes() -> None:
description="Access SharePoint sites, lists, and files",
requiresConnection="microsoft",
isDefault=False,
- tools=[],
+ tools=[
+ "sharepoint_findDocuments", "sharepoint_readDocuments",
+ "sharepoint_upload",
+ ],
),
ToolboxDefinition(
id="clickup",
@@ -153,7 +174,9 @@ def _registerDefaultToolboxes() -> None:
description="Manage ClickUp tasks and projects",
requiresConnection="clickup",
isDefault=False,
- tools=[],
+ tools=[
+ "clickup_searchTasks", "clickup_createTask", "clickup_updateTask",
+ ],
),
ToolboxDefinition(
id="jira",
@@ -161,7 +184,9 @@ def _registerDefaultToolboxes() -> None:
description="Manage Jira issues and projects",
requiresConnection="jira",
isDefault=False,
- tools=[],
+ tools=[
+ "jira_connect", "jira_exportTickets", "jira_importTickets",
+ ],
),
ToolboxDefinition(
id="workflow",
@@ -175,9 +200,49 @@ def _registerDefaultToolboxes() -> None:
"listWorkflowHistory", "readWorkflowMessages",
],
),
+ ToolboxDefinition(
+ id="trustee",
+ label="Trustee / Accounting",
+ description="Trustee accounting tools: refresh data from external system (e.g. Abacus), query positions and journal entries",
+ featureCode="trustee",
+ isDefault=False,
+ tools=[
+ "trustee_refreshAccountingData",
+ ],
+ ),
]
for tb in defaults:
_toolboxRegistry.registerToolbox(tb)
_registerDefaultToolboxes()
+
+
+REQUEST_TOOLBOX_TOOL_NAME = "requestToolbox"
+
+
+def buildRequestToolboxDefinition(availableToolboxIds: List[str]) -> dict:
+ """Build the tool definition dict for the requestToolbox meta-tool."""
+ return {
+ "name": REQUEST_TOOLBOX_TOOL_NAME,
+ "description": (
+ "Request additional specialized tools for the current task. "
+ "Call this when you need tools from a specific toolbox that is not yet active. "
+ "After calling, the requested tools will be available in the next round."
+ ),
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "toolboxId": {
+ "type": "string",
+ "enum": availableToolboxIds,
+ "description": "ID of the toolbox to activate",
+ },
+ "reason": {
+ "type": "string",
+ "description": "Brief reason why this toolbox is needed",
+ },
+ },
+ "required": ["toolboxId"],
+ },
+ }
diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py
index 67344206..40055b11 100644
--- a/modules/workflows/automation2/executionEngine.py
+++ b/modules/workflows/automation2/executionEngine.py
@@ -85,6 +85,25 @@ def _getExecutor(
return None
+_stepMeta: Dict[str, Dict[str, str]] = {}
+
+
+def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None:
+ """Emit a step-log SSE event to any listening client for this run."""
+ try:
+ from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager
+ em = get_event_manager()
+ queueId = f"run-trace-{runId}"
+ if not em.has_queue(queueId):
+ return
+ import asyncio
+ loop = asyncio.get_event_loop()
+ if loop.is_running():
+ asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing"))
+ except Exception:
+ pass
+
+
def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "running", inputSnapshot: Dict = None) -> Optional[str]:
"""Create an AutoStepLog entry. Returns the step log ID or None if interface unavailable."""
if not iface or not runId:
@@ -92,6 +111,7 @@ def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str =
try:
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog
stepId = str(uuid.uuid4())
+ startedAt = time.time()
iface.db.recordCreate(AutoStepLog, {
"id": stepId,
"runId": runId,
@@ -99,7 +119,12 @@ def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str =
"nodeType": nodeType,
"status": status,
"inputSnapshot": inputSnapshot or {},
- "startedAt": time.time(),
+ "startedAt": startedAt,
+ })
+ _stepMeta[stepId] = {"runId": runId, "nodeId": nodeId, "nodeType": nodeType}
+ _emitStepEvent(runId, {
+ "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType,
+ "status": status, "startedAt": startedAt,
})
return stepId
except Exception as e:
@@ -114,9 +139,10 @@ def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error:
return
try:
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog
+ completedAt = time.time()
updates: Dict[str, Any] = {
"status": status,
- "completedAt": time.time(),
+ "completedAt": completedAt,
}
if output is not None:
updates["output"] = output
@@ -129,6 +155,14 @@ def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error:
if retryCount:
updates["retryCount"] = retryCount
iface.db.recordModify(AutoStepLog, stepId, updates)
+ meta = _stepMeta.pop(stepId, None)
+ if meta:
+ _emitStepEvent(meta["runId"], {
+ "id": stepId, "runId": meta["runId"], "nodeId": meta["nodeId"],
+ "nodeType": meta["nodeType"], "status": status,
+ "completedAt": completedAt, "durationMs": durationMs,
+ "error": error, "tokensUsed": tokensUsed, "retryCount": retryCount,
+ })
except Exception as e:
logger.debug("Could not update AutoStepLog %s: %s", stepId, e)
@@ -282,7 +316,11 @@ async def executeGraph(
nodeOutputs[bnid] = None
continue
_rStepStart = time.time()
- _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running")
+ _rInputSnap = {"_loopItem": items[next_index], "_loopIndex": next_index}
+ for _rSrc, _, _ in connectionMap.get(bnid, []):
+ if _rSrc in nodeOutputs:
+ _rInputSnap[_rSrc] = nodeOutputs[_rSrc]
+ _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap)
try:
result, _rRetry = await _executeWithRetry(executor, body_node, context)
nodeOutputs[bnid] = result
@@ -310,7 +348,7 @@ async def executeGraph(
nodeOutputs[bnid] = {"error": str(ex), "success": False}
if runId and automation2_interface:
automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs)
- return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid}
+ return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid, "runId": runId}
next_index += 1
if loop_node_id:
nodeOutputs[loop_node_id] = {"items": items, "count": len(items)}
@@ -330,7 +368,11 @@ async def executeGraph(
nodeType = node.get("type", "")
if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs):
logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId)
- _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped")
+ _skipInputSnap = {"_skipReason": "inactive_branch"}
+ for _sSrc, _, _ in connectionMap.get(nodeId, []):
+ if _sSrc in nodeOutputs:
+ _skipInputSnap[_sSrc] = nodeOutputs[_sSrc]
+ _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap)
if _skipStepId:
_updateStepLog(automation2_interface, _skipStepId, "skipped")
continue
@@ -351,7 +393,11 @@ async def executeGraph(
_stepId = None
try:
if nodeType == "flow.loop":
- _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running")
+ _loopInputSnap = {}
+ for _lSrc, _, _ in connectionMap.get(nodeId, []):
+ if _lSrc in nodeOutputs:
+ _loopInputSnap[_lSrc] = nodeOutputs[_lSrc]
+ _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap)
result = await executor.execute(node, context)
items = result.get("items") or []
body_ids = getLoopBodyNodeIds(nodeId, connectionMap)
@@ -372,7 +418,11 @@ async def executeGraph(
nodeOutputs[bnid] = None
continue
_bStepStart = time.time()
- _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running")
+ _bInputSnap = {"_loopItem": item, "_loopIndex": idx}
+ for _bSrc, _, _ in connectionMap.get(bnid, []):
+ if _bSrc in nodeOutputs:
+ _bInputSnap[_bSrc] = nodeOutputs[_bSrc]
+ _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnap)
try:
bres, _bRetry = await _executeWithRetry(bexec, body_node, context)
nodeOutputs[bnid] = bres
@@ -401,10 +451,11 @@ async def executeGraph(
nodeOutputs[bnid] = {"error": str(ex), "success": False}
if runId and automation2_interface:
automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs)
- return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid}
+ return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid, "runId": runId}
nodeOutputs[nodeId] = {"items": items, "count": len(items)}
_updateStepLog(automation2_interface, _stepId, "completed",
- output={"items": len(items)}, durationMs=int((time.time() - _stepStartMs) * 1000))
+ output={"iterationCount": len(items), "items": len(items)},
+ durationMs=int((time.time() - _stepStartMs) * 1000))
logger.info("executeGraph flow.loop done: %d iterations", len(items))
else:
_stepStartMs = time.time()
@@ -489,6 +540,8 @@ async def executeGraph(
_updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs)
if runId and automation2_interface:
automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs)
+ if runId:
+ _emitStepEvent(runId, {"type": "run_failed", "runId": runId, "status": "failed", "error": str(e), "failedNode": nodeId})
try:
_wfObj = automation2_interface.getWorkflow(workflowId) if automation2_interface and workflowId else None
_wfDict = _wfObj if isinstance(_wfObj, dict) else (
@@ -509,10 +562,13 @@ async def executeGraph(
"error": str(e),
"nodeOutputs": nodeOutputs,
"failedNode": nodeId,
+ "runId": runId,
}
if runId and automation2_interface:
automation2_interface.updateRun(runId, status="completed", nodeOutputs=nodeOutputs)
+ if runId:
+ _emitStepEvent(runId, {"type": "run_complete", "runId": runId, "status": "completed"})
logger.info(
"executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s",
list(nodeOutputs.keys()),
@@ -522,4 +578,5 @@ async def executeGraph(
"success": True,
"nodeOutputs": nodeOutputs,
"stopped": context.get("_stopped", False),
+ "runId": runId,
}
diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py
index 1a81c3eb..6a9f2956 100644
--- a/modules/workflows/methods/methodBase.py
+++ b/modules/workflows/methods/methodBase.py
@@ -243,8 +243,10 @@ class MethodBase:
# Handle List[str], List[int], etc.
if expectedType.startswith('List['):
- if not isinstance(value, list):
- raise ValueError(f"Expected list for type '{expectedType}', got {type(value).__name__}")
+ if isinstance(value, str):
+ value = [v.strip() for v in value.split(',') if v.strip()] if ',' in value else [value]
+ elif not isinstance(value, list):
+ value = [value]
# Extract inner type
innerType = expectedType[5:-1].strip() # Remove "List[" and "]"
if innerType in typeMap:
diff --git a/modules/workflows/methods/methodTrustee/actions/processDocuments.py b/modules/workflows/methods/methodTrustee/actions/processDocuments.py
index 3f95836d..c4bf9df1 100644
--- a/modules/workflows/methods/methodTrustee/actions/processDocuments.py
+++ b/modules/workflows/methods/methodTrustee/actions/processDocuments.py
@@ -204,6 +204,56 @@ def _recordToPosition(record: Dict[str, Any], documentId: Optional[str], feature
}
+def _resolveDocumentList(documentListParam, services) -> List[tuple]:
+ """Resolve documentList from either Graph-Editor output (list of dicts) or Chat references.
+
+ Returns list of (data_dict, fileId, fileName, mimeType) tuples.
+ """
+ results = []
+
+ if isinstance(documentListParam, list) and documentListParam:
+ first = documentListParam[0]
+ if isinstance(first, dict) and ("documentData" in first or "documentName" in first):
+ for doc in documentListParam:
+ rawData = doc.get("documentData")
+ if not rawData:
+ continue
+ try:
+ data = json.loads(rawData) if isinstance(rawData, str) else rawData
+ except (json.JSONDecodeError, TypeError):
+ continue
+ fileId = (doc.get("validationMetadata") or {}).get("fileId") or doc.get("fileId", "")
+ fileName = doc.get("documentName") or doc.get("fileName") or "document"
+ mimeType = doc.get("mimeType") or doc.get("documentMimeType") or "application/json"
+ results.append((data, fileId, fileName, mimeType))
+ if results:
+ return results
+
+ chatService = getattr(services, "chat", None)
+ if not chatService:
+ return results
+
+ try:
+ docList = DocumentReferenceList.from_string_list(
+ documentListParam if isinstance(documentListParam, list) else [documentListParam]
+ )
+ chatDocuments = chatService.getChatDocumentsFromDocumentList(docList)
+ for chatDoc in (chatDocuments or []):
+ rawBytes = chatService.getFileData(chatDoc.fileId)
+ if not rawBytes:
+ continue
+ content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
+ try:
+ data = json.loads(content) if isinstance(content, str) else content
+ except (json.JSONDecodeError, TypeError):
+ continue
+ results.append((data, chatDoc.fileId, chatDoc.fileName or "document", chatDoc.mimeType or "application/json"))
+ except Exception as e:
+ logger.debug("_resolveDocumentList chat fallback failed: %s", e)
+
+ return results
+
+
async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Resolve documentList to ChatDocuments, load extraction JSON per document,
@@ -218,11 +268,8 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
return ActionResult.isFailure(error="featureInstanceId is required")
try:
- docList = DocumentReferenceList.from_string_list(
- documentListParam if isinstance(documentListParam, list) else [documentListParam]
- )
- chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
- if not chatDocuments:
+ extractionDocs = _resolveDocumentList(documentListParam, self.services)
+ if not extractionDocs:
return ActionResult.isFailure(error="No documents found for documentList")
from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
@@ -237,17 +284,11 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
allDocumentIds = []
autoMatchedPositionIds = []
- for chatDoc in chatDocuments:
- rawBytes = self.services.chat.getFileData(chatDoc.fileId)
- if not rawBytes:
- logger.warning(f"Could not load file {chatDoc.fileId}, skipping")
- continue
- content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
- data = json.loads(content) if isinstance(content, str) else content
+ for data, fileId, fileName, mimeType in extractionDocs:
documentType = data.get("documentType")
extractedData = data.get("extractedData")
- fileId = data.get("fileId") or chatDoc.fileId
- fileName = data.get("fileName") or chatDoc.fileName or "document"
+ fileId = data.get("fileId") or fileId
+ fileName = data.get("fileName") or fileName or "document"
records = extractedData if isinstance(extractedData, list) else [extractedData] if extractedData else []
if not records:
@@ -256,7 +297,7 @@ async def processDocuments(self, parameters: Dict[str, Any]) -> ActionResult:
docPayload = {
"fileId": fileId,
"documentName": fileName,
- "documentMimeType": chatDoc.mimeType or "application/octet-stream",
+ "documentMimeType": mimeType or "application/octet-stream",
"sourceType": "workflow",
"documentType": documentType,
}
diff --git a/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py b/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py
new file mode 100644
index 00000000..9e6276f5
--- /dev/null
+++ b/modules/workflows/methods/methodTrustee/actions/refreshAccountingData.py
@@ -0,0 +1,132 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Refresh accounting data from external system (e.g. Abacus) into local TrusteeData* tables.
+Checks lastSyncAt to avoid redundant imports unless forceRefresh is set.
+"""
+
+import json
+import logging
+import time
+from typing import Dict, Any
+
+from modules.datamodels.datamodelChat import ActionResult, ActionDocument
+
+logger = logging.getLogger(__name__)
+
+_SYNC_THRESHOLD_SECONDS = 3600
+
+
+async def refreshAccountingData(self, parameters: Dict[str, Any]) -> ActionResult:
+ """Import/refresh accounting data from the configured external system.
+
+ If data was synced within the last hour and forceRefresh is not set,
+ returns cached counts without triggering an external sync.
+ """
+ featureInstanceId = parameters.get("featureInstanceId") or (
+ self.services.featureInstanceId if hasattr(self.services, "featureInstanceId") else None
+ )
+ forceRefresh = parameters.get("forceRefresh", False)
+ if isinstance(forceRefresh, str):
+ forceRefresh = forceRefresh.lower() in ("true", "1", "yes")
+ dateFrom = parameters.get("dateFrom") or None
+ dateTo = parameters.get("dateTo") or None
+
+ if not featureInstanceId:
+ return ActionResult.isFailure(error="featureInstanceId is required")
+
+ try:
+ from modules.features.trustee.interfaceFeatureTrustee import getInterface as getTrusteeInterface
+ from modules.features.trustee.datamodelFeatureTrustee import (
+ TrusteeAccountingConfig,
+ TrusteeDataAccount,
+ TrusteeDataJournalEntry,
+ TrusteeDataJournalLine,
+ TrusteeDataContact,
+ TrusteeDataAccountBalance,
+ )
+
+ trusteeInterface = getTrusteeInterface(
+ self.services.user,
+ mandateId=self.services.mandateId,
+ featureInstanceId=featureInstanceId,
+ )
+
+ cfgRecords = trusteeInterface.db.getRecordset(
+ TrusteeAccountingConfig,
+ recordFilter={"featureInstanceId": featureInstanceId, "isActive": True},
+ )
+ if not cfgRecords:
+ return ActionResult.isFailure(error="No active accounting configuration found for this Trustee instance")
+
+ cfgRecord = cfgRecords[0]
+ lastSyncAt = cfgRecord.get("lastSyncAt") or 0
+ lastSyncStatus = cfgRecord.get("lastSyncStatus") or ""
+
+ isFresh = (
+ lastSyncAt
+ and (time.time() - lastSyncAt) < _SYNC_THRESHOLD_SECONDS
+ and lastSyncStatus in ("success", "partial")
+ )
+
+ if isFresh and not forceRefresh:
+ counts = _getCachedCounts(trusteeInterface, featureInstanceId)
+ counts["synced"] = False
+ counts["lastSyncAt"] = lastSyncAt
+ counts["lastSyncStatus"] = lastSyncStatus
+ counts["message"] = f"Data is fresh (synced {int(time.time() - lastSyncAt)}s ago). Use forceRefresh=true to re-import."
+ return ActionResult.isSuccess(documents=[
+ ActionDocument(
+ documentName="refresh_result",
+ documentData=json.dumps(counts, ensure_ascii=False),
+ mimeType="application/json",
+ )
+ ])
+
+ from modules.features.trustee.accounting.accountingDataSync import AccountingDataSync
+
+ sync = AccountingDataSync(trusteeInterface)
+ summary = await sync.importData(
+ featureInstanceId=featureInstanceId,
+ mandateId=self.services.mandateId,
+ dateFrom=dateFrom,
+ dateTo=dateTo,
+ )
+ summary["synced"] = True
+ summary.pop("startedAt", None)
+ summary.pop("finishedAt", None)
+
+ return ActionResult.isSuccess(documents=[
+ ActionDocument(
+ documentName="refresh_result",
+ documentData=json.dumps(summary, ensure_ascii=False),
+ mimeType="application/json",
+ )
+ ])
+ except Exception as e:
+ logger.exception("refreshAccountingData failed")
+ return ActionResult.isFailure(error=str(e))
+
+
+def _getCachedCounts(trusteeInterface, featureInstanceId: str) -> Dict[str, Any]:
+ """Count existing records per TrusteeData* table without triggering an external sync."""
+ from modules.features.trustee.datamodelFeatureTrustee import (
+ TrusteeDataAccount,
+ TrusteeDataJournalEntry,
+ TrusteeDataJournalLine,
+ TrusteeDataContact,
+ TrusteeDataAccountBalance,
+ )
+ counts = {}
+ for label, model in [
+ ("accounts", TrusteeDataAccount),
+ ("journalEntries", TrusteeDataJournalEntry),
+ ("journalLines", TrusteeDataJournalLine),
+ ("contacts", TrusteeDataContact),
+ ("accountBalances", TrusteeDataAccountBalance),
+ ]:
+ records = trusteeInterface.db.getRecordset(
+ model, recordFilter={"featureInstanceId": featureInstanceId}
+ )
+ counts[label] = len(records) if records else 0
+ return counts
diff --git a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py
index 4633f32e..555a8623 100644
--- a/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py
+++ b/modules/workflows/methods/methodTrustee/actions/syncToAccounting.py
@@ -16,6 +16,43 @@ from modules.datamodels.datamodelDocref import DocumentReferenceList
logger = logging.getLogger(__name__)
+def _resolveFirstDocument(documentListParam, services) -> Dict[str, Any] | None:
+ """Resolve the first document from either Graph-Editor output (list of dicts) or Chat references.
+
+ Returns the parsed JSON dict or None.
+ """
+ if isinstance(documentListParam, list) and documentListParam:
+ first = documentListParam[0]
+ if isinstance(first, dict) and ("documentData" in first or "documentName" in first):
+ rawData = first.get("documentData")
+ if rawData:
+ try:
+ return json.loads(rawData) if isinstance(rawData, str) else rawData
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ chatService = getattr(services, "chat", None)
+ if not chatService:
+ return None
+
+ try:
+ docList = DocumentReferenceList.from_string_list(
+ documentListParam if isinstance(documentListParam, list) else [documentListParam]
+ )
+ chatDocuments = chatService.getChatDocumentsFromDocumentList(docList)
+ if not chatDocuments:
+ return None
+ doc = chatDocuments[0]
+ rawBytes = chatService.getFileData(doc.fileId)
+ if not rawBytes:
+ return None
+ content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
+ return json.loads(content) if isinstance(content, str) else content
+ except Exception as e:
+ logger.debug("_resolveFirstDocument chat fallback failed: %s", e)
+ return None
+
+
async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Push trustee positions to the configured accounting system.
@@ -30,21 +67,10 @@ async def syncToAccounting(self, parameters: Dict[str, Any]) -> ActionResult:
return ActionResult.isFailure(error="documentList is required (reference to processDocuments result)")
try:
- docList = DocumentReferenceList.from_string_list(
- documentListParam if isinstance(documentListParam, list) else [documentListParam]
- )
- chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(docList)
- if not chatDocuments:
+ data = _resolveFirstDocument(documentListParam, self.services)
+ if data is None:
return ActionResult.isFailure(error="No document found for documentList; ensure processDocuments ran before this action")
- # Expect one document (JSON with positionIds, documentIds)
- doc = chatDocuments[0]
- rawBytes = self.services.chat.getFileData(doc.fileId)
- if not rawBytes:
- return ActionResult.isFailure(error=f"Could not load document content for fileId={doc.fileId}")
-
- content = rawBytes.decode("utf-8") if isinstance(rawBytes, bytes) else rawBytes
- data = json.loads(content) if isinstance(content, str) else content
positionIds = data.get("positionIds") or []
if not positionIds:
return ActionResult.isSuccess(documents=[
diff --git a/modules/workflows/methods/methodTrustee/methodTrustee.py b/modules/workflows/methods/methodTrustee/methodTrustee.py
index fefeaa52..5be232f8 100644
--- a/modules/workflows/methods/methodTrustee/methodTrustee.py
+++ b/modules/workflows/methods/methodTrustee/methodTrustee.py
@@ -12,6 +12,7 @@ from modules.shared.frontendTypes import FrontendType
from .actions.extractFromFiles import extractFromFiles
from .actions.processDocuments import processDocuments
from .actions.syncToAccounting import syncToAccounting
+from .actions.refreshAccountingData import refreshAccountingData
logger = logging.getLogger(__name__)
@@ -112,9 +113,46 @@ class MethodTrustee(MethodBase):
},
execute=syncToAccounting.__get__(self, self.__class__),
),
+ "refreshAccountingData": WorkflowActionDefinition(
+ actionId="trustee.refreshAccountingData",
+ description="Import/refresh accounting data from external system (e.g. Abacus) into local tables. Checks cache freshness; use forceRefresh to re-import.",
+ dynamicMode=True,
+ parameters={
+ "featureInstanceId": WorkflowActionParameter(
+ name="featureInstanceId",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=True,
+ description="Trustee feature instance ID",
+ ),
+ "forceRefresh": WorkflowActionParameter(
+ name="forceRefresh",
+ type="bool",
+ frontendType=FrontendType.CHECKBOX,
+ required=False,
+ description="Force re-import even if data is fresh (default: false)",
+ ),
+ "dateFrom": WorkflowActionParameter(
+ name="dateFrom",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=False,
+ description="Start date filter for journal entries (YYYY-MM-DD)",
+ ),
+ "dateTo": WorkflowActionParameter(
+ name="dateTo",
+ type="str",
+ frontendType=FrontendType.TEXT,
+ required=False,
+ description="End date filter for journal entries (YYYY-MM-DD)",
+ ),
+ },
+ execute=refreshAccountingData.__get__(self, self.__class__),
+ ),
}
self._validateActions()
self.extractFromFiles = extractFromFiles.__get__(self, self.__class__)
self.processDocuments = processDocuments.__get__(self, self.__class__)
self.syncToAccounting = syncToAccounting.__get__(self, self.__class__)
+ self.refreshAccountingData = refreshAccountingData.__get__(self, self.__class__)
diff --git a/modules/workflows/scheduler/mainScheduler.py b/modules/workflows/scheduler/mainScheduler.py
index ec156fd2..5f888237 100644
--- a/modules/workflows/scheduler/mainScheduler.py
+++ b/modules/workflows/scheduler/mainScheduler.py
@@ -454,6 +454,11 @@ def stop() -> bool:
return _scheduler.stop()
+def syncNow() -> dict:
+ """Trigger an immediate incremental sync. Used by /schedule-sync endpoint."""
+ return _scheduler._syncScheduledWorkflows()
+
+
def setMainLoop(loop) -> None:
"""Set the main event loop for thread-bridge."""
_setMainLoop(loop)