This commit is contained in:
ValueOn AG 2026-04-09 21:33:56 +02:00
parent 722f49e832
commit 259fd25d9b
11 changed files with 187 additions and 47 deletions

View file

@ -71,6 +71,7 @@ class AiAnthropic(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 9),
(OperationTypeEnum.DATA_EXTRACT, 8),
(OperationTypeEnum.AGENT, 9),
(OperationTypeEnum.DATA_QUERY, 9),
),
version="claude-sonnet-4-5-20250929",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.003 + (bytesReceived / 4 / 1000) * 0.015
@ -97,6 +98,7 @@ class AiAnthropic(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 8),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 7),
(OperationTypeEnum.DATA_QUERY, 10),
),
version="claude-haiku-4-5-20251001",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.001 + (bytesReceived / 4 / 1000) * 0.005
@ -123,6 +125,7 @@ class AiAnthropic(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 10),
(OperationTypeEnum.DATA_EXTRACT, 9),
(OperationTypeEnum.AGENT, 10),
(OperationTypeEnum.DATA_QUERY, 3),
),
version="claude-opus-4-6",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.025

View file

@ -67,6 +67,7 @@ class AiMistral(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 9),
(OperationTypeEnum.DATA_EXTRACT, 8),
(OperationTypeEnum.AGENT, 8),
(OperationTypeEnum.DATA_QUERY, 7),
),
version="mistral-large-latest",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0005 + (bytesReceived / 4 / 1000) * 0.0015
@ -93,6 +94,7 @@ class AiMistral(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 8),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 6),
(OperationTypeEnum.DATA_QUERY, 9),
),
version="mistral-small-latest",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00006 + (bytesReceived / 4 / 1000) * 0.00018

View file

@ -68,6 +68,7 @@ class AiOpenai(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 10),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 9),
(OperationTypeEnum.DATA_QUERY, 8),
),
version="gpt-4o",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0025 + (bytesReceived / 4 / 1000) * 0.01
@ -95,6 +96,7 @@ class AiOpenai(BaseConnectorAi):
(OperationTypeEnum.DATA_GENERATE, 9),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 8),
(OperationTypeEnum.DATA_QUERY, 10),
),
version="gpt-4o-mini",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00015 + (bytesReceived / 4 / 1000) * 0.0006

View file

@ -32,6 +32,7 @@ class OperationTypeEnum(str, Enum):
# Agent Operations
AGENT = "agent" # Agent loop: reasoning + tool use
DATA_QUERY = "dataQuery" # Data query sub-agent: fast model, schema-aware
# Embedding Operations
EMBEDDING = "embedding" # Text → vector conversion for semantic search

View file

@ -5,10 +5,13 @@
Flow: load config resolve connector fetch data clear old records write new records compute balances.
"""
import json as _json
import logging
import os
import time
from collections import defaultdict
from typing import Dict, Any, Optional
from pathlib import Path
from typing import Dict, Any, List, Optional
from .accountingConnectorBase import BaseAccountingConnector
from .accountingRegistry import _getAccountingRegistry
@ -16,6 +19,45 @@ from .accountingRegistry import _getAccountingRegistry
logger = logging.getLogger(__name__)
_DEBUG_SYNC_DIR = Path("D:/Athi/Local/Web/poweron/local/debug/sync")
def _debugSyncDir() -> Path:
_DEBUG_SYNC_DIR.mkdir(parents=True, exist_ok=True)
return _DEBUG_SYNC_DIR
def _isDebugEnabled() -> bool:
try:
from modules.shared.configuration import APP_CONFIG
return APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False) is True or str(APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", "")).lower() == "true"
except Exception:
return False
def _dumpSyncData(tag: str, rows: list):
"""Write raw connector data to a timestamped JSON file in local/debug/sync/."""
if not _isDebugEnabled():
return
try:
d = _debugSyncDir()
ts = time.strftime("%Y%m%d-%H%M%S")
path = d / f"{ts}_{tag}.json"
serializable = []
for r in rows:
if isinstance(r, dict):
serializable.append(r)
elif hasattr(r, "__dict__"):
serializable.append({k: v for k, v in r.__dict__.items() if not k.startswith("_")})
else:
serializable.append(str(r))
with open(path, "w", encoding="utf-8") as f:
_json.dump({"count": len(serializable), "rows": serializable}, f, ensure_ascii=False, indent=2, default=str)
logger.info(f"Debug sync dump: {path.name} ({len(serializable)} rows)")
except Exception as e:
logger.warning(f"Failed to write debug sync dump for {tag}: {e}")
class AccountingDataSync:
"""Imports accounting data (read-only) from an external system into local TrusteeData* tables."""
@ -86,6 +128,7 @@ class AccountingDataSync:
# 1) Chart of accounts
try:
charts = await connector.getChartOfAccounts(connConfig)
_dumpSyncData("accounts", charts)
fetchedAccountNumbers = [acc.accountNumber for acc in charts if acc.accountNumber]
self._clearTable(TrusteeDataAccount, featureInstanceId)
for acc in charts:
@ -105,6 +148,7 @@ class AccountingDataSync:
# 2) Journal entries + lines (pass already-fetched chart to avoid redundant API call)
try:
rawEntries = await connector.getJournalEntries(connConfig, dateFrom=dateFrom, dateTo=dateTo, accountNumbers=fetchedAccountNumbers or None)
_dumpSyncData("journalEntries", rawEntries)
self._clearTable(TrusteeDataJournalEntry, featureInstanceId)
self._clearTable(TrusteeDataJournalLine, featureInstanceId)
lineCount = 0
@ -146,11 +190,13 @@ class AccountingDataSync:
contactCount = 0
customers = await connector.getCustomers(connConfig)
_dumpSyncData("customers", customers)
for c in customers:
self._if.db.recordCreate(TrusteeDataContact, self._mapContact(c, "customer", scope))
contactCount += 1
vendors = await connector.getVendors(connConfig)
_dumpSyncData("vendors", vendors)
for v in vendors:
self._if.db.recordCreate(TrusteeDataContact, self._mapContact(v, "vendor", scope))
contactCount += 1

View file

@ -235,6 +235,8 @@ def _buildFeatureDataSourceContext(featureDataSourceIds: List[str]) -> str:
parts = [
"The user has attached data from the following feature instances.",
"Use queryFeatureInstance(featureInstanceId, question) to query this data.",
"IMPORTANT: Formulate ONE comprehensive question per call that covers everything you need.",
"The sub-agent can browse, filter, and aggregate -- ask precisely and avoid repeated calls.",
"",
]
found = False

View file

@ -169,7 +169,7 @@ async def runAgentLoop(
aiRequest = AiCallRequest(
prompt="",
options=AiCallOptions(
operationType=OperationTypeEnum.AGENT,
operationType=config.operationType or OperationTypeEnum.AGENT,
temperature=config.temperature
),
messages=conversation.messages,

View file

@ -185,8 +185,12 @@ def _registerFeatureSubAgentTools(registry: ToolRegistry, services):
description=(
"Query data from a feature instance (e.g. Trustee, CommCoach). "
"Delegates to a specialized sub-agent that knows the feature's data schema "
"and can browse/query its tables. Use this when the user has attached "
"feature data sources or asks about feature-specific data."
"and can browse, filter, and aggregate its tables. Use this when the user "
"has attached feature data sources or asks about feature-specific data.\n\n"
"GUIDELINES:\n"
"- Ask a precise, self-contained question (include all context the sub-agent needs).\n"
"- Combine related data needs into ONE call instead of multiple small ones.\n"
"- Avoid calling this tool repeatedly with slight variations of the same question."
),
parameters={
"type": "object",

View file

@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional
from enum import Enum
from pydantic import BaseModel, Field
from modules.shared.timeUtils import getUtcTimestamp
from modules.datamodels.datamodelAi import OperationTypeEnum
import uuid
@ -91,6 +92,7 @@ class AgentConfig(BaseModel):
initialToolboxes: List[str] = Field(default_factory=lambda: ["core"])
availableToolboxes: List[str] = Field(default_factory=list)
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0)
operationType: Optional[OperationTypeEnum] = Field(default=None, description="Override the default AGENT operationType for model selection")
class AgentState(BaseModel):

View file

@ -24,8 +24,8 @@ from modules.serviceCenter.services.serviceAgent.featureDataProvider import Feat
logger = logging.getLogger(__name__)
_MAX_ROUNDS = 5
_MAX_COST_CHF = 0.10
_MAX_ROUNDS = 8
_MAX_COST_CHF = 0.15
async def runFeatureDataAgent(
@ -69,25 +69,37 @@ async def runFeatureDataAgent(
if realCols:
meta["fields"] = realCols
schemaContext = _buildSchemaContext(featureCode, instanceLabel, selectedTables)
prompt = f"{schemaContext}\n\nUser question:\n{question}"
systemPrompt = _buildSchemaContext(featureCode, instanceLabel, selectedTables)
config = AgentConfig(maxRounds=_MAX_ROUNDS, maxCostCHF=_MAX_COST_CHF)
config = AgentConfig(
maxRounds=_MAX_ROUNDS,
maxCostCHF=_MAX_COST_CHF,
operationType=OperationTypeEnum.DATA_QUERY,
)
costAccumulator = 0.0
async def _trackingAiCallFn(req):
nonlocal costAccumulator
resp = await aiCallFn(req)
costAccumulator += resp.priceCHF
return resp
async def _getWorkflowCost() -> float:
return 0.0
return costAccumulator
result = ""
async for event in runAgentLoop(
prompt=prompt,
prompt=question,
toolRegistry=registry,
config=config,
aiCallFn=aiCallFn,
aiCallFn=_trackingAiCallFn,
getWorkflowCostFn=_getWorkflowCost,
workflowId=f"fda-{featureInstanceId[:8]}",
userId=userId,
featureInstanceId=featureInstanceId,
mandateId=mandateId,
systemPromptOverride=systemPrompt,
):
if event.type == AgentEventTypeEnum.FINAL and event.content:
result = event.content
@ -282,7 +294,7 @@ def _buildSchemaContext(
instanceLabel: str,
selectedTables: List[Dict[str, Any]],
) -> str:
"""Build a system-level context block describing available tables."""
"""Build a system prompt describing available tables and query strategy."""
tableNames = []
tableBlocks = []
@ -298,34 +310,28 @@ def _buildSchemaContext(
block += f"\n Fields: {', '.join(fields)}"
tableBlocks.append(block)
parts = [
f"You are a data query assistant for the '{featureCode}' feature",
]
header = f"You are a data query assistant for the '{featureCode}' feature"
if instanceLabel:
parts[0] += f' (instance: "{instanceLabel}")'
parts[0] += "."
header += f' (instance: "{instanceLabel}")'
header += "."
parts.append("")
parts.append("AVAILABLE TABLES (use EXACTLY these names as tableName parameter):")
parts.extend(tableBlocks)
parts.append("")
parts.append(
"IMPORTANT RULES:\n"
f"- The ONLY valid tableName values are: {tableNames}\n"
"- Do NOT invent table names, do NOT use UUIDs or IDs as table names.\n"
"- Field names are plain column names (e.g. 'accountNumber', 'periodYear').\n"
" Do NOT prefix field names with UUIDs, table names, or dots.\n"
"- If unsure about column names, call browseTable with only tableName (no fields)\n"
" to see actual columns first."
)
parts.append("")
parts.append(
"Tools: browseTable (list rows), queryTable (filter/search), "
"aggregateTable (SUM/COUNT/AVG/MIN/MAX with optional GROUP BY)."
)
parts.append("")
parts.append(
"Answer the user's question using the data from these tables. "
"Be precise, cite row counts, and format data clearly."
)
parts = [
header,
"",
"AVAILABLE TABLES (use EXACTLY these names as tableName parameter):",
*tableBlocks,
"",
f"Valid tableName values: {tableNames}",
"Field names are plain column names (e.g. 'accountNumber', 'periodYear').",
"",
"QUERY STRATEGY:",
"1. If unsure about columns, call browseTable(tableName) first to inspect the schema.",
"2. Use queryTable with filters for targeted lookups.",
"3. Use aggregateTable for SUM/COUNT/AVG/MIN/MAX with optional GROUP BY.",
"4. Combine what you need into as few tool calls as possible.",
"",
"RULES:",
"- Do NOT invent table or field names. Do NOT prefix fields with UUIDs or dots.",
"- Answer concisely. Cite row counts and key values.",
]
return "\n".join(parts)

View file

@ -10,10 +10,50 @@ and mandateId so data isolation is guaranteed.
import logging
import json
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_DEBUG_DIR = Path("D:/Athi/Local/Web/poweron/local/debug")
def _isDebugEnabled() -> bool:
try:
from modules.shared.configuration import APP_CONFIG
val = APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", False)
return val is True or str(val).lower() == "true"
except Exception:
return False
def _debugQueryLog(method: str, tableName: str, params: dict, result: dict, elapsed: float):
"""Append query + result to local/debug/debug_queryTable.log."""
if not _isDebugEnabled():
return
debugDir = _DEBUG_DIR
try:
debugDir.mkdir(parents=True, exist_ok=True)
logPath = debugDir / "debug_queryTable.log"
ts = time.strftime("%Y-%m-%d %H:%M:%S")
rows = result.get("rows", [])
total = result.get("total", len(rows))
err = result.get("error")
header = f"[{ts}] {method}({tableName}) — {len(rows)} rows returned, total={total}, elapsed={elapsed:.0f}ms"
if err:
header += f", ERROR={err}"
lines = [header]
lines.append(f" params: {json.dumps(params, ensure_ascii=False, default=str)}")
for i, row in enumerate(rows):
lines.append(f" [{i}] {json.dumps(row, ensure_ascii=False, default=str)}")
lines.append("")
with open(logPath, "a", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
except Exception:
pass
_ALLOWED_OPERATORS = {"=", "!=", ">", "<", ">=", "<=", "LIKE", "ILIKE", "IS NULL", "IS NOT NULL"}
_ALLOWED_AGGREGATES = {"SUM", "COUNT", "AVG", "MIN", "MAX"}
@ -96,6 +136,7 @@ class FeatureDataProvider:
fullWhere += " AND " + extraWhere
allParams.extend(extraParams)
t0 = time.time()
try:
with conn.cursor() as cur:
countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}'
@ -111,14 +152,23 @@ class FeatureDataProvider:
cur.execute(dataSql, allParams + [limit, offset])
rows = [_serializeRow(dict(r)) for r in cur.fetchall()]
return {"rows": rows, "total": total, "limit": limit, "offset": offset}
result = {"rows": rows, "total": total, "limit": limit, "offset": offset}
_debugQueryLog("browseTable", tableName, {
"fields": fields, "limit": limit, "offset": offset,
}, result, (time.time() - t0) * 1000)
return result
except Exception as e:
logger.error(f"browseTable({tableName}) failed: {e}")
elapsed = (time.time() - t0) * 1000
errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)}
_debugQueryLog("browseTable", tableName, {
"fields": fields, "limit": limit, "offset": offset,
}, errResult, elapsed)
try:
conn.rollback()
except Exception:
pass
return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)}
return errResult
def aggregateTable(
self,
@ -153,6 +203,7 @@ class FeatureDataProvider:
fullWhere += " AND " + extraWhere
allParams.extend(extraParams)
t0 = time.time()
try:
with conn.cursor() as cur:
if groupBy:
@ -169,19 +220,28 @@ class FeatureDataProvider:
cur.execute(sql, allParams)
rows = [_serializeRow(dict(r)) for r in cur.fetchall()]
return {
result = {
"rows": rows,
"aggregate": aggregate,
"field": field,
"groupBy": groupBy,
}
_debugQueryLog("aggregateTable", tableName, {
"aggregate": aggregate, "field": field, "groupBy": groupBy,
}, result, (time.time() - t0) * 1000)
return result
except Exception as e:
logger.error(f"aggregateTable({tableName}, {aggregate}({field})) failed: {e}")
elapsed = (time.time() - t0) * 1000
errResult = {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy}
_debugQueryLog("aggregateTable", tableName, {
"aggregate": aggregate, "field": field, "groupBy": groupBy,
}, errResult, elapsed)
try:
conn.rollback()
except Exception:
pass
return {"rows": [], "error": str(e), "aggregate": aggregate, "field": field, "groupBy": groupBy}
return errResult
def queryTable(
self,
@ -222,6 +282,7 @@ class FeatureDataProvider:
fullWhere += " AND " + extraWhere
allParams.extend(extraParams)
t0 = time.time()
try:
with conn.cursor() as cur:
countSql = f'SELECT COUNT(*) FROM "{tableName}" WHERE {fullWhere}'
@ -237,14 +298,25 @@ class FeatureDataProvider:
cur.execute(dataSql, allParams + [limit, offset])
rows = [_serializeRow(dict(r)) for r in cur.fetchall()]
return {"rows": rows, "total": total, "limit": limit, "offset": offset}
result = {"rows": rows, "total": total, "limit": limit, "offset": offset}
_debugQueryLog("queryTable", tableName, {
"filters": filters, "fields": fields, "orderBy": orderBy,
"limit": limit, "offset": offset,
}, result, (time.time() - t0) * 1000)
return result
except Exception as e:
logger.error(f"queryTable({tableName}) failed: {e}")
elapsed = (time.time() - t0) * 1000
errResult = {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)}
_debugQueryLog("queryTable", tableName, {
"filters": filters, "fields": fields, "orderBy": orderBy,
"limit": limit, "offset": offset,
}, errResult, elapsed)
try:
conn.rollback()
except Exception:
pass
return {"rows": [], "total": 0, "limit": limit, "offset": offset, "error": str(e)}
return errResult
# ------------------------------------------------------------------