diff --git a/app.py b/app.py index f76f7083..31085e6b 100644 --- a/app.py +++ b/app.py @@ -318,9 +318,23 @@ async def lifespan(app: FastAPI): onMandateDelete as _waOnMandateDelete, onInstanceCreate as _waOnInstanceCreate, ) + from modules.interfaces.interfaceDbBilling import ( + onMandateDelete as _billingOnMandateDelete, + onMandateProvision as _billingOnMandateProvision, + onStorageChanged as _billingOnStorageChanged, + onUserMandateCreate as _billingOnUserMandateCreate, + onUserMandateDelete as _billingOnUserMandateDelete, + onUserBudgetAdjust as _billingOnUserBudgetAdjust, + ) registerLifecycleHook("onBootstrap", _waOnBootstrap) registerLifecycleHook("onMandateDelete", _waOnMandateDelete) + registerLifecycleHook("onMandateDelete", _billingOnMandateDelete) + registerLifecycleHook("onMandateProvision", _billingOnMandateProvision) + registerLifecycleHook("onStorageChanged", _billingOnStorageChanged) registerLifecycleHook("onInstanceCreate", _waOnInstanceCreate) + registerLifecycleHook("onUserMandateCreate", _billingOnUserMandateCreate) + registerLifecycleHook("onUserMandateDelete", _billingOnUserMandateDelete) + registerLifecycleHook("onUserBudgetAdjust", _billingOnUserBudgetAdjust) # Bootstrap database if needed (creates initial users, mandates, roles, etc.) # This must happen before getting root interface diff --git a/modules/aicore/aicoreModelRegistry.py b/modules/aicore/aicoreModelRegistry.py index 164f71f9..8c57e0c4 100644 --- a/modules/aicore/aicoreModelRegistry.py +++ b/modules/aicore/aicoreModelRegistry.py @@ -10,16 +10,13 @@ import importlib import os import time import threading -from typing import Dict, List, Optional, Any, Tuple, TYPE_CHECKING +from typing import Dict, List, Optional, Any, Tuple from modules.datamodels.datamodelAi import AiModel -from modules.datamodels.datamodelRbac import AccessRuleContext +from modules.datamodels.datamodelRbac import AccessRuleContext, RbacProtocol from .aicoreBase import BaseConnectorAi from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector -if TYPE_CHECKING: - from modules.security.rbac import RbacClass - logger = logging.getLogger(__name__) # TODO TESTING: Override maxTokens for all models during testing @@ -188,7 +185,7 @@ class ModelRegistry: def getAvailableModels( self, currentUser: Optional[User] = None, - rbacInstance: Optional["RbacClass"] = None, + rbacInstance: Optional[RbacProtocol] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None ) -> List[AiModel]: @@ -239,7 +236,7 @@ class ModelRegistry: self, models: List[AiModel], currentUser: User, - rbacInstance: "RbacClass", + rbacInstance: RbacProtocol, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None ) -> List[AiModel]: @@ -264,7 +261,7 @@ class ModelRegistry: logger.debug(f"User {currentUser.username} does not have access to model {model.displayName} (connector: {model.connectorType})") return filteredModels - def getModel(self, displayName: str, currentUser: Optional[User] = None, rbacInstance: Optional["RbacClass"] = None) -> Optional[AiModel]: + def getModel(self, displayName: str, currentUser: Optional[User] = None, rbacInstance: Optional[RbacProtocol] = None) -> Optional[AiModel]: """Get a specific model by displayName, optionally checking RBAC permissions. Args: diff --git a/modules/datamodels/datamodelRbac.py b/modules/datamodels/datamodelRbac.py index 83a4525d..7ea9d710 100644 --- a/modules/datamodels/datamodelRbac.py +++ b/modules/datamodels/datamodelRbac.py @@ -10,7 +10,7 @@ Multi-Tenant Design: """ import uuid -from typing import Optional +from typing import Optional, Dict, List, Protocol, runtime_checkable from enum import Enum from pydantic import BaseModel, Field from modules.datamodels.datamodelBase import PowerOnModel @@ -174,6 +174,20 @@ class AccessRule(PowerOnModel): ) +@runtime_checkable +class RbacProtocol(Protocol): + """Structural type for RBAC checkers — allows aicore (L3) to reference + the RBAC contract without importing from security (L4).""" + + def checkResourceAccessBulk( + self, + user: "User", + resourcePaths: List[str], + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, + ) -> Dict[str, bool]: ... + + # IMMUTABLE Fields Definition - für Enforcement auf Application-Level IMMUTABLE_FIELDS = { "Role": ["mandateId", "featureInstanceId", "featureCode"], diff --git a/modules/demoConfigs/investorDemo2026.py b/modules/demoConfigs/investorDemo2026.py index 84fc5e01..e88ce6c7 100644 --- a/modules/demoConfigs/investorDemo2026.py +++ b/modules/demoConfigs/investorDemo2026.py @@ -169,7 +169,7 @@ class InvestorDemo2026(BaseDemoConfig): def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]: from modules.datamodels.datamodelUam import Mandate - from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate + from modules.interfaces.interfaceRbac import copySystemRolesToMandate existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]}) if existing: diff --git a/modules/demoConfigs/pwgDemo2026.py b/modules/demoConfigs/pwgDemo2026.py index 2d301f7a..90e3c3e4 100644 --- a/modules/demoConfigs/pwgDemo2026.py +++ b/modules/demoConfigs/pwgDemo2026.py @@ -154,7 +154,7 @@ class PwgDemo2026(BaseDemoConfig): def _ensureMandate(self, db, mandateDef: Dict, summary: Dict) -> Optional[str]: from modules.datamodels.datamodelUam import Mandate - from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate + from modules.interfaces.interfaceRbac import copySystemRolesToMandate existing = db.getRecordset(Mandate, recordFilter={"name": mandateDef["name"]}) if existing: diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py index d7a79d1f..3aa0c1a0 100644 --- a/modules/features/commcoach/serviceCommcoach.py +++ b/modules/features/commcoach/serviceCommcoach.py @@ -597,8 +597,8 @@ def _createCommcoachRagFn( from modules.serviceCenter.context import ServiceCenterContext serviceContext = ServiceCenterContext( user=currentUser, - mandate_id=mandateId, - feature_instance_id=featureInstanceId, + mandateId=mandateId, + featureInstanceId=featureInstanceId, ) knowledgeService = getService("knowledge", serviceContext) ragContext = await knowledgeService.buildAgentContext( @@ -902,8 +902,8 @@ class CommcoachService: serviceContext = ServiceCenterContext( user=self.currentUser, - mandate_id=self.mandateId, - feature_instance_id=self.instanceId, + mandateId=self.mandateId, + featureInstanceId=self.instanceId, ) agentService = getService("agent", serviceContext) @@ -1240,8 +1240,8 @@ class CommcoachService: serviceContext = ServiceCenterContext( user=self.currentUser, - mandate_id=self.mandateId, - feature_instance_id=self.instanceId, + mandateId=self.mandateId, + featureInstanceId=self.instanceId, ) knowledgeService = getService("knowledge", serviceContext) parsedGoals = aiPrompts._parseJsonField(context.get("goals") if context else None, []) @@ -1535,8 +1535,8 @@ class CommcoachService: serviceContext = ServiceCenterContext( user=self.currentUser, - mandate_id=self.mandateId, - feature_instance_id=self.instanceId, + mandateId=self.mandateId, + featureInstanceId=self.instanceId, ) aiService = getService("ai", serviceContext) await aiService.ensureAiObjectsInitialized() @@ -1561,8 +1561,8 @@ class CommcoachService: serviceContext = ServiceCenterContext( user=self.currentUser, - mandate_id=self.mandateId, - feature_instance_id=self.instanceId, + mandateId=self.mandateId, + featureInstanceId=self.instanceId, ) aiService = getService("ai", serviceContext) await aiService.ensureAiObjectsInitialized() diff --git a/modules/features/neutralization/interfaceFeatureNeutralizer.py b/modules/features/neutralization/interfaceFeatureNeutralizer.py index 3d5c9129..97a466ff 100644 --- a/modules/features/neutralization/interfaceFeatureNeutralizer.py +++ b/modules/features/neutralization/interfaceFeatureNeutralizer.py @@ -309,8 +309,8 @@ class InterfaceFeatureNeutralizer: ) -> Optional[DataNeutralizerAttributes]: """Create a neutralization attribute for placeholder resolution.""" try: - mandate_id = self.mandateId or "" - feature_instance_id = self.featureInstanceId or "" + mandateId = self.mandateId or "" + featureInstanceId = self.featureInstanceId or "" if not self.userId: logger.warning("Cannot create attribute: missing userId") return None diff --git a/modules/features/neutralization/neutralizePlayground.py b/modules/features/neutralization/neutralizePlayground.py index e855ad22..1a46cd25 100644 --- a/modules/features/neutralization/neutralizePlayground.py +++ b/modules/features/neutralization/neutralizePlayground.py @@ -22,7 +22,7 @@ class NeutralizationPlayground: self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId - self._ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId, feature_instance_id=featureInstanceId) + self._ctx = ServiceCenterContext(user=currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) def _getService(self, name: str): return getService(name, self._ctx) @@ -258,7 +258,7 @@ class SharepointProcessor: self._sharepoint = getService("sharepoint", ctx) self._neutralization = getService("neutralization", ctx) from modules.interfaces.interfaceDbApp import getInterface as _getAppInterface - self._interfaceDbApp = _getAppInterface(currentUser, mandateId=ctx.mandate_id) + self._interfaceDbApp = _getAppInterface(currentUser, mandateId=ctx.mandateId) async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]: try: diff --git a/modules/features/neutralization/routeFeatureNeutralizer.py b/modules/features/neutralization/routeFeatureNeutralizer.py index bf396e3b..488ef352 100644 --- a/modules/features/neutralization/routeFeatureNeutralizer.py +++ b/modules/features/neutralization/routeFeatureNeutralizer.py @@ -58,18 +58,17 @@ def get_neutralization_config( ) -> DataNeutraliserConfig: """Get data neutralization configuration""" try: - mandate_id = str(context.mandateId) if context.mandateId else "" - feature_instance_id = str(context.featureInstanceId) if context.featureInstanceId else "" + mandateId = str(context.mandateId) if context.mandateId else "" + featureInstanceId = str(context.featureInstanceId) if context.featureInstanceId else "" service = NeutralizationPlayground( - context.user, mandate_id, featureInstanceId=feature_instance_id or None + context.user, mandateId, featureInstanceId=featureInstanceId or None ) config = service.getConfig() if not config: - # Return default config instead of 404 (requires mandateId and featureInstanceId for instance-scoped config) return DataNeutraliserConfig( - mandateId=mandate_id, - featureInstanceId=feature_instance_id, + mandateId=mandateId, + featureInstanceId=featureInstanceId, userId=context.user.id, enabled=True, namesToParse="", diff --git a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py index 4cfec864..0388dbba 100644 --- a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py +++ b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py @@ -64,8 +64,8 @@ class NeutralizationService: elif serviceCenter and getattr(serviceCenter, "user", None): self.interfaceNeutralizer = getNeutralizerInterface( currentUser=serviceCenter.user, - mandateId=getattr(serviceCenter, 'mandateId', None) or getattr(serviceCenter, 'mandate_id', None), - featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None) or getattr(serviceCenter, 'feature_instance_id', None), + mandateId=getattr(serviceCenter, 'mandateId', None), + featureInstanceId=getattr(serviceCenter, 'featureInstanceId', None), ) namesList = NamesToParse if isinstance(NamesToParse, list) else [] diff --git a/modules/features/realEstate/serviceAiIntent.py b/modules/features/realEstate/serviceAiIntent.py index ca53c98e..d790d7c8 100644 --- a/modules/features/realEstate/serviceAiIntent.py +++ b/modules/features/realEstate/serviceAiIntent.py @@ -232,7 +232,7 @@ async def processNaturalLanguageCommand( logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {mandateId})") logger.debug(f"User input: {userInput}") - ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId) + ctx = ServiceCenterContext(user=currentUser, mandateId=mandateId) aiService = getService("ai", ctx) intentAnalysis = await analyzeUserIntent(aiService, userInput) diff --git a/modules/features/realEstate/serviceBzo.py b/modules/features/realEstate/serviceBzo.py index 178c8021..f4ec90bd 100644 --- a/modules/features/realEstate/serviceBzo.py +++ b/modules/features/realEstate/serviceBzo.py @@ -234,7 +234,7 @@ async def extract_bzo_information( bzo_params_result = None try: - ctx = ServiceCenterContext(user=currentUser, mandate_id=_mandateId, feature_instance_id=featureInstanceId) + ctx = ServiceCenterContext(user=currentUser, mandateId=_mandateId, featureInstanceId=featureInstanceId) ai_service = getService("ai", ctx) bzo_params_result = await run_bzo_params_extraction( extracted_content=all_extracted_content, @@ -520,7 +520,7 @@ async def generate_bauzone_ai_summary( AI-generated summary string """ try: - ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId, feature_instance_id=featureInstanceId) + ctx = ServiceCenterContext(user=currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) aiService = getService("ai", ctx) context_parts = [] diff --git a/modules/features/redmine/interfaceFeatureRedmine.py b/modules/features/redmine/interfaceFeatureRedmine.py index 88855501..225b5a31 100644 --- a/modules/features/redmine/interfaceFeatureRedmine.py +++ b/modules/features/redmine/interfaceFeatureRedmine.py @@ -13,7 +13,7 @@ from __future__ import annotations import logging import time -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional, Tuple from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.connectors.connectorTicketsRedmine import ConnectorTicketsRedmine @@ -21,6 +21,9 @@ from modules.datamodels.datamodelUam import User from modules.features.redmine.datamodelRedmine import ( RedmineConfigDto, RedmineConfigUpdateRequest, + RedmineCustomFieldSchemaDto, + RedmineFieldChoiceDto, + RedmineFieldSchemaDto, RedmineInstanceConfig, RedmineRelationMirror, RedmineTicketMirror, @@ -447,3 +450,135 @@ def getInterface( featureInstanceId=effectiveFeatureInstanceId, ) return _redmineInterfaces[contextKey] + + +# --------------------------------------------------------------------------- +# Project meta -- with TTL cache stored on the config record +# --------------------------------------------------------------------------- + +class RedmineNotConfiguredError(RuntimeError): + """The given feature instance has no usable Redmine config.""" + + +def _resolveRootTrackerId( + rootTrackerName: str, trackers: List[Dict[str, Any]] +) -> Optional[int]: + """Resolve the configured root tracker name to a tracker id. + + Strict: case-insensitive exact match. Returns ``None`` if not found + (the UI must surface this as a config error). + """ + target = (rootTrackerName or "").strip().lower() + if not target: + return None + for t in trackers: + if str(t.get("name") or "").strip().lower() == target: + tid = t.get("id") + return int(tid) if tid is not None else None + return None + + +def _schemaFromCache( + projectId: str, cache: Optional[Dict[str, Any]], rootTrackerName: str +) -> Optional[RedmineFieldSchemaDto]: + if not cache: + return None + trackers = cache.get("trackers") or [] + return RedmineFieldSchemaDto( + projectId=projectId, + projectName=str(cache.get("projectName") or ""), + trackers=[RedmineFieldChoiceDto(**t) for t in trackers], + statuses=[RedmineFieldChoiceDto(**s) for s in cache.get("statuses") or []], + priorities=[RedmineFieldChoiceDto(**p) for p in cache.get("priorities") or []], + users=[RedmineFieldChoiceDto(**u) for u in cache.get("users") or []], + categories=[RedmineFieldChoiceDto(**c) for c in cache.get("categories") or []], + customFields=[ + RedmineCustomFieldSchemaDto( + id=cf.get("id"), + name=cf.get("name", ""), + fieldFormat=cf.get("fieldFormat", "string"), + isRequired=bool(cf.get("isRequired")), + possibleValues=list(cf.get("possibleValues") or []), + multiple=bool(cf.get("multiple")), + defaultValue=cf.get("defaultValue"), + ) + for cf in cache.get("customFields") or [] + if cf.get("id") is not None + ], + rootTrackerName=rootTrackerName, + rootTrackerId=_resolveRootTrackerId(rootTrackerName, trackers), + ) + + +async def getProjectMeta( + currentUser: User, + mandateId: Optional[str], + featureInstanceId: str, + *, + forceRefresh: bool = False, +) -> RedmineFieldSchemaDto: + """Fetch (or return cached) project metadata: trackers, statuses, priorities, etc.""" + iface = getInterface(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) + connector = iface.resolveConnector(featureInstanceId) + if not connector: + raise RedmineNotConfiguredError( + f"Redmine instance {featureInstanceId} is not configured or inactive" + ) + cfg = iface.getConfig(featureInstanceId) + if cfg is None: + raise RedmineNotConfiguredError("Config row vanished after connector resolve") + + ttl = cfg.schemaCacheTtlSeconds if cfg.schemaCacheTtlSeconds is not None else 24 * 60 * 60 + fresh_enough = ( + cfg.schemaCache + and cfg.schemaCachedAt + and (time.time() - cfg.schemaCachedAt) < ttl + ) + if fresh_enough and not forceRefresh: + schema = _schemaFromCache(cfg.projectId, cfg.schemaCache, cfg.rootTrackerName) + if schema is not None: + return schema + + project_info = await connector.getProjectInfo() + trackers_raw = await connector.getTrackers() + statuses_raw = await connector.getStatuses() + priorities_raw = await connector.getPriorities() + custom_fields_raw = await connector.getCustomFields() + users_raw = await connector.getProjectUsers() + categories_raw = await connector.getIssueCategories() + + schema_cache: Dict[str, Any] = { + "projectName": project_info.get("name", ""), + "trackers": [{"id": t.get("id"), "name": t.get("name")} for t in trackers_raw], + "statuses": [ + { + "id": s.get("id"), + "name": s.get("name"), + "isClosed": bool(s.get("is_closed")), + } + for s in statuses_raw + ], + "priorities": [{"id": p.get("id"), "name": p.get("name")} for p in priorities_raw], + "users": [{"id": u.get("id"), "name": u.get("name")} for u in users_raw], + "categories": [{"id": c.get("id"), "name": c.get("name")} for c in categories_raw if c.get("id") is not None], + "customFields": [ + { + "id": cf.get("id"), + "name": cf.get("name"), + "fieldFormat": cf.get("field_format", "string"), + "isRequired": bool(cf.get("is_required")), + "possibleValues": [pv.get("value") for pv in (cf.get("possible_values") or []) if pv.get("value") is not None], + "multiple": bool(cf.get("multiple")), + "defaultValue": cf.get("default_value"), + } + for cf in custom_fields_raw + ], + } + iface.updateSchemaCache(featureInstanceId, schema_cache) + iface.markConfigConnected(featureInstanceId) + + return _schemaFromCache(cfg.projectId, schema_cache, cfg.rootTrackerName) or RedmineFieldSchemaDto( + projectId=cfg.projectId, + projectName=schema_cache["projectName"], + rootTrackerName=cfg.rootTrackerName, + ) diff --git a/modules/features/redmine/routeFeatureRedmine.py b/modules/features/redmine/routeFeatureRedmine.py index d973e690..bdc8797b 100644 --- a/modules/features/redmine/routeFeatureRedmine.py +++ b/modules/features/redmine/routeFeatureRedmine.py @@ -32,7 +32,7 @@ from modules.features.redmine.datamodelRedmine import ( RedmineTicketDto, RedmineTicketUpdateRequest, ) -from modules.features.redmine.serviceRedmine import RedmineNotConfiguredError +from modules.features.redmine.interfaceFeatureRedmine import RedmineNotConfiguredError from modules.connectors.connectorTicketsRedmine import RedmineApiError from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceFeatures import getFeatureInterface diff --git a/modules/features/redmine/serviceRedmine.py b/modules/features/redmine/serviceRedmine.py index b4a3d137..d772478b 100644 --- a/modules/features/redmine/serviceRedmine.py +++ b/modules/features/redmine/serviceRedmine.py @@ -25,7 +25,6 @@ workflow engine without context-magic. from __future__ import annotations import logging -import time from datetime import datetime from typing import Any, Dict, List, Optional, Tuple @@ -35,9 +34,7 @@ from modules.connectors.connectorTicketsRedmine import ( ) from modules.datamodels.datamodelUam import User from modules.features.redmine.datamodelRedmine import ( - RedmineCustomFieldSchemaDto, RedmineCustomFieldValueDto, - RedmineFieldChoiceDto, RedmineFieldSchemaDto, RedmineRelationCreateRequest, RedmineRelationDto, @@ -46,8 +43,10 @@ from modules.features.redmine.datamodelRedmine import ( RedmineTicketUpdateRequest, ) from modules.features.redmine.interfaceFeatureRedmine import ( + RedmineNotConfiguredError, RedmineObjects, getInterface, + getProjectMeta, ) from modules.features.redmine.serviceRedmineStatsCache import getStatsCache @@ -58,9 +57,6 @@ logger = logging.getLogger(__name__) # Resolution helpers # --------------------------------------------------------------------------- -class RedmineNotConfiguredError(RuntimeError): - """The given feature instance has no usable Redmine config.""" - def _resolveContext( currentUser: User, mandateId: Optional[str], featureInstanceId: str @@ -74,127 +70,6 @@ def _resolveContext( return iface, connector -# --------------------------------------------------------------------------- -# Project meta -- with TTL cache stored on the config record -# --------------------------------------------------------------------------- - -async def getProjectMeta( - currentUser: User, - mandateId: Optional[str], - featureInstanceId: str, - *, - forceRefresh: bool = False, -) -> RedmineFieldSchemaDto: - iface, connector = _resolveContext(currentUser, mandateId, featureInstanceId) - cfg = iface.getConfig(featureInstanceId) - if cfg is None: - raise RedmineNotConfiguredError("Config row vanished after connector resolve") - - ttl = cfg.schemaCacheTtlSeconds if cfg.schemaCacheTtlSeconds is not None else 24 * 60 * 60 - fresh_enough = ( - cfg.schemaCache - and cfg.schemaCachedAt - and (time.time() - cfg.schemaCachedAt) < ttl - ) - if fresh_enough and not forceRefresh: - schema = _schemaFromCache(cfg.projectId, cfg.schemaCache, cfg.rootTrackerName) - if schema is not None: - return schema - - project_info = await connector.getProjectInfo() - trackers_raw = await connector.getTrackers() - statuses_raw = await connector.getStatuses() - priorities_raw = await connector.getPriorities() - custom_fields_raw = await connector.getCustomFields() - users_raw = await connector.getProjectUsers() - categories_raw = await connector.getIssueCategories() - - schema_cache: Dict[str, Any] = { - "projectName": project_info.get("name", ""), - "trackers": [{"id": t.get("id"), "name": t.get("name")} for t in trackers_raw], - "statuses": [ - { - "id": s.get("id"), - "name": s.get("name"), - "isClosed": bool(s.get("is_closed")), - } - for s in statuses_raw - ], - "priorities": [{"id": p.get("id"), "name": p.get("name")} for p in priorities_raw], - "users": [{"id": u.get("id"), "name": u.get("name")} for u in users_raw], - "categories": [{"id": c.get("id"), "name": c.get("name")} for c in categories_raw if c.get("id") is not None], - "customFields": [ - { - "id": cf.get("id"), - "name": cf.get("name"), - "fieldFormat": cf.get("field_format", "string"), - "isRequired": bool(cf.get("is_required")), - "possibleValues": [pv.get("value") for pv in (cf.get("possible_values") or []) if pv.get("value") is not None], - "multiple": bool(cf.get("multiple")), - "defaultValue": cf.get("default_value"), - } - for cf in custom_fields_raw - ], - } - iface.updateSchemaCache(featureInstanceId, schema_cache) - iface.markConfigConnected(featureInstanceId) - - return _schemaFromCache(cfg.projectId, schema_cache, cfg.rootTrackerName) or RedmineFieldSchemaDto( - projectId=cfg.projectId, - projectName=schema_cache["projectName"], - rootTrackerName=cfg.rootTrackerName, - ) - - -def _resolveRootTrackerId( - rootTrackerName: str, trackers: List[Dict[str, Any]] -) -> Optional[int]: - """Resolve the configured root tracker name to a tracker id. - - Strict: case-insensitive exact match. Returns ``None`` if not found - (the UI must surface this as a config error). - """ - target = (rootTrackerName or "").strip().lower() - if not target: - return None - for t in trackers: - if str(t.get("name") or "").strip().lower() == target: - tid = t.get("id") - return int(tid) if tid is not None else None - return None - - -def _schemaFromCache( - projectId: str, cache: Optional[Dict[str, Any]], rootTrackerName: str -) -> Optional[RedmineFieldSchemaDto]: - if not cache: - return None - trackers = cache.get("trackers") or [] - return RedmineFieldSchemaDto( - projectId=projectId, - projectName=str(cache.get("projectName") or ""), - trackers=[RedmineFieldChoiceDto(**t) for t in trackers], - statuses=[RedmineFieldChoiceDto(**s) for s in cache.get("statuses") or []], - priorities=[RedmineFieldChoiceDto(**p) for p in cache.get("priorities") or []], - users=[RedmineFieldChoiceDto(**u) for u in cache.get("users") or []], - categories=[RedmineFieldChoiceDto(**c) for c in cache.get("categories") or []], - customFields=[ - RedmineCustomFieldSchemaDto( - id=cf.get("id"), - name=cf.get("name", ""), - fieldFormat=cf.get("fieldFormat", "string"), - isRequired=bool(cf.get("isRequired")), - possibleValues=list(cf.get("possibleValues") or []), - multiple=bool(cf.get("multiple")), - defaultValue=cf.get("defaultValue"), - ) - for cf in cache.get("customFields") or [] - if cf.get("id") is not None - ], - rootTrackerName=rootTrackerName, - rootTrackerId=_resolveRootTrackerId(rootTrackerName, trackers), - ) - # --------------------------------------------------------------------------- # Mirror -> RedmineTicketDto diff --git a/modules/features/redmine/serviceRedmineStats.py b/modules/features/redmine/serviceRedmineStats.py index 1c289181..8566db16 100644 --- a/modules/features/redmine/serviceRedmineStats.py +++ b/modules/features/redmine/serviceRedmineStats.py @@ -83,10 +83,8 @@ async def getStats( # Lazy import: keeps the pure aggregation helpers below importable # without dragging in aiohttp / DB connector at module load. - from modules.features.redmine.serviceRedmine import ( - getProjectMeta, - listTickets, - ) + from modules.features.redmine.interfaceFeatureRedmine import getProjectMeta + from modules.features.redmine.serviceRedmine import listTickets schema = await getProjectMeta(currentUser, mandateId, featureInstanceId) root_tracker_id = schema.rootTrackerId diff --git a/modules/features/redmine/serviceRedmineSync.py b/modules/features/redmine/serviceRedmineSync.py index 37507973..a56198f1 100644 --- a/modules/features/redmine/serviceRedmineSync.py +++ b/modules/features/redmine/serviceRedmineSync.py @@ -38,7 +38,7 @@ from modules.features.redmine.datamodelRedmine import ( RedmineSyncStatusDto, RedmineTicketMirror, ) -from modules.features.redmine.interfaceFeatureRedmine import getInterface +from modules.features.redmine.interfaceFeatureRedmine import getInterface, getProjectMeta from modules.features.redmine.serviceRedmineStatsCache import getStatsCache logger = logging.getLogger(__name__) @@ -281,8 +281,6 @@ async def _ensureSchemaWarm( statuses = (cfg.schemaCache or {}).get("statuses") or [] if statuses: return - # Lazy import to avoid a circular dependency at module load. - from modules.features.redmine.serviceRedmine import getProjectMeta try: await getProjectMeta(currentUser, mandateId, featureInstanceId, forceRefresh=True) except Exception as e: diff --git a/modules/features/teamsbot/service.py b/modules/features/teamsbot/service.py index 2bafd0e2..2487ad81 100644 --- a/modules/features/teamsbot/service.py +++ b/modules/features/teamsbot/service.py @@ -405,9 +405,9 @@ def createAiService(user, mandateId, featureInstanceId=None): """Create a properly wired AiService via the service center.""" ctx = ServiceCenterContext( user=user, - mandate_id=mandateId, - feature_instance_id=featureInstanceId, - feature_code="teamsbot", + mandateId=mandateId, + featureInstanceId=featureInstanceId, + featureCode="teamsbot", ) return _getServiceCenterService("ai", ctx) @@ -1320,9 +1320,9 @@ class TeamsbotService: ctx = ServiceCenterContext( user=self.currentUser, - mandate_id=self.mandateId, - feature_instance_id=self.instanceId, - feature_code="teamsbot", + mandateId=self.mandateId, + featureInstanceId=self.instanceId, + featureCode="teamsbot", ) agentService = _getServiceCenterService("agent", ctx) diff --git a/modules/features/teamsbot/serviceCommands.py b/modules/features/teamsbot/serviceCommands.py index 55f16bf0..a8ce763f 100644 --- a/modules/features/teamsbot/serviceCommands.py +++ b/modules/features/teamsbot/serviceCommands.py @@ -247,8 +247,8 @@ async def _cmdSendMail(service, sessionId: str, params: dict): from modules.serviceCenter import ServiceCenterContext, getService ctx = ServiceCenterContext( user=service.currentUser, - mandate_id=service.mandateId, - feature_instance_id=service.instanceId, + mandateId=service.mandateId, + featureInstanceId=service.instanceId, ) messaging = getService("messaging", ctx) success = messaging.sendEmailDirect( @@ -280,8 +280,8 @@ async def _cmdStoreDocument(service, sessionId: str, params: dict): from modules.serviceCenter import ServiceCenterContext, getService ctx = ServiceCenterContext( user=service.currentUser, - mandate_id=service.mandateId, - feature_instance_id=service.instanceId, + mandateId=service.mandateId, + featureInstanceId=service.instanceId, ) sharepoint = getService("sharepoint", ctx) if not sharepoint.setAccessTokenFromConnection(service.currentUser): diff --git a/modules/features/workspace/routeFeatureWorkspace.py b/modules/features/workspace/routeFeatureWorkspace.py index 6d83e234..fedda841 100644 --- a/modules/features/workspace/routeFeatureWorkspace.py +++ b/modules/features/workspace/routeFeatureWorkspace.py @@ -566,10 +566,10 @@ async def streamWorkspaceStart( wsBillingFeatureCode = _workspaceBillingFeatureCode(context.user, mandateId or "", instanceId) svcCtx = ServiceCenterContext( user=context.user, - mandate_id=mandateId or "", - feature_instance_id=instanceId, - workflow_id=workflowId, - feature_code=wsBillingFeatureCode, + mandateId=mandateId or "", + featureInstanceId=instanceId, + workflowId=workflowId, + featureCode=wsBillingFeatureCode, ) chatSvc = getService("chat", svcCtx) attachmentLabel = _buildWorkspaceAttachmentLabel( @@ -687,10 +687,10 @@ async def _runWorkspaceAgent( from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=user, - mandate_id=mandateId, - feature_instance_id=instanceId, - workflow_id=workflowId, - feature_code=billingFeatureCode, + mandateId=mandateId, + featureInstanceId=instanceId, + workflowId=workflowId, + featureCode=billingFeatureCode, ) agentService = getService("agent", ctx) chatService = getService("chat", ctx) @@ -1299,7 +1299,7 @@ async def listWorkspaceDataSources( try: from modules.datamodels.datamodelDataSource import DataSource from modules.interfaces.interfaceDbApp import getRootInterface - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import buildEffectiveByConnection + from modules.serviceCenter.core.flagResolution import buildEffectiveByConnection rootIf = getRootInterface() recordFilter: dict = {"featureInstanceId": instanceId} if wsMandateId: @@ -1352,8 +1352,8 @@ async def createWorkspaceDataSource( from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, - mandate_id=_mandateId or "", - feature_instance_id=instanceId, + mandateId=_mandateId or "", + featureInstanceId=instanceId, ) chatService = getService("chat", ctx) dataSource = chatService.createDataSource( @@ -1381,8 +1381,8 @@ async def deleteWorkspaceDataSource( from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, - mandate_id=_mandateId or "", - feature_instance_id=instanceId, + mandateId=_mandateId or "", + featureInstanceId=instanceId, ) chatService = getService("chat", ctx) chatService.deleteDataSource(dataSourceId) @@ -1464,7 +1464,7 @@ async def listFeatureDataSources( wsMandateId, _ = _validateInstanceAccess(instanceId, context) from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatures import FeatureDataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import buildEffectiveByWorkspaceFds + from modules.serviceCenter.core.flagResolution import buildEffectiveByWorkspaceFds rootIf = getRootInterface() recordFilter: dict = {} diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 4764cd4a..19ff4e26 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -11,7 +11,6 @@ Multi-Tenant Design: """ import logging -import uuid from typing import Optional, Dict from passlib.context import CryptContext from modules.connectors.connectorDbPostgre import DatabaseConnector @@ -521,6 +520,8 @@ def _ensureAllMandatesHaveSystemRoles(db: DatabaseConnector) -> None: Ensure all existing mandates have system-instance roles. Serves as both initial setup and migration for existing mandates. """ + from modules.interfaces.interfaceRbac import copySystemRolesToMandate + allMandates = db.getRecordset(Mandate) if not allMandates: logger.info("No mandates found, skipping system role copy") @@ -534,94 +535,6 @@ def _ensureAllMandatesHaveSystemRoles(db: DatabaseConnector) -> None: logger.info(f"Copied {copiedCount} system roles to mandate {mandateId}") -def copySystemRolesToMandate(db: DatabaseConnector, mandateId: str) -> int: - """ - Copy system template roles (mandateId=None, isSystemRole=True) to a mandate - as mandate-instance roles. Also copies all AccessRules for each role. - - This is analogous to how feature template roles are copied to feature instances. - Each mandate gets its own instances of admin/user/viewer with their AccessRules. - - Args: - db: Database connector instance - mandateId: Target mandate ID - - Returns: - Number of roles copied - """ - # Find system template roles (global: mandateId=NULL, isSystemRole=True) - templateRoles = db.getRecordset( - Role, - recordFilter={"isSystemRole": True, "mandateId": None} - ) - - if not templateRoles: - logger.warning(f"No system template roles found (mandateId IS NULL, isSystemRole=True)") - return 0 - - # Check which mandate-level roles already exist for this mandate - existingMandateRoles = db.getRecordset( - Role, - recordFilter={"mandateId": mandateId, "featureInstanceId": None} - ) - existingLabels = {r.get("roleLabel") for r in existingMandateRoles} - logger.info(f"copySystemRolesToMandate: mandate={mandateId}, templates={len(templateRoles)}, existing={len(existingMandateRoles)}, labels={existingLabels}") - - # Load all AccessRules for template roles - templateRoleIds = [r.get("id") for r in templateRoles] - rulesByRoleId = {} - for roleId in templateRoleIds: - rules = db.getRecordset(AccessRule, recordFilter={"roleId": roleId}) - rulesByRoleId[roleId] = rules - - copiedCount = 0 - for templateRole in templateRoles: - roleLabel = templateRole.get("roleLabel") - - # Skip if mandate already has this role - if roleLabel in existingLabels: - logger.debug(f"Mandate {mandateId} already has role '{roleLabel}', skipping") - continue - - newRoleId = str(uuid.uuid4()) - - # Create mandate-instance role - newRole = Role( - id=newRoleId, - roleLabel=roleLabel, - description=coerce_text_multilingual(templateRole.get("description", {})), - mandateId=mandateId, - featureInstanceId=None, - featureCode=None, - isSystemRole=False # Mandate-level role, not a system template - ) - db.recordCreate(Role, newRole.model_dump()) - - # Copy AccessRules - templateRules = rulesByRoleId.get(templateRole.get("id"), []) - for rule in templateRules: - newRule = AccessRule( - id=str(uuid.uuid4()), - roleId=newRoleId, - context=rule.get("context"), - item=rule.get("item"), - view=rule.get("view", False), - read=rule.get("read"), - create=rule.get("create"), - update=rule.get("update"), - delete=rule.get("delete") - ) - db.recordCreate(AccessRule, newRule.model_dump()) - - copiedCount += 1 - logger.info(f"Copied system role '{roleLabel}' to mandate {mandateId} with {len(templateRules)} AccessRules") - - if copiedCount > 0: - logger.info(f"Copied {copiedCount} system roles to mandate {mandateId}") - - return copiedCount - - def _getRoleId(db: DatabaseConnector, roleLabel: str) -> Optional[str]: """ Get role ID by label, using cache or database lookup. diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index d5fb2e49..13c7ead6 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -1560,7 +1560,7 @@ class AppObjects: # Copy system template roles to new mandate (admin, user, viewer + AccessRules) try: - from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate + from modules.interfaces.interfaceRbac import copySystemRolesToMandate copiedCount = copySystemRolesToMandate(self.db, mandateId) logger.info(f"Copied {copiedCount} system roles to new mandate {mandateId}") except Exception as e: @@ -1577,7 +1577,7 @@ class AppObjects: """ from modules.datamodels.datamodelSubscription import MandateSubscription, SubscriptionStatusEnum, BUILTIN_PLANS from modules.datamodels.datamodelFeatures import FeatureInstance - from modules.interfaces.interfaceBootstrap import copySystemRolesToMandate + from modules.interfaces.interfaceRbac import copySystemRolesToMandate from modules.interfaces.interfaceFeatures import getFeatureInterface from modules.shared.featureDiscovery import loadFeatureMainModules plan = BUILTIN_PLANS.get(planKey) @@ -1615,7 +1615,7 @@ class AppObjects: raise ValueError(f"No admin role found for mandate {mandateId} — cannot assign user without role") from modules.interfaces.interfaceDbSubscription import getRootInterface as _getSubRoot - from modules.interfaces.interfaceDbBilling import getRootInterface as _getBillingRoot + from modules.shared.systemComponentRegistry import getLifecycleHooks as _getHooks now = datetime.now(timezone.utc) nowTs = now.timestamp() @@ -1635,17 +1635,11 @@ class AppObjects: subInterface = _getSubRoot() subInterface.createSubscription(subscription) - try: - billingRoot = _getBillingRoot() - billingRoot.getOrCreateSettings(mandateId) - billingRoot.ensureActivationBudget(mandateId, planKey) - except Exception as billingEx: - logger.error( - "Initial billing setup failed for mandate %s (plan=%s): %s", - mandateId, - planKey, - billingEx, - ) + for _hook in _getHooks("onMandateProvision"): + try: + _hook(mandateId, planKey) + except Exception as _hookErr: + logger.error("onMandateProvision hook failed: %s", _hookErr) self.createUserMandate(userId, mandateId, roleIds=[adminRoleId], skipCapacityCheck=True) @@ -1865,7 +1859,6 @@ class AppObjects: from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk from modules.datamodels.datamodelFeatures import FeatureDataSource - from modules.datamodels.datamodelBilling import BillingSettings, BillingAccount, BillingTransaction from modules.datamodels.datamodelFeatures import DataNeutralizerAttributes instances = self.db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) @@ -1987,20 +1980,7 @@ class AppObjects: subInterface.db.recordDelete(MandateSubscription, subId) logger.info(f"Cascade: deleted {len(subs)} subscriptions for mandate {mandateId}") - # 3b. Delete Billing data (poweron_billing) - from modules.interfaces.interfaceDbBilling import getRootInterface as _getBillingRoot - billingDb = _getBillingRoot().db - billingAccounts = billingDb.getRecordset(BillingAccount, recordFilter={"mandateId": mandateId}) - for acc in billingAccounts: - accTxs = billingDb.getRecordset(BillingTransaction, recordFilter={"accountId": acc.get("id")}) - for tx in accTxs: - billingDb.recordDelete(BillingTransaction, tx.get("id")) - billingDb.recordDelete(BillingAccount, acc.get("id")) - billingSettings = billingDb.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) - for bs in billingSettings: - billingDb.recordDelete(BillingSettings, bs.get("id")) - if billingAccounts or billingSettings: - logger.info(f"Cascade: deleted billing data for mandate {mandateId}") + # 3b. Billing data cascade handled by onMandateDelete lifecycle hook (interfaceDbBilling) # 3c. Delete Invitations for this mandate from modules.datamodels.datamodelInvitation import Invitation @@ -2155,10 +2135,20 @@ class AppObjects: ) self.db.recordCreate(UserMandateRole, userMandateRole.model_dump()) - self._ensureUserBillingAccount(userId, mandateId) + from modules.shared.systemComponentRegistry import getLifecycleHooks + for _hook in getLifecycleHooks("onUserMandateCreate"): + try: + _hook(userId, mandateId) + except Exception as _hookErr: + logger.warning("onUserMandateCreate hook failed: %s", _hookErr) + self._syncSubscriptionQuantity(mandateId) if not skipCapacityCheck: - self._adjustAiBudgetForUserChange(mandateId, delta=+1) + for _hook in getLifecycleHooks("onUserBudgetAdjust"): + try: + _hook(mandateId, +1) + except Exception as _hookErr: + logger.warning("onUserBudgetAdjust hook failed: %s", _hookErr) cleanedRecord = dict(createdRecord) return UserMandate(**cleanedRecord) @@ -2167,26 +2157,6 @@ class AppObjects: raise logger.error(f"Error creating UserMandate: {e}") raise ValueError(f"Failed to create UserMandate: {e}") from e - - def _ensureUserBillingAccount(self, userId: str, mandateId: str) -> None: - """ - Ensure a user has a billing audit account for the mandate. - Balance is always on the mandate pool (PREPAY_MANDATE). User accounts are for audit trail only. - """ - try: - from modules.interfaces.interfaceDbBilling import getRootInterface as getBillingRootInterface - - billingInterface = getBillingRootInterface() - settings = billingInterface.getSettings(mandateId) - - if not settings: - return - - billingInterface.getOrCreateUserAccount(mandateId, userId, initialBalance=0.0) - logger.info(f"Ensured billing audit account for user {userId} in mandate {mandateId}") - - except Exception as e: - logger.warning(f"Failed to create billing account for user {userId} (non-critical): {e}") def _checkSubscriptionCapacity(self, mandateId: str, resourceType: str, delta: int = 1) -> None: """Check subscription capacity before creating a resource. Raises on cap violation.""" @@ -2222,23 +2192,6 @@ class AppObjects: raise logger.debug(f"Subscription quantity sync skipped: {e}") - def _adjustAiBudgetForUserChange(self, mandateId: str, delta: int) -> None: - """Pro-rata AI budget credit/debit when a user is added or removed mid-cycle.""" - try: - from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface - from modules.interfaces.interfaceDbBilling import getInterface as getBillingInterface - from modules.security.rootAccess import getRootUser - rootUser = getRootUser() - subIf = getSubInterface(rootUser, mandateId) - operative = subIf.getOperativeForMandate(mandateId) - if not operative: - return - planKey = operative.get("planKey", "") - billingIf = getBillingInterface(rootUser) - billingIf.adjustAiBudgetForUserChange(mandateId, planKey, delta) - except Exception as e: - logger.debug(f"AI budget adjustment skipped: {e}") - def deleteUserMandate(self, userId: str, mandateId: str) -> bool: """ Delete a UserMandate record (remove user from mandate). @@ -2278,7 +2231,14 @@ class AppObjects: result = self.db.recordDelete(UserMandate, existing.id) self._syncSubscriptionQuantity(mandateId) - self._adjustAiBudgetForUserChange(mandateId, delta=-1) + + from modules.shared.systemComponentRegistry import getLifecycleHooks + for _hook in getLifecycleHooks("onUserMandateDelete"): + try: + _hook(userId, mandateId) + except Exception as _hookErr: + logger.warning("onUserMandateDelete hook failed: %s", _hookErr) + return result except Exception as e: logger.error(f"Error deleting UserMandate: {e}") diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py index d51813d8..84cc748e 100644 --- a/modules/interfaces/interfaceDbBilling.py +++ b/modules/interfaces/interfaceDbBilling.py @@ -2144,3 +2144,83 @@ class BillingObjects: # Sort by creation date descending and limit _sortBillingTransactionsBySysCreatedAtDesc(allTransactions, "getUserTransactionsForMandates") return allTransactions[:limit] + + def deleteMandateData(self, mandateId: str) -> None: + """Delete all billing data for a mandate (accounts, transactions, settings). + + Used as cascade during mandate hard-delete via the onMandateDelete lifecycle hook. + """ + billingAccounts = self.db.getRecordset(BillingAccount, recordFilter={"mandateId": mandateId}) + for acc in billingAccounts: + accTxs = self.db.getRecordset(BillingTransaction, recordFilter={"accountId": acc.get("id")}) + for tx in accTxs: + self.db.recordDelete(BillingTransaction, tx.get("id")) + self.db.recordDelete(BillingAccount, acc.get("id")) + billingSettings = self.db.getRecordset(BillingSettings, recordFilter={"mandateId": mandateId}) + for bs in billingSettings: + self.db.recordDelete(BillingSettings, bs.get("id")) + if billingAccounts or billingSettings: + logger.info("deleteMandateData: deleted billing data for mandate %s", mandateId) + + +def onMandateDelete(mandateId: str, instances: list) -> None: + """Lifecycle hook: cascade-delete billing data when a mandate is hard-deleted.""" + getRootInterface().deleteMandateData(mandateId) + + +def onUserMandateCreate(userId: str, mandateId: str) -> None: + """Lifecycle hook: ensure user has a billing audit account when added to a mandate.""" + try: + billingInterface = getRootInterface() + settings = billingInterface.getSettings(mandateId) + if not settings: + return + billingInterface.getOrCreateUserAccount(mandateId, userId, initialBalance=0.0) + logger.info("Ensured billing audit account for user %s in mandate %s", userId, mandateId) + except Exception as e: + logger.warning("Failed to create billing account for user %s (non-critical): %s", userId, e) + + +def onUserMandateDelete(userId: str, mandateId: str) -> None: + """Lifecycle hook: pro-rata AI budget debit when user is removed from a mandate.""" + _adjustAiBudgetForUserChange(mandateId, delta=-1) + + +def onUserBudgetAdjust(mandateId: str, delta: int) -> None: + """Lifecycle hook: pro-rata AI budget credit/debit for user membership changes.""" + _adjustAiBudgetForUserChange(mandateId, delta) + + +def onMandateProvision(mandateId: str, planKey: str) -> None: + """Lifecycle hook: create billing settings and activation budget for a new mandate.""" + try: + billingRoot = getRootInterface() + billingRoot.getOrCreateSettings(mandateId) + billingRoot.ensureActivationBudget(mandateId, planKey) + except Exception as e: + logger.error("Initial billing setup failed for mandate %s (plan=%s): %s", mandateId, planKey, e) + + +def onStorageChanged(mandateId: str) -> None: + """Lifecycle hook: reconcile storage billing after knowledge content changes.""" + try: + getRootInterface().reconcileMandateStorageBilling(mandateId) + except Exception as e: + logger.warning("reconcileMandateStorageBilling failed for mandate %s: %s", mandateId, e) + + +def _adjustAiBudgetForUserChange(mandateId: str, delta: int) -> None: + """Pro-rata AI budget credit/debit when a user is added or removed mid-cycle.""" + try: + from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface + from modules.security.rootAccess import getRootUser + rootUser = getRootUser() + subIf = getSubInterface(rootUser, mandateId) + operative = subIf.getOperativeForMandate(mandateId) + if not operative: + return + planKey = operative.get("planKey", "") + billingIf = getInterface(rootUser) + billingIf.adjustAiBudgetForUserChange(mandateId, planKey, delta) + except Exception as e: + logger.debug("AI budget adjustment skipped: %s", e) diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py index e979bbd3..20d66dc2 100644 --- a/modules/interfaces/interfaceDbKnowledge.py +++ b/modules/interfaces/interfaceDbKnowledge.py @@ -123,13 +123,13 @@ class KnowledgeObjects: if mid: mandateIds.add(str(mid)) + from modules.shared.systemComponentRegistry import getLifecycleHooks for mid in mandateIds: - try: - from modules.interfaces.interfaceDbBilling import getRootInterface as getBillingRoot - - getBillingRoot().reconcileMandateStorageBilling(mid) - except Exception as ex: - logger.warning("reconcileMandateStorageBilling after connection purge failed: %s", ex) + for _hook in getLifecycleHooks("onStorageChanged"): + try: + _hook(mid) + except Exception as ex: + logger.warning("onStorageChanged hook after connection purge failed: %s", ex) return {"indexRows": indexCount, "chunks": chunkCount} @@ -166,12 +166,13 @@ class KnowledgeObjects: if mid: mandateIds.add(str(mid)) + from modules.shared.systemComponentRegistry import getLifecycleHooks for mid in mandateIds: - try: - from modules.interfaces.interfaceDbBilling import getRootInterface as getBillingRoot - getBillingRoot().reconcileMandateStorageBilling(mid) - except Exception as ex: - logger.warning("reconcileMandateStorageBilling after datasource purge failed: %s", ex) + for _hook in getLifecycleHooks("onStorageChanged"): + try: + _hook(mid) + except Exception as ex: + logger.warning("onStorageChanged hook after datasource purge failed: %s", ex) return {"indexRows": indexCount, "chunks": chunkCount} @@ -196,12 +197,12 @@ class KnowledgeObjects: self.db.recordDelete(ContentChunk, chunk["id"]) ok = self.db.recordDelete(FileContentIndex, fileId) if ok and mandateId: - try: - from modules.interfaces.interfaceDbBilling import getRootInterface - - getRootInterface().reconcileMandateStorageBilling(str(mandateId)) - except Exception as ex: - logger.warning("reconcileMandateStorageBilling after delete failed: %s", ex) + from modules.shared.systemComponentRegistry import getLifecycleHooks + for _hook in getLifecycleHooks("onStorageChanged"): + try: + _hook(str(mandateId)) + except Exception as ex: + logger.warning("onStorageChanged hook after delete failed: %s", ex) return ok # ========================================================================= diff --git a/modules/interfaces/interfaceRbac.py b/modules/interfaces/interfaceRbac.py index 16429acb..5c09942a 100644 --- a/modules/interfaces/interfaceRbac.py +++ b/modules/interfaces/interfaceRbac.py @@ -27,12 +27,15 @@ import json import math import re import copy +import uuid from datetime import datetime, timezone from typing import List, Dict, Any, Optional, Type, Union from pydantic import BaseModel -from modules.datamodels.datamodelRbac import AccessRuleContext +from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult +from modules.datamodels.datamodelUtils import coerce_text_multilingual +from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.security.rbac import RbacClass from modules.security.rootAccess import getRootDbAppConnector @@ -1123,3 +1126,96 @@ def _checkRowPermission( # Unknown level - deny by default return False + + +# ============================================================================= +# System Role Provisioning +# ============================================================================= + + +def copySystemRolesToMandate(db: DatabaseConnector, mandateId: str) -> int: + """ + Copy system template roles (mandateId=None, isSystemRole=True) to a mandate + as mandate-instance roles. Also copies all AccessRules for each role. + + This is analogous to how feature template roles are copied to feature instances. + Each mandate gets its own instances of admin/user/viewer with their AccessRules. + + Args: + db: Database connector instance + mandateId: Target mandate ID + + Returns: + Number of roles copied + """ + templateRoles = db.getRecordset( + Role, + recordFilter={"isSystemRole": True, "mandateId": None} + ) + + if not templateRoles: + logger.warning("No system template roles found (mandateId IS NULL, isSystemRole=True)") + return 0 + + existingMandateRoles = db.getRecordset( + Role, + recordFilter={"mandateId": mandateId, "featureInstanceId": None} + ) + existingLabels = {r.get("roleLabel") for r in existingMandateRoles} + logger.info( + "copySystemRolesToMandate: mandate=%s, templates=%s, existing=%s, labels=%s", + mandateId, len(templateRoles), len(existingMandateRoles), existingLabels, + ) + + templateRoleIds = [r.get("id") for r in templateRoles] + rulesByRoleId = {} + for roleId in templateRoleIds: + rules = db.getRecordset(AccessRule, recordFilter={"roleId": roleId}) + rulesByRoleId[roleId] = rules + + copiedCount = 0 + for templateRole in templateRoles: + roleLabel = templateRole.get("roleLabel") + + if roleLabel in existingLabels: + logger.debug("Mandate %s already has role '%s', skipping", mandateId, roleLabel) + continue + + newRoleId = str(uuid.uuid4()) + + newRole = Role( + id=newRoleId, + roleLabel=roleLabel, + description=coerce_text_multilingual(templateRole.get("description", {})), + mandateId=mandateId, + featureInstanceId=None, + featureCode=None, + isSystemRole=False, + ) + db.recordCreate(Role, newRole.model_dump()) + + templateRules = rulesByRoleId.get(templateRole.get("id"), []) + for rule in templateRules: + newRule = AccessRule( + id=str(uuid.uuid4()), + roleId=newRoleId, + context=rule.get("context"), + item=rule.get("item"), + view=rule.get("view", False), + read=rule.get("read"), + create=rule.get("create"), + update=rule.get("update"), + delete=rule.get("delete"), + ) + db.recordCreate(AccessRule, newRule.model_dump()) + + copiedCount += 1 + logger.info( + "Copied system role '%s' to mandate %s with %s AccessRules", + roleLabel, mandateId, len(templateRules), + ) + + if copiedCount > 0: + logger.info("Copied %s system roles to mandate %s", copiedCount, mandateId) + + return copiedCount diff --git a/modules/routes/routeDataConnections.py b/modules/routes/routeDataConnections.py index 5de77a9b..7a327d16 100644 --- a/modules/routes/routeDataConnections.py +++ b/modules/routes/routeDataConnections.py @@ -798,7 +798,7 @@ async def _updateKnowledgeConsent( cancelled = cancelJobsByConnection(connectionId) else: from modules.datamodels.datamodelDataSource import DataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag + from modules.serviceCenter.core.flagResolution import getEffectiveFlag allConnDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) dataSources = [ ds for ds in (allConnDs or []) diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index 52a4b98a..a7a4e34b 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -98,17 +98,17 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user, *, man return file_meta = mgmtInterface.getFile(fileId) - feature_instance_id = "" - mandate_id = "" + featureInstanceId = "" + mandateId = "" file_scope = "personal" if file_meta: if isinstance(file_meta, dict): - feature_instance_id = file_meta.get("featureInstanceId") or "" - mandate_id = file_meta.get("mandateId") or "" + featureInstanceId = file_meta.get("featureInstanceId") or "" + mandateId = file_meta.get("mandateId") or "" file_scope = file_meta.get("scope") or "personal" else: - feature_instance_id = getattr(file_meta, "featureInstanceId", None) or "" - mandate_id = getattr(file_meta, "mandateId", None) or "" + featureInstanceId = getattr(file_meta, "featureInstanceId", None) or "" + mandateId = getattr(file_meta, "mandateId", None) or "" file_scope = getattr(file_meta, "scope", None) or "personal" logger.info(f"Auto-index starting for {fileName} ({len(rawBytes)} bytes, {mimeType})") @@ -121,8 +121,8 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user, *, man fileId=fileId, fileName=fileName, userId=userId, - featureInstanceId=str(feature_instance_id) if feature_instance_id else "", - mandateId=str(mandate_id) if mandate_id else "", + featureInstanceId=str(featureInstanceId) if featureInstanceId else "", + mandateId=str(mandateId) if mandateId else "", scope=file_scope, ) logger.info( @@ -208,8 +208,8 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user, *, man ctx = ServiceCenterContext( user=user, - mandate_id=str(mandate_id) if mandate_id else "", - feature_instance_id=str(feature_instance_id) if feature_instance_id else "", + mandateId=str(mandateId) if mandateId else "", + featureInstanceId=str(featureInstanceId) if featureInstanceId else "", ) knowledgeService = getService("knowledge", ctx) @@ -222,8 +222,8 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user, *, man fileName=fileName, mimeType=mimeType, userId=userId, - featureInstanceId=str(feature_instance_id) if feature_instance_id else "", - mandateId=str(mandate_id) if mandate_id else "", + featureInstanceId=str(featureInstanceId) if featureInstanceId else "", + mandateId=str(mandateId) if mandateId else "", contentObjects=contentObjects, structure=contentIndex.structure, provenance={"lane": "upload", "route": "routeDataFiles._autoIndexFile"}, diff --git a/modules/routes/routeRagInventory.py b/modules/routes/routeRagInventory.py index f7219c60..419ddec1 100644 --- a/modules/routes/routeRagInventory.py +++ b/modules/routes/routeRagInventory.py @@ -86,7 +86,7 @@ def _buildConnectionInventory(connections, rootIf, knowledgeIf, jobService) -> L """ from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelKnowledge import FileContentIndex - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag + from modules.serviceCenter.core.flagResolution import getEffectiveFlag out = [] for conn in connections: @@ -236,7 +236,7 @@ def _buildFeatureInstanceInventory(featureInstanceIds, rootIf, knowledgeIf) -> L from modules.datamodels.datamodelKnowledge import FileContentIndex from modules.datamodels.datamodelFeatures import FeatureDataSource from modules.interfaces.interfaceFeatures import getFeatureInterface - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds + from modules.serviceCenter.core.flagResolution import getEffectiveFlagFds from modules.serviceCenter.services.serviceBackgroundJobs import mainBackgroundJobService as jobService from modules.serviceCenter.services.serviceKnowledge.subFeatureBootstrap import FEATURE_BOOTSTRAP_JOB_TYPE @@ -548,7 +548,7 @@ async def _reindexConnection( if str(conn.userId) != str(currentUser.id): raise HTTPException(status_code=403, detail="Not your connection") - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag + from modules.serviceCenter.core.flagResolution import getEffectiveFlag dataSources = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) ragDs = [ds for ds in dataSources if getEffectiveFlag(ds, "ragIndexEnabled", dataSources, mode="walk") is True] if not ragDs: diff --git a/modules/routes/routeVoiceUser.py b/modules/routes/routeVoiceUser.py index 7ddfbed4..ce14afe0 100644 --- a/modules/routes/routeVoiceUser.py +++ b/modules/routes/routeVoiceUser.py @@ -251,7 +251,7 @@ async def _generateTtsSampleTextForLocale( from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException mandateId = _resolveMandateIdForVoiceTestAi(request, currentUser) - ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId, feature_instance_id=None) + ctx = ServiceCenterContext(user=currentUser, mandateId=mandateId, featureInstanceId=None) aiService = getService("ai", ctx) systemPrompt = ( diff --git a/modules/routes/routeWorkflowAutomation.py b/modules/routes/routeWorkflowAutomation.py index 8a9dd587..afd4aaa0 100644 --- a/modules/routes/routeWorkflowAutomation.py +++ b/modules/routes/routeWorkflowAutomation.py @@ -596,8 +596,8 @@ def _buildServiceCenterContext(context: RequestContext, mandateId: str, instance from modules.serviceCenter.context import ServiceCenterContext return ServiceCenterContext( user=context.user, - mandate_id=str(context.mandateId) if context.mandateId else mandateId, - feature_instance_id=instanceId, + mandateId=str(context.mandateId) if context.mandateId else mandateId, + featureInstanceId=instanceId, ) @@ -1366,6 +1366,21 @@ def _buildExecuteRunEnvelope( return env +def _startEmailPollerIfNeeded(result: dict) -> None: + """Start the background email poller when a run pauses for email wait.""" + if not isinstance(result, dict) or result.get("waitReason") != "email": + return + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.workflowAutomation.scheduler.emailPoller import ensureRunning + root = getRootInterface() + eventUser = root.getUserByUsername("event") if root else None + if eventUser: + ensureRunning(eventUser) + except Exception as pollErr: + logger.warning("Could not start email poller: %s", pollErr) + + @router.post("/workflows/{workflowId}/execute") @limiter.limit("30/minute") async def _executeWorkflow( @@ -1446,6 +1461,7 @@ async def _executeWorkflow( "workflowAutomation execute result: success=%s error=%s paused=%s", result.get("success"), result.get("error"), result.get("paused"), ) + _startEmailPollerIfNeeded(result) return result @@ -1778,7 +1794,7 @@ async def _completeTask( graph = wfForGraph["graph"] services = _getWorkflowAutomationServices(context.user, mandateId=mandateId, featureInstanceId=instanceId) - return await executeGraph( + result = await executeGraph( graph=graph, services=services, workflowId=workflowId, @@ -1790,6 +1806,8 @@ async def _completeTask( startAfterNodeId=taskNodeId, runId=runId, ) + _startEmailPollerIfNeeded(result) + return result @router.post("/tasks/{taskId}/cancel") diff --git a/modules/serviceCenter/__init__.py b/modules/serviceCenter/__init__.py index a2590fc6..968b8acf 100644 --- a/modules/serviceCenter/__init__.py +++ b/modules/serviceCenter/__init__.py @@ -33,7 +33,7 @@ def getService( Args: key: Service key (e.g., "web", "extraction", "utils") - context: ServiceCenterContext with user, mandate_id, feature_instance_id, workflow + context: ServiceCenterContext with user, mandateId, featureInstanceId, workflow Returns: Service instance diff --git a/modules/serviceCenter/context.py b/modules/serviceCenter/context.py index 24868fca..2738f8b3 100644 --- a/modules/serviceCenter/context.py +++ b/modules/serviceCenter/context.py @@ -16,20 +16,10 @@ class ServiceCenterContext: """Context for service resolution: user, mandate, feature instance, optional workflow.""" user: User - mandate_id: Optional[str] = None - feature_instance_id: Optional[str] = None - workflow_id: Optional[str] = None + mandateId: Optional[str] = None + featureInstanceId: Optional[str] = None + workflowId: Optional[str] = None workflow: Any = None requireNeutralization: Optional[bool] = None # When workflow is absent (e.g. workspace agent), billing/UI still need feature code for transactions. - feature_code: Optional[str] = None - - @property - def mandateId(self) -> Optional[str]: - """Alias for mandate_id (backward compatibility).""" - return self.mandate_id - - @property - def featureInstanceId(self) -> Optional[str]: - """Alias for feature_instance_id (backward compatibility).""" - return self.feature_instance_id + featureCode: Optional[str] = None diff --git a/modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py b/modules/serviceCenter/core/flagResolution.py similarity index 100% rename from modules/serviceCenter/services/serviceKnowledge/_inheritFlags.py rename to modules/serviceCenter/core/flagResolution.py diff --git a/modules/serviceCenter/core/serviceSecurity/mainServiceSecurity.py b/modules/serviceCenter/core/serviceSecurity/mainServiceSecurity.py index 4591c36e..b5a9a84b 100644 --- a/modules/serviceCenter/core/serviceSecurity/mainServiceSecurity.py +++ b/modules/serviceCenter/core/serviceSecurity/mainServiceSecurity.py @@ -20,12 +20,12 @@ class SecurityService: def __init__(self, context: Any, get_service: Callable[[str], Any]): """Initialize with service center context and resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service self._tokenManager = TokenManager() from modules.interfaces.interfaceDbApp import getInterface as getAppInterface self._interfaceDbApp = getAppInterface( context.user, - mandateId=context.mandate_id, + mandateId=context.mandateId, ) def getFreshToken(self, connectionId: str, secondsBeforeExpiry: int = 30 * 60) -> Optional[Token]: diff --git a/modules/serviceCenter/core/serviceStreaming/mainServiceStreaming.py b/modules/serviceCenter/core/serviceStreaming/mainServiceStreaming.py index c6c7ddf7..76369553 100644 --- a/modules/serviceCenter/core/serviceStreaming/mainServiceStreaming.py +++ b/modules/serviceCenter/core/serviceStreaming/mainServiceStreaming.py @@ -19,7 +19,7 @@ class StreamingService: def __init__(self, context: Any, get_service: Callable[[str], Any]): """Initialize with service center context and resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service def getEventManager(self) -> EventManager: """Get the global event manager instance for SSE streaming.""" diff --git a/modules/serviceCenter/core/serviceUtils/mainServiceUtils.py b/modules/serviceCenter/core/serviceUtils/mainServiceUtils.py index d22eab1b..856514bf 100644 --- a/modules/serviceCenter/core/serviceUtils/mainServiceUtils.py +++ b/modules/serviceCenter/core/serviceUtils/mainServiceUtils.py @@ -22,7 +22,7 @@ class UtilsService: def __init__(self, context, get_service: Callable[[str], Any]): """Initialize with service center context and resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service # ===== Event handling ===== diff --git a/modules/serviceCenter/core/types.py b/modules/serviceCenter/core/types.py new file mode 100644 index 00000000..19c15081 --- /dev/null +++ b/modules/serviceCenter/core/types.py @@ -0,0 +1,90 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Neutral protocol types used across serviceCenter services. + +Protocols defined here break import cycles by providing structural typing +contracts that services can depend on without importing concrete classes +from sibling services. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Protocol, runtime_checkable + + +# --------------------------------------------------------------------------- +# FeatureDataProviderProtocol (used by serviceKnowledge, implemented in serviceAgent) +# --------------------------------------------------------------------------- + +@runtime_checkable +class FeatureDataProviderProtocol(Protocol): + """Structural contract for the RBAC-scoped feature-data read layer. + + serviceKnowledge depends on this Protocol for RAG indexing; + serviceAgent supplies the concrete FeatureDataProvider implementation. + """ + + def browseTable( + self, + tableName: str, + featureInstanceId: str, + mandateId: str, + fields: Optional[List[str]] = None, + limit: int = 50, + offset: int = 0, + extraFilters: Optional[List[Dict[str, Any]]] = None, + ) -> Dict[str, Any]: ... + + async def finalizeRowsAsync( + self, + tableName: str, + rows: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: ... + + +# --------------------------------------------------------------------------- +# FeatureDataProvider factory registry +# --------------------------------------------------------------------------- + +_featureDataProviderFactory = None + + +def registerFeatureDataProviderFactory(factory) -> None: + """Register the concrete FeatureDataProvider class (called at composition time).""" + global _featureDataProviderFactory + _featureDataProviderFactory = factory + + +def createFeatureDataProvider( + dbConnector, + neutralizeFields: Optional[Dict[str, List[str]]] = None, + neutralizePolicy: Optional[Dict[str, Dict[str, Any]]] = None, + neutralizationService: Optional[Any] = None, +) -> FeatureDataProviderProtocol: + """Instantiate a FeatureDataProvider without importing serviceAgent.""" + if _featureDataProviderFactory is None: + raise RuntimeError( + "FeatureDataProvider factory not registered. " + "Ensure serviceAgent is initialized before serviceKnowledge bootstrap runs." + ) + return _featureDataProviderFactory( + dbConnector, + neutralizeFields=neutralizeFields, + neutralizePolicy=neutralizePolicy, + neutralizationService=neutralizationService, + ) + + +# --------------------------------------------------------------------------- +# RendererProtocol (used by serviceExtraction, implemented in serviceGeneration) +# --------------------------------------------------------------------------- + +@runtime_checkable +class RendererProtocol(Protocol): + """Structural contract for document renderers. + + serviceExtraction depends on this Protocol for type hints; + serviceGeneration supplies BaseRenderer and its subclasses. + """ + + def getExtractionGuidelines(self) -> str: ... diff --git a/modules/serviceCenter/resolver.py b/modules/serviceCenter/resolver.py index 316ce052..729adb69 100644 --- a/modules/serviceCenter/resolver.py +++ b/modules/serviceCenter/resolver.py @@ -19,7 +19,7 @@ GetServiceFunc = Callable[[str], Any] def _make_context_id(ctx: ServiceCenterContext) -> str: """Create a stable cache key from context.""" - return f"{id(ctx.user)}_{ctx.mandate_id or ''}_{ctx.feature_instance_id or ''}" + return f"{id(ctx.user)}_{ctx.mandateId or ''}_{ctx.featureInstanceId or ''}" def _load_service_class(module_path: str, class_name: str): diff --git a/modules/serviceCenter/services/serviceAgent/__init__.py b/modules/serviceCenter/services/serviceAgent/__init__.py index 05d5452b..8878ece1 100644 --- a/modules/serviceCenter/services/serviceAgent/__init__.py +++ b/modules/serviceCenter/services/serviceAgent/__init__.py @@ -1,3 +1,8 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """serviceAgent: AI Agent with ReAct loop and native function calling.""" + +from modules.serviceCenter.core.types import registerFeatureDataProviderFactory +from modules.serviceCenter.services.serviceAgent.featureDataProvider import FeatureDataProvider + +registerFeatureDataProviderFactory(FeatureDataProvider) diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_dataSourceTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_dataSourceTools.py index 76fd0bae..f1e49368 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_dataSourceTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_dataSourceTools.py @@ -151,7 +151,7 @@ def registerDataSourceTools(registry: ToolRegistry, services): sourceType = ds.get("sourceType", "") path = ds.get("path", "/") label = ds.get("label", "") - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag + from modules.serviceCenter.core.flagResolution import getEffectiveFlag from modules.datamodels.datamodelDataSource import DataSource from modules.interfaces.interfaceDbApp import getRootInterface rootIf = getRootInterface() diff --git a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py index 2dfc4686..f522a62a 100644 --- a/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py +++ b/modules/serviceCenter/services/serviceAgent/coreTools/_featureSubAgentTools.py @@ -109,7 +109,7 @@ def registerFeatureSubAgentTools(registry: ToolRegistry, services): recordFilter={"featureInstanceId": featureInstanceId}, ) - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds + from modules.serviceCenter.core.flagResolution import getEffectiveFlagFds _fdsAll = featureDataSources or [] _anySourceNeutralize = any( getEffectiveFlagFds(ds, "neutralize", _fdsAll, mode="walk") is True @@ -160,7 +160,7 @@ def registerFeatureSubAgentTools(registry: ToolRegistry, services): # A2: build the per-table type/inheritance-aware neutralization policy. # tableActive = effective (own or inherited) table-level neutralize flag; # explicitFields = fields whose neutralize flag is set explicitly. - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import resolveEffectiveForFds + from modules.serviceCenter.core.flagResolution import resolveEffectiveForFds neutralizePolicy: Dict[str, Dict[str, Any]] = {} for tblObj in selectedTables: tn = tblObj.get("meta", {}).get("table", "") if isinstance(tblObj, dict) else "" diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index e977f596..390d062c 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -68,8 +68,8 @@ class ServicesBag: self._context = context self._getService = getService self.user = context.user - self.mandateId = context.mandate_id - self.featureInstanceId = context.feature_instance_id + self.mandateId = context.mandateId + self.featureInstanceId = context.featureInstanceId @property def workflow(self): diff --git a/modules/serviceCenter/services/serviceAi/mainServiceAi.py b/modules/serviceCenter/services/serviceAi/mainServiceAi.py index 0fd10678..98489dc8 100644 --- a/modules/serviceCenter/services/serviceAi/mainServiceAi.py +++ b/modules/serviceCenter/services/serviceAi/mainServiceAi.py @@ -42,10 +42,10 @@ class _ServicesAdapter: Workflow is read from context dynamically so propagation updates are visible.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context - self._get_service = get_service + self._getService = get_service self.user = context.user - self.mandateId = context.mandate_id - self.featureInstanceId = context.feature_instance_id + self.mandateId = context.mandateId + self.featureInstanceId = context.featureInstanceId @property def workflow(self): @@ -57,31 +57,31 @@ class _ServicesAdapter: @property def chat(self): - return self._get_service("chat") + return self._getService("chat") @property def extraction(self): - return self._get_service("extraction") + return self._getService("extraction") @property def utils(self): - return self._get_service("utils") + return self._getService("utils") @property def ai(self): - return self._get_service("ai") + return self._getService("ai") @property def interfaceDbChat(self): - return self._get_service("chat").interfaceDbChat + return self._getService("chat").interfaceDbChat @property def interfaceDbComponent(self): - return self._get_service("chat").interfaceDbComponent + return self._getService("chat").interfaceDbComponent @property def featureCode(self) -> Optional[str]: - fc = getattr(self._context, "feature_code", None) + fc = getattr(self._context, "featureCode", None) if fc and str(fc).strip(): return str(fc).strip() w = self.workflow @@ -102,11 +102,11 @@ class AiService: """Initialize with ServiceCenterContext and service resolver. Args: - context: ServiceCenterContext with user, mandate_id, feature_instance_id, workflow + context: ServiceCenterContext with user, mandateId, featureInstanceId, workflow get_service: Callable to resolve dependency services by key """ self.services = _ServicesAdapter(context, get_service) - self._get_service = get_service + self._getService = get_service self.aiObjects = None self.extractionService = None @@ -117,7 +117,7 @@ class AiService: if self.extractionService is None: logger.info("Initializing ExtractionService via service center...") - self.extractionService = self._get_service("extraction") + self.extractionService = self._getService("extraction") # Initialize new submodules from .subResponseParsing import ResponseParser @@ -673,7 +673,7 @@ detectedIntent-Werte: _sources = [] # Source 1: Feature-Instance config - _neutralSvc = self._get_service("neutralization") + _neutralSvc = self._getService("neutralization") if _neutralSvc and hasattr(_neutralSvc, 'getConfig'): _config = _neutralSvc.getConfig() if _config and getattr(_config, 'enabled', False): @@ -721,7 +721,7 @@ detectedIntent-Werte: _hardMode = request.requireNeutralization is True excludedDocs: List[str] = [] - neutralSvc = self._get_service("neutralization") + neutralSvc = self._getService("neutralization") if not neutralSvc or not hasattr(neutralSvc, 'processTextAsync'): if _hardMode: raise RuntimeError("Neutralization explicitly required but service unavailable — AI call BLOCKED") @@ -1193,7 +1193,7 @@ detectedIntent-Werte: contentOut = getattr(response, 'content', None) contentOutput = str(contentOut) if contentOut else None - neutralSvc = self._get_service("neutralization") if wasNeutralized else None + neutralSvc = self._getService("neutralization") if wasNeutralized else None mappingsCount = None if neutralSvc and hasattr(neutralSvc, 'getActiveMappingsCount'): try: @@ -1324,8 +1324,8 @@ detectedIntent-Werte: from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=servicesHub.user, - mandate_id=servicesHub.mandateId, - feature_instance_id=servicesHub.featureInstanceId, + mandateId=servicesHub.mandateId, + featureInstanceId=servicesHub.featureInstanceId, workflow=getattr(servicesHub, "workflow", None), ) return getService("ai", ctx) @@ -1721,7 +1721,7 @@ Respond with ONLY a JSON object in this exact format: ) try: - generationService = self._get_service("generation") + generationService = self._getService("generation") # renderReport verarbeitet jetzt jedes Dokument einzeln # und gibt Liste von (documentData, mimeType, filename) zurück diff --git a/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py b/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py index f2418b4b..2158506f 100644 --- a/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py +++ b/modules/serviceCenter/services/serviceBilling/mainServiceBilling.py @@ -56,9 +56,9 @@ def getService(currentUser: User, mandateId: str, featureInstanceId: str = None, return _billingServices[cacheKey] -def _get_feature_code_from_context(context) -> Optional[str]: +def _getFeatureCodeFromContext(context) -> Optional[str]: """Extract featureCode from ServiceCenterContext.""" - explicit = getattr(context, "feature_code", None) + explicit = getattr(context, "featureCode", None) if explicit and str(explicit).strip(): return str(explicit).strip() if context.workflow and hasattr(context.workflow, "feature") and context.workflow.feature: @@ -91,15 +91,15 @@ class BillingService: ctx = context_or_user get_service = mandateId self.currentUser = ctx.user - self.mandateId = ctx.mandate_id or "" - self.featureInstanceId = ctx.feature_instance_id - self.featureCode = _get_feature_code_from_context(ctx) + self.mandateId = ctx.mandateId or "" + self.featureInstanceId = ctx.featureInstanceId + self.featureCode = _getFeatureCodeFromContext(ctx) elif get_service is not None and hasattr(context_or_user, "user"): ctx = context_or_user self.currentUser = ctx.user - self.mandateId = ctx.mandate_id or "" - self.featureInstanceId = ctx.feature_instance_id - self.featureCode = _get_feature_code_from_context(ctx) + self.mandateId = ctx.mandateId or "" + self.featureInstanceId = ctx.featureInstanceId + self.featureCode = _getFeatureCodeFromContext(ctx) else: self.currentUser = context_or_user self.mandateId = mandateId or "" diff --git a/modules/serviceCenter/services/serviceChat/mainServiceChat.py b/modules/serviceCenter/services/serviceChat/mainServiceChat.py index 77856a7d..3382f75e 100644 --- a/modules/serviceCenter/services/serviceChat/mainServiceChat.py +++ b/modules/serviceCenter/services/serviceChat/mainServiceChat.py @@ -18,17 +18,17 @@ class ChatService: def __init__(self, context, get_service: Callable[[str], Any]): """Initialize with ServiceCenterContext and service resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service self.user = context.user from modules.interfaces.interfaceDbApp import getInterface as getAppInterface from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface from modules.interfaces.interfaceDbChat import getInterface as getChatInterface - self.interfaceDbApp = getAppInterface(context.user, mandateId=context.mandate_id) - self.interfaceDbComponent = getComponentInterface(context.user, mandateId=context.mandate_id, featureInstanceId=context.feature_instance_id) + self.interfaceDbApp = getAppInterface(context.user, mandateId=context.mandateId) + self.interfaceDbComponent = getComponentInterface(context.user, mandateId=context.mandateId, featureInstanceId=context.featureInstanceId) self.interfaceDbChat = getChatInterface( context.user, - mandateId=context.mandate_id, - featureInstanceId=context.feature_instance_id, + mandateId=context.mandateId, + featureInstanceId=context.featureInstanceId, ) self._progressLogger = None @@ -374,10 +374,10 @@ class ChatService: try: # Get a fresh token via security service logger.debug(f"Getting fresh token for connection {connection.id}") - token = self._get_service("security").getFreshToken(connection.id) + token = self._getService("security").getFreshToken(connection.id) if token: if hasattr(token, 'expiresAt') and token.expiresAt: - current_time = self._get_service("utils").timestampGetUtc() + current_time = self._getService("utils").timestampGetUtc() if current_time > token.expiresAt: token_status = "expired" else: @@ -462,7 +462,7 @@ class ChatService: Token object or None if not found/expired """ try: - return self._get_service("security").getFreshToken(connectionId) + return self._getService("security").getFreshToken(connectionId) except Exception as e: logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}") return None @@ -575,8 +575,8 @@ class ChatService: path=path, label=label, displayPath=displayPath, - featureInstanceId=featureInstanceId or self._context.feature_instance_id or "", - mandateId=self._context.mandate_id or "", + featureInstanceId=featureInstanceId or self._context.featureInstanceId or "", + mandateId=self._context.mandateId or "", userId=self.user.id if self.user else "", ) return self.interfaceDbApp.db.recordCreate(DataSource, ds) diff --git a/modules/serviceCenter/services/serviceClickup/mainServiceClickup.py b/modules/serviceCenter/services/serviceClickup/mainServiceClickup.py index d1ef51b3..74a7e809 100644 --- a/modules/serviceCenter/services/serviceClickup/mainServiceClickup.py +++ b/modules/serviceCenter/services/serviceClickup/mainServiceClickup.py @@ -30,7 +30,7 @@ class ClickupService(ClickupApiClient): def __init__(self, context, get_service: Callable[[str], Any]): super().__init__(accessToken="") self._context = context - self._get_service = get_service + self._getService = get_service def setAccessTokenFromConnection(self, userConnection) -> bool: """Load OAuth/personal token from SecurityService for this UserConnection.""" @@ -45,7 +45,7 @@ class ClickupService(ClickupApiClient): if not connection_id: logger.error("UserConnection must have an 'id' field") return False - security = self._get_service("security") + security = self._getService("security") if not security: logger.error("Security service not available for token access") return False diff --git a/modules/serviceCenter/services/serviceExtraction/mainServiceExtraction.py b/modules/serviceCenter/services/serviceExtraction/mainServiceExtraction.py index a3fb0baf..125281e7 100644 --- a/modules/serviceCenter/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/serviceCenter/services/serviceExtraction/mainServiceExtraction.py @@ -28,12 +28,12 @@ class ExtractionService: def __init__(self, context, get_service: Callable[[str], Any]): """Initialize with ServiceCenterContext and service resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface self._interfaceDbComponent = getComponentInterface( context.user, - mandateId=context.mandate_id, - featureInstanceId=context.feature_instance_id, + mandateId=context.mandateId, + featureInstanceId=context.featureInstanceId, ) self._extractorRegistry = getExtractorRegistry() if ExtractionService._sharedChunkerRegistry is None: @@ -117,7 +117,7 @@ class ExtractionService: docOperationId = f"{operationId}_doc_{i}" # Use parentOperationId if provided, otherwise use operationId as parent parentId = parentOperationId if parentOperationId else operationId - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( docOperationId, "Extracting Document", f"Document {i + 1}/{totalDocs}", @@ -130,17 +130,17 @@ class ExtractionService: try: if docOperationId: - self._get_service("chat").progressLogUpdate(docOperationId, 0.1, "Loading document data") + self._getService("chat").progressLogUpdate(docOperationId, 0.1, "Loading document data") # Resolve raw bytes for this document using interface documentBytes = dbInterface.getFileData(doc.fileId) if not documentBytes: if docOperationId: - self._get_service("chat").progressLogFinish(docOperationId, False) + self._getService("chat").progressLogFinish(docOperationId, False) raise ValueError(f"No file data found for fileId={doc.fileId}") if docOperationId: - self._get_service("chat").progressLogUpdate(docOperationId, 0.2, "Running extraction pipeline") + self._getService("chat").progressLogUpdate(docOperationId, 0.2, "Running extraction pipeline") # Convert ChatDocument to the format expected by runExtraction documentData = { @@ -160,7 +160,7 @@ class ExtractionService: ) if docOperationId: - self._get_service("chat").progressLogUpdate(docOperationId, 0.7, f"Extracted {len(ec.parts)} parts") + self._getService("chat").progressLogUpdate(docOperationId, 0.7, f"Extracted {len(ec.parts)} parts") # Log content parts metadata logger.debug(f"Content parts: {len(ec.parts)}") @@ -223,7 +223,7 @@ class ExtractionService: # Use document name and part index for filename doc_name_safe = documentData["fileName"].replace(" ", "_").replace("/", "_").replace("\\", "_")[:50] debug_filename = f"extraction_text_part_{j+1}_{doc_name_safe}.txt" - self._get_service("utils").writeDebugFile(debug_json, debug_filename) + self._getService("utils").writeDebugFile(debug_json, debug_filename) logger.info(f"Wrote debug file for extracted text part {j+1}/{len(ec.parts)}: {debug_filename}") except Exception as e: logger.warning(f"Failed to write debug file for text part {j+1}: {str(e)}") @@ -240,7 +240,7 @@ class ExtractionService: logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits") if docOperationId: - self._get_service("chat").progressLogUpdate(docOperationId, 0.9, f"Processing complete: {len(ec.parts)} parts extracted") + self._getService("chat").progressLogUpdate(docOperationId, 0.9, f"Processing complete: {len(ec.parts)} parts extracted") # Calculate timing and emit stats endTime = time.time() @@ -256,7 +256,7 @@ class ExtractionService: # Hard fail if model is missing; caller must ensure connectors are registered if model is None or model.calculatepriceCHF is None: if docOperationId: - self._get_service("chat").progressLogFinish(docOperationId, False) + self._getService("chat").progressLogFinish(docOperationId, False) raise RuntimeError(f"Pricing model not available: {modelDisplayName}") priceCHF = model.calculatepriceCHF(processingTime, bytesSent, bytesReceived) @@ -309,13 +309,13 @@ class ExtractionService: # Finish document operation successfully if docOperationId: - self._get_service("chat").progressLogFinish(docOperationId, True) + self._getService("chat").progressLogFinish(docOperationId, True) except Exception as e: logger.error(f"Error extracting content from document {i + 1}/{totalDocs} ({doc.fileName}): {str(e)}") if docOperationId: try: - self._get_service("chat").progressLogFinish(docOperationId, False) + self._getService("chat").progressLogFinish(docOperationId, False) except: pass # Don't fail on progress logging errors # Continue with next document instead of failing completely @@ -355,7 +355,7 @@ class ExtractionService: if not operationId: workflowId = self._context.workflow.id if self._context.workflow else f"no-workflow-{int(time.time())}" operationId = f"ai_text_extract_{workflowId}_{int(time.time())}" - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( operationId, "AI Text Extract", "Document Processing", @@ -383,19 +383,19 @@ class ExtractionService: # Extract content WITHOUT chunking if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") + self._getService("chat").progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents") # Pass operationId as parentOperationId for hierarchical logging # Correct hierarchy: parentOperationId -> operationId -> docOperationId extractionResult = self.extractContent(documents, extractionOptions, operationId=operationId, parentOperationId=operationId) if not isinstance(extractionResult, list): if operationId: - self._get_service("chat").progressLogFinish(operationId, False) + self._getService("chat").progressLogFinish(operationId, False) return "[Error: No extraction results]" # Process parts (not chunks) with model-aware AI calls if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") + self._getService("chat").progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts") # Use operationId as parentOperationId for child operations # Correct hierarchy: parentOperationId -> operationId -> partOperationId processParentOperationId = operationId @@ -403,20 +403,20 @@ class ExtractionService: # Merge results using existing merging system if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") + self._getService("chat").progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results") mergedContent = self.mergePartResults(partResults, options) # Save merged extraction content to debug - self._get_service("utils").writeDebugFile(mergedContent or '', "extraction_merged_text") + self._getService("utils").writeDebugFile(mergedContent or '', "extraction_merged_text") if operationId: - self._get_service("chat").progressLogFinish(operationId, True) + self._getService("chat").progressLogFinish(operationId, True) return mergedContent except Exception as e: logger.error(f"Error in processDocumentsPerChunk: {str(e)}") if operationId: - self._get_service("chat").progressLogFinish(operationId, False) + self._getService("chat").progressLogFinish(operationId, False) raise async def _processPartsWithMapping( @@ -468,7 +468,7 @@ class ExtractionService: if operationId: workflowId = self._context.workflow.id if self._context.workflow else f"no-workflow-{int(time.time())}" partOperationId = f"{operationId}_part_{part_index}" - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( partOperationId, "Content Processing", f"Part {part_index + 1}", @@ -487,15 +487,15 @@ class ExtractionService: # Update progress - initiating if partOperationId: - self._get_service("chat").progressLogUpdate(partOperationId, 0.3, "Initiating") + self._getService("chat").progressLogUpdate(partOperationId, 0.3, "Initiating") # Call AI with model-aware chunking (no progress callback - handled by parent operation) response = await aiObjects.call(request) # Update progress - completed if partOperationId: - self._get_service("chat").progressLogUpdate(partOperationId, 0.9, "Completed") - self._get_service("chat").progressLogFinish(partOperationId, True) + self._getService("chat").progressLogUpdate(partOperationId, 0.9, "Completed") + self._getService("chat").progressLogFinish(partOperationId, True) processing_time = time.time() - start_time @@ -1133,7 +1133,7 @@ class ExtractionService: "perPartExtractedData": per_part_extracted_data } debug_json = json.dumps(debug_content, indent=2, ensure_ascii=False) - self._get_service("utils").writeDebugFile(debug_json, "content_extraction_per_part") + self._getService("utils").writeDebugFile(debug_json, "content_extraction_per_part") logger.info(f"Wrote per-part extracted data to debug file: {len(per_part_extracted_data)} blocks from {len(content_parts)} content parts") except Exception as e: logger.warning(f"Failed to write per-part extracted data to debug file: {str(e)}") @@ -1172,7 +1172,7 @@ class ExtractionService: extraction_result_format["parts"].append(formatted_part) result_json = json.dumps(extraction_result_format, indent=2, ensure_ascii=False) - self._get_service("utils").writeDebugFile(result_json, "content_extraction_original_parts") + self._getService("utils").writeDebugFile(result_json, "content_extraction_original_parts") logger.info(f"Wrote original parts extracted data to debug file: {len(original_parts_extracted_data)} original parts") except Exception as e: logger.warning(f"Failed to write original parts extracted data to debug file: {str(e)}") @@ -1764,11 +1764,11 @@ class ExtractionService: debugPrefix = f"generation_contentPart_{partId}_{partLabelSafe}" # Write prompt - self._get_service("utils").writeDebugFile(prompt, f"{debugPrefix}_prompt") + self._getService("utils").writeDebugFile(prompt, f"{debugPrefix}_prompt") # Write response responseContent = partResult.content if partResult.content else "" - self._get_service("utils").writeDebugFile(responseContent, f"{debugPrefix}_response") + self._getService("utils").writeDebugFile(responseContent, f"{debugPrefix}_response") logger.debug(f"Wrote debug files for contentPart {partId} (generation): {debugPrefix}_prompt, {debugPrefix}_response") except Exception as debugError: diff --git a/modules/serviceCenter/services/serviceExtraction/subPromptBuilderExtraction.py b/modules/serviceCenter/services/serviceExtraction/subPromptBuilderExtraction.py index fe342002..0f9cbf45 100644 --- a/modules/serviceCenter/services/serviceExtraction/subPromptBuilderExtraction.py +++ b/modules/serviceCenter/services/serviceExtraction/subPromptBuilderExtraction.py @@ -10,13 +10,7 @@ import logging from typing import Dict, Any, Optional from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum -# Type hint for renderer parameter -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from modules.serviceCenter.services.serviceGeneration.renderers.documentRendererBaseTemplate import BaseRenderer - _RendererLike = BaseRenderer -else: - _RendererLike = Any +from modules.serviceCenter.core.types import RendererProtocol logger = logging.getLogger(__name__) @@ -27,7 +21,7 @@ async def buildExtractionPrompt( title: str, aiService=None, services=None, - renderer: _RendererLike = None + renderer: Optional[RendererProtocol] = None ) -> str: """ Build unified extraction prompt for extracting content from documents. diff --git a/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py b/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py index 5dbf16de..1137b7d6 100644 --- a/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py @@ -26,10 +26,10 @@ class _ServicesAdapter: Workflow is read from context dynamically so propagation updates are visible.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context - self._get_service = get_service + self._getService = get_service self.user = context.user - self.mandateId = context.mandate_id - self.featureInstanceId = context.feature_instance_id + self.mandateId = context.mandateId + self.featureInstanceId = context.featureInstanceId chat = get_service("chat") self.interfaceDbChat = chat.interfaceDbChat @@ -39,22 +39,22 @@ class _ServicesAdapter: @property def chat(self): - return self._get_service("chat") + return self._getService("chat") @property def utils(self): - return self._get_service("utils") + return self._getService("utils") @property def ai(self): - return self._get_service("ai") + return self._getService("ai") class GenerationService: def __init__(self, context, get_service: Callable[[str], Any]): """Initialize with ServiceCenterContext and service resolver.""" self.services = _ServicesAdapter(context, get_service) - self._get_service = get_service + self._getService = get_service self.interfaceDbChat = self.services.interfaceDbChat def processActionResultDocuments(self, actionResult, action) -> List[Dict[str, Any]]: diff --git a/modules/serviceCenter/services/serviceKnowledge/_buildTree.py b/modules/serviceCenter/services/serviceKnowledge/_buildTree.py index 87021f9d..cf32a925 100644 --- a/modules/serviceCenter/services/serviceKnowledge/_buildTree.py +++ b/modules/serviceCenter/services/serviceKnowledge/_buildTree.py @@ -112,7 +112,7 @@ def _findDsRecord( sourceType: str, path: str, ) -> Optional[Dict[str, Any]]: - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import normalisePath + from modules.serviceCenter.core.flagResolution import normalisePath norm = normalisePath(path) for ds in allDs: if ( @@ -191,8 +191,8 @@ def _personalRootChildrenNodes( mandateId = getattr(context, "mandateId", "") or "" ctx = ServiceCenterContext( user=context.user, - mandate_id=mandateId, - feature_instance_id="", + mandateId=mandateId, + featureInstanceId="", ) chatService = getService("chat", ctx) connections = chatService.getUserConnections() or [] @@ -295,8 +295,8 @@ async def _connectionServiceNodes( mandateId = getattr(context, "mandateId", "") or "" ctx = ServiceCenterContext( user=context.user, - mandate_id=mandateId, - feature_instance_id=instanceId, + mandateId=mandateId, + featureInstanceId=instanceId, ) chatService = getService("chat", ctx) securityService = getService("security", ctx) @@ -347,8 +347,8 @@ async def _browseChildNodes( mandateId = getattr(context, "mandateId", "") or "" ctx = ServiceCenterContext( user=context.user, - mandate_id=mandateId, - feature_instance_id=instanceId, + mandateId=mandateId, + featureInstanceId=instanceId, ) chatService = getService("chat", ctx) securityService = getService("security", ctx) @@ -683,9 +683,9 @@ def _callerInstanceId(context: Any) -> str: """The UDB is feature-agnostic, but `_browseChildNodes` and `_connectionServiceNodes` need a feature instance id for the ServiceCenterContext (the underlying connector resolver wants one). - Use the caller's current feature_instance_id (workspace) when + Use the caller's current featureInstanceId (workspace) when available, else an empty string. The id is NOT used for FDS scoping.""" - fid = getattr(context, "feature_instance_id", None) or getattr(context, "featureInstanceId", None) + fid = getattr(context, "featureInstanceId", None) return str(fid) if fid else "" diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 291dd9a6..095e97cc 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -926,7 +926,7 @@ class KnowledgeService: contentObjectId=f"page-{pageIdx}", fileId=fileId, userId=self._context.user.id if self._context.user else "", - featureInstanceId=self._context.feature_instance_id or "", + featureInstanceId=self._context.featureInstanceId or "", contentType="text", data=text, contextRef={ diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py index df80898b..5fec915e 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorIngestConsumer.py @@ -172,7 +172,7 @@ def _loadRagEnabledDataSources(connectionId: str, dataSourceIds: Optional[list] """ from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelDataSource import DataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlag + from modules.serviceCenter.core.flagResolution import getEffectiveFlag rootIf = getRootInterface() allDs = rootIf.db.getRecordset(DataSource, recordFilter={"connectionId": connectionId}) diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py index ac886099..edddb2c1 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncClickup.py @@ -314,7 +314,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py index 9857bfb7..7c485a82 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGdrive.py @@ -244,7 +244,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py index 150fe839..b07f83c3 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncGmail.py @@ -297,7 +297,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py index 1c50070e..5dd3174c 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncKdrive.py @@ -211,7 +211,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py index c27a5039..eb131350 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncOutlook.py @@ -256,7 +256,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py index 86d61f60..adb4b841 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py +++ b/modules/serviceCenter/services/serviceKnowledge/subConnectorSyncSharepoint.py @@ -245,7 +245,7 @@ async def _resolveDependencies(connectionId: str): rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=str(getattr(connection, "mandateId", "") or ""), + mandateId=str(getattr(connection, "mandateId", "") or ""), ) knowledgeService = getService("knowledge", ctx) return adapter, connection, knowledgeService diff --git a/modules/serviceCenter/services/serviceKnowledge/subFeatureBootstrap.py b/modules/serviceCenter/services/serviceKnowledge/subFeatureBootstrap.py index 4d58933c..f1cd3887 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subFeatureBootstrap.py +++ b/modules/serviceCenter/services/serviceKnowledge/subFeatureBootstrap.py @@ -30,7 +30,7 @@ def _loadRagEnabledFds(featureInstanceId: str, featureDataSourceIds: Optional[Li """ from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFeatures import FeatureDataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import getEffectiveFlagFds + from modules.serviceCenter.core.flagResolution import getEffectiveFlagFds rootIf = getRootInterface() allFds = rootIf.db.getRecordset( @@ -118,7 +118,7 @@ async def _featureBootstrapHandler( ) return {"featureInstanceId": featureInstanceId, "skipped": True, "reason": "no_rag_enabled_fds"} - from modules.serviceCenter.services.serviceAgent.featureDataProvider import FeatureDataProvider + from modules.serviceCenter.core.types import createFeatureDataProvider from modules.serviceCenter.services.serviceKnowledge.mainServiceKnowledge import IngestionJob from modules.serviceCenter.context import ServiceCenterContext from modules.serviceCenter import getService @@ -156,8 +156,8 @@ async def _featureBootstrapHandler( rootUser = getRootUser() ctx = ServiceCenterContext( user=rootUser, - mandate_id=mandateId, - feature_instance_id=fdsFeatureInstanceId, + mandateId=mandateId, + featureInstanceId=fdsFeatureInstanceId, ) knowledgeService = getService("knowledge", ctx) @@ -171,7 +171,7 @@ async def _featureBootstrapHandler( "explicitFields": set(neutralizeFields), } } - provider = FeatureDataProvider( + provider = createFeatureDataProvider( dbConnector, neutralizePolicy=neutralizePolicy, neutralizationService=neutralizationService, diff --git a/modules/serviceCenter/services/serviceKnowledge/udbNodes.py b/modules/serviceCenter/services/serviceKnowledge/udbNodes.py index d46292ce..879983dd 100644 --- a/modules/serviceCenter/services/serviceKnowledge/udbNodes.py +++ b/modules/serviceCenter/services/serviceKnowledge/udbNodes.py @@ -251,7 +251,7 @@ class _DataSourceFamilyNode(UdbNode): def getEffectiveFlag(self, flag, allDs, allFds, mode="aggregate") -> Any: if not self.supportsFlag(flag): return False - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + from modules.serviceCenter.core.flagResolution import ( resolveEffectiveForPath, ) out = resolveEffectiveForPath(self.connectionId, self.sourceType, self.path, allDs, mode=mode) @@ -260,7 +260,7 @@ class _DataSourceFamilyNode(UdbNode): def setFlag(self, flag, value, rootIf) -> List[str]: from modules.datamodels.datamodelDataSource import DataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + from modules.serviceCenter.core.flagResolution import ( cascadeResetDescendants, ) if not self.rec: @@ -416,7 +416,7 @@ class _FdsFamilyNode(UdbNode): def getEffectiveFlag(self, flag, allDs, allFds, mode="aggregate") -> Any: if not self.supportsFlag(flag): return None - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + from modules.serviceCenter.core.flagResolution import ( resolveEffectiveForFds, ) out = resolveEffectiveForFds(self.featureInstanceId, self.tableName, @@ -428,7 +428,7 @@ class _FdsFamilyNode(UdbNode): if not self.supportsFlag(flag): raise ValueError(f"FDS does not support flag {flag!r}") from modules.datamodels.datamodelFeatures import FeatureDataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + from modules.serviceCenter.core.flagResolution import ( cascadeResetDescendantsFds, ) if not self.rec: @@ -669,7 +669,7 @@ class FdsFieldNode(UdbNode): # Not explicitly overridden -> inherit from the table's effective # neutralize. Use walk mode so the inherited value is concrete # (never 'mixed'); a single field cannot itself be ambiguous. - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import ( + from modules.serviceCenter.core.flagResolution import ( resolveEffectiveForFds, ) out = resolveEffectiveForFds( @@ -753,7 +753,7 @@ def _findOrCreateDs(rootIf: Any, connectionId: str, sourceType: str, """ from modules.datamodels.datamodelDataSource import DataSource from modules.datamodels.datamodelUam import UserConnection - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import normalisePath + from modules.serviceCenter.core.flagResolution import normalisePath normPath = normalisePath(path) @@ -1007,7 +1007,7 @@ def buildNodeForKey(key: str, context: Any, rootIf: Any) -> Optional[UdbNode]: def _findDsByCoord(rootIf: Any, connectionId: str, sourceType: Optional[str], path: str) -> Optional[Dict[str, Any]]: from modules.datamodels.datamodelDataSource import DataSource - from modules.serviceCenter.services.serviceKnowledge._inheritFlags import normalisePath + from modules.serviceCenter.core.flagResolution import normalisePath rf = {"connectionId": connectionId} if sourceType is not None: rf["sourceType"] = sourceType diff --git a/modules/serviceCenter/services/serviceMessaging/mainServiceMessaging.py b/modules/serviceCenter/services/serviceMessaging/mainServiceMessaging.py index 77e77695..cc43ca0c 100644 --- a/modules/serviceCenter/services/serviceMessaging/mainServiceMessaging.py +++ b/modules/serviceCenter/services/serviceMessaging/mainServiceMessaging.py @@ -33,7 +33,7 @@ class _ServicesAdapter: from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface self.interfaceDbComponent = getComponentInterface( context.user, - mandateId=context.mandate_id + mandateId=context.mandateId ) diff --git a/modules/serviceCenter/services/serviceSharepoint/mainServiceSharepoint.py b/modules/serviceCenter/services/serviceSharepoint/mainServiceSharepoint.py index 8456dc52..e6cbc8e4 100644 --- a/modules/serviceCenter/services/serviceSharepoint/mainServiceSharepoint.py +++ b/modules/serviceCenter/services/serviceSharepoint/mainServiceSharepoint.py @@ -24,13 +24,13 @@ class SharepointService: """Initialize SharePoint service without access token. Args: - context: ServiceCenterContext with user, mandate_id, etc. + context: ServiceCenterContext with user, mandateId, etc. get_service: Service resolver for dependency injection (e.g. security) Use setAccessTokenFromConnection() method to configure the access token before making API calls. """ self._context = context - self._get_service = get_service + self._getService = get_service self.accessToken = None self.baseUrl = "https://graph.microsoft.com/v1.0" @@ -59,7 +59,7 @@ class SharepointService: return False # Get a fresh token for this specific connection via security service - security = self._get_service("security") + security = self._getService("security") if not security: logger.error("Security service not available for token access") return False diff --git a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py index 71dc4526..e5924aaf 100644 --- a/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py +++ b/modules/serviceCenter/services/serviceSubscription/mainServiceSubscription.py @@ -55,11 +55,11 @@ class SubscriptionService: if mandateId is not None and callable(mandateId): ctx = contextOrUser self.currentUser = ctx.user - self.mandateId = ctx.mandate_id or "" + self.mandateId = ctx.mandateId or "" elif get_service is not None and hasattr(contextOrUser, "user"): ctx = contextOrUser self.currentUser = ctx.user - self.mandateId = ctx.mandate_id or "" + self.mandateId = ctx.mandateId or "" else: self.currentUser = contextOrUser self.mandateId = mandateId or "" diff --git a/modules/serviceCenter/services/serviceTicket/mainServiceTicket.py b/modules/serviceCenter/services/serviceTicket/mainServiceTicket.py index 10ad1ba6..ea229940 100644 --- a/modules/serviceCenter/services/serviceTicket/mainServiceTicket.py +++ b/modules/serviceCenter/services/serviceTicket/mainServiceTicket.py @@ -15,7 +15,7 @@ class TicketService: def __init__(self, context, get_service: Callable[[str], Any]): """Initialize with context and service resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service async def connectTicket( self, diff --git a/modules/serviceCenter/services/serviceWeb/mainServiceWeb.py b/modules/serviceCenter/services/serviceWeb/mainServiceWeb.py index 3445839e..c6403c8d 100644 --- a/modules/serviceCenter/services/serviceWeb/mainServiceWeb.py +++ b/modules/serviceCenter/services/serviceWeb/mainServiceWeb.py @@ -22,14 +22,14 @@ class WebService: def __init__(self, context, get_service): """Initialize webcrawl service with context and service resolver.""" self._context = context - self._get_service = get_service + self._getService = get_service def _workflow_id(self): """Get workflow ID for operation IDs.""" if self._context.workflow: return self._context.workflow.id - if self._context.workflow_id: - return self._context.workflow_id + if self._context.workflowId: + return self._context.workflowId return f"no-workflow-{int(time.time())}" async def performWebResearch( @@ -61,7 +61,7 @@ class WebService: """ # Start progress tracking if operationId provided if operationId: - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( operationId, "Web Research", "Research", @@ -71,7 +71,7 @@ class WebService: try: # Step 1: AI intention analysis - extract URLs and parameters from prompt if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.1, "Analyzing research intent") + self._getService("chat").progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) @@ -99,7 +99,7 @@ class WebService: searchResultsWithContent = [] if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.3, "Searching for URLs and content") + self._getService("chat").progressLogUpdate(operationId, 0.3, "Searching for URLs and content") try: searchUrls, searchResultsWithContent = await self._performWebSearch( @@ -121,7 +121,7 @@ class WebService: logger.warning("Tavily search returned no URLs, using AI-extracted URLs only") if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") + self._getService("chat").progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") # If we have search results (even without content), use them directly instead of crawling # Tavily search results are more relevant than generic AI-extracted URLs @@ -179,7 +179,7 @@ class WebService: "total_urls": len(searchUrls), "urls_with_content": urlsWithContent, "total_content_length": totalContentLength, - "search_date": self._get_service("utils").timestampGetUtc() + "search_date": self._getService("utils").timestampGetUtc() }, "sections": sections, "statistics": { @@ -201,8 +201,8 @@ class WebService: result["metadata"]["suggested_filename"] = suggestedFilename if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.9, "Completed") - self._get_service("chat").progressLogFinish(operationId, True) + self._getService("chat").progressLogUpdate(operationId, 0.9, "Completed") + self._getService("chat").progressLogFinish(operationId, True) return result @@ -231,8 +231,8 @@ class WebService: # Step 5: Crawl all URLs with hierarchical logging if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.4, "Initiating") - self._get_service("chat").progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs") + self._getService("chat").progressLogUpdate(operationId, 0.4, "Initiating") + self._getService("chat").progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs") # Use parent operation ID directly (parentId should be operationId, not log entry ID) parentOperationId = operationId # Use the parent's operationId directly @@ -246,9 +246,9 @@ class WebService: ) if operationId: - self._get_service("chat").progressLogUpdate(operationId, 0.9, "Consolidating results") - self._get_service("chat").progressLogUpdate(operationId, 0.95, "Completed") - self._get_service("chat").progressLogFinish(operationId, True) + self._getService("chat").progressLogUpdate(operationId, 0.9, "Consolidating results") + self._getService("chat").progressLogUpdate(operationId, 0.95, "Completed") + self._getService("chat").progressLogFinish(operationId, True) # Calculate statistics about crawl results totalResults = len(crawlResult) if isinstance(crawlResult, list) else 1 @@ -317,7 +317,7 @@ class WebService: "total_urls": len(validatedUrls), "urls_with_content": urlsWithContent, "total_content_length": totalContentLength, - "crawl_date": self._get_service("utils").timestampGetUtc() + "crawl_date": self._getService("utils").timestampGetUtc() }, "sections": sections, "statistics": { @@ -345,7 +345,7 @@ class WebService: except Exception as e: logger.error(f"Error in web research: {str(e)}") if operationId: - self._get_service("chat").progressLogFinish(operationId, False) + self._getService("chat").progressLogFinish(operationId, False) raise async def _analyzeResearchIntent( @@ -397,13 +397,13 @@ Return ONLY valid JSON, no additional text: try: # Call AI planning to analyze intent - analysisJson = await self._get_service("ai").callAiPlanning( + analysisJson = await self._getService("ai").callAiPlanning( analysisPrompt, debugType="webresearchintent" ) # Extract JSON from response (handles markdown code blocks) - extractedJson = self._get_service("utils").jsonExtractString(analysisJson) + extractedJson = self._getService("utils").jsonExtractString(analysisJson) if not extractedJson: raise ValueError("No JSON found in AI response") @@ -454,7 +454,7 @@ Return ONLY valid JSON, no additional text: searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2) # Debug: persist search prompt - self._get_service("utils").writeDebugFile(searchPrompt, "websearch_prompt") + self._getService("utils").writeDebugFile(searchPrompt, "websearch_prompt") # Call AI with WEB_SEARCH_DATA operation searchOptions = AiCallOptions( @@ -463,7 +463,7 @@ Return ONLY valid JSON, no additional text: ) # Use unified callAiContent method - searchResponse = await self._get_service("ai").callAiContent( + searchResponse = await self._getService("ai").callAiContent( prompt=searchPrompt, options=searchOptions, outputFormat="json" @@ -518,16 +518,16 @@ Return ONLY valid JSON, no additional text: # Debug: persist search response if isinstance(searchResult, str): - self._get_service("utils").writeDebugFile(searchResult, "websearch_response") + self._getService("utils").writeDebugFile(searchResult, "websearch_response") logger.debug(f"Search response (first 500 chars): {searchResult[:500]}") else: - self._get_service("utils").writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response") + self._getService("utils").writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response") logger.debug(f"Search response type: {type(searchResult)}, keys: {list(searchResult.keys()) if isinstance(searchResult, dict) else 'N/A'}") # Parse and extract URLs and content if isinstance(searchResult, str): # Extract JSON from response (handles markdown code blocks) - extractedJson = self._get_service("utils").jsonExtractString(searchResult) + extractedJson = self._getService("utils").jsonExtractString(searchResult) if extractedJson: try: searchData = json.loads(extractedJson) @@ -800,7 +800,7 @@ Return ONLY valid JSON, no additional text: if parentOperationId: workflowId = self._workflow_id() urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( urlOperationId, "Web Crawl", f"URL {urlIndex + 1}/{totalUrls}", @@ -813,8 +813,8 @@ Return ONLY valid JSON, no additional text: if urlOperationId: displayUrl = url[:50] + "..." if len(url) > 50 else url - self._get_service("chat").progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") - self._get_service("chat").progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") + self._getService("chat").progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") + self._getService("chat").progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") # Build crawl prompt model for single URL # maxWidth is passed from performWebResearch based on researchDepth @@ -829,7 +829,7 @@ Return ONLY valid JSON, no additional text: # Debug: persist crawl prompt (with URL identifier in content for clarity) debugPrompt = f"URL: {url}\n\n{crawlPrompt}" - self._get_service("utils").writeDebugFile(debugPrompt, "webcrawl_prompt") + self._getService("utils").writeDebugFile(debugPrompt, "webcrawl_prompt") # Call AI with WEB_CRAWL operation crawlOptions = AiCallOptions( @@ -838,10 +838,10 @@ Return ONLY valid JSON, no additional text: ) if urlOperationId: - self._get_service("chat").progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") + self._getService("chat").progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") # Use unified callAiContent method with parentOperationId for hierarchical logging - crawlResponse = await self._get_service("ai").callAiContent( + crawlResponse = await self._getService("ai").callAiContent( prompt=crawlPrompt, options=crawlOptions, outputFormat="json", @@ -849,22 +849,22 @@ Return ONLY valid JSON, no additional text: ) if urlOperationId: - self._get_service("chat").progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") + self._getService("chat").progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") # Extract content from AiResponse crawlResult = crawlResponse.content # Debug: persist crawl response if isinstance(crawlResult, str): - self._get_service("utils").writeDebugFile(crawlResult, "webcrawl_response") + self._getService("utils").writeDebugFile(crawlResult, "webcrawl_response") else: - self._get_service("utils").writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") + self._getService("utils").writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") # Parse crawl result if isinstance(crawlResult, str): try: # Extract JSON from response (handles markdown code blocks) - extractedJson = self._get_service("utils").jsonExtractString(crawlResult) + extractedJson = self._getService("utils").jsonExtractString(crawlResult) crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult) except: crawlData = {"url": url, "content": crawlResult} @@ -873,7 +873,7 @@ Return ONLY valid JSON, no additional text: # Process crawl results and create hierarchical progress logging for sub-URLs if urlOperationId: - self._get_service("chat").progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") + self._getService("chat").progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") # Recursively process crawl results to find nested URLs and create child operations processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0) @@ -891,17 +891,17 @@ Return ONLY valid JSON, no additional text: if urlOperationId: if totalUrlsCrawled > 1: - self._get_service("chat").progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") + self._getService("chat").progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") else: - self._get_service("chat").progressLogUpdate(urlOperationId, 0.9, "Crawl completed") - self._get_service("chat").progressLogFinish(urlOperationId, True) + self._getService("chat").progressLogUpdate(urlOperationId, 0.9, "Crawl completed") + self._getService("chat").progressLogFinish(urlOperationId, True) return results except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") if urlOperationId: - self._get_service("chat").progressLogFinish(urlOperationId, False) + self._getService("chat").progressLogFinish(urlOperationId, False) return [{"url": url, "error": str(e)}] def _processCrawlResultsWithHierarchy( @@ -943,7 +943,7 @@ Return ONLY valid JSON, no additional text: # This is a sub-URL - create child operation workflowId = self._workflow_id() subUrlOperationId = f"{parentOperationId}_sub_{idx}_{int(time.time())}" - self._get_service("chat").progressLogStart( + self._getService("chat").progressLogStart( subUrlOperationId, "Crawling Sub-URL", f"Depth {currentDepth + 1}", @@ -969,12 +969,12 @@ Return ONLY valid JSON, no additional text: ) item["subUrls"] = nestedResults - self._get_service("chat").progressLogUpdate(subUrlOperationId, 0.9, "Completed") - self._get_service("chat").progressLogFinish(subUrlOperationId, True) + self._getService("chat").progressLogUpdate(subUrlOperationId, 0.9, "Completed") + self._getService("chat").progressLogFinish(subUrlOperationId, True) except Exception as e: logger.error(f"Error processing sub-URL {itemUrl}: {str(e)}") if subUrlOperationId: - self._get_service("chat").progressLogFinish(subUrlOperationId, False) + self._getService("chat").progressLogFinish(subUrlOperationId, False) results.append(item) else: diff --git a/modules/shared/systemComponentRegistry.py b/modules/shared/systemComponentRegistry.py index e4733a68..70cd485d 100644 --- a/modules/shared/systemComponentRegistry.py +++ b/modules/shared/systemComponentRegistry.py @@ -6,7 +6,9 @@ Higher-layer system components (e.g. workflowAutomation) register their lifecycle hooks here at boot time via ``app.py`` (Composition Root, L7). Interface modules read the registry generically — no upward imports needed. -Supported events: ``onBootstrap``, ``onMandateDelete``, ``onInstanceCreate``. +Supported events: ``onBootstrap``, ``onMandateDelete``, ``onMandateProvision``, +``onInstanceCreate``, ``onUserMandateCreate``, ``onUserMandateDelete``, +``onUserBudgetAdjust``, ``onStorageChanged``. This is the same inversion pattern used by ``serviceAgent/externalToolRegistry.py`` for agent tools. diff --git a/modules/workflowAutomation/editor/_valueKindResolver.py b/modules/workflowAutomation/editor/_valueKindResolver.py new file mode 100644 index 00000000..63dd849d --- /dev/null +++ b/modules/workflowAutomation/editor/_valueKindResolver.py @@ -0,0 +1,102 @@ +# Copyright (c) 2025 Patrick Motsch +"""Shared value-kind resolution helpers. + +Extracted from conditionOperators so that upstreamPathsService can resolve +value kinds without importing conditionOperators (breaking the bidirectional +import cycle). +""" +from __future__ import annotations + +from typing import Any, Dict, List + + +def catalogTypeToValueKind(catalogType: str) -> str: + """Map port-catalog / dataPickOptions type strings to condition valueKind.""" + ct = (catalogType or "").strip() + if not ct or ct == "Any": + return "unknown" + low = ct.lower() + if low in ("str", "string", "email", "url"): + return "string" + if low in ("int", "float", "number"): + return "number" + if low == "bool": + return "boolean" + if low in ("date", "datetime", "timestamp"): + return "datetime" + if low.startswith("list[") or low == "list": + return "array" + if low.startswith("dict") or low == "dict": + return "object" + if low in ("file", "actiondocument", "fileref"): + return "file" + return "unknown" + + +def _isContextProducer(nodeType: str) -> bool: + return nodeType in ("context.extractContent", "context.mergeContext", "context.setContext") + + +def _pathSuggestsContext(path: List[Any], producerType: str) -> bool: + if not path: + return _isContextProducer(producerType) + last = str(path[-1]) + if last in ("data", "files", "merged", "presentation"): + return True + if "files" in [str(p) for p in path]: + return True + if _isContextProducer(producerType) and path[0] in ("data", "response", "merged"): + return True + return False + + +def _pathSuggestsFile(path: List[Any], producerType: str) -> bool: + pathStr = [str(p) for p in path] + if producerType == "input.upload": + return True + if "file" in pathStr or "documents" in pathStr or "mimeType" in pathStr or "fileName" in pathStr: + return True + if producerType.startswith("sharepoint.") and "file" in pathStr: + return True + return False + + +def _pathsEqual(a: List[Any], b: List[Any]) -> bool: + if len(a) != len(b): + return False + return all(str(x) == str(y) for x, y in zip(a, b)) + + +def resolveValueKindFromRef(graph: Dict[str, Any], ref: Dict[str, Any]) -> str: + """Resolve condition valueKind using graph-local heuristics only. + + Unlike ``conditionOperators.resolve_value_kind`` this does NOT call + ``compute_upstream_paths``, so it is safe to import from + upstreamPathsService without creating a cycle. + """ + if not isinstance(ref, dict): + return "unknown" + producerId = ref.get("nodeId") + path = ref.get("path") or [] + if not isinstance(path, list): + path = [] + if not producerId: + return "unknown" + + nodes = graph.get("nodes") or [] + nodeById = {n.get("id"): n for n in nodes if n.get("id")} + producer = nodeById.get(producerId) or {} + producerType = str(producer.get("type") or "") + + if _pathSuggestsContext(path, producerType): + return "context" + if _pathSuggestsFile(path, producerType): + tail = str(path[-1]) if path else "" + if tail in ("mimeType", "fileName"): + return "string" + return "file" + + if producerType in ("trigger.form", "input.form") and path and str(path[0]) == "payload": + return "string" + + return "unknown" diff --git a/modules/workflowAutomation/editor/conditionOperators.py b/modules/workflowAutomation/editor/conditionOperators.py index e99defc1..5b5d611a 100644 --- a/modules/workflowAutomation/editor/conditionOperators.py +++ b/modules/workflowAutomation/editor/conditionOperators.py @@ -10,6 +10,12 @@ from typing import Any, Dict, List, Optional, Tuple from modules.nodeCatalog.nodeDefinitions import STATIC_NODE_TYPES from modules.shared.i18nRegistry import resolveText, t +from modules.workflowAutomation.editor._valueKindResolver import ( + catalogTypeToValueKind as catalog_type_to_value_kind, + _pathSuggestsContext as _path_suggests_context, + _pathSuggestsFile as _path_suggests_file, + _pathsEqual as _paths_equal, +) logger = logging.getLogger(__name__) @@ -200,64 +206,7 @@ def localize_operator_catalog(lang: str = "de") -> Dict[str, List[Dict[str, Any] return out -def catalog_type_to_value_kind(catalog_type: str) -> str: - """Map port-catalog / dataPickOptions type strings to condition valueKind.""" - ct = (catalog_type or "").strip() - if not ct or ct == "Any": - return "unknown" - low = ct.lower() - if low in ("str", "string", "email", "url"): - return "string" - if low in ("int", "float", "number"): - return "number" - if low == "bool": - return "boolean" - if low in ("date", "datetime", "timestamp"): - return "datetime" - if low.startswith("list[") or low == "list": - return "array" - if low.startswith("dict") or low == "dict": - return "object" - if low in ("file", "actiondocument", "fileref"): - return "file" - return "unknown" - - -def _paths_equal(a: List[Any], b: List[Any]) -> bool: - if len(a) != len(b): - return False - return all(str(x) == str(y) for x, y in zip(a, b)) - - -def _is_context_producer(node_type: str) -> bool: - return node_type in ("context.extractContent", "context.mergeContext", "context.setContext") - - -def _path_suggests_context(path: List[Any], producer_type: str) -> bool: - if not path: - return _is_context_producer(producer_type) - last = str(path[-1]) - if last in ("data", "files", "merged", "presentation"): - return True - if "files" in [str(p) for p in path]: - return True - if _is_context_producer(producer_type) and path[0] in ("data", "response", "merged"): - return True - return False - - -def _path_suggests_file(path: List[Any], producer_type: str) -> bool: - path_str = [str(p) for p in path] - if producer_type == "input.upload": - return True - if "file" in path_str or "documents" in path_str or "mimeType" in path_str or "fileName" in path_str: - return True - if producer_type.startswith("sharepoint.") and "file" in path_str: - return True - return False - - -def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any], *, _skip_upstream: bool = False) -> str: +def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any]) -> str: """Resolve condition valueKind for a DataRef against the workflow graph.""" if not isinstance(ref, dict): return "unknown" @@ -281,32 +230,31 @@ def resolve_value_kind(graph: Dict[str, Any], ref: Dict[str, Any], *, _skip_upst return "string" return "file" - if not _skip_upstream: - from modules.workflowAutomation.editor.upstreamPathsService import compute_upstream_paths + from modules.workflowAutomation.editor.upstreamPathsService import compute_upstream_paths - target_id = graph.get("targetNodeId") or producer_id - matched_type: Optional[str] = None + target_id = graph.get("targetNodeId") or producer_id + matched_type: Optional[str] = None + for entry in compute_upstream_paths(graph, target_id): + if entry.get("producerNodeId") != producer_id: + continue + entry_path = entry.get("path") or [] + if _paths_equal(list(entry_path), list(path)): + matched_type = str(entry.get("type") or "Any") + break + + if matched_type is None and path: + parent_path = list(path[:-1]) for entry in compute_upstream_paths(graph, target_id): if entry.get("producerNodeId") != producer_id: continue - entry_path = entry.get("path") or [] - if _paths_equal(list(entry_path), list(path)): + if _paths_equal(list(entry.get("path") or []), parent_path): matched_type = str(entry.get("type") or "Any") break - if matched_type is None and path: - parent_path = list(path[:-1]) - for entry in compute_upstream_paths(graph, target_id): - if entry.get("producerNodeId") != producer_id: - continue - if _paths_equal(list(entry.get("path") or []), parent_path): - matched_type = str(entry.get("type") or "Any") - break - - if matched_type: - vk = catalog_type_to_value_kind(matched_type) - if vk != "unknown": - return vk + if matched_type: + vk = catalog_type_to_value_kind(matched_type) + if vk != "unknown": + return vk if producer_type in ("trigger.form", "input.form") and path and str(path[0]) == "payload": return "string" diff --git a/modules/workflowAutomation/editor/upstreamPathsService.py b/modules/workflowAutomation/editor/upstreamPathsService.py index 3639b7b7..a98be149 100644 --- a/modules/workflowAutomation/editor/upstreamPathsService.py +++ b/modules/workflowAutomation/editor/upstreamPathsService.py @@ -4,7 +4,10 @@ from __future__ import annotations from typing import Any, Dict, List, Set -from modules.workflowAutomation.editor.conditionOperators import catalog_type_to_value_kind, resolve_value_kind +from modules.workflowAutomation.editor._valueKindResolver import ( + catalogTypeToValueKind, + resolveValueKindFromRef, +) from modules.nodeCatalog.nodeDefinitions import STATIC_NODE_TYPES from modules.nodeCatalog.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema from modules.workflowAutomation.engine.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds @@ -170,14 +173,14 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D for entry in paths: ct = str(entry.get("type") or "Any") - vk = catalog_type_to_value_kind(ct) + vk = catalogTypeToValueKind(ct) if vk == "unknown": ref = { "nodeId": entry.get("producerNodeId"), "path": entry.get("path") or [], } graph_with_target = {**graph, "targetNodeId": target_node_id} - vk = resolve_value_kind(graph_with_target, ref, _skip_upstream=True) + vk = resolveValueKindFromRef(graph_with_target, ref) entry["valueKind"] = vk return paths diff --git a/modules/workflowAutomation/engine/_runNotifications.py b/modules/workflowAutomation/engine/_runNotifications.py new file mode 100644 index 00000000..c8d7786d --- /dev/null +++ b/modules/workflowAutomation/engine/_runNotifications.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025 Patrick Motsch +"""Run failure notification helpers. + +Extracted from scheduler/mainScheduler to break the bidirectional import +cycle between executionEngine and mainScheduler. The engine calls +``notifyRunFailed`` directly (same subfolder, no cycle). +""" + +import logging +from typing import Optional + +from modules.shared.eventManagement import eventManager + +logger = logging.getLogger(__name__) + + +def notifyRunFailed( + workflowId: str, + runId: str, + error: str, + mandateId: str = None, + workflowLabel: str = None, +) -> None: + """Notify on workflow run failure: emit event, create in-app notification, trigger email subscription.""" + try: + eventManager.emit("workflowAutomation.run.failed", { + "workflowId": workflowId, + "runId": runId, + "error": error, + "mandateId": mandateId, + }) + logger.info("Emitted run.failed event for run %s (workflow %s)", runId, workflowId) + except Exception as e: + logger.warning("Failed to emit run.failed event: %s", e) + + _createRunFailedNotification(workflowId, runId, error, mandateId, workflowLabel) + _triggerRunFailedSubscription(workflowId, runId, error, mandateId, workflowLabel) + + +def _createRunFailedNotification( + workflowId: str, + runId: str, + error: str, + mandateId: str = None, + workflowLabel: str = None, +) -> None: + """Create in-app notification for the workflow creator.""" + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.datamodels.datamodelNotification import UserNotification, NotificationType, NotificationStatus + + rootInterface = getRootInterface() + if not rootInterface: + return + + from modules.interfaces.interfaceWorkflowAutomation import _getWorkflowAutomationInterface + eventUser = rootInterface.getUserByUsername("event") + if not eventUser: + return + + iface = _getWorkflowAutomationInterface(eventUser, mandateId or "", "") + wf = iface.getWorkflow(workflowId) + if not wf: + return + + creatorId = wf.get("sysCreatedBy") if isinstance(wf, dict) else getattr(wf, "sysCreatedBy", None) + if not creatorId: + return + + label = workflowLabel or (wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", "")) + notification = UserNotification( + userId=creatorId, + type=NotificationType.SYSTEM, + status=NotificationStatus.UNREAD, + title="Workflow fehlgeschlagen", + message=f"Workflow '{label or workflowId}' ist fehlgeschlagen: {error[:200]}", + referenceType="AutoRun", + referenceId=runId, + icon="alert-triangle", + ) + rootInterface.db.recordCreate( + model_class=UserNotification, + record=notification.model_dump(), + ) + logger.info("Created in-app notification for user %s (run %s)", creatorId, runId) + except Exception as e: + logger.warning("Failed to create in-app run.failed notification: %s", e) + + +_onRunFailedCallback = None + + +def setOnRunFailedCallback(callback) -> None: + """Set the callback for run failure notifications (injected by app.py).""" + global _onRunFailedCallback + _onRunFailedCallback = callback + + +def _triggerRunFailedSubscription( + workflowId: str, + runId: str, + error: str, + mandateId: str = None, + workflowLabel: str = None, +) -> None: + """Trigger the messaging subscription for run failures via injected callback.""" + if _onRunFailedCallback is None: + return + try: + _onRunFailedCallback( + workflowId=workflowId, + runId=runId, + error=error, + mandateId=mandateId, + workflowLabel=workflowLabel, + ) + except Exception as e: + logger.warning("Failed to trigger run.failed subscription: %s", e) diff --git a/modules/workflowAutomation/engine/executionEngine.py b/modules/workflowAutomation/engine/executionEngine.py index 99f7c2ed..b1c877c2 100644 --- a/modules/workflowAutomation/engine/executionEngine.py +++ b/modules/workflowAutomation/engine/executionEngine.py @@ -1540,15 +1540,6 @@ async def executeGraph( duration_ms=_emailPauseMs, ) logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId) - try: - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.workflowAutomation.scheduler.emailPoller import ensureRunning - root = getRootInterface() - event_user = root.getUserByUsername("event") if root else None - if event_user: - ensureRunning(event_user) - except Exception as poll_err: - logger.warning("Could not start email poller: %s", poll_err) paused_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") run_ctx = { "connectionMap": context.get("connectionMap"), @@ -1612,7 +1603,7 @@ async def executeGraph( ) if _wfObj else {} _shouldNotify = _wfDict.get("notifyOnFailure", True) if _wfDict else True if _shouldNotify: - from modules.workflowAutomation.scheduler.mainScheduler import notifyRunFailed + from modules.workflowAutomation.engine._runNotifications import notifyRunFailed notifyRunFailed( workflowId or "", runId or "", str(e), mandateId=mandateId, diff --git a/modules/workflowAutomation/engine/graphUtils.py b/modules/workflowAutomation/engine/graphUtils.py index c6a4b5cd..68368b48 100644 --- a/modules/workflowAutomation/engine/graphUtils.py +++ b/modules/workflowAutomation/engine/graphUtils.py @@ -383,6 +383,21 @@ def _pathContainsWildcard(path: List[Any]) -> bool: # (``featureInstanceRefMigration.materializeFeatureInstanceRefs``) writes the # envelope, the resolver unwraps it on its way to the action. +_STALE_FILE_CREATE_CONTEXT_PATHS = frozenset({ + ("responseData",), + ("response",), + ("merged",), + ("documents", 0, "documentData"), +}) + + +def remap_stale_presentation_ref_path(path: List[Any]) -> List[Any]: + """Map legacy text-handover paths to unified presentation ``data``.""" + if tuple(path) in _STALE_FILE_CREATE_CONTEXT_PATHS: + return ["data"] + return list(path) + + _TYPED_REF_PRIMARY_FIELD = { "FeatureInstanceRef": "id", "ConnectionRef": "id", @@ -450,9 +465,6 @@ def resolveParameterReferences( plist = list(path) resolved = _get_by_path(data, plist) if resolved is None: - from modules.workflowAutomation.engine.pickNotPushMigration import ( - remap_stale_presentation_ref_path, - ) alt_path = remap_stale_presentation_ref_path(plist) if alt_path != plist: resolved = _get_by_path(data, alt_path) diff --git a/modules/workflowAutomation/engine/pickNotPushMigration.py b/modules/workflowAutomation/engine/pickNotPushMigration.py index 78bd63c4..14b91eae 100644 --- a/modules/workflowAutomation/engine/pickNotPushMigration.py +++ b/modules/workflowAutomation/engine/pickNotPushMigration.py @@ -21,7 +21,11 @@ from modules.nodeCatalog.portTypes import ( PRIMARY_TEXT_HANDOVER_REF_PATH, resolve_output_schema_name, ) -from modules.workflowAutomation.engine.graphUtils import buildConnectionMap, getInputSources +from modules.workflowAutomation.engine.graphUtils import ( + buildConnectionMap, + getInputSources, + remap_stale_presentation_ref_path, +) logger = logging.getLogger(__name__) @@ -243,20 +247,6 @@ def materializeRecommendedDataPickRef(graph: Dict[str, Any]) -> Dict[str, Any]: return g -_STALE_FILE_CREATE_CONTEXT_PATHS = frozenset({ - ("responseData",), - ("response",), - ("merged",), - ("documents", 0, "documentData"), -}) - - -def remap_stale_presentation_ref_path(path: List[Any]) -> List[Any]: - """Map legacy text-handover paths to unified presentation ``data``.""" - if tuple(path) in _STALE_FILE_CREATE_CONTEXT_PATHS: - return ["data"] - return list(path) - def _normalize_presentation_refs_in_value(val: Any) -> Any: """Rewrite stale ref paths inside ``contextBuilder`` lists or bare refs.""" diff --git a/modules/workflowAutomation/mainWorkflowAutomation.py b/modules/workflowAutomation/mainWorkflowAutomation.py index e3a38d84..a05064c9 100644 --- a/modules/workflowAutomation/mainWorkflowAutomation.py +++ b/modules/workflowAutomation/mainWorkflowAutomation.py @@ -57,8 +57,8 @@ def _getWorkflowAutomationServices( ctx = ServiceCenterContext( user=user, - mandate_id=mandateId, - feature_instance_id=featureInstanceId, + mandateId=mandateId, + featureInstanceId=featureInstanceId, workflow=_workflow, ) return ServicesBag(ctx, lambda key: getService(key, ctx)) diff --git a/modules/workflowAutomation/scheduler/__init__.py b/modules/workflowAutomation/scheduler/__init__.py index d5178091..ab966ca5 100644 --- a/modules/workflowAutomation/scheduler/__init__.py +++ b/modules/workflowAutomation/scheduler/__init__.py @@ -6,6 +6,8 @@ from modules.workflowAutomation.scheduler.mainScheduler import ( stop, syncNow, setMainLoop, +) +from modules.workflowAutomation.engine._runNotifications import ( notifyRunFailed, setOnRunFailedCallback, ) diff --git a/modules/workflowAutomation/scheduler/mainScheduler.py b/modules/workflowAutomation/scheduler/mainScheduler.py index ec368480..a0ced9cc 100644 --- a/modules/workflowAutomation/scheduler/mainScheduler.py +++ b/modules/workflowAutomation/scheduler/mainScheduler.py @@ -263,6 +263,12 @@ class WorkflowScheduler: "WorkflowScheduler: executed workflow %s success=%s paused=%s", workflowId, result.get("success"), result.get("paused"), ) + if result.get("waitReason") == "email": + try: + from modules.workflowAutomation.scheduler.emailPoller import ensureRunning + ensureRunning(eventUser) + except Exception as pollErr: + logger.warning("WorkflowScheduler: could not start email poller: %s", pollErr) except Exception as e: logger.exception("WorkflowScheduler: failed to execute workflow %s: %s", workflowId, e) @@ -333,94 +339,10 @@ def _cronToIntervalSeconds(cron: str): return None -def notifyRunFailed(workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None) -> None: - """Notify on workflow run failure: emit event, create in-app notification, trigger email subscription.""" - try: - eventManager.emit("workflowAutomation.run.failed", { - "workflowId": workflowId, - "runId": runId, - "error": error, - "mandateId": mandateId, - }) - logger.info("Emitted run.failed event for run %s (workflow %s)", runId, workflowId) - except Exception as e: - logger.warning("Failed to emit run.failed event: %s", e) - - _createRunFailedNotification(workflowId, runId, error, mandateId, workflowLabel) - _triggerRunFailedSubscription(workflowId, runId, error, mandateId, workflowLabel) - - -def _createRunFailedNotification( - workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None -) -> None: - """Create in-app notification for the workflow creator.""" - try: - from modules.interfaces.interfaceDbApp import getRootInterface - from modules.datamodels.datamodelNotification import UserNotification, NotificationType, NotificationStatus - - rootInterface = getRootInterface() - if not rootInterface: - return - - from modules.interfaces.interfaceWorkflowAutomation import _getWorkflowAutomationInterface - eventUser = rootInterface.getUserByUsername("event") - if not eventUser: - return - - iface = _getWorkflowAutomationInterface(eventUser, mandateId or "", "") - wf = iface.getWorkflow(workflowId) - if not wf: - return - - creatorId = wf.get("sysCreatedBy") if isinstance(wf, dict) else getattr(wf, "sysCreatedBy", None) - if not creatorId: - return - - label = workflowLabel or (wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", "")) - notification = UserNotification( - userId=creatorId, - type=NotificationType.SYSTEM, - status=NotificationStatus.UNREAD, - title="Workflow fehlgeschlagen", - message=f"Workflow '{label or workflowId}' ist fehlgeschlagen: {error[:200]}", - referenceType="AutoRun", - referenceId=runId, - icon="alert-triangle", - ) - rootInterface.db.recordCreate( - model_class=UserNotification, - record=notification.model_dump(), - ) - logger.info("Created in-app notification for user %s (run %s)", creatorId, runId) - except Exception as e: - logger.warning("Failed to create in-app run.failed notification: %s", e) - - -_onRunFailedCallback = None - - -def setOnRunFailedCallback(callback) -> None: - """Set the callback for run failure notifications (injected by app.py).""" - global _onRunFailedCallback - _onRunFailedCallback = callback - - -def _triggerRunFailedSubscription( - workflowId: str, runId: str, error: str, mandateId: str = None, workflowLabel: str = None -) -> None: - """Trigger the messaging subscription for run failures via injected callback.""" - if _onRunFailedCallback is None: - return - try: - _onRunFailedCallback( - workflowId=workflowId, - runId=runId, - error=error, - mandateId=mandateId, - workflowLabel=workflowLabel, - ) - except Exception as e: - logger.warning("Failed to trigger run.failed subscription: %s", e) +from modules.workflowAutomation.engine._runNotifications import ( # noqa: E402 — re-export + notifyRunFailed, + setOnRunFailedCallback, +) # Module-level singleton diff --git a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py index dee1cc1f..7415df93 100644 --- a/modules/workflows/processing/shared/promptGenerationActionsDynamic.py +++ b/modules/workflows/processing/shared/promptGenerationActionsDynamic.py @@ -41,7 +41,7 @@ def generateDynamicPlanSelectionPrompt(services, context: Any, learningEngine=No # Add adaptive learning context if available adaptiveContext = {} if learningEngine: - workflowId = getattr(context, 'workflow_id', 'unknown') + workflowId = getattr(context, 'workflowId', 'unknown') userPrompt = extractUserPrompt(context) adaptiveContext = learningEngine.getAdaptiveContextForActionSelection(workflowId, userPrompt) @@ -226,7 +226,7 @@ Excludes documents/connections/history entirely. # Add adaptive learning context if available adaptiveContext = {} if learningEngine: - workflowId = getattr(context, 'workflow_id', 'unknown') + workflowId = getattr(context, 'workflowId', 'unknown') adaptiveContext = learningEngine.getAdaptiveContextForParameters(workflowId, compoundActionName, parametersContext or "") if adaptiveContext: diff --git a/tests/integration/mandates/test_createMandate.py b/tests/integration/mandates/test_createMandate.py index f58f9021..1ad24b75 100644 --- a/tests/integration/mandates/test_createMandate.py +++ b/tests/integration/mandates/test_createMandate.py @@ -78,7 +78,7 @@ def _buildInterface(db: _FakeDb) -> AppObjects: def _stubCopySystemRoles(): """Avoid touching the bootstrap module (which would need a real DB).""" with patch( - "modules.interfaces.interfaceBootstrap.copySystemRolesToMandate", + "modules.interfaces.interfaceRbac.copySystemRolesToMandate", return_value=0, ): yield