diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py index 81a2175e..12cfcbe7 100644 --- a/modules/aicore/aicorePluginAnthropic.py +++ b/modules/aicore/aicorePluginAnthropic.py @@ -71,6 +71,7 @@ class AiAnthropic(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 9), (OperationTypeEnum.DATA_EXTRACT, 8), (OperationTypeEnum.AGENT, 9), + (OperationTypeEnum.DATA_QUERY, 9), ), version="claude-sonnet-4-5-20250929", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.003 + (bytesReceived / 4 / 1000) * 0.015 @@ -97,6 +98,7 @@ class AiAnthropic(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 8), (OperationTypeEnum.DATA_EXTRACT, 7), (OperationTypeEnum.AGENT, 7), + (OperationTypeEnum.DATA_QUERY, 10), ), version="claude-haiku-4-5-20251001", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.001 + (bytesReceived / 4 / 1000) * 0.005 @@ -123,6 +125,7 @@ class AiAnthropic(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 10), (OperationTypeEnum.DATA_EXTRACT, 9), (OperationTypeEnum.AGENT, 10), + (OperationTypeEnum.DATA_QUERY, 3), ), version="claude-opus-4-6", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.025 diff --git a/modules/aicore/aicorePluginMistral.py b/modules/aicore/aicorePluginMistral.py index 885addcf..d2ad0694 100644 --- a/modules/aicore/aicorePluginMistral.py +++ b/modules/aicore/aicorePluginMistral.py @@ -67,6 +67,7 @@ class AiMistral(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 9), (OperationTypeEnum.DATA_EXTRACT, 8), (OperationTypeEnum.AGENT, 8), + (OperationTypeEnum.DATA_QUERY, 7), ), version="mistral-large-latest", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0005 + (bytesReceived / 4 / 1000) * 0.0015 @@ -93,6 +94,7 @@ class AiMistral(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 8), (OperationTypeEnum.DATA_EXTRACT, 7), (OperationTypeEnum.AGENT, 6), + (OperationTypeEnum.DATA_QUERY, 9), ), version="mistral-small-latest", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00006 + (bytesReceived / 4 / 1000) * 0.00018 diff --git a/modules/aicore/aicorePluginOpenai.py b/modules/aicore/aicorePluginOpenai.py index 3b9f2c5f..ae5a02b3 100644 --- a/modules/aicore/aicorePluginOpenai.py +++ b/modules/aicore/aicorePluginOpenai.py @@ -68,6 +68,7 @@ class AiOpenai(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 10), (OperationTypeEnum.DATA_EXTRACT, 7), (OperationTypeEnum.AGENT, 9), + (OperationTypeEnum.DATA_QUERY, 8), ), version="gpt-4o", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0025 + (bytesReceived / 4 / 1000) * 0.01 @@ -95,6 +96,7 @@ class AiOpenai(BaseConnectorAi): (OperationTypeEnum.DATA_GENERATE, 9), (OperationTypeEnum.DATA_EXTRACT, 7), (OperationTypeEnum.AGENT, 8), + (OperationTypeEnum.DATA_QUERY, 10), ), version="gpt-4o-mini", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00015 + (bytesReceived / 4 / 1000) * 0.0006 diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 662eded2..a581a7e8 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -32,6 +32,7 @@ class OperationTypeEnum(str, Enum): # Agent Operations AGENT = "agent" # Agent loop: reasoning + tool use + DATA_QUERY = "dataQuery" # Data query sub-agent: fast model, schema-aware # Embedding Operations EMBEDDING = "embedding" # Text → vector conversion for semantic search diff --git a/modules/features/trustee/accounting/accountingDataSync.py b/modules/features/trustee/accounting/accountingDataSync.py index d393de76..e0584a02 100644 --- a/modules/features/trustee/accounting/accountingDataSync.py +++ b/modules/features/trustee/accounting/accountingDataSync.py @@ -5,10 +5,13 @@ Flow: load config → resolve connector → fetch data → clear old records → write new records → compute balances. """ +import json as _json import logging +import os import time from collections import defaultdict -from typing import Dict, Any, Optional +from pathlib import Path +from typing import Dict, Any, List, Optional from .accountingConnectorBase import BaseAccountingConnector from .accountingRegistry import _getAccountingRegistry @@ -16,6 +19,45 @@ from .accountingRegistry import _getAccountingRegistry logger = logging.getLogger(__name__) +_DEBUG_SYNC_DIR = Path("D:/Athi/Local/Web/poweron/local/debug/sync") + + +def _debugSyncDir() -> Path: + _DEBUG_SYNC_DIR.mkdir(parents=True, exist_ok=True) + return _DEBUG_SYNC_DIR + + +def _isDebugEnabled() -> bool: + try: + from modules.shared.configuration import APP_CONFIG + return APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False) is True or str(APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", "")).lower() == "true" + except Exception: + return False + + +def _dumpSyncData(tag: str, rows: list): + """Write raw connector data to a timestamped JSON file in local/debug/sync/.""" + if not _isDebugEnabled(): + return + try: + d = _debugSyncDir() + ts = time.strftime("%Y%m%d-%H%M%S") + path = d / f"{ts}_{tag}.json" + serializable = [] + for r in rows: + if isinstance(r, dict): + serializable.append(r) + elif hasattr(r, "__dict__"): + serializable.append({k: v for k, v in r.__dict__.items() if not k.startswith("_")}) + else: + serializable.append(str(r)) + with open(path, "w", encoding="utf-8") as f: + _json.dump({"count": len(serializable), "rows": serializable}, f, ensure_ascii=False, indent=2, default=str) + logger.info(f"Debug sync dump: {path.name} ({len(serializable)} rows)") + except Exception as e: + logger.warning(f"Failed to write debug sync dump for {tag}: {e}") + + class AccountingDataSync: """Imports accounting data (read-only) from an external system into local TrusteeData* tables.""" @@ -86,6 +128,7 @@ class AccountingDataSync: # 1) Chart of accounts try: charts = await connector.getChartOfAccounts(connConfig) + _dumpSyncData("accounts", charts) fetchedAccountNumbers = [acc.accountNumber for acc in charts if acc.accountNumber] self._clearTable(TrusteeDataAccount, featureInstanceId) for acc in charts: @@ -105,6 +148,7 @@ class AccountingDataSync: # 2) Journal entries + lines (pass already-fetched chart to avoid redundant API call) try: rawEntries = await connector.getJournalEntries(connConfig, dateFrom=dateFrom, dateTo=dateTo, accountNumbers=fetchedAccountNumbers or None) + _dumpSyncData("journalEntries", rawEntries) self._clearTable(TrusteeDataJournalEntry, featureInstanceId) self._clearTable(TrusteeDataJournalLine, featureInstanceId) lineCount = 0 @@ -146,11 +190,13 @@ class AccountingDataSync: contactCount = 0 customers = await connector.getCustomers(connConfig) + _dumpSyncData("customers", customers) for c in customers: self._if.db.recordCreate(TrusteeDataContact, self._mapContact(c, "customer", scope)) contactCount += 1 vendors = await connector.getVendors(connConfig) + _dumpSyncData("vendors", vendors) for v in vendors: self._if.db.recordCreate(TrusteeDataContact, self._mapContact(v, "vendor", scope)) contactCount += 1 diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index ae51f0cd..85188c52 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -235,6 +235,8 @@ def _buildFeatureDataSourceContext(featureDataSourceIds: List[str]) -> str: parts = [ "The user has attached data from the following feature instances.", "Use queryFeatureInstance(featureInstanceId, question) to query this data.", + "IMPORTANT: Formulate ONE comprehensive question per call that covers everything you need.", + "The sub-agent can browse, filter, and aggregate -- ask precisely and avoid repeated calls.", "", ] found = False diff --git a/modules/serviceCenter/services/serviceAgent/agentLoop.py b/modules/serviceCenter/services/serviceAgent/agentLoop.py index 998816d9..0ceb2a40 100644 --- a/modules/serviceCenter/services/serviceAgent/agentLoop.py +++ b/modules/serviceCenter/services/serviceAgent/agentLoop.py @@ -169,7 +169,7 @@ async def runAgentLoop( aiRequest = AiCallRequest( prompt="", options=AiCallOptions( - operationType=OperationTypeEnum.AGENT, + operationType=config.operationType or OperationTypeEnum.AGENT, temperature=config.temperature ), messages=conversation.messages, diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py index 04e32ad8..a699ab4a 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py @@ -185,8 +185,12 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services): description=( "Query data from a feature instance (e.g. Trustee, CommCoach). " "Delegates to a specialized sub-agent that knows the feature's data schema " - "and can browse/query its tables. Use this when the user has attached " - "feature data sources or asks about feature-specific data." + "and can browse, filter, and aggregate its tables. Use this when the user " + "has attached feature data sources or asks about feature-specific data.\n\n" + "GUIDELINES:\n" + "- Ask a precise, self-contained question (include all context the sub-agent needs).\n" + "- Combine related data needs into ONE call instead of multiple small ones.\n" + "- Avoid calling this tool repeatedly with slight variations of the same question." ), parameters={ "type": "object", diff --git a/modules/serviceCenter/services/serviceAgent/datamodelAgent.py b/modules/serviceCenter/services/serviceAgent/datamodelAgent.py index e09e16b8..053569b0 100644 --- a/modules/serviceCenter/services/serviceAgent/datamodelAgent.py +++ b/modules/serviceCenter/services/serviceAgent/datamodelAgent.py @@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional from enum import Enum from pydantic import BaseModel, Field from modules.shared.timeUtils import getUtcTimestamp +from modules.datamodels.datamodelAi import OperationTypeEnum import uuid @@ -91,6 +92,7 @@ class AgentConfig(BaseModel): initialToolboxes: List[str] = Field(default_factory=lambda: ["core"]) availableToolboxes: List[str] = Field(default_factory=list) temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) + operationType: Optional[OperationTypeEnum] = Field(default=None, description="Override the default AGENT operationType for model selection") class AgentState(BaseModel): diff --git a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py index 9db820ca..7ecc41e1 100644 --- a/modules/serviceCenter/services/serviceAgent/featureDataAgent.py +++ b/modules/serviceCenter/services/serviceAgent/featureDataAgent.py @@ -24,8 +24,8 @@ from modules.serviceCenter.services.serviceAgent.featureDataProvider import Feat logger = logging.getLogger(__name__) -_MAX_ROUNDS = 5 -_MAX_COST_CHF = 0.10 +_MAX_ROUNDS = 8 +_MAX_COST_CHF = 0.15 async def runFeatureDataAgent( @@ -69,25 +69,37 @@ async def runFeatureDataAgent( if realCols: meta["fields"] = realCols - schemaContext = _buildSchemaContext(featureCode, instanceLabel, selectedTables) - prompt = f"{schemaContext}\n\nUser question:\n{question}" + systemPrompt = _buildSchemaContext(featureCode, instanceLabel, selectedTables) - config = AgentConfig(maxRounds=_MAX_ROUNDS, maxCostCHF=_MAX_COST_CHF) + config = AgentConfig( + maxRounds=_MAX_ROUNDS, + maxCostCHF=_MAX_COST_CHF, + operationType=OperationTypeEnum.DATA_QUERY, + ) + + costAccumulator = 0.0 + + async def _trackingAiCallFn(req): + nonlocal costAccumulator + resp = await aiCallFn(req) + costAccumulator += resp.priceCHF + return resp async def _getWorkflowCost() -> float: - return 0.0 + return costAccumulator result = "" async for event in runAgentLoop( - prompt=prompt, + prompt=question, toolRegistry=registry, config=config, - aiCallFn=aiCallFn, + aiCallFn=_trackingAiCallFn, getWorkflowCostFn=_getWorkflowCost, workflowId=f"fda-{featureInstanceId[:8]}", userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, + systemPromptOverride=systemPrompt, ): if event.type == AgentEventTypeEnum.FINAL and event.content: result = event.content @@ -282,7 +294,7 @@ def _buildSchemaContext( instanceLabel: str, selectedTables: List[Dict[str, Any]], ) -> str: - """Build a system-level context block describing available tables.""" + """Build a system prompt describing available tables and query strategy.""" tableNames = [] tableBlocks = [] @@ -298,34 +310,28 @@ def _buildSchemaContext( block += f"\n Fields: {', '.join(fields)}" tableBlocks.append(block) - parts = [ - f"You are a data query assistant for the '{featureCode}' feature", - ] + header = f"You are a data query assistant for the '{featureCode}' feature" if instanceLabel: - parts[0] += f' (instance: "{instanceLabel}")' - parts[0] += "." + header += f' (instance: "{instanceLabel}")' + header += "." - parts.append("") - parts.append("AVAILABLE TABLES (use EXACTLY these names as tableName parameter):") - parts.extend(tableBlocks) - parts.append("") - parts.append( - "IMPORTANT RULES:\n" - f"- The ONLY valid tableName values are: {tableNames}\n" - "- Do NOT invent table names, do NOT use UUIDs or IDs as table names.\n" - "- Field names are plain column names (e.g. 'accountNumber', 'periodYear').\n" - " Do NOT prefix field names with UUIDs, table names, or dots.\n" - "- If unsure about column names, call browseTable with only tableName (no fields)\n" - " to see actual columns first." - ) - parts.append("") - parts.append( - "Tools: browseTable (list rows), queryTable (filter/search), " - "aggregateTable (SUM/COUNT/AVG/MIN/MAX with optional GROUP BY)." - ) - parts.append("") - parts.append( - "Answer the user's question using the data from these tables. " - "Be precise, cite row counts, and format data clearly." - ) + parts = [ + header, + "", + "AVAILABLE TABLES (use EXACTLY these names as tableName parameter):", + *tableBlocks, + "", + f"Valid tableName values: {tableNames}", + "Field names are plain column names (e.g. 'accountNumber', 'periodYear').", + "", + "QUERY STRATEGY:", + "1. If unsure about columns, call browseTable(tableName) first to inspect the schema.", + "2. Use queryTable with filters for targeted lookups.", + "3. Use aggregateTable for SUM/COUNT/AVG/MIN/MAX with optional GROUP BY.", + "4. Combine what you need into as few tool calls as possible.", + "", + "RULES:", + "- Do NOT invent table or field names. Do NOT prefix fields with UUIDs or dots.", + "- Answer concisely. Cite row counts and key values.", + ] return "\n".join(parts) diff --git a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py index a44dcd07..872e47eb 100644 --- a/modules/serviceCenter/services/serviceAgent/featureDataProvider.py +++ b/modules/serviceCenter/services/serviceAgent/featureDataProvider.py @@ -10,10 +10,50 @@ and mandateId so data isolation is guaranteed. import logging import json +import os +import time +from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) +_DEBUG_DIR = Path("D:/Athi/Local/Web/poweron/local/debug") + + +def _isDebugEnabled() -> bool: + try: + from modules.shared.configuration import APP_CONFIG + val = APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False) + return val is True or str(val).lower() == "true" + except Exception: + return False + + +def _debugQueryLog(method: str, tableName: str, params: dict, result: dict, elapsed: float): + """Append query + result to local/debug/debug_queryTable.log.""" + if not _isDebugEnabled(): + return + debugDir = _DEBUG_DIR + try: + debugDir.mkdir(parents=True, exist_ok=True) + logPath = debugDir / "debug_queryTable.log" + ts = time.strftime("%Y-%m-%d %H:%M:%S") + rows = result.get("rows", []) + total = result.get("total", len(rows)) + err = result.get("error") + header = f"[{ts}] {method}({tableName}) — {len(rows)} rows returned, total={total}, elapsed={elapsed:.0f}ms" + if err: + header += f", ERROR={err}" + lines = [header] + lines.append(f" params: {json.dumps(params, ensure_ascii=False, default=str)}") + for i, row in enumerate(rows): + lines.append(f" [{i}] {json.dumps(row, ensure_ascii=False, default=str)}") + lines.append("") + with open(logPath, "a", encoding="utf-8") as f: + f.write("\n".join(lines) + "\n") + except Exception: + pass + _ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<=", "LIKE", "ILIKE", "IS NULL", "IS NOT NULL"} _ALLOWED_AGGREGATES = {"SUM", "COUNT", "AVG", "MIN", "MAX"} @@ -96,6 +136,7 @@ class FeatureDataProvider: fullWhere += " AND " + extraWhere allParams.extend(extraParams) + t0 = time.time() try: with conn.cursor() as cur: countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}' @@ -111,14 +152,23 @@ class FeatureDataProvider: cur.execute(dataSql, allParams + [limit, offset]) rows = [_serializeRow(dict(r)) for r in cur.fetchall()] - return {"rows": rows, "total": total, "limit": limit, "offset": offset} + result = {"rows": rows, "total": total, "limit": limit, "offset": offset} + _debugQueryLog("browseTable", tableName, { + "fields": fields, "limit": limit, "offset": offset, + }, result, (time.time() - t0) * 1000) + return result except Exception as e: logger.error(f"browseTable({tableName}) failed: {e}") + elapsed = (time.time() - t0) * 1000 + errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} + _debugQueryLog("browseTable", tableName, { + "fields": fields, "limit": limit, "offset": offset, + }, errResult, elapsed) try: conn.rollback() except Exception: pass - return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} + return errResult def aggregateTable( self, @@ -153,6 +203,7 @@ class FeatureDataProvider: fullWhere += " AND " + extraWhere allParams.extend(extraParams) + t0 = time.time() try: with conn.cursor() as cur: if groupBy: @@ -169,19 +220,28 @@ class FeatureDataProvider: cur.execute(sql, allParams) rows = [_serializeRow(dict(r)) for r in cur.fetchall()] - return { + result = { "rows": rows, "aggregate": aggregate, "field": field, "groupBy": groupBy, } + _debugQueryLog("aggregateTable", tableName, { + "aggregate": aggregate, "field": field, "groupBy": groupBy, + }, result, (time.time() - t0) * 1000) + return result except Exception as e: logger.error(f"aggregateTable({tableName}, {aggregate}({field})) failed: {e}") + elapsed = (time.time() - t0) * 1000 + errResult = {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy} + _debugQueryLog("aggregateTable", tableName, { + "aggregate": aggregate, "field": field, "groupBy": groupBy, + }, errResult, elapsed) try: conn.rollback() except Exception: pass - return {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy} + return errResult def queryTable( self, @@ -222,6 +282,7 @@ class FeatureDataProvider: fullWhere += " AND " + extraWhere allParams.extend(extraParams) + t0 = time.time() try: with conn.cursor() as cur: countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}' @@ -237,14 +298,25 @@ class FeatureDataProvider: cur.execute(dataSql, allParams + [limit, offset]) rows = [_serializeRow(dict(r)) for r in cur.fetchall()] - return {"rows": rows, "total": total, "limit": limit, "offset": offset} + result = {"rows": rows, "total": total, "limit": limit, "offset": offset} + _debugQueryLog("queryTable", tableName, { + "filters": filters, "fields": fields, "orderBy": orderBy, + "limit": limit, "offset": offset, + }, result, (time.time() - t0) * 1000) + return result except Exception as e: logger.error(f"queryTable({tableName}) failed: {e}") + elapsed = (time.time() - t0) * 1000 + errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} + _debugQueryLog("queryTable", tableName, { + "filters": filters, "fields": fields, "orderBy": orderBy, + "limit": limit, "offset": offset, + }, errResult, elapsed) try: conn.rollback() except Exception: pass - return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)} + return errResult # ------------------------------------------------------------------