From 5380e30f0d2e473c4b63b53e7917ceeddf5fa176 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sun, 11 Jan 2026 15:42:33 +0100 Subject: [PATCH] fix: cleaned interfaces to work all with rbac --- env_int.env | 7 + env_prod.env | 7 + modules/connectors/connectorDbPostgre.py | 77 +- .../connectors/connectorSwissTopoMapServer.py | 2 +- modules/interfaces/interfaceBootstrap.py | 97 +- modules/interfaces/interfaceDbAppObjects.py | 41 + modules/interfaces/interfaceDbChatObjects.py | 7 +- .../interfaces/interfaceDbComponentObjects.py | 41 + .../interfaces/interfaceDbRealEstateAccess.py | 90 -- .../interfaceDbRealEstateObjects.py | 255 +-- modules/interfaces/interfaceRbac.py | 22 +- modules/workflows/workflowManager copy.py | 1385 ----------------- 12 files changed, 415 insertions(+), 1616 deletions(-) delete mode 100644 modules/interfaces/interfaceDbRealEstateAccess.py delete mode 100644 modules/workflows/workflowManager copy.py diff --git a/env_int.env b/env_int.env index a84e69f0..1ed0bed1 100644 --- a/env_int.env +++ b/env_int.env @@ -29,6 +29,13 @@ DB_MANAGEMENT_USER=heeshkdlby DB_MANAGEMENT_PASSWORD_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjTnJKNlJMNmEwQ0Y5dVNrR3pkZk9SQXVvLTRTNW9lQ1g3TTE5cFhBNTd5UENqWW9qdWd3NWNseWhnUHJveDJyd1Z3X1czS3VuZnAwZHBXYVNQWlZsRy12ME42NndEVlR5X3ZPdFBNNmhLYm89 DB_MANAGEMENT_PORT=5432 +# PostgreSQL Storage (new) +DB_REALESTATE_HOST=localhost +DB_REALESTATE_DATABASE=poweron_realestate +DB_REALESTATE_USER=poweron_dev +DB_REALESTATE_PASSWORD_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjTnJKNlJMNmEwQ0Y5dVNrR3pkZk9SQXVvLTRTNW9lQ1g3TTE5cFhBNTd5UENqWW9qdWd3NWNseWhnUHJveDJyd1Z3X1czS3VuZnAwZHBXYVNQWlZsRy12ME42NndEVlR5X3ZPdFBNNmhLYm89 +DB_REALESTATE_PORT=5432 + # Security Configuration APP_JWT_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjNUctb2RwU25iR3ZnanBOdHZhWUtIajZ1RnZzTEp4aDR0MktWRjNoeVBrY1Npd1R0VE9YVHp3M2w1cXRzbUxNaU82QUJvaDNFeVQyN05KblRWblBvbWtoT0VXbkNBbDQ5OHhwSUFnaDZGRG10Vmgtdm1YUkRsYUhFMzRVZURmSFlDTFIzVWg4MXNueDZyMGc5aVpFdWRxY3dkTExGM093ZTVUZVl5LUhGWnlRPQ== APP_TOKEN_EXPIRY=300 diff --git a/env_prod.env b/env_prod.env index 0daaff02..ebd1ece4 100644 --- a/env_prod.env +++ b/env_prod.env @@ -29,6 +29,13 @@ DB_MANAGEMENT_USER=gzxxmcrdhn DB_MANAGEMENT_PASSWORD_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3ZWpySThqdlVmWWd5dGxmWE91RVBsenZrQmNhSzVxbktmYzZ1RlM3cXhTMUdXRV9wX1lfLTJXLTFzeUo0R3pWLXlmUWdrZ2x6QkFlZVRXaEF6aUdRbDlzb1FfcWtub0dxSGp3OVVQWGg3enM9 DB_MANAGEMENT_PORT=5432 +# PostgreSQL Storage (new) +DB_REALESTATE_HOST=localhost +DB_REALESTATE_DATABASE=poweron_realestate +DB_REALESTATE_USER=poweron_dev +DB_REALESTATE_PASSWORD_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3ZWpySThqdlVmWWd5dGxmWE91RVBsenZrQmNhSzVxbktmYzZ1RlM3cXhTMUdXRV9wX1lfLTJXLTFzeUo0R3pWLXlmUWdrZ2x6QkFlZVRXaEF6aUdRbDlzb1FfcWtub0dxSGp3OVVQWGg3enM9 +DB_REALESTATE_PORT=5432 + # Security Configuration APP_JWT_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3elhfV0Rnd2pQRjlMdkVwX1FnSmRhSzNZUlV5SVpaWXBNX1hpa2xPZGdMSWpnN2ZINHQxeGZnNHJweU5pZjlyYlY5Qm9zOUZEbl9wUEgtZHZXd1NhR19JSG9kbFU4MnFGQnllbFhRQVphRGQyNHlFVWR5VHQyUUpqN0stUmRuY2QyTi1oalczRHpLTEJqWURjZWs4YjZvT2U5YnFqcXEwdEpxV05fX05QMmtrPQ== APP_TOKEN_EXPIRY=300 diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py index 65ded736..7de3bf17 100644 --- a/modules/connectors/connectorDbPostgre.py +++ b/modules/connectors/connectorDbPostgre.py @@ -51,6 +51,7 @@ def _get_model_fields(model_class) -> Dict[str, str]: field_type = field_info.annotation # Check for JSONB fields (Dict, List, or complex types) + # Purely type-based detection - no hardcoded field names if ( field_type == dict or field_type == list @@ -58,23 +59,7 @@ def _get_model_fields(model_class) -> Dict[str, str]: hasattr(field_type, "__origin__") and field_type.__origin__ in (dict, list) ) - or field_name - in [ - "execParameters", - "expectedDocumentFormats", - "resultDocuments", - "logs", - "messages", - "stats", - "tasks", - "perimeter", # GeoPolylinie objects - "baulinie", # GeoPolylinie objects - "kontextInformationen", # List of Kontext objects - "parzellenNachbarschaft", # List of dictionaries - "dokumente", # List of Dokument objects - "parzellen", # List of Parzelle objects (in Projekt) - ] - # Check if field type is a Pydantic BaseModel (for nested models like GeoPolylinie) + # Check if field type is a Pydantic BaseModel (for nested models) or (hasattr(field_type, "__origin__") and get_origin(field_type) is Union and any(hasattr(arg, "__bases__") and BaseModel in getattr(arg, "__bases__", ()) for arg in get_args(field_type))) @@ -691,21 +676,28 @@ class DatabaseConnector: # Handle JSONB fields for all records fields = _get_model_fields(model_class) + model_fields = model_class.model_fields # Get Pydantic model fields for record in records: for field_name, field_type in fields.items(): if field_type == "JSONB" and field_name in record: if record[field_name] is None: - # Convert None to appropriate default based on field name - if field_name in [ - "logs", - "messages", - "tasks", - "expectedDocumentFormats", - "resultDocuments", - ]: - record[field_name] = [] - elif field_name in ["execParameters", "stats"]: - record[field_name] = {} + # Generic type-based default: List types -> [], Dict types -> {} + # Interfaces handle domain-specific defaults + field_info = model_fields.get(field_name) + if field_info: + field_annotation = field_info.annotation + # Check if it's a List type + if (field_annotation == list or + (hasattr(field_annotation, "__origin__") and + field_annotation.__origin__ is list)): + record[field_name] = [] + # Check if it's a Dict type + elif (field_annotation == dict or + (hasattr(field_annotation, "__origin__") and + field_annotation.__origin__ is dict)): + record[field_name] = {} + else: + record[field_name] = None else: record[field_name] = None else: @@ -878,6 +870,7 @@ class DatabaseConnector: # Handle JSONB fields and ensure numeric types are correct fields = _get_model_fields(model_class) + model_fields = model_class.model_fields # Get Pydantic model fields for record in records: for field_name, field_type in fields.items(): # Ensure numeric fields (float/int) are properly typed @@ -897,17 +890,23 @@ class DatabaseConnector: ) elif field_type == "JSONB" and field_name in record: if record[field_name] is None: - # Convert None to appropriate default based on field name - if field_name in [ - "logs", - "messages", - "tasks", - "expectedDocumentFormats", - "resultDocuments", - ]: - record[field_name] = [] - elif field_name in ["execParameters", "stats"]: - record[field_name] = {} + # Generic type-based default: List types -> [], Dict types -> {} + # Interfaces handle domain-specific defaults + field_info = model_fields.get(field_name) + if field_info: + field_annotation = field_info.annotation + # Check if it's a List type + if (field_annotation == list or + (hasattr(field_annotation, "__origin__") and + field_annotation.__origin__ is list)): + record[field_name] = [] + # Check if it's a Dict type + elif (field_annotation == dict or + (hasattr(field_annotation, "__origin__") and + field_annotation.__origin__ is dict)): + record[field_name] = {} + else: + record[field_name] = None else: record[field_name] = None else: diff --git a/modules/connectors/connectorSwissTopoMapServer.py b/modules/connectors/connectorSwissTopoMapServer.py index 97838507..f4ebb8ff 100644 --- a/modules/connectors/connectorSwissTopoMapServer.py +++ b/modules/connectors/connectorSwissTopoMapServer.py @@ -878,7 +878,7 @@ class SwissTopoMapServerConnector: return None x, y = coords - logger.info(f"Parzelle gefunden: {parcel.get('label', 'Unknown')}, Zentrum: E={x}, N={y}") + logger.info(f"Parzelle gefunden: {search_result.get('label', 'Unknown')}, Zentrum: E={x}, N={y}") # Schritt 2: Polygon-Geometrie abrufen identify_params = { diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 71bdd1e7..3e7b043f 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -645,6 +645,54 @@ def createTableSpecificRules(db: DatabaseConnector) -> None: delete=AccessLevel.NONE, )) + # Real Estate tables - Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land + realEstateTables = ["Projekt", "Parzelle", "Dokument", "Gemeinde", "Kanton", "Land"] + for table in realEstateTables: + # Sysadmin - full access + tableRules.append(AccessRule( + roleLabel="sysadmin", + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.ALL, + create=AccessLevel.ALL, + update=AccessLevel.ALL, + delete=AccessLevel.ALL, + )) + # Admin - group access + tableRules.append(AccessRule( + roleLabel="admin", + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.GROUP, + create=AccessLevel.GROUP, + update=AccessLevel.GROUP, + delete=AccessLevel.GROUP, + )) + # User - my records only + tableRules.append(AccessRule( + roleLabel="user", + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.MY, + create=AccessLevel.MY, + update=AccessLevel.MY, + delete=AccessLevel.MY, + )) + # Viewer - read-only my records + tableRules.append(AccessRule( + roleLabel="viewer", + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.MY, + create=AccessLevel.NONE, + update=AccessLevel.NONE, + delete=AccessLevel.NONE, + )) + # Create all table-specific rules for rule in tableRules: db.recordCreate(AccessRule, rule) @@ -903,7 +951,7 @@ def _addMissingTableRules(db: DatabaseConnector, existingRules: List[Dict[str, A existingRoles = {rule.get("roleLabel") for rule in existingRules} # Tables that need rules - requiredTables = ["ChatWorkflow", "Prompt"] + requiredTables = ["ChatWorkflow", "Prompt", "Projekt", "Parzelle", "Dokument", "Gemeinde", "Kanton", "Land"] requiredRoles = ["sysadmin", "admin", "user", "viewer"] newRules = [] @@ -1005,6 +1053,53 @@ def _addMissingTableRules(db: DatabaseConnector, existingRules: List[Dict[str, A update=AccessLevel.NONE, delete=AccessLevel.NONE, )) + # Real Estate tables rules (Projekt, Parzelle, Dokument, Gemeinde, Kanton, Land) + elif table in ["Projekt", "Parzelle", "Dokument", "Gemeinde", "Kanton", "Land"]: + for roleLabel in requiredRoles: + if roleLabel == "sysadmin": + newRules.append(AccessRule( + roleLabel=roleLabel, + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.ALL, + create=AccessLevel.ALL, + update=AccessLevel.ALL, + delete=AccessLevel.ALL, + )) + elif roleLabel == "admin": + newRules.append(AccessRule( + roleLabel=roleLabel, + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.GROUP, + create=AccessLevel.GROUP, + update=AccessLevel.GROUP, + delete=AccessLevel.GROUP, + )) + elif roleLabel == "user": + newRules.append(AccessRule( + roleLabel=roleLabel, + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.MY, + create=AccessLevel.MY, + update=AccessLevel.MY, + delete=AccessLevel.MY, + )) + elif roleLabel == "viewer": + newRules.append(AccessRule( + roleLabel=roleLabel, + context=AccessRuleContext.DATA, + item=table, + view=True, + read=AccessLevel.MY, + create=AccessLevel.NONE, + update=AccessLevel.NONE, + delete=AccessLevel.NONE, + )) # Create missing rules if newRules: diff --git a/modules/interfaces/interfaceDbAppObjects.py b/modules/interfaces/interfaceDbAppObjects.py index f8397477..dd9aef3d 100644 --- a/modules/interfaces/interfaceDbAppObjects.py +++ b/modules/interfaces/interfaceDbAppObjects.py @@ -134,6 +134,47 @@ class AppObjects: logger.error(f"Failed to initialize database: {str(e)}") raise + def _separateObjectFields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]: + """Separate simple fields from object fields based on Pydantic model structure.""" + simpleFields = {} + objectFields = {} + + # Get field information from the Pydantic model + modelFields = model_class.model_fields + + for fieldName, value in data.items(): + # Check if this field should be stored as JSONB in the database + if fieldName in modelFields: + fieldInfo = modelFields[fieldName] + # Pydantic v2 only + fieldType = fieldInfo.annotation + + # Check if this is a JSONB field (Dict, List, or complex types) + # Purely type-based detection - no hardcoded field names + if (fieldType == dict or + fieldType == list or + (hasattr(fieldType, '__origin__') and fieldType.__origin__ in (dict, list))): + # Store as JSONB - include in simple_fields for database storage + simpleFields[fieldName] = value + elif isinstance(value, (str, int, float, bool, type(None))): + # Simple scalar types + simpleFields[fieldName] = value + else: + # Complex objects that should be filtered out + objectFields[fieldName] = value + else: + # Field not in model - treat as scalar if simple, otherwise filter out + # BUT: always include metadata fields (_createdBy, _createdAt, etc.) as they're handled by connector + if fieldName.startswith("_"): + # Metadata fields should be passed through to connector + simpleFields[fieldName] = value + elif isinstance(value, (str, int, float, bool, type(None))): + simpleFields[fieldName] = value + else: + objectFields[fieldName] = value + + return simpleFields, objectFields + def _initRecords(self): """Initialize standard records if they don't exist.""" initBootstrap(self.db) diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index a3b9bc06..86fa7c30 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -223,15 +223,16 @@ class ChatObjects: fieldType = fieldInfo.annotation # Always route relational/object fields to object_fields for separate handling - if fieldName in ['documents', 'stats']: + # These fields are stored in separate normalized tables, not as JSONB + if fieldName in ['documents', 'stats', 'logs', 'messages']: objectFields[fieldName] = value continue # Check if this is a JSONB field (Dict, List, or complex types) + # Purely type-based detection - no hardcoded field names if (fieldType == dict or fieldType == list or - (hasattr(fieldType, '__origin__') and fieldType.__origin__ in (dict, list)) or - fieldName in ['execParameters', 'expectedDocumentFormats', 'resultDocuments']): + (hasattr(fieldType, '__origin__') and fieldType.__origin__ in (dict, list))): # Store as JSONB - include in simple_fields for database storage simpleFields[fieldName] = value elif isinstance(value, (str, int, float, bool, type(None))): diff --git a/modules/interfaces/interfaceDbComponentObjects.py b/modules/interfaces/interfaceDbComponentObjects.py index 9f6b80bb..fee78fef 100644 --- a/modules/interfaces/interfaceDbComponentObjects.py +++ b/modules/interfaces/interfaceDbComponentObjects.py @@ -140,6 +140,47 @@ class ComponentObjects: logger.error(f"Failed to initialize database: {str(e)}") raise + def _separateObjectFields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]: + """Separate simple fields from object fields based on Pydantic model structure.""" + simpleFields = {} + objectFields = {} + + # Get field information from the Pydantic model + modelFields = model_class.model_fields + + for fieldName, value in data.items(): + # Check if this field should be stored as JSONB in the database + if fieldName in modelFields: + fieldInfo = modelFields[fieldName] + # Pydantic v2 only + fieldType = fieldInfo.annotation + + # Check if this is a JSONB field (Dict, List, or complex types) + # Purely type-based detection - no hardcoded field names + if (fieldType == dict or + fieldType == list or + (hasattr(fieldType, '__origin__') and fieldType.__origin__ in (dict, list))): + # Store as JSONB - include in simple_fields for database storage + simpleFields[fieldName] = value + elif isinstance(value, (str, int, float, bool, type(None))): + # Simple scalar types + simpleFields[fieldName] = value + else: + # Complex objects that should be filtered out + objectFields[fieldName] = value + else: + # Field not in model - treat as scalar if simple, otherwise filter out + # BUT: always include metadata fields (_createdBy, _createdAt, etc.) as they're handled by connector + if fieldName.startswith("_"): + # Metadata fields should be passed through to connector + simpleFields[fieldName] = value + elif isinstance(value, (str, int, float, bool, type(None))): + simpleFields[fieldName] = value + else: + objectFields[fieldName] = value + + return simpleFields, objectFields + def _initRecords(self): """Initializes standard records in the database if they don't exist.""" try: diff --git a/modules/interfaces/interfaceDbRealEstateAccess.py b/modules/interfaces/interfaceDbRealEstateAccess.py deleted file mode 100644 index b06b89f5..00000000 --- a/modules/interfaces/interfaceDbRealEstateAccess.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -Access control for Real Estate interface. -Handles user access management and permission checks. -""" - -import logging -from typing import Dict, Any, List, Optional -from modules.datamodels.datamodelUam import User - -logger = logging.getLogger(__name__) - - -class RealEstateAccess: - """ - Access control class for Real Estate interface. - Handles user access management and permission checks. - """ - - def __init__(self, currentUser: User, db): - """Initialize with user context.""" - self.currentUser = currentUser - self.mandateId = currentUser.mandateId - self.userId = currentUser.id - self.roleLabels = currentUser.roleLabels or [] - - if not self.mandateId or not self.userId: - raise ValueError("Invalid user context: mandateId and userId are required") - - self.db = db - - def uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - Unified user access management function that filters data based on user privileges. - - Args: - model_class: Pydantic model class for the table - recordset: Recordset to filter based on access rules - - Returns: - Filtered recordset with access control attributes - """ - filtered_records = [] - - # System admins see all records - if "sysadmin" in self.roleLabels: - filtered_records = recordset - # Admins see records in their mandate - elif "admin" in self.roleLabels: - filtered_records = [r for r in recordset if r.get("mandateId", "-") == self.mandateId] - # Regular users see only their records - else: - filtered_records = [ - r for r in recordset - if r.get("mandateId", "-") == self.mandateId and r.get("_createdBy") == self.userId - ] - - # Add access control attributes - for record in filtered_records: - record["_hideView"] = False - record["_hideEdit"] = not self.canModify(model_class, record.get("id")) - record["_hideDelete"] = not self.canModify(model_class, record.get("id")) - - return filtered_records - - def canModify(self, model_class: type, recordId: Optional[str] = None) -> bool: - """Checks if the current user can modify records.""" - # System admins can modify all records - if "sysadmin" in self.roleLabels: - return True - - if recordId is not None: - records = self.db.getRecordset(model_class, recordFilter={"id": recordId}) - if not records: - return False - - record = records[0] - - # Admins can modify records in their mandate - if "admin" in self.roleLabels and record.get("mandateId", "-") == self.mandateId: - return True - - # Regular users can modify their own records - if (record.get("mandateId", "-") == self.mandateId and - record.get("_createdBy") == self.userId): - return True - - return False - else: - return True # Regular users can create records - diff --git a/modules/interfaces/interfaceDbRealEstateObjects.py b/modules/interfaces/interfaceDbRealEstateObjects.py index 48084d11..df0ac295 100644 --- a/modules/interfaces/interfaceDbRealEstateObjects.py +++ b/modules/interfaces/interfaceDbRealEstateObjects.py @@ -21,8 +21,10 @@ from modules.datamodels.datamodelRealEstate import ( from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.configuration import APP_CONFIG -# Import Access-Klasse aus separater Datei -from modules.interfaces.interfaceDbRealEstateAccess import RealEstateAccess +from modules.security.rbac import RbacClass +from modules.datamodels.datamodelRbac import AccessRuleContext +from modules.datamodels.datamodelUam import AccessLevel +from modules.interfaces.interfaceRbac import getRecordsetWithRBAC logger = logging.getLogger(__name__) @@ -42,7 +44,7 @@ class RealEstateObjects: self.currentUser = currentUser self.userId = currentUser.id if currentUser else None self.mandateId = currentUser.mandateId if currentUser else None - self.access = None + self.rbac = None # RBAC interface # Initialize database self._initializeDatabase() @@ -108,8 +110,13 @@ class RealEstateObjects: if not self.userId or not self.mandateId: raise ValueError("Invalid user context: id and mandateId are required") - # Initialize access control - self.access = RealEstateAccess(self.currentUser, self.db) + # Initialize RBAC interface + if not self.currentUser: + raise ValueError("User context is required for RBAC") + # Get DbApp connection for RBAC AccessRule queries + from modules.security.rootAccess import getRootDbAppConnector + dbApp = getRootDbAppConnector() + self.rbac = RbacClass(self.db, dbApp=dbApp) # Update database context self.db.updateContext(self.userId) @@ -118,13 +125,14 @@ class RealEstateObjects: def createProjekt(self, projekt: Projekt) -> Projekt: """Create a new project.""" + # Check RBAC permission + if not self.checkRbacPermission(Projekt, "create"): + raise PermissionError(f"User {self.userId} cannot create projects") + # Ensure mandateId is set if not projekt.mandateId: projekt.mandateId = self.mandateId - # Apply access control - self.access.uam(Projekt, []) - # Save to database - use mode='json' to ensure nested Pydantic models are serialized self.db.recordCreate(Projekt, projekt.model_dump(mode='json')) @@ -132,30 +140,28 @@ class RealEstateObjects: def getProjekt(self, projektId: str) -> Optional[Projekt]: """Get a project by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Projekt, + self.currentUser, recordFilter={"id": projektId} ) if not records: return None - # Apply access control - filtered = self.access.uam(Projekt, records) - - if not filtered: - return None - - return Projekt(**filtered[0]) + return Projekt(**records[0]) def getProjekte(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Projekt]: """Get all projects matching the filter.""" - records = self.db.getRecordset(Projekt, recordFilter=recordFilter or {}) + records = getRecordsetWithRBAC( + self.db, + Projekt, + self.currentUser, + recordFilter=recordFilter or {} + ) - # Apply access control - filtered = self.access.uam(Projekt, records) - - return [Projekt(**r) for r in filtered] + return [Projekt(**r) for r in records] def updateProjekt(self, projektId_or_projekt: Union[str, Projekt], updateData: Optional[Dict[str, Any]] = None) -> Optional[Projekt]: """Update a project. @@ -180,8 +186,8 @@ class RealEstateObjects: if hasattr(projekt, key): setattr(projekt, key, value) - # Check if user can modify - if not self.access.canModify(Projekt, projektId): + # Check RBAC permission + if not self.checkRbacPermission(Projekt, "update", projektId): raise PermissionError(f"User {self.userId} cannot modify project {projektId}") # Save to database @@ -195,8 +201,8 @@ class RealEstateObjects: if not projekt: return False - # Check if user can modify - if not self.access.canModify(Projekt, projektId): + # Check RBAC permission + if not self.checkRbacPermission(Projekt, "delete", projektId): raise PermissionError(f"User {self.userId} cannot delete project {projektId}") return self.db.recordDelete(Projekt, projektId) @@ -205,10 +211,13 @@ class RealEstateObjects: def createParzelle(self, parzelle: Parzelle) -> Parzelle: """Create a new plot.""" + # Check RBAC permission + if not self.checkRbacPermission(Parzelle, "create"): + raise PermissionError(f"User {self.userId} cannot create plots") + if not parzelle.mandateId: parzelle.mandateId = self.mandateId - self.access.uam(Parzelle, []) # Use mode='json' to ensure nested Pydantic models (like GeoPolylinie) are serialized self.db.recordCreate(Parzelle, parzelle.model_dump(mode='json')) @@ -216,20 +225,17 @@ class RealEstateObjects: def getParzelle(self, parzelleId: str) -> Optional[Parzelle]: """Get a plot by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Parzelle, + self.currentUser, recordFilter={"id": parzelleId} ) if not records: return None - filtered = self.access.uam(Parzelle, records) - - if not filtered: - return None - - return Parzelle(**filtered[0]) + return Parzelle(**records[0]) def getParzellen(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Parzelle]: """Get all plots matching the filter.""" @@ -243,7 +249,12 @@ class RealEstateObjects: recordFilter = self._resolveLocationFilters(recordFilter) - records = self.db.getRecordset(Parzelle, recordFilter=recordFilter or {}) + records = getRecordsetWithRBAC( + self.db, + Parzelle, + self.currentUser, + recordFilter=recordFilter or {} + ) # Fallback: If no records found and we resolved a Gemeinde name, # try searching with the original name for backwards compatibility @@ -253,14 +264,16 @@ class RealEstateObjects: logger.info(f"No results with resolved UUID, trying with original name '{original_gemeinde_value}'") fallback_filter = recordFilter.copy() fallback_filter["kontextGemeinde"] = original_gemeinde_value - records = self.db.getRecordset(Parzelle, recordFilter=fallback_filter) + records = getRecordsetWithRBAC( + self.db, + Parzelle, + self.currentUser, + recordFilter=fallback_filter + ) if records: logger.info(f"Found {len(records)} records using original name (legacy data format)") - # Apply access control - filtered = self.access.uam(Parzelle, records) - - return [Parzelle(**r) for r in filtered] + return [Parzelle(**r) for r in records] def _resolveLocationFilters(self, recordFilter: Dict[str, Any]) -> Dict[str, Any]: """ @@ -402,7 +415,7 @@ class RealEstateObjects: if not parzelle: return None - if not self.access.canModify(Parzelle, parzelleId): + if not self.checkRbacPermission(Parzelle, "update", parzelleId): raise PermissionError(f"User {self.userId} cannot modify plot {parzelleId}") for key, value in updateData.items(): @@ -419,7 +432,7 @@ class RealEstateObjects: if not parzelle: return False - if not self.access.canModify(Parzelle, parzelleId): + if not self.checkRbacPermission(Parzelle, "delete", parzelleId): raise PermissionError(f"User {self.userId} cannot delete plot {parzelleId}") return self.db.recordDelete(Parzelle, parzelleId) @@ -428,36 +441,40 @@ class RealEstateObjects: def createDokument(self, dokument: Dokument) -> Dokument: """Create a new document.""" + # Check RBAC permission + if not self.checkRbacPermission(Dokument, "create"): + raise PermissionError(f"User {self.userId} cannot create documents") + if not dokument.mandateId: dokument.mandateId = self.mandateId - self.access.uam(Dokument, []) self.db.recordCreate(Dokument, dokument.model_dump()) return dokument def getDokument(self, dokumentId: str) -> Optional[Dokument]: """Get a document by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Dokument, + self.currentUser, recordFilter={"id": dokumentId} ) if not records: return None - filtered = self.access.uam(Dokument, records) - - if not filtered: - return None - - return Dokument(**filtered[0]) + return Dokument(**records[0]) def getDokumente(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Dokument]: """Get all documents matching the filter.""" - records = self.db.getRecordset(Dokument, recordFilter=recordFilter or {}) - filtered = self.access.uam(Dokument, records) - return [Dokument(**r) for r in filtered] + records = getRecordsetWithRBAC( + self.db, + Dokument, + self.currentUser, + recordFilter=recordFilter or {} + ) + return [Dokument(**r) for r in records] def updateDokument(self, dokumentId: str, updateData: Dict[str, Any]) -> Optional[Dokument]: """Update a document.""" @@ -465,7 +482,7 @@ class RealEstateObjects: if not dokument: return None - if not self.access.canModify(Dokument, dokumentId): + if not self.checkRbacPermission(Dokument, "update", dokumentId): raise PermissionError(f"User {self.userId} cannot modify document {dokumentId}") for key, value in updateData.items(): @@ -481,7 +498,7 @@ class RealEstateObjects: if not dokument: return False - if not self.access.canModify(Dokument, dokumentId): + if not self.checkRbacPermission(Dokument, "delete", dokumentId): raise PermissionError(f"User {self.userId} cannot delete document {dokumentId}") return self.db.recordDelete(Dokument, dokumentId) @@ -490,36 +507,40 @@ class RealEstateObjects: def createGemeinde(self, gemeinde: Gemeinde) -> Gemeinde: """Create a new municipality.""" + # Check RBAC permission + if not self.checkRbacPermission(Gemeinde, "create"): + raise PermissionError(f"User {self.userId} cannot create municipalities") + if not gemeinde.mandateId: gemeinde.mandateId = self.mandateId - self.access.uam(Gemeinde, []) self.db.recordCreate(Gemeinde, gemeinde.model_dump()) return gemeinde def getGemeinde(self, gemeindeId: str) -> Optional[Gemeinde]: """Get a municipality by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Gemeinde, + self.currentUser, recordFilter={"id": gemeindeId} ) if not records: return None - filtered = self.access.uam(Gemeinde, records) - - if not filtered: - return None - - return Gemeinde(**filtered[0]) + return Gemeinde(**records[0]) def getGemeinden(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Gemeinde]: """Get all municipalities matching the filter.""" - records = self.db.getRecordset(Gemeinde, recordFilter=recordFilter or {}) - filtered = self.access.uam(Gemeinde, records) - return [Gemeinde(**r) for r in filtered] + records = getRecordsetWithRBAC( + self.db, + Gemeinde, + self.currentUser, + recordFilter=recordFilter or {} + ) + return [Gemeinde(**r) for r in records] def updateGemeinde(self, gemeindeId: str, updateData: Dict[str, Any]) -> Optional[Gemeinde]: """Update a municipality.""" @@ -527,7 +548,7 @@ class RealEstateObjects: if not gemeinde: return None - if not self.access.canModify(Gemeinde, gemeindeId): + if not self.checkRbacPermission(Gemeinde, "update", gemeindeId): raise PermissionError(f"User {self.userId} cannot modify municipality {gemeindeId}") for key, value in updateData.items(): @@ -543,7 +564,7 @@ class RealEstateObjects: if not gemeinde: return False - if not self.access.canModify(Gemeinde, gemeindeId): + if not self.checkRbacPermission(Gemeinde, "delete", gemeindeId): raise PermissionError(f"User {self.userId} cannot delete municipality {gemeindeId}") return self.db.recordDelete(Gemeinde, gemeindeId) @@ -552,36 +573,40 @@ class RealEstateObjects: def createKanton(self, kanton: Kanton) -> Kanton: """Create a new canton.""" + # Check RBAC permission + if not self.checkRbacPermission(Kanton, "create"): + raise PermissionError(f"User {self.userId} cannot create cantons") + if not kanton.mandateId: kanton.mandateId = self.mandateId - self.access.uam(Kanton, []) self.db.recordCreate(Kanton, kanton.model_dump()) return kanton def getKanton(self, kantonId: str) -> Optional[Kanton]: """Get a canton by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Kanton, + self.currentUser, recordFilter={"id": kantonId} ) if not records: return None - filtered = self.access.uam(Kanton, records) - - if not filtered: - return None - - return Kanton(**filtered[0]) + return Kanton(**records[0]) def getKantone(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Kanton]: """Get all cantons matching the filter.""" - records = self.db.getRecordset(Kanton, recordFilter=recordFilter or {}) - filtered = self.access.uam(Kanton, records) - return [Kanton(**r) for r in filtered] + records = getRecordsetWithRBAC( + self.db, + Kanton, + self.currentUser, + recordFilter=recordFilter or {} + ) + return [Kanton(**r) for r in records] def updateKanton(self, kantonId: str, updateData: Dict[str, Any]) -> Optional[Kanton]: """Update a canton.""" @@ -589,7 +614,7 @@ class RealEstateObjects: if not kanton: return None - if not self.access.canModify(Kanton, kantonId): + if not self.checkRbacPermission(Kanton, "update", kantonId): raise PermissionError(f"User {self.userId} cannot modify canton {kantonId}") for key, value in updateData.items(): @@ -605,7 +630,7 @@ class RealEstateObjects: if not kanton: return False - if not self.access.canModify(Kanton, kantonId): + if not self.checkRbacPermission(Kanton, "delete", kantonId): raise PermissionError(f"User {self.userId} cannot delete canton {kantonId}") return self.db.recordDelete(Kanton, kantonId) @@ -614,36 +639,40 @@ class RealEstateObjects: def createLand(self, land: Land) -> Land: """Create a new country.""" + # Check RBAC permission + if not self.checkRbacPermission(Land, "create"): + raise PermissionError(f"User {self.userId} cannot create countries") + if not land.mandateId: land.mandateId = self.mandateId - self.access.uam(Land, []) self.db.recordCreate(Land, land.model_dump()) return land def getLand(self, landId: str) -> Optional[Land]: """Get a country by ID.""" - records = self.db.getRecordset( + records = getRecordsetWithRBAC( + self.db, Land, + self.currentUser, recordFilter={"id": landId} ) if not records: return None - filtered = self.access.uam(Land, records) - - if not filtered: - return None - - return Land(**filtered[0]) + return Land(**records[0]) def getLaender(self, recordFilter: Optional[Dict[str, Any]] = None) -> List[Land]: """Get all countries matching the filter.""" - records = self.db.getRecordset(Land, recordFilter=recordFilter or {}) - filtered = self.access.uam(Land, records) - return [Land(**r) for r in filtered] + records = getRecordsetWithRBAC( + self.db, + Land, + self.currentUser, + recordFilter=recordFilter or {} + ) + return [Land(**r) for r in records] def updateLand(self, landId: str, updateData: Dict[str, Any]) -> Optional[Land]: """Update a country.""" @@ -651,7 +680,7 @@ class RealEstateObjects: if not land: return None - if not self.access.canModify(Land, landId): + if not self.checkRbacPermission(Land, "update", landId): raise PermissionError(f"User {self.userId} cannot modify country {landId}") for key, value in updateData.items(): @@ -667,11 +696,51 @@ class RealEstateObjects: if not land: return False - if not self.access.canModify(Land, landId): + if not self.checkRbacPermission(Land, "delete", landId): raise PermissionError(f"User {self.userId} cannot delete country {landId}") return self.db.recordDelete(Land, landId) + # ===== RBAC Permission Checks ===== + + def checkRbacPermission( + self, + modelClass: type, + operation: str, + recordId: Optional[str] = None + ) -> bool: + """ + Check RBAC permission for a specific operation on a table. + + Args: + modelClass: Pydantic model class for the table + operation: Operation to check ('create', 'update', 'delete', 'read') + recordId: Optional record ID for specific record check + + Returns: + Boolean indicating permission + """ + if not self.rbac or not self.currentUser: + return False + + tableName = modelClass.__name__ + permissions = self.rbac.getUserPermissions( + self.currentUser, + AccessRuleContext.DATA, + tableName + ) + + if operation == "create": + return permissions.create != AccessLevel.NONE + elif operation == "update": + return permissions.update != AccessLevel.NONE + elif operation == "delete": + return permissions.delete != AccessLevel.NONE + elif operation == "read": + return permissions.read != AccessLevel.NONE + else: + return False + # ===== Direct Query Execution (stateless) ===== def executeQuery(self, queryText: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: diff --git a/modules/interfaces/interfaceRbac.py b/modules/interfaces/interfaceRbac.py index fad37e96..e232ae95 100644 --- a/modules/interfaces/interfaceRbac.py +++ b/modules/interfaces/interfaceRbac.py @@ -112,10 +112,24 @@ def getRecordsetWithRBAC( ) elif fieldType == "JSONB" and fieldName in record: if record[fieldName] is None: - if fieldName in ["logs", "messages", "tasks", "expectedDocumentFormats", "resultDocuments"]: - record[fieldName] = [] - elif fieldName in ["execParameters", "stats"]: - record[fieldName] = {} + # Generic type-based default: List types -> [], Dict types -> {} + # Interfaces handle domain-specific defaults + modelFields = modelClass.model_fields + fieldInfo = modelFields.get(fieldName) + if fieldInfo: + fieldAnnotation = fieldInfo.annotation + # Check if it's a List type + if (fieldAnnotation == list or + (hasattr(fieldAnnotation, "__origin__") and + fieldAnnotation.__origin__ is list)): + record[fieldName] = [] + # Check if it's a Dict type + elif (fieldAnnotation == dict or + (hasattr(fieldAnnotation, "__origin__") and + fieldAnnotation.__origin__ is dict)): + record[fieldName] = {} + else: + record[fieldName] = None else: record[fieldName] = None else: diff --git a/modules/workflows/workflowManager copy.py b/modules/workflows/workflowManager copy.py deleted file mode 100644 index 9806060a..00000000 --- a/modules/workflows/workflowManager copy.py +++ /dev/null @@ -1,1385 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -from typing import Dict, Any, List, Optional -import logging -import uuid -import asyncio -import json - -from modules.datamodels.datamodelChat import ( - UserInputRequest, - ChatMessage, - ChatWorkflow, - ChatDocument, - WorkflowModeEnum -) -from modules.datamodels.datamodelChat import TaskContext -from modules.workflows.processing.workflowProcessor import WorkflowProcessor -from modules.workflows.processing.shared.stateTools import WorkflowStoppedException, checkWorkflowStopped - - -logger = logging.getLogger(__name__) - -class WorkflowManager: - """Manager for workflow processing and coordination""" - - def __init__(self, services): - self.services = services - self.workflowProcessor = None - - # Exported functions - - async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow: - """Starts a new workflow or continues an existing one, then launches processing.""" - try: - # Debug log to check workflowMode parameter - logger.info(f"WorkflowManager received workflowMode: {workflowMode}") - currentTime = self.services.utils.timestampGetUtc() - - if workflowId: - workflow = self.services.chat.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Store workflow in services for reference (this is the ChatWorkflow object) - self.services.workflow = workflow - - # CRITICAL: Update all method instances to use the current Services object with the correct workflow - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - discoverMethods(self.services) - logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - - if workflow.status == "running": - logger.info(f"Stopping running workflow {workflowId} before processing new prompt") - workflow.status = "stopped" - workflow.lastActivity = currentTime - self.services.chat.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": currentTime - }) - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped for new prompt", - "type": "info", - "status": "stopped", - "progress": 1.0 - }) - - newRound = workflow.currentRound + 1 - self.services.chat.updateWorkflow(workflowId, { - "status": "running", - "lastActivity": currentTime, - "currentRound": newRound, - "workflowMode": workflowMode # Update workflow mode for existing workflows - }) - - # Reflect updates on the in-memory object without reloading - workflow.status = "running" - workflow.lastActivity = currentTime - workflow.currentRound = newRound - workflow.workflowMode = workflowMode - - self.services.chat.storeLog(workflow, { - "message": f"Workflow resumed (round {workflow.currentRound}) with mode: {workflowMode}", - "type": "info", - "status": "running", - "progress": 0 - }) - - else: - workflowData = { - "name": "New Workflow", - "status": "running", - "startedAt": currentTime, - "lastActivity": currentTime, - "currentRound": 1, - "currentTask": 0, - "currentAction": 0, - "totalTasks": 0, - "totalActions": 0, - "mandateId": self.services.user.mandateId, - "messageIds": [], - "workflowMode": workflowMode, - "maxSteps": 10 , # Set maxSteps - } - - workflow = self.services.chat.createWorkflow(workflowData) - logger.info(f"Created workflow with mode: {getattr(workflow, 'workflowMode', 'NOT_SET')}") - logger.info(f"Workflow data passed: {workflowData.get('workflowMode', 'NOT_IN_DATA')}") - - # Store workflow in services (this is the ChatWorkflow object) - self.services.workflow = workflow - - # CRITICAL: Update all method instances to use the current Services object with the correct workflow - # This ensures cached method instances don't use stale workflow IDs from previous workflows - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - discoverMethods(self.services) - logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - - # Start workflow processing asynchronously - asyncio.create_task(self._workflowProcess(userInput)) - - return workflow - except Exception as e: - logger.error(f"Error starting workflow: {str(e)}") - raise - - async def workflowStop(self, workflowId: str) -> ChatWorkflow: - """Stops a running workflow.""" - try: - workflow = self.services.chat.getWorkflow(workflowId) - if not workflow: - raise ValueError(f"Workflow {workflowId} not found") - - # Store workflow in services (this is the ChatWorkflow object) - self.services.workflow = workflow - - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflowId, { - "status": "stopped", - "lastActivity": workflow.lastActivity - }) - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - return workflow - except Exception as e: - logger.error(f"Error stopping workflow: {str(e)}") - raise - - # Main processor - - async def _workflowProcess(self, userInput: UserInputRequest) -> None: - """Process a workflow with user input""" - try: - # Send ChatLog message immediately when workflow starts - workflow = self.services.workflow - self.services.chat.storeLog(workflow, { - "message": "Workflow started...", - "type": "info", - "status": "running", - "progress": 0.0 - }) - - # Store the current user prompt in services for easy access throughout the workflow - self.services.rawUserPrompt = userInput.prompt - self.services.currentUserPrompt = userInput.prompt - - # Reset progress logger for new workflow - self.services.chat._progressLogger = None - - # Reset workflow history flag at start of each workflow - setattr(self.services, '_needsWorkflowHistory', False) - - self.workflowProcessor = WorkflowProcessor(self.services) - - # Get workflow mode to determine if combined analysis is needed - workflowMode = getattr(self.services.workflow, 'workflowMode', None) - skipCombinedAnalysis = (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) - - if skipCombinedAnalysis: - logger.info("Skipping combined analysis for AUTOMATION mode - using predefined plan") - complexity = "moderate" # Default for automation workflows - needsWorkflowHistory = False # Automation workflows don't need history - detectedLanguage = None # No language detection in automation mode - normalizedRequest = userInput.prompt - intentText = userInput.prompt - contextItems = [] - workflowIntent = None - else: - # Process user-uploaded documents from userInput for combined analysis - documents = [] - if userInput.listFileId: - try: - documents = await self._processFileIds(userInput.listFileId, None) - except Exception as e: - logger.warning(f"Failed to process user fileIds for combined analysis: {e}") - - # Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call - analysisResult = await self._analyzeUserInputAndComplexity(userInput.prompt, documents) - - # Extract results - detectedLanguage = analysisResult.get('detectedLanguage') - normalizedRequest = analysisResult.get('normalizedRequest') - intentText = analysisResult.get('intent') or userInput.prompt - contextItems = analysisResult.get('contextItems', []) - complexity = analysisResult.get('complexity', 'moderate') - needsWorkflowHistory = analysisResult.get('needsWorkflowHistory', False) - fastTrack = analysisResult.get('fastTrack', False) - - # Extract intent analysis fields and store as workflowIntent - workflowIntent = { - 'intent': intentText, # Use intent instead of primaryGoal - 'dataType': analysisResult.get('dataType', 'unknown'), - 'expectedFormats': analysisResult.get('expectedFormats', []), - 'qualityRequirements': analysisResult.get('qualityRequirements', {}), - 'successCriteria': analysisResult.get('successCriteria', []), - 'languageUserDetected': detectedLanguage, - 'needsWorkflowHistory': needsWorkflowHistory - } - - # Store needsWorkflowHistory in services - setattr(self.services, '_needsWorkflowHistory', bool(needsWorkflowHistory)) - - # Store workflowIntent in workflow object for reuse - if hasattr(self.services, 'workflow') and self.services.workflow: - self.services.workflow._workflowIntent = workflowIntent - - # Store normalized request and intent - # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent - self.services.currentUserPrompt = intentText or userInput.prompt - if normalizedRequest and normalizedRequest.strip(): - # Use normalizedRequest if available and not empty - self.services.currentUserPromptNormalized = normalizedRequest - logger.info(f"Stored normalized request (length: {len(normalizedRequest)}, preview: {normalizedRequest[:100]}...)") - else: - # Fallback only if normalizedRequest is None or empty - logger.warning(f"normalizedRequest is None or empty, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText[:100] if intentText else None}...") - self.services.currentUserPromptNormalized = intentText or userInput.prompt - if contextItems is not None: - self.services.currentUserContextItems = contextItems - - # Set detected language - if detectedLanguage and isinstance(detectedLanguage, str): - self._setUserLanguage(detectedLanguage) - try: - setattr(self.services, 'currentUserLanguage', detectedLanguage) - except Exception: - pass - - logger.info(f"Combined analysis: complexity={complexity}, needsWorkflowHistory={needsWorkflowHistory}, language={detectedLanguage}, fastTrack={fastTrack}") - - # Route to fast path for simple requests if history is not needed - # Skip fast path for automation mode or if history is needed - if not skipCombinedAnalysis and complexity == "simple" and not needsWorkflowHistory: - logger.info("Routing to fast path for simple request") - await self._executeFastPath(userInput, documents) - return # Fast path completes the workflow - - # Now send the first message (use already analyzed data if available) - await self._sendFirstMessage(userInput, skipIntentionAnalysis=not skipCombinedAnalysis) - - # Route to full workflow for moderate/complex requests or automation mode - logger.info(f"Routing to full workflow for {complexity} request" + (" (automation mode)" if skipCombinedAnalysis else "")) - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - - except WorkflowStoppedException: - self._handleWorkflowStop() - - except Exception as e: - self._handleWorkflowError(e) - - # Helper functions - - async def _analyzeUserInputAndComplexity( - self, - userPrompt: str, - documents: List[ChatDocument] - ) -> Dict[str, Any]: - """ - Phase 1+2: Kombinierte Analyse: Intent + Komplexität in einem AI-Call. - - Args: - userPrompt: User-Anfrage - documents: Liste der Dokumente - - Returns: - Dict mit: - - detectedLanguage: ISO 639-1 Sprachcode - - normalizedRequest: Vollständige, explizite Umformulierung - - intent: Kurze Kern-Anfrage - - contextItems: Große Datenblöcke als separate Dokumente - - complexity: "simple" | "moderate" | "complex" - - needsWorkflowHistory: bool - - fastTrack: bool - - dataType: Datentyp - - expectedFormats: Erwartete Formate - - qualityRequirements: Qualitätsanforderungen - - successCriteria: Erfolgskriterien - """ - # Baue Dokument-Liste für Prompt - docListText = "" - if documents: - for i, doc in enumerate(documents, 1): - docListText += f"\n{i}. {doc.fileName} ({doc.mimeType}, {doc.fileSize} bytes)" - - analysisPrompt = f"""You are an input analyzer. From the user's message, perform ALL of the following in one pass: - -1. detectedLanguage: Detect ISO 639-1 language code (e.g., de, en, fr, it) -2. normalizedRequest: Full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details -3. intent: Concise single-paragraph core request in the detected language for high-level routing -4. contextItems: Supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content -5. complexity: "simple" | "moderate" | "complex" - - "simple": Only if NO documents AND NO web search required. Single question, straightforward answer (5-15s) - - "moderate": Multiple steps, some documents, structured response requiring some processing, or web search needed (30-60s) - - "complex": Multi-task workflow, many documents, research needed, content generation required, multi-step planning (60-120s) -6. needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work) -7. fastTrack: Boolean indicating if Fast Track is possible (simple requests without documents and without workflow history) -8. dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown) -9. expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., ["xlsx", "pdf"]). If format is unclear or not specified, use empty list [] -10. qualityRequirements: Quality requirements they have (accuracy, completeness) as {{accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}} -11. successCriteria: Specific success criteria that define completion (array of strings) - -Rules: -- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained -- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear -- Preserve critical references (URLs, filenames) in intent -- Normalize to the primary detected language if mixed-language -- Consider number of documents provided when determining complexity -- Consider need for external research or web search when determining complexity - -Documents provided: {len(documents)} document(s) -{docListText} - -Return ONLY JSON (no markdown) with this exact structure: -{{ - "detectedLanguage": "de|en|fr|it|...", - "normalizedRequest": "Full explicit instruction in detected language", - "intent": "Concise normalized request...", - "contextItems": [ - {{ - "title": "User context 1", - "mimeType": "text/plain", - "content": "Full extracted content block here" - }} - ], - "complexity": "simple" | "moderate" | "complex", - "needsWorkflowHistory": true|false, - "fastTrack": true|false, - "dataType": "numbers|text|documents|analysis|code|unknown", - "expectedFormats": ["pdf", "docx", "xlsx", "txt", "json", "csv", "html", "md"], - "qualityRequirements": {{ - "accuracyThreshold": 0.0-1.0, - "completenessThreshold": 0.0-1.0 - }}, - "successCriteria": ["specific criterion 1", "specific criterion 2"] -}} - -## User Message -The following is the user's original input message. Analyze intent, normalize the request, determine complexity, and identify any large context blocks that should be moved to separate documents: - -################ USER INPUT START ################# -{userPrompt.replace('{', '{{').replace('}', '}}') if userPrompt else ''} -################ USER INPUT FINISH ################# -""" - - # AI-Call (verwende callAiPlanning für einfache JSON-Responses) - # Debug-Logs werden bereits von callAiPlanning geschrieben - aiResponse = await self.services.ai.callAiPlanning( - prompt=analysisPrompt, - placeholders=None, - debugType="user_input_analysis" - ) - - # Parse Result - try: - jsonStart = aiResponse.find('{') if aiResponse else -1 - jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 - if jsonStart != -1 and jsonEnd > jsonStart: - result = json.loads(aiResponse[jsonStart:jsonEnd]) - return result - else: - logger.warning("Could not parse combined analysis response, using defaults") - return self._getDefaultAnalysisResult() - except Exception as e: - logger.warning(f"Error parsing combined analysis response: {str(e)}, using defaults") - return self._getDefaultAnalysisResult() - - def _getDefaultAnalysisResult(self) -> Dict[str, Any]: - """Fallback Default-Werte wenn Parsing fehlschlägt.""" - return { - "detectedLanguage": "en", - "normalizedRequest": "", - "intent": "", - "contextItems": [], - "complexity": "moderate", - "needsWorkflowHistory": False, - "fastTrack": False, - "dataType": "unknown", - "expectedFormats": [], - "qualityRequirements": { - "accuracyThreshold": 0.8, - "completenessThreshold": 0.8 - }, - "successCriteria": [] - } - - async def _executeFastPath(self, userInput: UserInputRequest, documents: List[ChatDocument]) -> None: - """Execute fast path for simple requests and deliver result to user""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Get user language if available - userLanguage = getattr(self.services, 'currentUserLanguage', None) - - # Execute fast path - use normalizedRequest if available, otherwise use raw prompt - normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - result = await self.workflowProcessor.fastPathExecute( - prompt=normalizedPrompt, - documents=documents, - userLanguage=userLanguage - ) - - if not result.success: - # Fast path failed, fall back to full workflow - logger.warning(f"Fast path failed: {result.error}, falling back to full workflow") - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - return - - # Extract response text from ActionResult - responseText = "" - chatDocuments = [] - - if result.documents and len(result.documents) > 0: - # Get response text from first document - firstDoc = result.documents[0] - if hasattr(firstDoc, 'documentData'): - docData = firstDoc.documentData - if isinstance(docData, bytes): - responseText = docData.decode('utf-8') - else: - responseText = str(docData) - - # Convert ActionDocuments to ChatDocuments for persistence - for actionDoc in result.documents: - if hasattr(actionDoc, 'documentData') and actionDoc.documentData: - # Create file in component storage - fileItem = self.services.interfaceDbComponent.createFile( - name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt", - mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain", - content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8') - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')) - - # Get file info - fileInfo = self.services.chat.getFileInfo(fileItem.id) - - # Create ChatDocument as dict (messageId will be assigned by createMessage) - # Don't create ChatDocument object directly - it requires messageId which doesn't exist yet - chatDoc = { - "fileId": fileItem.id, - "fileName": fileInfo.get("fileName", actionDoc.documentName) if fileInfo else actionDoc.documentName, - "fileSize": fileInfo.get("size", len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))) if fileInfo else (len(actionDoc.documentData) if isinstance(actionDoc.documentData, bytes) else len(actionDoc.documentData.encode('utf-8'))), - "mimeType": fileInfo.get("mimeType", actionDoc.mimeType) if fileInfo else actionDoc.mimeType, - "roundNumber": workflow.currentRound, - "taskNumber": 0, # Fast path doesn't have tasks - "actionNumber": 0 - } - chatDocuments.append(chatDoc) - - # Mark workflow as completed BEFORE storing message (so UI polling stops) - workflow.status = "completed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity - }) - - # Create ChatMessage with fast path response (in user's language) - messageData = { - "workflowId": workflow.id, - "role": "assistant", - "message": responseText or "Fast path response completed", - "status": "last", # Fast path completes the workflow - UI polling stops on this - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "fast_path_response", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, # Fast path doesn't have tasks - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - # Store message with documents - self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments) - - logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars") - - except Exception as e: - logger.error(f"Error in _executeFastPath: {str(e)}") - # Fall back to full workflow on error - logger.info("Falling back to full workflow due to fast path error") - taskPlan = await self._planTasks(userInput) - await self._executeTasks(taskPlan) - await self._processWorkflowResults() - - async def _sendFirstMessage(self, userInput: UserInputRequest, skipIntentionAnalysis: bool = False) -> None: - """Send first message to start workflow""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Create initial message using interface - # For first user message, include round info in the user context label - roundNum = workflow.currentRound - contextLabel = f"round{roundNum}_usercontext" - - # Use normalized request if available (from combined analysis), otherwise use original prompt - # This ensures the first message uses the normalized request for security - normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - - messageData = { - "workflowId": workflow.id, - "role": "user", - "message": normalizedRequest, # Use normalized request instead of original prompt - "status": "first", - "sequenceNr": 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": contextLabel, - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - - # Analyze the user's input to detect language, normalize request, extract intent, and offload bulky context into documents - # SKIP user intention analysis if already done in combined analysis (skipIntentionAnalysis=True) - # or for AUTOMATION mode - it uses predefined JSON plans - createdDocs = [] - workflowMode = getattr(workflow, 'workflowMode', None) - skipIntentionAnalysis = skipIntentionAnalysis or (workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION) - - if skipIntentionAnalysis: - logger.info("Skipping user intention analysis (already done in combined analysis or AUTOMATION mode)") - # Use already analyzed data if available, otherwise use user input directly - detectedLanguage = getattr(self.services, 'currentUserLanguage', None) - normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - intentText = getattr(self.services, 'currentUserPrompt', None) or userInput.prompt - contextItems = getattr(self.services, 'currentUserContextItems', None) or [] - workflowIntent = getattr(workflow, '_workflowIntent', None) - - # Create documents for context items (if available from combined analysis) - if contextItems and isinstance(contextItems, list): - for idx, item in enumerate(contextItems): - try: - title = item.get('title') if isinstance(item, dict) else None - mime = item.get('mimeType') if isinstance(item, dict) else None - content = item.get('content') if isinstance(item, dict) else None - if not content: - continue - fileName = (title or f"user_context_{idx+1}.txt").strip() - mimeType = (mime or "text/plain").strip() - - # Neutralize content before storing if neutralization is enabled - contentBytes = content.encode('utf-8') - contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType) - - # Create file in component storage - fileItem = self.services.interfaceDbComponent.createFile( - name=fileName, - mimeType=mimeType, - content=contentBytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) - - # Collect file info - fileInfo = self.services.chat.getFileInfo(fileItem.id) - from modules.datamodels.datamodelChat import ChatDocument - doc = ChatDocument( - fileId=fileItem.id, - fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName, - fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), - mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType - ) - createdDocs.append(doc) - except Exception: - continue - else: - try: - analyzerPrompt = ( - "You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n" - "1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n" - "2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n" - "3) intent: concise single-paragraph core request in the detected language for high-level routing.\n" - "4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent (large literal content, long lists/tables, code/JSON blocks, transcripts, CSV fragments, detailed specs). Keep URLs in the intent unless they embed large pasted content.\n" - "5) dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown).\n" - "6) expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., [\"xlsx\", \"pdf\"]). If format is unclear or not specified, use empty list [].\n" - "7) qualityRequirements: Quality requirements they have (accuracy, completeness) as {accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}.\n" - "8) successCriteria: Specific success criteria that define completion (array of strings).\n" - "9) needsWorkflowHistory: Boolean indicating if this request needs previous workflow rounds/history to be understood or completed (e.g., 'continue', 'retry', 'fix', 'improve', 'update', 'modify', 'based on previous', 'build on', references to earlier work). Return true if the request is a continuation, retry, modification, or builds upon previous work.\n\n" - "Rules:\n" - "- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n" - "- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n" - "- Preserve critical references (URLs, filenames) in intent.\n" - "- Normalize to the primary detected language if mixed-language.\n\n" - "Return ONLY JSON (no markdown) with this shape:\n" - "{\n" - " \"detectedLanguage\": \"de|en|fr|it|...\",\n" - " \"normalizedRequest\": \"Full explicit instruction in detected language\",\n" - " \"intent\": \"Concise normalized request...\",\n" - " \"contextItems\": [\n" - " {\n" - " \"title\": \"User context 1\",\n" - " \"mimeType\": \"text/plain\",\n" - " \"content\": \"Full extracted content block here\"\n" - " }\n" - " ],\n" - " \"dataType\": \"numbers|text|documents|analysis|code|unknown\",\n" - " \"expectedFormats\": [\"pdf\", \"docx\", \"xlsx\", \"txt\", \"json\", \"csv\", \"html\", \"md\"],\n" - " \"qualityRequirements\": {\n" - " \"accuracyThreshold\": 0.0-1.0,\n" - " \"completenessThreshold\": 0.0-1.0\n" - " },\n" - " \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"],\n" - " \"needsWorkflowHistory\": true|false\n" - "}\n\n" - "## User Message\n" - "The following is the user's original input message. Extract intent, normalize the request, and identify any large context blocks that should be moved to separate documents:\n\n" - "################ USER INPUT START #################\n" - f"{userInput.prompt.replace('{', '{{').replace('}', '}}') if userInput.prompt else ''}\n" - "################ USER INPUT FINISH #################" - ) - - # Call AI analyzer (planning call - will use static parameters) - aiResponse = await self.services.ai.callAiPlanning( - prompt=analyzerPrompt, - placeholders=None, - debugType="userintention" - ) - - detectedLanguage = None - normalizedRequest = None - intentText = userInput.prompt - contextItems = [] - workflowIntent = None - - # Parse analyzer response (JSON expected) - try: - jsonStart = aiResponse.find('{') if aiResponse else -1 - jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 - if jsonStart != -1 and jsonEnd > jsonStart: - parsed = json.loads(aiResponse[jsonStart:jsonEnd]) - detectedLanguage = parsed.get('detectedLanguage') or None - normalizedRequest = parsed.get('normalizedRequest') or None - if parsed.get('intent'): - intentText = parsed.get('intent') - contextItems = parsed.get('contextItems') or [] - - # Extract intent analysis fields and store as workflowIntent - intentText = parsed.get('intent') or userInput.prompt - workflowIntent = { - 'intent': intentText, # Use intent instead of primaryGoal - 'dataType': parsed.get('dataType', 'unknown'), - 'expectedFormats': parsed.get('expectedFormats', []), - 'qualityRequirements': parsed.get('qualityRequirements', {}), - 'successCriteria': parsed.get('successCriteria', []), - 'languageUserDetected': detectedLanguage, - 'needsWorkflowHistory': parsed.get('needsWorkflowHistory', False) - } - - # Store needsWorkflowHistory in services for fast path decision - needsHistoryFromIntention = parsed.get('needsWorkflowHistory', False) - # Always set the value - default to False if not a boolean - setattr(self.services, '_needsWorkflowHistory', bool(needsHistoryFromIntention) if isinstance(needsHistoryFromIntention, bool) else False) - - # Store workflowIntent in workflow object for reuse - if hasattr(self.services, 'workflow') and self.services.workflow: - self.services.workflow._workflowIntent = workflowIntent - except Exception: - contextItems = [] - workflowIntent = None - # Ensure needsWorkflowHistory is False if parsing fails - setattr(self.services, '_needsWorkflowHistory', False) - - # Update services state - # CRITICAL: Validate language from AI response - # If AI didn't return language or invalid → use user language - # If user language not set → use "en" - validatedLanguage = None - - # Validate AI-detected language - if detectedLanguage and isinstance(detectedLanguage, str): - detectedLanguage = detectedLanguage.strip().lower() - # Check if it's a valid 2-character ISO code - if len(detectedLanguage) == 2 and detectedLanguage.isalpha(): - validatedLanguage = detectedLanguage - - # If AI didn't return valid language, use user language - if not validatedLanguage: - userLanguage = getattr(self.services.user, 'language', None) if hasattr(self.services, 'user') and self.services.user else None - if userLanguage and isinstance(userLanguage, str): - userLanguage = userLanguage.strip().lower() - if len(userLanguage) == 2 and userLanguage.isalpha(): - validatedLanguage = userLanguage - - # Final fallback to "en" - if not validatedLanguage: - validatedLanguage = "en" - logger.warning("Language not detected from AI and user language not set - using default 'en'") - - # Set validated language - self._setUserLanguage(validatedLanguage) - try: - setattr(self.services, 'currentUserLanguage', validatedLanguage) - logger.debug(f"Set currentUserLanguage to validated value: {validatedLanguage}") - except Exception: - pass - self.services.currentUserPrompt = intentText or userInput.prompt - # Always set currentUserPromptNormalized - use normalizedRequest if available, otherwise fallback to currentUserPrompt - # CRITICAL: normalizedRequest MUST be used if available, do NOT fall back to intent - if normalizedRequest and normalizedRequest.strip(): - # Use normalizedRequest if available and not empty - self.services.currentUserPromptNormalized = normalizedRequest - logger.debug(f"Stored normalized request from analysis (length: {len(normalizedRequest)})") - else: - # Fallback only if normalizedRequest is None or empty - logger.warning(f"normalizedRequest is None or empty in analysis, falling back to intentText. normalizedRequest={normalizedRequest}, intentText={intentText}") - self.services.currentUserPromptNormalized = intentText or userInput.prompt - if contextItems is not None: - self.services.currentUserContextItems = contextItems - - # Update message with normalized request if analysis produced one - if normalizedRequest and normalizedRequest != userInput.prompt: - messageData["message"] = normalizedRequest - logger.debug(f"Updated first message with normalized request (length: {len(normalizedRequest)})") - - # Create documents for context items - if contextItems and isinstance(contextItems, list): - for idx, item in enumerate(contextItems): - try: - title = item.get('title') if isinstance(item, dict) else None - mime = item.get('mimeType') if isinstance(item, dict) else None - content = item.get('content') if isinstance(item, dict) else None - if not content: - continue - fileName = (title or f"user_context_{idx+1}.txt").strip() - mimeType = (mime or "text/plain").strip() - - # Neutralize content before storing if neutralization is enabled - contentBytes = content.encode('utf-8') - contentBytes = await self._neutralizeContentIfEnabled(contentBytes, mimeType) - - # Create file in component storage - fileItem = self.services.interfaceDbComponent.createFile( - name=fileName, - mimeType=mimeType, - content=contentBytes - ) - # Persist file data - self.services.interfaceDbComponent.createFileData(fileItem.id, contentBytes) - - # Collect file info - fileInfo = self.services.chat.getFileInfo(fileItem.id) - from modules.datamodels.datamodelChat import ChatDocument - doc = ChatDocument( - fileId=fileItem.id, - fileName=fileInfo.get("fileName", fileName) if fileInfo else fileName, - fileSize=fileInfo.get("size", len(contentBytes)) if fileInfo else len(contentBytes), - mimeType=fileInfo.get("mimeType", mimeType) if fileInfo else mimeType - ) - createdDocs.append(doc) - except Exception: - continue - except Exception as e: - logger.warning(f"Prompt analysis failed or skipped: {str(e)}") - - # Process user-uploaded documents (fileIds) and combine with context documents - if userInput.listFileId: - try: - userDocs = await self._processFileIds(userInput.listFileId, None) - if userDocs: - createdDocs.extend(userDocs) - except Exception as e: - logger.warning(f"Failed to process user fileIds: {e}") - - # Finally, persist and bind the first message with combined documents (context + user) - self.services.chat.storeMessageWithDocuments(workflow, messageData, createdDocs) - - # Create ChatMessage with success criteria (KPI) AFTER the first user message - # This ensures the KPI message appears after the user message in the UI - workflowIntent = getattr(workflow, '_workflowIntent', None) - if workflowIntent and isinstance(workflowIntent, dict): - successCriteria = workflowIntent.get('successCriteria', []) - if successCriteria and isinstance(successCriteria, list) and len(successCriteria) > 0: - try: - # Format success criteria as message with "KPI" title - criteriaText = "**KPI**\n\n" + "\n".join([f"• {criterion}" for criterion in successCriteria]) - - kpiMessageData = { - "workflowId": workflow.id, - "role": "system", - "message": criteriaText, - "summary": f"KPI: {len(successCriteria)} success criteria", - "status": "step", - "sequenceNr": len(workflow.messages) + 1, # After user message - "publishedAt": self.services.utils.timestampGetUtc(), - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0 - } - - self.services.chat.storeMessageWithDocuments(workflow, kpiMessageData, []) - logger.info(f"Created KPI message with {len(successCriteria)} success criteria after first user message") - except Exception as e: - logger.error(f"Error creating KPI message: {str(e)}") - - except Exception as e: - logger.error(f"Error sending first message: {str(e)}") - raise - - async def _planTasks(self, userInput: UserInputRequest): - """Generate task plan for workflow execution""" - workflow = self.services.workflow - handling = self.workflowProcessor - # Generate task plan first (shared for both modes) - # Use normalizedRequest instead of raw userInput.prompt for security - normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt - taskPlan = await handling.generateTaskPlan(normalizedPrompt, workflow) - if not taskPlan or not taskPlan.tasks: - raise Exception("No tasks generated in task plan.") - workflowMode = getattr(workflow, 'workflowMode') - logger.info(f"Workflow object attributes: {workflow.__dict__ if hasattr(workflow, '__dict__') else 'No __dict__'}") - logger.info(f"Executing workflow mode={workflowMode} with {len(taskPlan.tasks)} tasks") - return taskPlan - - async def _executeTasks(self, taskPlan) -> None: - """Execute all tasks in the task plan and update workflow status.""" - workflow = self.services.workflow - handling = self.workflowProcessor - totalTasks = len(taskPlan.tasks) - allTaskResults: List = [] - previousResults: List[str] = [] - - # Create "Service Workflow Execution" root entry - parent of all tasks - workflowExecOperationId = f"workflowExec_{workflow.id}" - self.services.chat.progressLogStart( - workflowExecOperationId, - "Service", - "Workflow Execution", - f"Executing {totalTasks} task(s)" - ) - - # Store workflow execution operationId in workflowProcessor for task hierarchy - handling.workflowExecOperationId = workflowExecOperationId - - try: - for idx, taskStep in enumerate(taskPlan.tasks): - currentTaskIndex = idx + 1 - logger.info(f"Task {currentTaskIndex}/{totalTasks}: {taskStep.objective}") - - # Update workflow state before executing task (fixes "Task 0" issue) - handling.updateWorkflowBeforeExecutingTask(currentTaskIndex) - - # Build TaskContext (mode-specific behavior is inside WorkflowProcessor) - taskContext = TaskContext( - taskStep=taskStep, - workflow=workflow, - workflowId=workflow.id, - availableDocuments=None, - availableConnections=None, - previousResults=previousResults, - previousHandover=None, - improvements=[], - retryCount=0, - previousActionResults=[], - previousReviewResult=None, - isRegeneration=False, - failurePatterns=[], - failedActions=[], - successfulActions=[], - criteriaProgress={ - 'met_criteria': set(), - 'unmet_criteria': set(), - 'attempt_history': [] - } - ) - - taskResult = await handling.executeTask(taskStep, workflow, taskContext) - - # Persist task result for cross-task/round document references - # Convert ChatTaskResult to WorkflowTaskResult for persistence - from modules.datamodels.datamodelWorkflow import TaskResult as WorkflowTaskResult - from modules.datamodels.datamodelChat import ActionResult - - # Get final ActionResult from task execution (last action result) - finalActionResult = None - if hasattr(taskResult, 'actionResult'): - finalActionResult = taskResult.actionResult - elif taskContext.previousActionResults and len(taskContext.previousActionResults) > 0: - # Use last action result from context - finalActionResult = taskContext.previousActionResults[-1] - - # Create WorkflowTaskResult for persistence - if finalActionResult: - workflowTaskResult = WorkflowTaskResult( - taskId=taskStep.id, - actionResult=finalActionResult - ) - # Persist task result (creates ChatMessage + ChatDocuments) - await handling.persistTaskResult(workflowTaskResult, workflow, taskContext) - - handoverData = await handling.prepareTaskHandover(taskStep, [], taskResult, workflow) - allTaskResults.append({ - 'taskStep': taskStep, - 'taskResult': taskResult, - 'handoverData': handoverData - }) - if taskResult.success and taskResult.feedback: - previousResults.append(taskResult.feedback) - - # Mark workflow as completed; error/stop cases update status elsewhere - workflow.status = "completed" - finally: - # Finish "Service Workflow Execution" entry - self.services.chat.progressLogFinish(workflowExecOperationId, True) - - return None - - async def _processWorkflowResults(self) -> None: - """Process workflow results based on workflow status and create appropriate messages""" - try: - workflow = self.services.workflow - try: - checkWorkflowStopped(self.services) - except WorkflowStoppedException: - logger.info(f"Workflow {workflow.id} was stopped during result processing") - - # Create final stopped message - stoppedMessage = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "stopped", - "actionProgress": "stopped" - } - self.services.chat.storeMessageWithDocuments(workflow, stoppedMessage, []) - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity - }) - return - - if workflow.status == 'stopped': - # Create stopped message - stopped_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "stopped", - "actionProgress": "stopped" - } - self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Add stopped log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped by user", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - return - elif workflow.status == 'failed': - # Create error message - errorMessage = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow failed: {'Unknown error'}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_failure", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, errorMessage, []) - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Add failed log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow failed: Unknown error", - "type": "error", - "status": "failed", - "progress": 1.0 - }) - return - - # For successful workflows, send detailed completion message - await self._sendLastMessage() - - except Exception as e: - logger.error(f"Error processing workflow results: {str(e)}") - # Create error message - error_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Error processing workflow results: {str(e)}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_error", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, error_message, []) - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - async def _sendLastMessage(self) -> None: - """Send last message to complete workflow (only for successful workflows)""" - try: - workflow = self.services.workflow - # Safety check: ensure this is only called for successful workflows - if workflow.status in ['stopped', 'failed']: - logger.warning(f"Attempted to send last message for {workflow.status} workflow {workflow.id}") - return - - # Generate feedback - feedback = await self._generateWorkflowFeedback() - - # Create last message using interface - messageData = { - "workflowId": workflow.id, - "role": "assistant", - "message": feedback, - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_feedback", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "success", - "actionProgress": "success" - } - - # Create message using interface - self.services.chat.storeMessageWithDocuments(workflow, messageData, []) - - # Update workflow status to completed - workflow.status = "completed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - - # Update workflow in database - self.services.chat.updateWorkflow(workflow.id, { - "status": "completed", - "lastActivity": workflow.lastActivity - }) - - # Add completion log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow completed", - "type": "success", - "status": "completed", - "progress": 1.0 - }) - - except Exception as e: - logger.error(f"Error sending last message: {str(e)}") - raise - - async def _generateWorkflowFeedback(self) -> str: - """Generate feedback message for workflow completion""" - try: - workflow = self.services.workflow - checkWorkflowStopped(self.services) - - # Count messages by role - userMessages = [msg for msg in workflow.messages if msg.role == 'user'] - assistantMessages = [msg for msg in workflow.messages if msg.role == 'assistant'] - - # Generate summary feedback - feedback = f"Workflow completed.\n\n" - feedback += f"Processed {len(userMessages)} user inputs and generated {len(assistantMessages)} responses.\n" - - # Add final status - if workflow.status == "completed": - feedback += "All tasks completed successfully." - elif workflow.status == "partial": - feedback += "Some tasks completed with partial success." - else: - feedback += f"Workflow status: {workflow.status}" - - return feedback - - except Exception as e: - logger.error(f"Error generating workflow feedback: {str(e)}") - return "Workflow processing completed." - - def _handleWorkflowStop(self) -> None: - """Handle workflow stop exception""" - workflow = self.services.workflow - logger.info("Workflow stopped by user") - - # Update workflow status to stopped - workflow.status = "stopped" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "stopped", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Create final stopped message - stopped_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": "🛑 Workflow stopped by user", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_stopped", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "pending", - "actionProgress": "pending" - } - self.services.chat.storeMessageWithDocuments(workflow, stopped_message, []) - - # Add log entry - self.services.chat.storeLog(workflow, { - "message": "Workflow stopped by user", - "type": "warning", - "status": "stopped", - "progress": 1.0 - }) - - def _handleWorkflowError(self, error: Exception) -> None: - """Handle workflow error exception""" - workflow = self.services.workflow - logger.error(f"Workflow processing error: {str(error)}") - - # Update workflow status to failed - workflow.status = "failed" - workflow.lastActivity = self.services.utils.timestampGetUtc() - self.services.chat.updateWorkflow(workflow.id, { - "status": "failed", - "lastActivity": workflow.lastActivity, - "totalTasks": workflow.totalTasks, - "totalActions": workflow.totalActions - }) - - # Create error message - error_message = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Workflow processing failed: {str(error)}", - "status": "last", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": self.services.utils.timestampGetUtc(), - "documentsLabel": "workflow_error", - "documents": [], - # Add workflow context fields - "roundNumber": workflow.currentRound, - "taskNumber": 0, - "actionNumber": 0, - # Add progress status - "taskProgress": "fail", - "actionProgress": "fail" - } - self.services.chat.storeMessageWithDocuments(workflow, error_message, []) - - # Add error log entry - self.services.chat.storeLog(workflow, { - "message": f"Workflow failed: {str(error)}", - "type": "error", - "status": "failed", - "progress": 1.0 - }) - - raise - - async def _processFileIds(self, fileIds: List[str], messageId: str = None) -> List[ChatDocument]: - """Process file IDs from existing files and return ChatDocument objects. - - NOTE: Neutralization is NOT performed here. For dynamic workflows, neutralization - should happen AFTER content extraction (in extractContent action) to neutralize - extracted data (ContentPart.data), not ChatDocuments. This ensures neutralization - happens after extraction but before AI processing. - """ - documents = [] - - workflow = self.services.workflow - - for fileId in fileIds: - try: - # Get file info from chat service - fileInfo = self.services.chat.getFileInfo(fileId) - if not fileInfo: - logger.warning(f"No file info found for file ID {fileId}") - continue - - originalFileName = fileInfo.get("fileName", "unknown") - originalMimeType = fileInfo.get("mimeType", "application/octet-stream") - fileSizeToUse = fileInfo.get("size", 0) - - # NOTE: Neutralization removed from here - it should happen in extractContent action - # after content extraction but before AI processing (for dynamic workflows) - # This ensures we neutralize extracted data (ContentPart.data), not ChatDocuments - - # Create document with original file ID (no neutralization) - document = ChatDocument( - id=str(uuid.uuid4()), - messageId=messageId or "", - fileId=fileId, - fileName=originalFileName, - fileSize=fileSizeToUse, - mimeType=originalMimeType - ) - documents.append(document) - logger.info(f"Processed file ID {fileId} -> {document.fileName}") - except Exception as e: - errorMsg = f"Error processing file ID {fileId}: {str(e)}" - logger.error(errorMsg) - self.services.chat.storeLog(workflow, { - "message": errorMsg, - "type": "error", - "status": "error", - "progress": -1 - }) - return documents - - - def _setUserLanguage(self, language: str) -> None: - """Set user language for the service center""" - self.services.user.language = language - - async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes: - """Neutralize content if neutralization is enabled in user settings""" - try: - # Check if neutralization is enabled - config = self.services.neutralization.getConfig() - if not config or not config.enabled: - return contentBytes - - # Decode content to text for neutralization - try: - textContent = contentBytes.decode('utf-8') - except UnicodeDecodeError: - # Try alternative encodings - for enc in ['latin-1', 'cp1252', 'iso-8859-1']: - try: - textContent = contentBytes.decode(enc) - break - except UnicodeDecodeError: - continue - else: - # If unable to decode, return original bytes (binary content) - logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}") - return contentBytes - - # Neutralize the text content - # Note: The neutralization service should use names from config when processing - result = self.services.neutralization.processText(textContent) - if result and 'neutralized_text' in result: - neutralizedText = result['neutralized_text'] - # Encode back to bytes using the same encoding - try: - return neutralizedText.encode('utf-8') - except Exception as e: - logger.warning(f"Error encoding neutralized text: {str(e)}") - return contentBytes - else: - logger.warning("Neutralization did not return neutralized_text") - return contentBytes - except Exception as e: - logger.error(f"Error during content neutralization: {str(e)}") - # Return original content on error - return contentBytes - - def _checkIfHistoryAvailable(self) -> bool: - """Check if workflow history is available (previous rounds exist). - - Returns True if there are previous workflow rounds with messages. - """ - try: - from modules.workflows.processing.shared.placeholderFactory import getPreviousRoundContext - - history = getPreviousRoundContext(self.services) - - # Check if history contains actual content (not just "No previous round context available") - if history and history != "No previous round context available": - return True - - return False - except Exception as e: - logger.error(f"Error checking if history is available: {str(e)}") - return False