# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ System Routes - Navigation and system-level API endpoints. Navigation API Konzept: - Single Source of Truth für Navigation im Gateway - UI rendert nur was es erhält (keine Permission-Logik im UI) - Keine Icons in API Response - UI mappt selbst via uiComponent - Blocks statt Sections mit order auf allen Ebenen """ import logging import time from collections import Counter from typing import Dict, List, Any, Optional, Set from fastapi import APIRouter, Depends, Request from slowapi import Limiter from slowapi.util import get_remote_address from modules.auth.authentication import getRequestContext, RequestContext from modules.system.mainSystem import NAVIGATION_SECTIONS, _objectKeyToUiComponent from modules.shared.i18nRegistry import resolveText, t from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceFeatures import getFeatureInterface from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole logger = logging.getLogger(__name__) limiter = Limiter(key_func=get_remote_address) # Main system router (for other system endpoints if needed) router = APIRouter(prefix="/api/system", tags=["System"]) # Navigation router at /api/navigation (gemäss Navigation-API-Konzept) navigationRouter = APIRouter(prefix="/api", tags=["Navigation"]) def _getUserRoleIds(userId: str) -> List[str]: """Get all role IDs for a user across all their mandates.""" rootInterface = getRootInterface() roleIds = [] # Get UserMandates as Pydantic models userMandates = rootInterface.getUserMandates(userId) for um in userMandates: if not um.enabled: continue mandateRoleIds = rootInterface.getRoleIdsForUserMandate(str(um.id)) for rid in mandateRoleIds: if rid not in roleIds: roleIds.append(rid) return roleIds def _checkUiPermission(roleIds: List[str], objectKey: str) -> bool: """Check if any of the given roles has view permission for the UI object.""" if not roleIds: return False rootInterface = getRootInterface() for roleId in roleIds: # Get UI rules for this role (returns Pydantic AccessRule models) rules = rootInterface.getAccessRules(roleId=roleId, context=AccessRuleContext.UI) for rule in rules: if not rule.view: continue # Global rule (item=None) grants access to all UI if rule.item is None: return True # Exact match if rule.item == objectKey: return True # Wildcard match (e.g., ui.admin.* matches ui.admin.mandates) if rule.item.endswith(".*"): prefix = rule.item[:-2] if objectKey.startswith(prefix): return True return False # ============================================================================= # Navigation API (gemäss Navigation-API-Konzept) # Endpoint: GET /api/navigation # ============================================================================= def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]: """ Get UI objects for a feature from its main module. Returns list of UI objects with objectKey, label, meta (including path). """ try: # Dynamic import based on feature code if featureCode == "trustee": from modules.features.trustee.mainTrustee import UI_OBJECTS return UI_OBJECTS elif featureCode == "realestate": from modules.features.realEstate.mainRealEstate import UI_OBJECTS return UI_OBJECTS elif featureCode == "graphicalEditor": from modules.features.graphicalEditor.mainGraphicalEditor import UI_OBJECTS return UI_OBJECTS elif featureCode == "teamsbot": from modules.features.teamsbot.mainTeamsbot import UI_OBJECTS return UI_OBJECTS elif featureCode == "neutralization": from modules.features.neutralization.mainNeutralization import UI_OBJECTS return UI_OBJECTS elif featureCode == "chatbot": from modules.features.chatbot.mainChatbot import UI_OBJECTS return UI_OBJECTS elif featureCode == "commcoach": from modules.features.commcoach.mainCommcoach import UI_OBJECTS return UI_OBJECTS elif featureCode == "workspace": from modules.features.workspace.mainWorkspace import UI_OBJECTS return UI_OBJECTS elif featureCode == "redmine": from modules.features.redmine.mainRedmine import UI_OBJECTS return UI_OBJECTS else: logger.debug(f"Skipping removed feature code: {featureCode}") return [] except ImportError as e: logger.error(f"Failed to import UI_OBJECTS for feature {featureCode}: {e}") return [] def _buildDynamicBlock( userId: str, isSysAdmin: bool ) -> Optional[Dict[str, Any]]: """ Build the dynamic features block with mandates, features, and instances. View and feature labels use resolveText() for the current request language (same contract as static navigation items in _formatBlockItem). Instance and mandate display names are user-defined and passed through as-is. Returns None if user has no feature instances. """ try: rootInterface = getRootInterface() featureInterface = getFeatureInterface(rootInterface.db) # Get all feature accesses for this user featureAccesses = rootInterface.getFeatureAccessesForUser(userId) if not featureAccesses: return None # Build hierarchical structure: mandate -> feature -> instances mandatesMap: Dict[str, Dict[str, Any]] = {} featuresMap: Dict[str, Dict[str, Any]] = {} # key: mandateId_featureCode mandateOrder = 10 for access in featureAccesses: if not access.enabled: continue instance = featureInterface.getFeatureInstance(str(access.featureInstanceId)) if not instance or not instance.enabled: continue # Get mandate info mandateId = str(instance.mandateId) if mandateId not in mandatesMap: mandate = rootInterface.getMandate(mandateId) if not mandate or not getattr(mandate, "enabled", True): continue mandateName = (mandate.label or mandate.name) if mandate else mandateId mandatesMap[mandateId] = { "id": mandateId, "uiLabel": mandateName, "order": mandateOrder, "features": [] } mandateOrder += 10 # Get feature info featureKey = f"{mandateId}_{instance.featureCode}" if featureKey not in featuresMap: feature = featureInterface.getFeature(instance.featureCode) # Handle featureLabel — TextMultilingual dict, plain str (German key), or legacy object if feature and hasattr(feature, 'label'): featureLabel = feature.label if hasattr(featureLabel, 'model_dump'): featureLabel = featureLabel.model_dump() elif isinstance(featureLabel, str): pass elif not isinstance(featureLabel, dict): featureLabel = { "de": getattr(featureLabel, 'de', instance.featureCode), "en": getattr(featureLabel, 'en', instance.featureCode), } else: featureLabel = {"de": instance.featureCode, "en": instance.featureCode} resolvedFeatureLabel = resolveText(featureLabel) featuresMap[featureKey] = { "uiComponent": f"feature.{instance.featureCode}", "uiLabel": resolvedFeatureLabel, "order": 10, "instances": [], "_mandateId": mandateId, "_featureCode": instance.featureCode } # Get user's permissions for this instance to filter views permissions = _getInstanceViewPermissions(rootInterface, userId, str(instance.id), isSysAdmin) # Get feature UI objects to build views featureUiObjects = _getFeatureUiObjects(instance.featureCode) # Build views for this instance views = [] viewOrder = 10 for uiObj in featureUiObjects: objectKey = uiObj.get("objectKey", "") # Extract view name from objectKey for path building viewName = objectKey.split(".")[-1] if objectKey else "" # Check permission using full objectKey (as per Navigation-API-Konzept) if not isSysAdmin and not permissions.get("_all") and not permissions.get(objectKey, False): continue # Skip admin-only views for non-admins meta = uiObj.get("meta", {}) if meta.get("admin_only") and not isSysAdmin and not permissions.get("isAdmin", False): continue # Build path for this view viewPath = f"/mandates/{mandateId}/{instance.featureCode}/{instance.id}/{viewName}" rawViewLabel = uiObj.get("label") uiLabel = resolveText(rawViewLabel) if rawViewLabel not in (None, "") else "" views.append({ "uiComponent": f"page.feature.{instance.featureCode}.{viewName}", "uiLabel": uiLabel, "uiPath": viewPath, "order": viewOrder, "objectKey": objectKey }) viewOrder += 10 # Sort views by order views.sort(key=lambda v: v["order"]) featuresMap[featureKey]["instances"].append({ "id": str(instance.id), "uiLabel": instance.label, "featureCode": instance.featureCode, "order": 10, "views": views, "isAdmin": permissions.get("isAdmin", False), }) # Build final structure for featureKey, featureData in featuresMap.items(): mandateId = featureData.pop("_mandateId") featureData.pop("_featureCode") mandatesMap[mandateId]["features"].append(featureData) # Sort features within each mandate for mandate in mandatesMap.values(): mandate["features"].sort(key=lambda f: f["order"]) # Convert to list and sort by order mandatesList = list(mandatesMap.values()) mandatesList.sort(key=lambda m: m["order"]) if not mandatesList: return None return { "type": "dynamic", "id": "features", "title": "MEINE FEATURES", "order": 15, # Between system (10) and workflows (20) "mandates": mandatesList } except Exception as e: logger.error(f"Error building dynamic block: {e}") return None def _getInstanceViewPermissions( rootInterface, userId: str, instanceId: str, isSysAdmin: bool ) -> Dict[str, Any]: """ Get view permissions for a user in a feature instance. Returns dict with view names as keys and True/False as values. Also includes "_all" if user has global view access. """ if isSysAdmin: return {"_all": True, "isAdmin": True} permissions = {"_all": False, "isAdmin": False} try: # Get FeatureAccess for this user and instance (Pydantic model) featureAccess = rootInterface.getFeatureAccess(userId, instanceId) if not featureAccess: return permissions # Get role IDs via interface method roleIds = rootInterface.getRoleIdsForFeatureAccess(str(featureAccess.id)) if not roleIds: return permissions # Check if user has admin role for roleId in roleIds: role = rootInterface.getRole(roleId) if role and "admin" in role.roleLabel.lower(): permissions["isAdmin"] = True break # Get UI permissions from AccessRules (Pydantic models) for roleId in roleIds: accessRules = rootInterface.getAccessRules(roleId=roleId, context=AccessRuleContext.UI) logger.debug(f"_getInstanceViewPermissions: roleId={roleId}, UI rules count={len(accessRules)}") for rule in accessRules: if not rule.view: continue logger.debug(f"_getInstanceViewPermissions: rule item={rule.item}, view={rule.view}") if rule.item is None: # item=None means all views permissions["_all"] = True else: # Store full objectKey as per Navigation-API-Konzept permissions[rule.item] = True logger.debug(f"_getInstanceViewPermissions: final permissions={permissions}") return permissions except Exception as e: logger.debug(f"Error getting instance view permissions: {e}") return permissions # Fail-safe: no permissions on error def _filterItems( items: List[Dict[str, Any]], isSysAdmin: bool, roleIds: List[str], hasGlobalPermission: bool ) -> List[Dict[str, Any]]: """Filter and format navigation items based on permissions.""" filteredItems = [] for item in items: if item.get("adminOnly") and not isSysAdmin: if not hasGlobalPermission and not _checkUiPermission(roleIds, item["objectKey"]): continue if item.get("sysAdminOnly") and not isSysAdmin: continue if item.get("public"): filteredItems.append(_formatBlockItem(item)) continue if isSysAdmin: filteredItems.append(_formatBlockItem(item)) continue if hasGlobalPermission or _checkUiPermission(roleIds, item["objectKey"]): filteredItems.append(_formatBlockItem(item)) filteredItems.sort(key=lambda i: i["order"]) return filteredItems def _buildStaticBlocks( isSysAdmin: bool, roleIds: List[str], hasGlobalPermission: bool ) -> List[Dict[str, Any]]: """ Build static navigation blocks from NAVIGATION_SECTIONS. Keys are registered at import time via t() in mainSystem.py. At request time, resolveText() translates them to the current language. """ blocks = [] for section in NAVIGATION_SECTIONS: if section.get("adminOnly") and not isSysAdmin: continue hasSubgroups = "subgroups" in section hasItems = "items" in section and len(section["items"]) > 0 if hasSubgroups: filteredSubgroups = [] for subgroup in section["subgroups"]: subItems = _filterItems( subgroup.get("items", []), isSysAdmin, roleIds, hasGlobalPermission ) if subItems: filteredSubgroups.append({ "id": subgroup["id"], "title": resolveText(subgroup["title"]), "order": subgroup.get("order", 50), "items": subItems, }) filteredSubgroups.sort(key=lambda s: s["order"]) topLevelItems = [] if hasItems: topLevelItems = _filterItems( section["items"], isSysAdmin, roleIds, hasGlobalPermission ) if filteredSubgroups or topLevelItems: blocks.append({ "type": "static", "id": section["id"], "title": resolveText(section["title"]), "order": section.get("order", 50), "items": topLevelItems, "subgroups": filteredSubgroups, }) else: filteredItems = _filterItems( section.get("items", []), isSysAdmin, roleIds, hasGlobalPermission ) if filteredItems: blocks.append({ "type": "static", "id": section["id"], "title": resolveText(section["title"]), "order": section.get("order", 50), "items": filteredItems, }) return blocks def _formatBlockItem(item: Dict[str, Any]) -> Dict[str, Any]: """Format a navigation item for the API response.""" objectKey = item["objectKey"] uiComponent = _objectKeyToUiComponent(objectKey) return { "uiComponent": uiComponent, "uiLabel": resolveText(item["label"]), "uiPath": item["path"], "order": item.get("order", 50), "objectKey": objectKey, } @navigationRouter.get("/navigation") @limiter.limit("60/minute") def get_navigation( request: Request, reqContext: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get unified navigation structure with blocks. Static items and dynamic feature/view labels are resolved with resolveText() for the current request language. User-defined instance/mandate names are raw. Endpoint: GET /api/navigation """ try: isSysAdmin = reqContext.isPlatformAdmin userId = str(reqContext.user.id) if reqContext.user else None # Get user's role IDs for permission checking roleIds = [] if userId and not isSysAdmin: roleIds = _getUserRoleIds(userId) # Check if user has global UI permission hasGlobalPermission = isSysAdmin if not hasGlobalPermission and roleIds: hasGlobalPermission = _checkUiPermission(roleIds, "_global_check") # Build static blocks from NAVIGATION_SECTIONS blocks = _buildStaticBlocks(isSysAdmin, roleIds, hasGlobalPermission) # Build dynamic block (features) if user has feature instances if userId: dynamicBlock = _buildDynamicBlock(userId, isSysAdmin) if dynamicBlock: blocks.append(dynamicBlock) # Sort all blocks by order blocks.sort(key=lambda b: b["order"]) return { "blocks": blocks, } except Exception as e: logger.error(f"Error getting navigation: {e}") return { "blocks": [], "error": str(e), } # ============================================================================= # AI models (integrations overview) # ============================================================================= def _buildIntegrationsOverviewPayload(userId: str, user=None) -> Dict[str, Any]: """ Single payload for the Integrations architecture page: real UserConnections, DataSource / FeatureDataSource rows, trustee accounting bindings, AICore connector modules (not individual models), extractor extensions and renderer formats from registries, platform infra tools, and live KPI stats. """ root = getRootInterface() out: Dict[str, Any] = { "aicoreModules": [], "infraTools": [], "extractorExtensions": [], "extractorClasses": [], "rendererFormats": [], "rendererClasses": [], "dataLayerItems": [], "liveStats": {}, "errors": [], } _PROVIDER_LABELS = { "anthropic": "Anthropic (Claude)", "openai": "OpenAI (GPT)", "mistral": "Mistral (Le Chat)", "perplexity": "Perplexity", "tavily": "Tavily (Websuche)", "privatellm": "Private LLM", "internal": "Intern", } # --- AICore: one entry per connector module + model counts --- try: from modules.aicore.aicoreModelRegistry import modelRegistry modelRegistry.ensureConnectorsRegistered() modelRegistry.refreshModels(force=False) counts = Counter() for m in modelRegistry.getModels(): if not getattr(m, "isAvailable", True): continue counts[str(getattr(m, "connectorType", "") or "")] += 1 modules: List[Dict[str, Any]] = [] for conn in modelRegistry.discoverConnectors(): ct = conn.getConnectorType() modules.append( { "connectorType": ct, "label": _PROVIDER_LABELS.get(ct, ct), "modelCount": int(counts.get(ct, 0)), } ) out["aicoreModules"] = modules except Exception as e: logger.error(f"integrations-overview aicore: {e}") out["errors"].append(f"aicore: {e}") # --- Extractors (registered extensions, unique + per-class rows) --- try: from modules.serviceCenter.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry if ExtractionService._sharedExtractorRegistry is None: ExtractionService._sharedExtractorRegistry = ExtractorRegistry() reg = ExtractionService._sharedExtractorRegistry ext_map = reg.getExtensionToMimeMap() uniq = sorted({str(k).upper() for k in ext_map.keys() if k and "." not in str(k)}) out["extractorExtensions"] = uniq seen_ext: Set[int] = set() class_rows: List[Dict[str, Any]] = [] for extractor in reg._map.values(): eid = id(extractor) if eid in seen_ext: continue seen_ext.add(eid) if not hasattr(extractor, "getSupportedExtensions"): continue raw_exts = extractor.getSupportedExtensions() if not raw_exts: continue norm = sorted({str(x).lstrip(".").lower() for x in raw_exts if x}) if norm: class_rows.append({"className": extractor.__class__.__name__, "extensions": norm}) class_rows.sort(key=lambda r: r["className"]) out["extractorClasses"] = class_rows fb = getattr(reg, "_fallback", None) if fb and hasattr(fb, "getSupportedExtensions") and id(fb) not in seen_ext: raw_exts = fb.getSupportedExtensions() if raw_exts: norm = sorted({str(x).lstrip(".").lower() for x in raw_exts if x}) if norm: out["extractorClasses"].append({"className": fb.__class__.__name__, "extensions": norm}) out["extractorClasses"].sort(key=lambda r: r["className"]) except Exception as e: logger.error(f"integrations-overview extractors: {e}") out["errors"].append(f"extractors: {e}") # --- Renderers (registered output formats + per-class rows) --- try: from modules.serviceCenter.services.serviceGeneration.renderers.registry import getSupportedFormats, getRendererInfo out["rendererFormats"] = sorted(getSupportedFormats()) by_renderer_class: Dict[str, Dict[str, Any]] = {} for composite_key, meta in getRendererInfo().items(): cn = meta.get("class_name") or "" if not cn: continue fmt = composite_key.split(":")[0] if ":" in composite_key else composite_key if cn not in by_renderer_class: by_renderer_class[cn] = {"className": cn, "formats": set()} by_renderer_class[cn]["formats"].add(fmt) renderer_rows = [ {"className": d["className"], "formats": sorted(d["formats"])} for _, d in sorted(by_renderer_class.items(), key=lambda x: x[0]) ] out["rendererClasses"] = renderer_rows except Exception as e: logger.error(f"integrations-overview renderers: {e}") out["errors"].append(f"renderers: {e}") # --- Platform infra tools (only routes that exist in this deployment) --- out["infraTools"] = [ {"id": "voice", "label": t("Voice / STT")}, ] accessible_instance_ids: Set[str] = set() try: for access in root.getFeatureAccessesForUser(userId): if not getattr(access, "enabled", True): continue accessible_instance_ids.add(str(access.featureInstanceId)) except Exception as e: logger.debug(f"integrations-overview feature accesses: {e}") # --- UserConnection (active only) --- try: from modules.datamodels.datamodelUam import ConnectionStatus for c in root.getUserConnections(userId): st = c.status st_val = st.value if hasattr(st, "value") else str(st) if st_val != ConnectionStatus.ACTIVE.value: continue dumped = c.model_dump(mode="json") dumped["kind"] = "userConnection" out["dataLayerItems"].append(dumped) except Exception as e: logger.error(f"integrations-overview connections: {e}") out["errors"].append(f"connections: {e}") # --- instance label lookup (shared by DataSource & Trustee blocks) --- _instLabelCache: Dict[str, str] = {} def _getInstanceLabel(iid: str) -> str: if iid in _instLabelCache: return _instLabelCache[iid] try: _fi = getFeatureInterface(root.db) inst = _fi.getFeatureInstance(iid) lbl = getattr(inst, "label", None) or getattr(inst, "uiLabel", None) or "" _instLabelCache[iid] = lbl except Exception: _instLabelCache[iid] = "" return _instLabelCache[iid] # --- DataSource & FeatureDataSource --- try: from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelFeatureDataSource import FeatureDataSource seen_ds: Set[str] = set() for row in root.db.getRecordset(DataSource, recordFilter={"userId": userId}) or []: rid = str(row.get("id", "")) if not rid or rid in seen_ds: continue seen_ds.add(rid) out["dataLayerItems"].append( { "kind": "dataSource", "id": rid, "label": row.get("label") or row.get("displayPath") or rid, "sourceType": row.get("sourceType") or "", "featureInstanceId": row.get("featureInstanceId"), "mandateId": row.get("mandateId"), "connectionId": row.get("connectionId"), } ) for iid in accessible_instance_ids: for row in root.db.getRecordset(DataSource, recordFilter={"featureInstanceId": iid}) or []: rid = str(row.get("id", "")) if not rid or rid in seen_ds: continue seen_ds.add(rid) out["dataLayerItems"].append( { "kind": "dataSource", "id": rid, "label": row.get("label") or row.get("displayPath") or rid, "sourceType": row.get("sourceType") or "", "featureInstanceId": row.get("featureInstanceId"), "mandateId": row.get("mandateId"), "connectionId": row.get("connectionId"), } ) seen_fds: Set[str] = set() for row in root.db.getRecordset(FeatureDataSource, recordFilter={"userId": userId}) or []: rid = str(row.get("id", "")) if not rid or rid in seen_fds: continue seen_fds.add(rid) fds_iid = row.get("featureInstanceId") or "" out["dataLayerItems"].append( { "kind": "featureDataSource", "id": rid, "label": row.get("label") or rid, "featureCode": row.get("featureCode") or "", "tableName": row.get("tableName") or "", "featureInstanceId": fds_iid, "mandateId": row.get("mandateId"), "instanceLabel": _getInstanceLabel(fds_iid) if fds_iid else "", } ) for iid in accessible_instance_ids: for row in root.db.getRecordset(FeatureDataSource, recordFilter={"featureInstanceId": iid}) or []: rid = str(row.get("id", "")) if not rid or rid in seen_fds: continue seen_fds.add(rid) out["dataLayerItems"].append( { "kind": "featureDataSource", "id": rid, "label": row.get("label") or rid, "featureCode": row.get("featureCode") or "", "tableName": row.get("tableName") or "", "featureInstanceId": iid, "mandateId": row.get("mandateId"), "instanceLabel": _getInstanceLabel(iid), } ) except Exception as e: logger.error(f"integrations-overview datasources: {e}") out["errors"].append(f"datasources: {e}") # --- Trustee accounting systems (configured integrations per instance) --- try: from modules.features.trustee.datamodelFeatureTrustee import TrusteeAccountingConfig fi = getFeatureInterface(root.db) seen_acc: Set[str] = set() for iid in accessible_instance_ids: inst = fi.getFeatureInstance(iid) if not inst or inst.featureCode != "trustee": continue for row in root.db.getRecordset( TrusteeAccountingConfig, recordFilter={"featureInstanceId": iid, "isActive": True}, ) or []: rid = str(row.get("id", "")) if not rid or rid in seen_acc: continue seen_acc.add(rid) out["dataLayerItems"].append( { "kind": "trusteeAccounting", "id": rid, "featureInstanceId": iid, "instanceLabel": getattr(inst, "label", None) or "", "mandateId": str(getattr(inst, "mandateId", "") or ""), "connectorType": row.get("connectorType") or "", "displayLabel": row.get("displayLabel") or row.get("connectorType") or rid, } ) except Exception as e: logger.error(f"integrations-overview trustee accounting: {e}") out["errors"].append(f"trusteeAccounting: {e}") # --- Live stats (billing AI calls + workflow metrics) --- liveStats: Dict[str, Any] = { "aiCallCount": 0, "aiCallPeriodDays": 30, "totalWorkflows": 0, "activeWorkflows": 0, "totalRuns": 0, "totalTokens": 0, } # Billing: count AI transactions in the last 30 days if user is not None: try: from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface mandateIds: List[str] = [] for um in root.getUserMandates(userId): mid = getattr(um, "mandateId", None) if mid and getattr(um, "enabled", True): mandateIds.append(str(mid)) if mandateIds: bi = getBillingInterface(user, mandateIds[0]) now = time.time() startTs = now - 30 * 86400 stats = bi.getTransactionStatisticsAggregated( mandateIds=mandateIds, scope="all", userId=userId, startTs=startTs, endTs=now, bucketSize="month", ) liveStats["aiCallCount"] = stats.get("transactionCount", 0) except Exception as e: logger.debug(f"integrations-overview billing stats: {e}") # Workflow metrics (same logic as routeWorkflowDashboard.get_workflow_metrics) try: from modules.shared.configuration import APP_CONFIG from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.datamodels.datamodelPagination import PaginationParams from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( AutoWorkflow, AutoRun, ) wfDb = DatabaseConnector( dbHost=APP_CONFIG.get("DB_HOST", "localhost"), dbDatabase="poweron_graphicaleditor", dbUser=APP_CONFIG.get("DB_USER"), dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"), dbPort=int(APP_CONFIG.get("DB_PORT", 5432)), userId=None, ) if wfDb._ensureTableExists(AutoWorkflow): mandateIds_wf: List[str] = [] for um in root.getUserMandates(userId): mid = getattr(um, "mandateId", None) if mid and getattr(um, "enabled", True): mandateIds_wf.append(str(mid)) wfFilter: dict = {"isTemplate": False} if mandateIds_wf: wfFilter["mandateId"] = mandateIds_wf else: wfFilter["mandateId"] = "__impossible__" wfCount = wfDb.getRecordsetPaginated( AutoWorkflow, pagination=PaginationParams(page=1, pageSize=1), recordFilter=wfFilter, ) liveStats["totalWorkflows"] = ( wfCount.get("totalItems", 0) if isinstance(wfCount, dict) else wfCount.totalItems ) activeFilter = dict(wfFilter) activeFilter["active"] = True activeCount = wfDb.getRecordsetPaginated( AutoWorkflow, pagination=PaginationParams(page=1, pageSize=1), recordFilter=activeFilter, ) liveStats["activeWorkflows"] = ( activeCount.get("totalItems", 0) if isinstance(activeCount, dict) else activeCount.totalItems ) if wfDb._ensureTableExists(AutoRun): runFilter: dict = {} if mandateIds_wf: runFilter["mandateId"] = mandateIds_wf else: runFilter["ownerId"] = userId runCount = wfDb.getRecordsetPaginated( AutoRun, pagination=PaginationParams(page=1, pageSize=1), recordFilter=runFilter, ) liveStats["totalRuns"] = ( runCount.get("totalItems", 0) if isinstance(runCount, dict) else runCount.totalItems ) totalTokens = 0 totalRuns = liveStats["totalRuns"] if 0 < totalRuns <= 10000: allRuns = wfDb.getRecordset( AutoRun, recordFilter=runFilter, fieldFilter=["costTokens"], ) or [] for r in allRuns: totalTokens += r.get("costTokens", 0) or 0 liveStats["totalTokens"] = totalTokens except Exception as e: logger.debug(f"integrations-overview workflow stats: {e}") out["liveStats"] = liveStats return out @router.get("/integrations-overview") @limiter.limit("30/minute") def get_integrations_overview( request: Request, reqContext: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """Aggregated, non-fictitious data for the PORTA integrations diagram.""" user_id = str(reqContext.user.id) return _buildIntegrationsOverviewPayload(user_id, user=reqContext.user) @router.get("/ai-models") @limiter.limit("60/minute") def get_ai_models_for_integrations( request: Request, reqContext: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: """ Registered AI models for the Integrations architecture page. Returns unique displayName entries with connector metadata (no callables). """ try: from modules.aicore.aicoreModelRegistry import modelRegistry modelRegistry.ensureConnectorsRegistered() modelRegistry.refreshModels(force=False) models = modelRegistry.getModels() out: List[Dict[str, Any]] = [] seen: set = set() for m in models: if not getattr(m, "isAvailable", True): continue key = (m.displayName, m.connectorType) if key in seen: continue seen.add(key) dumped = m.model_dump( exclude={"functionCall", "functionCallStream", "calculatepriceCHF"}, mode="json", ) out.append(dumped) return {"models": out} except Exception as e: logger.error(f"Error listing AI models: {e}") return {"models": [], "error": str(e)}