diff --git a/modules/aicore/aicorePluginPrivateLlm.py b/modules/aicore/aicorePluginPrivateLlm.py index 718c5905..38baa35e 100644 --- a/modules/aicore/aicorePluginPrivateLlm.py +++ b/modules/aicore/aicorePluginPrivateLlm.py @@ -7,9 +7,9 @@ Connects to the private-llm service running on-premise with Ollama backend. Provides OCR and Vision capabilities via local AI models. Models: -- poweron-ocr-general: Text extraction and OCR (deepseek backend) -- poweron-vision-general: General vision tasks (qwen2.5vl backend) -- poweron-vision-deep: Deep vision analysis (granite3.2 backend) +- poweron-text-general: Text (qwen2.5); NEUTRALIZATION_TEXT + data/plan ops +- poweron-vision-general: Vision (qwen2.5vl); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE +- poweron-vision-deep: Vision (granite3.2); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE Pricing (CHF per call): - Text models: CHF 0.010 @@ -245,6 +245,7 @@ class AiPrivateLlm(BaseConnectorAi): (OperationTypeEnum.DATA_ANALYSE, 8), (OperationTypeEnum.DATA_GENERATE, 8), (OperationTypeEnum.DATA_EXTRACT, 8), + (OperationTypeEnum.NEUTRALIZATION_TEXT, 9), ), version="qwen2.5:7b", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_TEXT_PER_CALL @@ -270,6 +271,7 @@ class AiPrivateLlm(BaseConnectorAi): processingMode=ProcessingModeEnum.ADVANCED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.IMAGE_ANALYSE, 9), + (OperationTypeEnum.NEUTRALIZATION_IMAGE, 9), ), version="qwen2.5vl:7b", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL @@ -295,6 +297,7 @@ class AiPrivateLlm(BaseConnectorAi): processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.IMAGE_ANALYSE, 9), + (OperationTypeEnum.NEUTRALIZATION_IMAGE, 9), ), version="granite3.2-vision", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index c31d5696..96e05185 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -22,6 +22,10 @@ class OperationTypeEnum(str, Enum): IMAGE_ANALYSE = "imageAnalyse" IMAGE_GENERATE = "imageGenerate" + # Neutralization (dedicated model selection; text vs vision backends) + NEUTRALIZATION_TEXT = "neutralizationText" + NEUTRALIZATION_IMAGE = "neutralizationImage" + # Web Operations WEB_SEARCH_DATA = "webSearch" # Returns list of URLs only WEB_CRAWL = "webCrawl" # Web crawl for a given URL diff --git a/modules/datamodels/datamodelBilling.py b/modules/datamodels/datamodelBilling.py index 2d3bfdb1..ccf1f4a1 100644 --- a/modules/datamodels/datamodelBilling.py +++ b/modules/datamodels/datamodelBilling.py @@ -28,6 +28,7 @@ class ReferenceTypeEnum(str, Enum): ADMIN = "ADMIN" # Admin adjustment SYSTEM = "SYSTEM" # System credit (e.g., initial credit) STORAGE = "STORAGE" # Metered storage overage (prepay pool) + SUBSCRIPTION = "SUBSCRIPTION" # AI budget credit from subscription plan class PeriodTypeEnum(str, Enum): diff --git a/modules/datamodels/datamodelKnowledge.py b/modules/datamodels/datamodelKnowledge.py index 3742a84b..7ac12c15 100644 --- a/modules/datamodels/datamodelKnowledge.py +++ b/modules/datamodels/datamodelKnowledge.py @@ -3,8 +3,10 @@ """Knowledge Store data models: FileContentIndex, ContentChunk, WorkflowMemory. These models support the 3-tier RAG architecture: -- Shared Layer: mandateId-scoped, isShared=True -- Instance Layer: userId + featureInstanceId-scoped +- Personal Layer: scope=personal, userId-scoped +- Instance Layer: scope=featureInstance, featureInstanceId-scoped +- Mandate Layer: scope=mandate, mandateId-scoped (visible to all mandate users) +- Global Layer: scope=global (sysAdmin only) - Workflow Layer: workflowId-scoped (WorkflowMemory) Vector fields use json_schema_extra={"db_type": "vector(1536)"} for pgvector. @@ -20,12 +22,11 @@ import uuid class FileContentIndex(PowerOnModel): """Structural index of a file's content objects. Created without AI. - Lives in the Instance Layer; optionally promoted to Shared Layer via isShared.""" + Scope is mirrored from FileItem (poweron_management) at indexing time.""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key (typically = fileId)") userId: str = Field(description="Owner user ID") featureInstanceId: str = Field(default="", description="Feature instance scope") mandateId: str = Field(default="", description="Mandate scope") - isShared: bool = Field(default=False, description="Visible in Shared Layer for all mandate users") fileName: str = Field(description="Original file name") mimeType: str = Field(description="MIME type of the file") containerPath: Optional[str] = Field(default=None, description="Path within a container (e.g. 'archive.zip/folder/report.pdf')") @@ -57,7 +58,6 @@ registerModelLabels( "userId": {"en": "User ID", "fr": "ID utilisateur"}, "featureInstanceId": {"en": "Feature Instance ID", "fr": "ID de l'instance"}, "mandateId": {"en": "Mandate ID", "fr": "ID du mandat"}, - "isShared": {"en": "Shared", "fr": "Partagé"}, "fileName": {"en": "File Name", "fr": "Nom de fichier"}, "mimeType": {"en": "MIME Type", "fr": "Type MIME"}, "containerPath": {"en": "Container Path", "fr": "Chemin du conteneur"}, diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py index 741ce3d5..e0c4f13c 100644 --- a/modules/datamodels/datamodelUam.py +++ b/modules/datamodels/datamodelUam.py @@ -10,7 +10,7 @@ Multi-Tenant Design: """ import uuid -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Any from enum import Enum from pydantic import BaseModel, Field, EmailStr, field_validator, computed_field from modules.datamodels.datamodelBase import PowerOnModel @@ -303,6 +303,33 @@ registerModelLabels( ) +def _normalizeTtsVoiceMap(value: Any) -> Optional[Dict[str, str]]: + """ + Coerce ttsVoiceMap payloads to Dict[str, str]. + + UI/clients may send per-locale objects like {"voiceName": "de-DE-Chirp3-HD-Achird"}; + storage and model field type are locale -> voice id string. + """ + if value is None: + return None + if not isinstance(value, dict): + return None + out: Dict[str, str] = {} + for rawKey, rawVal in value.items(): + key = str(rawKey) + if rawVal is None: + continue + if isinstance(rawVal, str): + out[key] = rawVal + elif isinstance(rawVal, dict): + vn = rawVal.get("voiceName") + if vn is not None and str(vn).strip() != "": + out[key] = str(vn).strip() + else: + out[key] = str(rawVal) + return out if out else None + + class UserVoicePreferences(PowerOnModel): """User-level voice/language preferences, shared across all features.""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") @@ -315,6 +342,11 @@ class UserVoicePreferences(PowerOnModel): translationSourceLanguage: Optional[str] = Field(default=None, description="Source language for translations") translationTargetLanguage: Optional[str] = Field(default=None, description="Target language for translations") + @field_validator("ttsVoiceMap", mode="before") + @classmethod + def _validateTtsVoiceMap(cls, value: Any) -> Optional[Dict[str, str]]: + return _normalizeTtsVoiceMap(value) + registerModelLabels( "UserVoicePreferences", diff --git a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py index c803b375..e583c60b 100644 --- a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py +++ b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py @@ -203,6 +203,89 @@ class NeutralizationService: 'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)} } + async def processImageAsync(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]: + """Analyze image via internal vision model to check for sensitive content. + + Returns dict with: + - 'status': 'ok' | 'blocked' | 'error' + - 'hasSensitiveContent': bool + - 'analysis': str (model's analysis text, if available) + - 'processed_info': dict with details + + Uses NEUTRALIZATION_IMAGE operation type → only internal Private-LLM models. + If no internal model available → returns 'blocked'. + """ + import base64 + try: + aiService = None + if self._getService: + try: + aiService = self._getService("ai") + except Exception: + pass + if not aiService or not hasattr(aiService, 'callAi'): + logger.warning(f"processImage: AI service not available — blocking image '{fileName}'") + return { + 'status': 'blocked', + 'hasSensitiveContent': True, + 'analysis': '', + 'processed_info': {'type': 'image', 'status': 'blocked', 'reason': 'AI service unavailable'} + } + + from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum + + _b64Data = base64.b64encode(imageBytes).decode('utf-8') + _dataUrl = f"data:{mimeType};base64,{_b64Data}" + + _prompt = ( + "Analyze this image for personally identifiable information (PII). " + "Check for: names, addresses, phone numbers, email addresses, ID numbers, " + "faces, signatures, handwritten text, license plates, financial data. " + "Respond with JSON: {\"hasPII\": true/false, \"findings\": [\"...\"]}" + ) + + _request = AiCallRequest( + prompt=_prompt, + options=AiCallOptions(operationType=OperationTypeEnum.NEUTRALIZATION_IMAGE), + messages=[{"role": "user", "content": [ + {"type": "text", "text": _prompt}, + {"type": "image_url", "image_url": {"url": _dataUrl}}, + ]}], + ) + + _response = await aiService.callAi(_request) + + _hasPII = False + _analysis = _response.content if _response and hasattr(_response, 'content') else '' + if _analysis: + _lowerAnalysis = _analysis.lower() + if '"haspii": true' in _lowerAnalysis or '"haspii":true' in _lowerAnalysis: + _hasPII = True + + return { + 'status': 'blocked' if _hasPII else 'ok', + 'hasSensitiveContent': _hasPII, + 'analysis': _analysis, + 'processed_info': {'type': 'image', 'status': 'blocked' if _hasPII else 'ok', 'fileName': fileName} + } + except Exception as e: + logger.error(f"processImage failed for '{fileName}': {e}") + return { + 'status': 'blocked', + 'hasSensitiveContent': True, + 'analysis': '', + 'processed_info': {'type': 'image', 'status': 'error', 'error': str(e)} + } + + def processImage(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]: + """Sync wrapper for processImageAsync. Uses asyncio.run when no event loop is running.""" + import asyncio + try: + return asyncio.run(self.processImageAsync(imageBytes, fileName, mimeType)) + except RuntimeError: + loop = asyncio.get_event_loop() + return loop.run_until_complete(self.processImageAsync(imageBytes, fileName, mimeType)) + def resolveText(self, text: str) -> str: if not self.interfaceNeutralizer: return text @@ -295,9 +378,21 @@ class NeutralizationService: p = part if isinstance(part, dict) else part.model_dump() if hasattr(part, 'model_dump') else part type_group = p.get('typeGroup', '') data = p.get('data', '') - if type_group in ('binary', 'image') or not (data and str(data).strip()): + if type_group == 'binary' or not (data and str(data).strip()): neutralized_parts.append(part) continue + if type_group == 'image': + import base64 as _b64img + try: + _imgBytes = _b64img.b64decode(str(data)) + _imgResult = await self.processImageAsync(_imgBytes, fileName) + if _imgResult.get("status") == "ok": + neutralized_parts.append(part) + else: + logger.warning(f"Image part blocked in binary file '{fileName}' (PII detected), removing") + except Exception as _imgErr: + logger.warning(f"Image check failed in binary file '{fileName}': {_imgErr}, removing (fail-safe)") + continue nr = self._neutralizeText(str(data), 'text' if type_group != 'table' else 'csv') proc = nr.get('processed_info', {}) or {} if isinstance(proc, dict) and proc.get('type') == 'error': diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index 7eccb3ee..93b17d6a 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -201,6 +201,13 @@ def initBootstrap(db: DatabaseConnector) -> None: except Exception as e: logger.error(f"Voice & documents migration failed: {e}") + # Backfill FileContentIndex scope fields from FileItem (one-time) + try: + from modules.migration.migrateRagScopeFields import runMigration as migrateRagScope + migrateRagScope(appDb=db) + except Exception as e: + logger.error(f"RAG scope fields migration failed: {e}") + # After migration: root mandate is purely technical — no feature instances if not migrationDone and mandateId: initRootMandateFeatures(db, mandateId) diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index 13179634..ffde890f 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -1931,14 +1931,26 @@ class AppObjects: raise logger.debug(f"Subscription capacity check skipped: {e}") - def _syncSubscriptionQuantity(self, mandateId: str) -> None: - """Sync Stripe subscription quantities after a resource mutation.""" + def _syncSubscriptionQuantity(self, mandateId: str, *, raiseOnError: bool = False) -> None: + """Sync Stripe subscription quantities after a resource mutation. + + Args: + raiseOnError: If True, propagate errors (billing-critical paths). + """ try: from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface from modules.security.rootAccess import getRootUser subIf = getSubInterface(getRootUser(), mandateId) - subIf.syncQuantityToStripe(mandateId) + operative = subIf.getOperativeForMandate(mandateId) + if not operative: + if raiseOnError: + raise ValueError(f"Kein operatives Abonnement für Mandant {mandateId}") + logger.debug("No operative subscription for mandate %s — quantity sync skipped", mandateId) + return + subIf.syncQuantityToStripe(operative["id"], raiseOnError=raiseOnError) except Exception as e: + if raiseOnError: + raise logger.debug(f"Subscription quantity sync skipped: {e}") def deleteUserMandate(self, userId: str, mandateId: str) -> bool: diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py index 1069314f..bb2dc5c9 100644 --- a/modules/interfaces/interfaceDbBilling.py +++ b/modules/interfaces/interfaceDbBilling.py @@ -970,6 +970,41 @@ class BillingObjects: ) return created + # ========================================================================= + # Subscription AI-Budget Credit + # ========================================================================= + + def creditSubscriptionBudget(self, mandateId: str, planKey: str, periodLabel: str = "") -> Optional[Dict[str, Any]]: + """Credit the plan's budgetAiCHF to the mandate pool account. + + Should be called once per billing period (initial activation + each invoice.paid). + Returns the created CREDIT transaction or None if budget is 0.""" + from modules.datamodels.datamodelSubscription import _getPlan + + plan = _getPlan(planKey) + if not plan or not plan.budgetAiCHF or plan.budgetAiCHF <= 0: + return None + + poolAccount = self.getOrCreateMandateAccount(mandateId) + description = f"AI-Budget ({planKey})" + if periodLabel: + description += f" – {periodLabel}" + + transaction = BillingTransaction( + accountId=poolAccount["id"], + transactionType=TransactionTypeEnum.CREDIT, + amount=plan.budgetAiCHF, + description=description, + referenceType=ReferenceTypeEnum.SUBSCRIPTION, + referenceId=mandateId, + ) + created = self.createTransaction(transaction) + logger.info( + "AI-Budget credited mandate=%s plan=%s amount=%.2f CHF", + mandateId, planKey, plan.budgetAiCHF, + ) + return created + # ========================================================================= # Workflow Cost Query # ========================================================================= diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py index c7b9e29a..ede37c87 100644 --- a/modules/interfaces/interfaceDbKnowledge.py +++ b/modules/interfaces/interfaceDbKnowledge.py @@ -294,7 +294,6 @@ class KnowledgeObjects: userId: str = None, featureInstanceId: str = None, mandateId: str = None, - isShared: bool = None, scope: str = None, limit: int = 10, minScore: float = None, @@ -305,10 +304,9 @@ class KnowledgeObjects: Args: queryVector: Query embedding vector. - userId: Filter by user (Instance Layer). + userId: Filter by user (personal scope). featureInstanceId: Filter by feature instance. - mandateId: Filter by mandate (for Shared Layer lookups). - isShared: If True, search Shared Layer via FileContentIndex join. + mandateId: Filter by mandate (scope=mandate means visible to all mandate users). scope: If provided, filter by this specific scope value. If not provided, use scope-union approach (personal + featureInstance + mandate + global). limit: Max results. @@ -323,8 +321,13 @@ class KnowledgeObjects: recordFilter["contentType"] = contentType if scope: + scopeFilter: Dict[str, Any] = {"scope": scope} + if mandateId: + scopeFilter["mandateId"] = mandateId + if featureInstanceId: + scopeFilter["featureInstanceId"] = featureInstanceId scopedFileIds = self.db.getRecordset( - FileContentIndex, recordFilter={"scope": scope} + FileContentIndex, recordFilter=scopeFilter ) fileIds = [ idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) @@ -334,16 +337,6 @@ class KnowledgeObjects: if not fileIds: return [] recordFilter["fileId"] = fileIds - elif isShared and mandateId: - sharedIndexes = self.db.getRecordset( - FileContentIndex, - recordFilter={"mandateId": mandateId, "isShared": True}, - ) - sharedFileIds = [idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) for idx in sharedIndexes] - sharedFileIds = [fid for fid in sharedFileIds if fid] - if not sharedFileIds: - return [] - recordFilter["fileId"] = sharedFileIds elif userId or featureInstanceId or mandateId: scopedFileIds = self._getScopedFileIds( userId=userId, @@ -410,7 +403,7 @@ class KnowledgeObjects: if mandateId: files_shared = self.db.getRecordset( FileContentIndex, - recordFilter={"mandateId": mandateId, "isShared": True}, + recordFilter={"mandateId": mandateId, "scope": "mandate"}, ) by_id: Dict[str, Dict[str, Any]] = {} @@ -559,6 +552,76 @@ class KnowledgeObjects: } +def aggregateMandateRagTotalBytes(mandateId: str) -> int: + """Sum FileContentIndex.totalSize for a mandate. + + Primary strategy (relies on correct scope fields on FileContentIndex): + 1. FileContentIndex rows with mandateId on the index + 2. FileContentIndex rows with featureInstanceId of any mandate FeatureInstance + Deduplicates by id. + """ + if not mandateId: + return 0 + from modules.datamodels.datamodelFeatures import FeatureInstance + from modules.interfaces.interfaceDbApp import getRootInterface + + knowDb = getInterface(None).db + appDb = getRootInterface().db + byId: Dict[str, Dict[str, Any]] = {} + + for row in knowDb.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId}): + rid = row.get("id") + if rid: + byId[str(rid)] = row + + instances = appDb.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) + instIds = [str(inst.get("id", "")) for inst in instances if inst.get("id")] + + for instId in instIds: + for row in knowDb.getRecordset(FileContentIndex, recordFilter={"featureInstanceId": instId}): + rid = row.get("id") + if rid and str(rid) not in byId: + byId[str(rid)] = row + + # DEPRECATED: file-ID-correlation fallback from poweron_management. + # Only needed for pre-migration data where mandateId/featureInstanceId on the + # FileContentIndex are empty. Remove once migrateRagScopeFields has been run. + _fallbackCount = 0 + try: + from modules.datamodels.datamodelFiles import FileItem + from modules.interfaces.interfaceDbManagement import ComponentObjects + mgmtDb = ComponentObjects().db + knowledgeIf = getInterface(None) + + fileIds: set = set() + for f in mgmtDb.getRecordset(FileItem, recordFilter={"mandateId": mandateId}): + fid = f.get("id") if isinstance(f, dict) else getattr(f, "id", None) + if fid: + fileIds.add(str(fid)) + for instId in instIds: + for f in mgmtDb.getRecordset(FileItem, recordFilter={"featureInstanceId": instId}): + fid = f.get("id") if isinstance(f, dict) else getattr(f, "id", None) + if fid: + fileIds.add(str(fid)) + + for fid in fileIds: + if fid in byId: + continue + row = knowledgeIf.getFileContentIndex(fid) + if row: + byId[fid] = row + _fallbackCount += 1 + except Exception as e: + logger.warning("aggregateMandateRagTotalBytes fallback failed: %s", e) + + total = sum(int(r.get("totalSize") or 0) for r in byId.values()) + logger.info( + "aggregateMandateRagTotalBytes(%s): %d indexes, %d bytes (fallback: %d)", + mandateId, len(byId), total, _fallbackCount, + ) + return total + + def getInterface(currentUser: Optional[User] = None) -> KnowledgeObjects: """Get or create a KnowledgeObjects singleton.""" if "default" not in _instances: diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py index 28842958..0a16b734 100644 --- a/modules/interfaces/interfaceDbManagement.py +++ b/modules/interfaces/interfaceDbManagement.py @@ -1053,15 +1053,20 @@ class ComponentObjects: # Ensure fileName is unique uniqueName = self._generateUniquefileName(name) - # Use mandateId and featureInstanceId from context for proper data isolation - # Convert None to empty string to satisfy Pydantic validation mandateId = self.mandateId or "" featureInstanceId = self.featureInstanceId or "" - - # Create FileItem instance + + if featureInstanceId: + scope = "featureInstance" + elif mandateId: + scope = "mandate" + else: + scope = "personal" + fileItem = FileItem( mandateId=mandateId, featureInstanceId=featureInstanceId, + scope=scope, fileName=uniqueName, mimeType=mimeType, fileSize=fileSize, diff --git a/modules/interfaces/interfaceDbSubscription.py b/modules/interfaces/interfaceDbSubscription.py index d6832f14..f1d7ccf7 100644 --- a/modules/interfaces/interfaceDbSubscription.py +++ b/modules/interfaces/interfaceDbSubscription.py @@ -309,13 +309,11 @@ class SubscriptionObjects: return self._getMandateDataVolumeMB(mandateId) def _getMandateDataVolumeMB(self, mandateId: str) -> float: - """Sum RAG index size (FileContentIndex.totalSize) across all feature instances of the mandate.""" + """Sum RAG index size (FileContentIndex.totalSize) for the mandate; reads poweron_knowledge.""" try: - from modules.datamodels.datamodelKnowledge import FileContentIndex - knowledgeDb = _getAppDatabaseConnector() - indexes = knowledgeDb.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId}) - totalBytes = sum(int(idx.get("totalSize") or 0) for idx in indexes) - return totalBytes / (1024 * 1024) + from modules.interfaces.interfaceDbKnowledge import aggregateMandateRagTotalBytes + + return aggregateMandateRagTotalBytes(mandateId) / (1024 * 1024) except Exception: return 0.0 @@ -359,11 +357,18 @@ class SubscriptionObjects: # Stripe quantity sync # ========================================================================= - def syncQuantityToStripe(self, subscriptionId: str) -> None: + def syncQuantityToStripe(self, subscriptionId: str, *, raiseOnError: bool = False) -> None: """Update Stripe subscription item quantities to match actual active counts. - Takes subscriptionId, not mandateId.""" + Takes subscriptionId, not mandateId. + + Args: + raiseOnError: If True, propagate Stripe API errors instead of logging them. + Use True for billing-critical paths (store activation). + """ sub = self.getById(subscriptionId) if not sub or not sub.get("stripeSubscriptionId"): + if raiseOnError: + raise ValueError(f"Subscription {subscriptionId} hat keine Stripe-Anbindung — Abrechnung nicht möglich.") return mandateId = sub["mandateId"] @@ -389,3 +394,5 @@ class SubscriptionObjects: logger.info("Stripe quantity synced for sub %s: users=%d, instances=%d", subscriptionId, activeUsers, activeInstances) except Exception as e: logger.error("syncQuantityToStripe(%s) failed: %s", subscriptionId, e) + if raiseOnError: + raise diff --git a/modules/migration/migrateRagScopeFields.py b/modules/migration/migrateRagScopeFields.py new file mode 100644 index 00000000..82e0e3fb --- /dev/null +++ b/modules/migration/migrateRagScopeFields.py @@ -0,0 +1,114 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Migration: Backfill FileContentIndex scope fields from FileItem (Single Source of Truth). + +Fixes legacy rows in poweron_knowledge where scope/mandateId/featureInstanceId +are empty or default ("personal") despite the corresponding FileItem having correct values. + +Idempotent — safe to run multiple times. Uses a DB flag to skip if already completed. +""" + +import logging +from modules.shared.configuration import APP_CONFIG +from modules.connectors.connectorDbPostgre import _get_cached_connector + +logger = logging.getLogger(__name__) + +_MIGRATION_FLAG_KEY = "migration_rag_scope_fields_completed" + + +def _isMigrationCompleted(appDb) -> bool: + try: + from modules.datamodels.datamodelUam import Mandate + records = appDb.getRecordset(Mandate, recordFilter={"name": _MIGRATION_FLAG_KEY}) + return len(records) > 0 + except Exception: + return False + + +def _setMigrationCompleted(appDb) -> None: + try: + from modules.datamodels.datamodelUam import Mandate + flag = Mandate(name=_MIGRATION_FLAG_KEY, description="RAG scope fields migration completed") + appDb.recordCreate(Mandate, flag) + except Exception as e: + logger.error("Could not set migration flag: %s", e) + + +def runMigration(appDb=None) -> dict: + """Backfill FileContentIndex rows from FileItem metadata. + + Returns dict with counts: {total, updated, skipped, orphaned}. + """ + from modules.datamodels.datamodelKnowledge import FileContentIndex + from modules.datamodels.datamodelFiles import FileItem + from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface + from modules.interfaces.interfaceDbManagement import ComponentObjects + + if appDb is None: + from modules.interfaces.interfaceDbApp import getRootInterface + appDb = getRootInterface().db + + if _isMigrationCompleted(appDb): + logger.info("migrateRagScopeFields: already completed, skipping") + return {"total": 0, "updated": 0, "skipped": 0, "orphaned": 0} + + knowDb = getKnowledgeInterface(None).db + mgmtDb = ComponentObjects().db + + allIndexes = knowDb.getRecordset(FileContentIndex, recordFilter={}) + total = len(allIndexes) + updated = 0 + skipped = 0 + orphaned = 0 + + logger.info("migrateRagScopeFields: processing %d FileContentIndex rows", total) + + for idx in allIndexes: + idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) + if not idxId: + skipped += 1 + continue + + fileItem = mgmtDb._loadRecord(FileItem, str(idxId)) + if not fileItem: + orphaned += 1 + continue + + _get = (lambda k, d="": fileItem.get(k, d)) if isinstance(fileItem, dict) else (lambda k, d="": getattr(fileItem, k, d)) + + fiScope = _get("scope") or "personal" + fiMandateId = str(_get("mandateId") or "") + fiFeatureInstanceId = str(_get("featureInstanceId") or "") + + idxGet = (lambda k, d="": idx.get(k, d)) if isinstance(idx, dict) else (lambda k, d="": getattr(idx, k, d)) + currentScope = idxGet("scope") or "personal" + currentMandateId = str(idxGet("mandateId") or "") + currentFeatureInstanceId = str(idxGet("featureInstanceId") or "") + + updates = {} + if fiScope != currentScope: + updates["scope"] = fiScope + if fiMandateId and fiMandateId != currentMandateId: + updates["mandateId"] = fiMandateId + if fiFeatureInstanceId and fiFeatureInstanceId != currentFeatureInstanceId: + updates["featureInstanceId"] = fiFeatureInstanceId + + if updates: + try: + knowDb.recordModify(FileContentIndex, str(idxId), updates) + updated += 1 + logger.debug("migrateRagScopeFields: updated %s -> %s", idxId, updates) + except Exception as e: + logger.error("migrateRagScopeFields: failed to update %s: %s", idxId, e) + skipped += 1 + else: + skipped += 1 + + _setMigrationCompleted(appDb) + logger.info( + "migrateRagScopeFields complete: total=%d, updated=%d, skipped=%d, orphaned=%d", + total, updated, skipped, orphaned, + ) + return {"total": total, "updated": updated, "skipped": skipped, "orphaned": orphaned} diff --git a/modules/routes/routeAdminFeatures.py b/modules/routes/routeAdminFeatures.py index 12206b06..e69df7b9 100644 --- a/modules/routes/routeAdminFeatures.py +++ b/modules/routes/routeAdminFeatures.py @@ -576,14 +576,15 @@ def create_feature_instance( config=data.config ) - # Sync Stripe quantity after successful creation try: from modules.interfaces.interfaceDbSubscription import getInterface as _getSubIf2 from modules.security.rootAccess import getRootUser as _getRU _subIf2 = _getSubIf2(_getRU(), mandateIdStr) - _subIf2.syncQuantityToStripe(mandateIdStr) - except Exception: - pass + _operative = _subIf2.getOperativeForMandate(mandateIdStr) + if _operative: + _subIf2.syncQuantityToStripe(_operative["id"], raiseOnError=True) + except Exception as e: + logger.error("Stripe quantity sync failed for admin feature creation in mandate %s: %s", mandateIdStr, e) logger.info( f"User {context.user.id} created feature instance '{data.label}' " diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py index 13e94559..0f612d45 100644 --- a/modules/routes/routeBilling.py +++ b/modules/routes/routeBilling.py @@ -1104,6 +1104,12 @@ def _handleSubscriptionCheckoutCompleted(session, eventId: str) -> None: updatedSub = subInterface.getById(subscriptionRecordId) _notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=updatedSub, platformUrl=platformUrl) + try: + billingIf = _getRootInterface() + billingIf.creditSubscriptionBudget(mandateId, planKey, periodLabel="Erstaktivierung") + except Exception as ex: + logger.error("creditSubscriptionBudget on activation failed: %s", ex) + logger.info( "Checkout completed: sub=%s -> %s, mandate=%s, plan=%s", subscriptionRecordId, toStatus.value, mandateId, planKey, @@ -1162,9 +1168,14 @@ def _handleSubscriptionWebhook(event) -> None: if stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.SCHEDULED: subInterface.transitionStatus(subId, SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.ACTIVE) subService.invalidateCache(mandateId) - plan = _getPlan(sub.get("planKey", "")) + planKey = sub.get("planKey", "") + plan = _getPlan(planKey) refreshedSub = subInterface.getById(subId) _notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=refreshedSub, platformUrl=webhookPlatformUrl) + try: + _getRootInterface().creditSubscriptionBudget(mandateId, planKey, periodLabel="Erstaktivierung") + except Exception as ex: + logger.error("creditSubscriptionBudget SCHEDULED->ACTIVE failed: %s", ex) logger.info("SCHEDULED -> ACTIVE for sub %s (mandate %s)", subId, mandateId) elif stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.PAST_DUE: @@ -1231,14 +1242,24 @@ def _handleSubscriptionWebhook(event) -> None: elif event.type == "invoice.paid": period_ts = obj.get("period_start") + periodLabel = "" if period_ts: period_start_at = datetime.fromtimestamp(int(period_ts), tz=timezone.utc) + periodLabel = period_start_at.strftime("%Y-%m-%d") try: billing_if = _getRootInterface() billing_if.resetStorageBillingPeriod(mandateId, period_start_at) billing_if.reconcileMandateStorageBilling(mandateId) except Exception as ex: logger.error("Storage billing on invoice.paid failed: %s", ex) + + planKey = sub.get("planKey", "") + try: + billing_if = _getRootInterface() + billing_if.creditSubscriptionBudget(mandateId, planKey, periodLabel=periodLabel or "Periodenverlängerung") + except Exception as ex: + logger.error("creditSubscriptionBudget on invoice.paid failed: %s", ex) + logger.info("Invoice paid for sub %s (mandate %s)", subId, mandateId) return None diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index e95da174..17e0ef56 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -1,6 +1,6 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. -from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, Path, Request, status, Query, Response, Body +from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, Path, Request, status, Query, Response, Body, BackgroundTasks from fastapi.responses import JSONResponse from typing import List, Dict, Any, Optional import logging @@ -41,13 +41,16 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): file_meta = mgmtInterface.getFile(fileId) feature_instance_id = "" mandate_id = "" + file_scope = "personal" if file_meta: if isinstance(file_meta, dict): feature_instance_id = file_meta.get("featureInstanceId") or "" mandate_id = file_meta.get("mandateId") or "" + file_scope = file_meta.get("scope") or "personal" else: feature_instance_id = getattr(file_meta, "featureInstanceId", None) or "" mandate_id = getattr(file_meta, "mandateId", None) or "" + file_scope = getattr(file_meta, "scope", None) or "personal" logger.info(f"Auto-index starting for {fileName} ({len(rawBytes)} bytes, {mimeType})") @@ -61,6 +64,7 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user): userId=userId, featureInstanceId=str(feature_instance_id) if feature_instance_id else "", mandateId=str(mandate_id) if mandate_id else "", + scope=file_scope, ) logger.info( f"Pre-scan complete for {fileName}: " @@ -667,6 +671,7 @@ def batch_move_items( @limiter.limit("30/minute") def updateFileScope( request: Request, + background_tasks: BackgroundTasks, fileId: str = Path(..., description="ID of the file"), scope: str = Body(..., embed=True), context: RequestContext = Depends(getRequestContext), @@ -700,19 +705,18 @@ def updateFileScope( except Exception as e: logger.warning(f"Failed to update FileContentIndex scope for file {fileId}: {e}") - # Trigger re-indexing so RAG embeddings metadata reflects the new scope - try: - fileMeta = managementInterface.getFile(fileId) - if fileMeta: - import asyncio - asyncio.ensure_future(_autoIndexFile( - fileId=fileId, - fileName=fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", ""), - mimeType=fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", ""), - user=context.user, - )) - except Exception as e: - logger.warning(f"Failed to trigger re-index after scope change for file {fileId}: {e}") + fileMeta = managementInterface.getFile(fileId) + if fileMeta: + fn = fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", "") + mt = fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", "") + + async def _runReindexAfterScopeChange(): + try: + await _autoIndexFile(fileId=fileId, fileName=fn, mimeType=mt, user=context.user) + except Exception as ex: + logger.warning("Re-index after scope change failed for %s: %s", fileId, ex) + + background_tasks.add_task(_runReindexAfterScopeChange) return {"fileId": fileId, "scope": scope, "updated": True} except HTTPException: @@ -726,11 +730,18 @@ def updateFileScope( @limiter.limit("30/minute") def updateFileNeutralize( request: Request, + background_tasks: BackgroundTasks, fileId: str = Path(..., description="ID of the file"), neutralize: bool = Body(..., embed=True), context: RequestContext = Depends(getRequestContext), ) -> Dict[str, Any]: - """Toggle neutralization flag on a file.""" + """Toggle neutralization flag on a file. + + FAILSAFE: When turning neutralize ON, the existing Knowledge Store index + and all content chunks are deleted SYNCHRONOUSLY before the response is + returned. The re-index happens in a background task. If re-indexing + fails the file simply has no index — no un-neutralized data can leak. + """ try: managementInterface = interfaceDbManagement.getInterface( context.user, @@ -740,35 +751,54 @@ def updateFileNeutralize( managementInterface.updateFile(fileId, {"neutralize": neutralize}) - # Update FileContentIndex neutralization metadata - try: - from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface - from modules.datamodels.datamodelKnowledge import FileContentIndex - knowledgeDb = getKnowledgeInterface() - neutralizationStatus = "neutralized" if neutralize else "original" - indices = knowledgeDb.db.getRecordset(FileContentIndex, recordFilter={"id": fileId}) - for idx in indices: - idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) - if idxId: - knowledgeDb.db.recordModify(FileContentIndex, idxId, {"neutralizationStatus": neutralizationStatus}) - except Exception as e: - logger.warning(f"Failed to update FileContentIndex neutralize for file {fileId}: {e}") + from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface + knowledgeDb = getKnowledgeInterface() - # Trigger re-indexing so content is re-processed with/without neutralization - try: - fileMeta = managementInterface.getFile(fileId) - if fileMeta: - import asyncio - asyncio.ensure_future(_autoIndexFile( - fileId=fileId, - fileName=fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", ""), - mimeType=fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", ""), - user=context.user, - )) - except Exception as e: - logger.warning(f"Failed to trigger re-index after neutralize change for file {fileId}: {e}") + if neutralize: + # ── CRITICAL: purge existing (potentially un-neutralized) index + # This MUST succeed before the response is sent so that no stale + # raw-text chunks remain searchable while re-indexing runs. + try: + knowledgeDb.deleteFileContentIndex(fileId) + logger.info("Neutralize toggle ON: deleted index + chunks for file %s", fileId) + except Exception as e: + logger.error("Neutralize toggle ON: FAILED to delete index for file %s: %s", fileId, e) + raise HTTPException( + status_code=500, + detail=f"Could not purge existing index for neutralization — aborting toggle. Error: {e}", + ) + else: + # Turning neutralize OFF: update metadata only; re-index will overwrite + try: + from modules.datamodels.datamodelKnowledge import FileContentIndex + indices = knowledgeDb.db.getRecordset(FileContentIndex, recordFilter={"id": fileId}) + for idx in indices: + idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) + if idxId: + knowledgeDb.db.recordModify(FileContentIndex, idxId, { + "neutralizationStatus": "original", + "isNeutralized": False, + }) + except Exception as e: + logger.warning("Failed to update FileContentIndex after neutralize-OFF for %s: %s", fileId, e) + + # Background re-index (safe: if it fails, there is simply no index) + fileMeta = managementInterface.getFile(fileId) + if fileMeta: + fn = fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", "") + mt = fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", "") + + async def _runReindexAfterNeutralizeToggle(): + try: + await _autoIndexFile(fileId=fileId, fileName=fn, mimeType=mt, user=context.user) + except Exception as ex: + logger.error("Re-index after neutralize toggle failed for %s: %s (file has NO index until next re-index)", fileId, ex) + + background_tasks.add_task(_runReindexAfterNeutralizeToggle) return {"fileId": fileId, "neutralize": neutralize, "updated": True} + except HTTPException: + raise except Exception as e: logger.error(f"Error updating file neutralize flag: {e}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/modules/routes/routeStore.py b/modules/routes/routeStore.py index c9512d3f..5c6f782a 100644 --- a/modules/routes/routeStore.py +++ b/modules/routes/routeStore.py @@ -282,8 +282,9 @@ def activateStoreFeature( context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ - Activate a store feature. Creates a new FeatureInstance in the target mandate. - If user has no admin mandate, auto-creates a personal mandate. + Activate a store feature. Billing-gated: a feature instance is ONLY created + if the Stripe subscription quantity update succeeds (proration confirmed). + On any billing failure the provisioned instance is rolled back. """ featureCode = data.featureCode userId = str(context.user.id) @@ -302,21 +303,39 @@ def activateStoreFeature( if not _isUserAdminInMandate(db, userId, mandateId): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not admin in target mandate") - # Check subscription capacity - from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS - subs = db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId}) - if subs: - sub = subs[0] - plan = BUILTIN_PLANS.get(sub.get("planKey")) - if plan and plan.maxFeatureInstances is not None: - currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) - if len(currentInstances) >= plan.maxFeatureInstances: - raise HTTPException( - status_code=status.HTTP_402_PAYMENT_REQUIRED, - detail=f"Feature instance limit reached ({plan.maxFeatureInstances}). Upgrade your plan." - ) + # ── 1. Resolve subscription & plan ────────────────────────────── + from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS, SubscriptionStatusEnum + from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRoot - # Create new FeatureInstance + subInterface = _getSubRoot() + operative = subInterface.getOperativeForMandate(mandateId) + if not operative: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail="Kein aktives Abonnement. Bitte zuerst ein Abo abschliessen.", + ) + + planKey = operative.get("planKey", "") + plan = BUILTIN_PLANS.get(planKey) + isBillable = plan is not None and (plan.pricePerFeatureInstanceCHF or 0) > 0 + + if isBillable: + if not operative.get("stripeSubscriptionId") or not operative.get("stripeItemIdInstances"): + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail="Stripe-Abonnement ist nicht vollständig eingerichtet — Aktivierung nicht möglich.", + ) + + # ── 2. Capacity check ─────────────────────────────────────────── + if plan and plan.maxFeatureInstances is not None: + currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) + if len(currentInstances) >= plan.maxFeatureInstances: + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Feature-Instanz-Limit erreicht ({plan.maxFeatureInstances}). Bitte Plan upgraden.", + ) + + # ── 3. Provision instance ─────────────────────────────────────── featureInterface = getFeatureInterface(db) featureLabel = featureDef.get("label", {}).get("en", featureCode) instance = featureInterface.createFeatureInstance( @@ -332,7 +351,6 @@ def activateStoreFeature( instanceId = instance.get("id") if isinstance(instance, dict) else instance.id - # Grant FeatureAccess with admin role — MUST be feature-specific (e.g. workspace-admin) instanceRoles = db.getRecordset(Role, recordFilter={"featureInstanceId": instanceId}) adminRoleId = None for ir in instanceRoles: @@ -342,21 +360,34 @@ def activateStoreFeature( break if not adminRoleId: + _rollbackInstance(db, instanceId) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"No feature-specific admin role (e.g. {featureCode}-admin) found for instance {instanceId}. " - f"Template roles were not correctly copied.", + detail=f"Keine Feature-Admin-Rolle für {featureCode} gefunden — Rollback.", ) rootInterface.createFeatureAccess(userId, instanceId, roleIds=[adminRoleId]) - # Sync subscription quantity - try: - rootInterface._syncSubscriptionQuantity(mandateId) - except Exception as e: - logger.warning(f"Failed to sync subscription quantity: {e}") + # ── 4. Billing gate: Stripe quantity sync (MUST succeed) ──────── + if isBillable: + try: + rootInterface._syncSubscriptionQuantity(mandateId, raiseOnError=True) + except Exception as e: + logger.error("Stripe billing for feature activation failed — rolling back instance %s: %s", instanceId, e) + _rollbackInstance(db, instanceId, userId=userId) + raise HTTPException( + status_code=status.HTTP_402_PAYMENT_REQUIRED, + detail=f"Stripe-Abrechnung fehlgeschlagen: {e}. Feature wurde NICHT aktiviert.", + ) + else: + try: + rootInterface._syncSubscriptionQuantity(mandateId) + except Exception as e: + logger.warning("Non-critical Stripe sync failed for free feature: %s", e) - logger.info(f"User {userId} activated '{featureCode}' in mandate {mandateId} (instance={instanceId})") + # ── 5. Confirmed — notify ────────────────────────────────────── + _notifyFeatureActivation(mandateId, featureLabel, featureCode, sub=operative, plan=plan) + logger.info("User %s activated '%s' in mandate %s (instance=%s, billed=%s)", userId, featureCode, mandateId, instanceId, isBillable) return { "featureCode": featureCode, @@ -412,11 +443,10 @@ def deactivateStoreFeature( instanceDeleted = True logger.info(f"Orphan Control: deleted instance {instanceId} (no remaining accesses)") - # Sync subscription quantity try: - rootInterface._syncSubscriptionQuantity(mandateId) + rootInterface._syncSubscriptionQuantity(mandateId, raiseOnError=True) except Exception as e: - logger.warning(f"Failed to sync subscription quantity: {e}") + logger.error("Stripe quantity sync after deactivation failed for mandate %s: %s", mandateId, e) logger.info(f"User {userId} deactivated instance {instanceId} in mandate {mandateId} (deleted={instanceDeleted})") @@ -433,3 +463,52 @@ def deactivateStoreFeature( except Exception as e: logger.error(f"Error deactivating store feature: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + + +# ============================================================================ +# Internal helpers +# ============================================================================ + +def _rollbackInstance(db, instanceId: str, userId: str = None) -> None: + """Delete a freshly provisioned FeatureInstance (and its access) on billing failure.""" + try: + if userId: + accesses = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instanceId}) + for a in accesses: + db.recordDelete(FeatureAccess, a.get("id")) + db.recordDelete(FeatureInstance, instanceId) + logger.info("Rolled back feature instance %s (billing gate)", instanceId) + except Exception as e: + logger.error("Rollback of instance %s failed: %s", instanceId, e) + + +def _notifyFeatureActivation( + mandateId: str, + featureLabel: str, + featureCode: str, + sub: dict = None, + plan = None, +) -> None: + """Send email notification to mandate admins about a newly activated feature.""" + try: + from modules.shared.notifyMandateAdmins import notifyMandateAdmins + + priceLine = "" + if plan and plan.pricePerFeatureInstanceCHF: + priceLine = f"Kosten: CHF {plan.pricePerFeatureInstanceCHF:.2f} / {plan.billingPeriod.value} (anteilig via Stripe-Proration)." + + bodyParagraphs = [ + f"Die Feature-Instanz «{featureLabel}» ({featureCode}) wurde soeben für Ihren Mandanten aktiviert.", + ] + if priceLine: + bodyParagraphs.append(priceLine) + bodyParagraphs.append("Die Stripe-Abrechnung wird automatisch angepasst.") + + notifyMandateAdmins( + mandateId=mandateId, + subject=f"Feature aktiviert: {featureLabel}", + headline="Neue Feature-Instanz aktiviert", + bodyParagraphs=bodyParagraphs, + ) + except Exception as e: + logger.warning("_notifyFeatureActivation failed for mandate %s: %s", mandateId, e) diff --git a/modules/routes/routeSubscription.py b/modules/routes/routeSubscription.py index 7aad386f..88d0b21c 100644 --- a/modules/routes/routeSubscription.py +++ b/modules/routes/routeSubscription.py @@ -183,7 +183,7 @@ def activatePlan( @router.post("/cancel", response_model=Dict[str, Any]) -@limiter.limit("5/minute") +@limiter.limit("30/minute") def cancelSubscription( request: Request, data: CancelRequest, @@ -209,7 +209,7 @@ def cancelSubscription( @router.post("/reactivate", response_model=Dict[str, Any]) -@limiter.limit("5/minute") +@limiter.limit("30/minute") def reactivateSubscription( request: Request, data: ReactivateRequest, @@ -235,7 +235,7 @@ def reactivateSubscription( @router.post("/force-cancel", response_model=Dict[str, Any]) -@limiter.limit("5/minute") +@limiter.limit("30/minute") def forceCancel( request: Request, data: ForceCancelRequest, @@ -451,46 +451,47 @@ def _getDataVolumeUsage( """Calculate current data volume usage for a mandate vs. plan limit.""" from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelFiles import FileItem - from modules.datamodels.datamodelSubscription import MandateSubscription, SubscriptionPlan - from modules.datamodels.datamodelFeature import FeatureInstance + from modules.datamodels.datamodelFeatures import FeatureInstance + from modules.interfaces.interfaceDbKnowledge import aggregateMandateRagTotalBytes + from modules.interfaces.interfaceDbManagement import getInterface as getMgmtInterface + from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRootIf rootIf = getRootInterface() mandateId = targetMandateId instances = rootIf.db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId}) - totalBytes = 0 - for inst in instances: - instId = inst.get("id") if isinstance(inst, dict) else getattr(inst, "id", None) - if not instId: - continue - files = rootIf.db.getRecordset(FileItem, recordFilter={"featureInstanceId": instId}) + instIds = [str(inst.get("id") or "") for inst in instances if inst.get("id")] + + mgmtDb = getMgmtInterface().db + totalFileBytes = 0 + for instId in instIds: + files = mgmtDb.getRecordset(FileItem, recordFilter={"featureInstanceId": instId}) for f in files: size = f.get("fileSize") if isinstance(f, dict) else getattr(f, "fileSize", 0) - totalBytes += (size or 0) + totalFileBytes += (size or 0) + mandateFiles = mgmtDb.getRecordset(FileItem, recordFilter={"mandateId": mandateId}) + for f in mandateFiles: + size = f.get("fileSize") if isinstance(f, dict) else getattr(f, "fileSize", 0) + totalFileBytes += (size or 0) + filesMB = round(totalFileBytes / (1024 * 1024), 2) - filesMB = round(totalBytes / (1024 * 1024), 2) - - from modules.datamodels.datamodelKnowledge import FileContentIndex - ragIndexes = rootIf.db.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId}) - ragBytes = sum(int(idx.get("totalSize") or 0) if isinstance(idx, dict) else int(getattr(idx, "totalSize", 0) or 0) for idx in ragIndexes) + ragBytes = aggregateMandateRagTotalBytes(mandateId) ragMB = round(ragBytes / (1024 * 1024), 2) maxMB = None - subs = rootIf.db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId}) - for sub in subs: - planKey = sub.get("planKey") if isinstance(sub, dict) else getattr(sub, "planKey", "") - if planKey: - plans = rootIf.db.getRecordset(SubscriptionPlan, recordFilter={"planKey": planKey}) - for plan in plans: - limit = plan.get("maxDataVolumeMB") if isinstance(plan, dict) else getattr(plan, "maxDataVolumeMB", None) - if limit: - maxMB = limit - break - if maxMB: - break + subIf = _getSubRootIf() + operative = subIf.getOperativeForMandate(mandateId) + if operative: + plan = subIf.getPlan(operative.get("planKey") or "") + if plan and plan.maxDataVolumeMB is not None: + maxMB = int(plan.maxDataVolumeMB) usedMB = ragMB percentUsed = round((usedMB / maxMB) * 100, 1) if maxMB else None + logger.info( + "data-volume mandate=%s: files=%.2f MB, rag=%.2f MB, max=%s MB", + mandateId, filesMB, ragMB, maxMB, + ) return { "mandateId": mandateId, "usedMB": usedMB, diff --git a/modules/routes/routeVoiceGoogle.py b/modules/routes/routeVoiceGoogle.py index dc0c7a85..1c796361 100644 --- a/modules/routes/routeVoiceGoogle.py +++ b/modules/routes/routeVoiceGoogle.py @@ -463,7 +463,7 @@ async def save_voice_settings( currentUser: User = Depends(getCurrentUser) ): """Save voice settings for the current user (writes to UserVoicePreferences).""" - from modules.datamodels.datamodelUam import UserVoicePreferences + from modules.datamodels.datamodelUam import UserVoicePreferences, _normalizeTtsVoiceMap from modules.security.rootAccess import getRootInterface rootInterface = getRootInterface() userId = str(currentUser.id) @@ -473,6 +473,8 @@ async def save_voice_settings( "translationSourceLanguage", "translationTargetLanguage", } updateData = {k: v for k, v in settings.items() if k in allowedFields} + if "ttsVoiceMap" in updateData: + updateData["ttsVoiceMap"] = _normalizeTtsVoiceMap(updateData["ttsVoiceMap"]) existing = rootInterface.db.getRecordset( UserVoicePreferences, recordFilter={"userId": userId} diff --git a/modules/routes/routeVoiceUser.py b/modules/routes/routeVoiceUser.py index 9b628eeb..2f21662b 100644 --- a/modules/routes/routeVoiceUser.py +++ b/modules/routes/routeVoiceUser.py @@ -14,7 +14,7 @@ from typing import Any, Dict from fastapi import APIRouter, Body, Depends, HTTPException, Query, Request, status from modules.auth import getCurrentUser, limiter -from modules.datamodels.datamodelUam import User, UserVoicePreferences +from modules.datamodels.datamodelUam import User, UserVoicePreferences, _normalizeTtsVoiceMap from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceVoiceObjects import getVoiceInterface @@ -79,6 +79,8 @@ def updateVoicePreferences( "translationTargetLanguage", } updateData = {k: v for k, v in preferences.items() if k in allowedFields} + if "ttsVoiceMap" in updateData: + updateData["ttsVoiceMap"] = _normalizeTtsVoiceMap(updateData["ttsVoiceMap"]) if existing: existingRecord = existing[0] diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index 23a749ab..4529ede0 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -27,6 +27,28 @@ _MAX_TOOL_RESULT_CHARS = 50_000 _BINARY_SIGNATURES = (b"%PDF", b"\x89PNG", b"\xff\xd8\xff", b"GIF8", b"PK\x03\x04", b"Rar!", b"\x1f\x8b") +def _resolveFileScope(fileId: str, context: dict) -> tuple: + """Resolve featureInstanceId and mandateId for a file from context or management DB. + + Returns (featureInstanceId, mandateId) — never None, always strings. + """ + fiId = context.get("featureInstanceId", "") or "" + mId = context.get("mandateId", "") or "" + if fiId and mId: + return fiId, mId + try: + from modules.datamodels.datamodelFiles import FileItem + from modules.interfaces.interfaceDbManagement import ComponentObjects + fm = ComponentObjects().db._loadRecord(FileItem, fileId) + if fm: + _get = (lambda k: fm.get(k, "")) if isinstance(fm, dict) else (lambda k: getattr(fm, k, "")) + fiId = fiId or str(_get("featureInstanceId") or "") + mId = mId or str(_get("mandateId") or "") + except Exception: + pass + return fiId, mId + + def _looksLikeBinary(data: bytes, sampleSize: int = 1024) -> bool: """Detect binary content by checking for magic bytes and non-printable char ratio.""" if any(data[:8].startswith(sig) for sig in _BINARY_SIGNATURES): @@ -602,16 +624,29 @@ def _registerCoreTools(registry: ToolRegistry, services): if knowledgeService: try: userId = context.get("userId", "") + _fiId, _mId = _resolveFileScope(fileId, context) await knowledgeService.indexFile( fileId=fileId, fileName=fileName, mimeType=mimeType, userId=userId, contentObjects=contentObjects, + featureInstanceId=_fiId, + mandateId=_mId, ) except Exception: pass - textParts = [o["data"] for o in contentObjects if o["contentType"] != "image"] - if textParts: - joined = "\n\n".join(textParts) + joined = "" + if knowledgeService: + _chunks = knowledgeService._knowledgeDb.getContentChunks(fileId) + _textChunks = [ + c for c in (_chunks or []) + if c.get("contentType") != "image" and c.get("data") + ] + if _textChunks: + joined = "\n\n".join(c["data"] for c in _textChunks) + if not joined: + textParts = [o["data"] for o in contentObjects if o["contentType"] != "image"] + joined = "\n\n".join(textParts) if textParts else "" + if joined: chunked = _applyOffsetLimit(joined, offset, limit) if chunked is not None: return ToolResult(toolCallId="", toolName="readFile", success=True, data=chunked) @@ -642,6 +677,36 @@ def _registerCoreTools(registry: ToolRegistry, services): try: text = rawBytes.decode(encoding) if text.strip(): + _fileNeedNeutralize = False + try: + from modules.datamodels.datamodelFiles import FileItem as _FI + from modules.interfaces.interfaceDbManagement import ComponentObjects as _CO + _fRec = _CO().db._loadRecord(_FI, fileId) + if _fRec: + _fG = (lambda k, d=None: _fRec.get(k, d)) if isinstance(_fRec, dict) else (lambda k, d=None: getattr(_fRec, k, d)) + _fileNeedNeutralize = bool(_fG("neutralize", False)) + except Exception: + pass + if _fileNeedNeutralize: + try: + _nSvc = services.getService("neutralization") if hasattr(services, "getService") else None + if _nSvc and hasattr(_nSvc, 'processText'): + _nResult = _nSvc.processText(text) + if _nResult and _nResult.get("neutralized_text"): + text = _nResult["neutralized_text"] + logger.debug(f"readFile: neutralized text for file {fileId}") + else: + logger.warning(f"readFile: neutralization failed for file {fileId}, blocking text (fail-safe)") + return ToolResult(toolCallId="", toolName="readFile", success=True, + data="[File requires neutralization but neutralization failed. Content blocked for data protection.]") + else: + logger.warning(f"readFile: neutralization required but service unavailable for file {fileId}") + return ToolResult(toolCallId="", toolName="readFile", success=True, + data="[File requires neutralization but service unavailable. Content blocked for data protection.]") + except Exception as _nErr: + logger.error(f"readFile: neutralization error for file {fileId}: {_nErr}") + return ToolResult(toolCallId="", toolName="readFile", success=True, + data="[File requires neutralization but an error occurred. Content blocked for data protection.]") chunked = _applyOffsetLimit(text, offset, limit) if chunked is not None: return ToolResult(toolCallId="", toolName="readFile", success=True, data=chunked) @@ -1562,7 +1627,7 @@ def _registerCoreTools(registry: ToolRegistry, services): } async def _resolveDataSource(dsId: str): - """Resolve a DataSource record and return (connectionId, service, path) or raise.""" + """Resolve a DataSource record and return (connectionId, service, path, neutralize) or raise.""" chatService = services.chat ds = chatService.getDataSource(dsId) if hasattr(chatService, "getDataSource") else None if not ds: @@ -1571,11 +1636,12 @@ def _registerCoreTools(registry: ToolRegistry, services): sourceType = ds.get("sourceType", "") path = ds.get("path", "/") label = ds.get("label", "") + neutralize = bool(ds.get("neutralize", False)) service = _SOURCE_TYPE_TO_SERVICE.get(sourceType, sourceType) if not connectionId: raise ValueError(f"DataSource '{dsId}' has no connectionId") - logger.info(f"Resolved DataSource '{dsId}' ({label}): sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}") - return connectionId, service, path + logger.info(f"Resolved DataSource '{dsId}' ({label}): sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}, neutralize={neutralize}") + return connectionId, service, path, neutralize _MAIL_SERVICES = {"outlook", "gmail"} @@ -1589,7 +1655,7 @@ def _registerCoreTools(registry: ToolRegistry, services): error="Provide either dataSourceId OR connectionId+service") try: if dsId: - connectionId, service, basePath = await _resolveDataSource(dsId) + connectionId, service, basePath, _neutralize = await _resolveDataSource(dsId) else: connectionId, service, basePath = directConnId, directService, args.get("path", "/") if subPath: @@ -1632,7 +1698,7 @@ def _registerCoreTools(registry: ToolRegistry, services): error="Provide either dataSourceId OR connectionId+service") try: if dsId: - connectionId, service, basePath = await _resolveDataSource(dsId) + connectionId, service, basePath, _neutralize = await _resolveDataSource(dsId) else: connectionId, service, basePath = directConnId, directService, args.get("path", "/") from modules.connectors.connectorResolver import ConnectorResolver @@ -1666,8 +1732,9 @@ def _registerCoreTools(registry: ToolRegistry, services): try: from modules.connectors.connectorResolver import ConnectorResolver from modules.connectors.connectorProviderBase import DownloadResult as _DR + _sourceNeutralize = False if dsId: - connectionId, service, basePath = await _resolveDataSource(dsId) + connectionId, service, basePath, _sourceNeutralize = await _resolveDataSource(dsId) else: connectionId, service, basePath = directConnId, directService, "/" fullPath = filePath if filePath.startswith("/") else f"{basePath.rstrip('/')}/{filePath}" @@ -1710,6 +1777,8 @@ def _registerCoreTools(registry: ToolRegistry, services): fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "") if fiId: chatService.interfaceDbComponent.updateFile(fileItem.id, {"featureInstanceId": fiId}) + if _sourceNeutralize: + chatService.interfaceDbComponent.updateFile(fileItem.id, {"neutralize": True}) tempFolderId = _getOrCreateTempFolder(chatService) if tempFolderId: chatService.interfaceDbComponent.updateFile(fileItem.id, {"folderId": tempFolderId}) @@ -2040,9 +2109,12 @@ def _registerCoreTools(registry: ToolRegistry, services): }) if contentObjects: + _diFiId, _diMId = _resolveFileScope(fileId, context) await knowledgeService.indexFile( fileId=fileId, fileName=fileName, mimeType=fileMime, userId=context.get("userId", ""), contentObjects=contentObjects, + featureInstanceId=_diFiId, + mandateId=_diMId, ) chunks = knowledgeService._knowledgeDb.getContentChunks(fileId) @@ -2088,9 +2160,22 @@ def _registerCoreTools(registry: ToolRegistry, services): dataUrl = f"data:{mimeType};base64,{imageData}" from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum as OTE + _opType = OTE.IMAGE_ANALYSE + try: + from modules.datamodels.datamodelFiles import FileItem as _FileItemModel + from modules.interfaces.interfaceDbManagement import ComponentObjects as _CO + _fRow = _CO().db._loadRecord(_FileItemModel, fileId) + if _fRow: + _fGet = (lambda k, d=None: _fRow.get(k, d)) if isinstance(_fRow, dict) else (lambda k, d=None: getattr(_fRow, k, d)) + if bool(_fGet("neutralize", False)): + _opType = OTE.NEUTRALIZATION_IMAGE + logger.info(f"describeImage: file {fileId} has neutralize=True, using NEUTRALIZATION_IMAGE (internal models only)") + except Exception: + pass + visionRequest = AiCallRequest( prompt=prompt, - options=AiCallOptions(operationType=OTE.IMAGE_ANALYSE), + options=AiCallOptions(operationType=_opType), messages=[{"role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": dataUrl}}, @@ -3099,6 +3184,11 @@ def _registerCoreTools(registry: ToolRegistry, services): recordFilter={"featureInstanceId": featureInstanceId, "workspaceInstanceId": workspaceInstanceId}, ) + _anySourceNeutralize = any( + bool(ds.get("neutralize", False) if isinstance(ds, dict) else getattr(ds, "neutralize", False)) + for ds in (featureDataSources or []) + ) + from modules.security.rbacCatalog import getCatalogService catalog = getCatalogService() if not featureDataSources: @@ -3133,6 +3223,8 @@ def _registerCoreTools(registry: ToolRegistry, services): ) async def _subAgentAiCall(req): + if _anySourceNeutralize: + req.requireNeutralization = True return await aiService.callAi(req) try: diff --git a/modules/serviceCenter/services/serviceAi/mainServiceAi.py b/modules/serviceCenter/services/serviceAi/mainServiceAi.py index e2de43e6..9ff6437d 100644 --- a/modules/serviceCenter/services/serviceAi/mainServiceAi.py +++ b/modules/serviceCenter/services/serviceAi/mainServiceAi.py @@ -200,10 +200,6 @@ class AiService: finally: self.aiObjects.billingCallback = None - # Rehydrate neutralization placeholders in response - if _wasNeutralized and response and hasattr(response, 'content') and response.content: - response.content = self._rehydrateResponse(response.content) - # Attach neutralization exclusion metadata if any parts failed if _excludedDocs and response: if not hasattr(response, 'metadata') or response.metadata is None: @@ -240,10 +236,7 @@ class AiService: self.aiObjects.billingCallback = self._createBillingCallback() try: async for chunk in self.aiObjects.callWithTextContextStream(request): - # Rehydrate the final AiCallResponse (non-str chunks are the final response) if not isinstance(chunk, str): - if _wasNeutralized and hasattr(chunk, 'content') and chunk.content: - chunk.content = self._rehydrateResponse(chunk.content) if _excludedDocs: if not hasattr(chunk, 'metadata') or chunk.metadata is None: chunk.metadata = {} @@ -566,34 +559,70 @@ detectedIntent-Werte: def _shouldNeutralize(self, request: AiCallRequest) -> bool: """Check if this AI request should have neutralization applied. - Per-request override: requireNeutralization=True forces it, False skips it. - Only applies to text prompts -- not embeddings or image processing.""" + + OR-logic across three sources (any True → neutralize): + 1. Feature-Instance config (NeutralizationConfig.enabled) + 2. Workflow/Session (context.requireNeutralization) + 3. Per-request (request.requireNeutralization) + + No source can override another's True with False. + """ try: - if request.requireNeutralization is False: - return False - if not request.prompt and not request.messages: + if not request.prompt and not request.messages and not request.context: return False + + _sources = [] + + # Source 1: Feature-Instance config + _neutralSvc = self._get_service("neutralization") + if _neutralSvc and hasattr(_neutralSvc, 'getConfig'): + _config = _neutralSvc.getConfig() + if _config and getattr(_config, 'enabled', False): + _sources.append("featureInstance") + + # Source 2: Workflow / Session context + _ctx = getattr(self.services, '_context', None) + _ctxFlag = getattr(_ctx, "requireNeutralization", None) if _ctx else None + if _ctxFlag is True: + _sources.append("context") + + # Source 3: Per-request flag if request.requireNeutralization is True: + _sources.append("request") + + if _sources: + logger.debug(f"Neutralization required by: {', '.join(_sources)}") + request.requireNeutralization = True return True - neutralSvc = self._get_service("neutralization") - if not neutralSvc: - return False - config = neutralSvc.getConfig() if hasattr(neutralSvc, 'getConfig') else None - if not config or not getattr(config, 'enabled', False): - return False - return True - except Exception: + + return False + except Exception as e: + logger.error(f"_shouldNeutralize check failed: {e} — defaulting to False") return False def _neutralizeRequest(self, request: AiCallRequest) -> Tuple[AiCallRequest, bool, List[str]]: """Neutralize the prompt text and messages in an AiCallRequest. + Returns (modifiedRequest, wasNeutralized, excludedDocs). - Fail-safe: failing parts are excluded instead of aborting the entire call.""" + + FAILSAFE behaviour when ``requireNeutralization is True`` (explicit): + - Service unavailable → raises (caller must not send raw data to AI). + - Prompt neutralization fails → raises. + - Individual message neutralization fails → message is **removed** + (not kept in original form) and noted in excludedDocs. + + When neutralization is only config-driven (requireNeutralization is + None) the behaviour is softer: failures are logged and originals are + kept — but a warning is emitted. + """ + _hardMode = request.requireNeutralization is True excludedDocs: List[str] = [] neutralSvc = self._get_service("neutralization") if not neutralSvc or not hasattr(neutralSvc, 'processText'): - logger.warning("Neutralization required but neutralization service is unavailable — continuing without neutralization") + if _hardMode: + raise RuntimeError("Neutralization explicitly required but service unavailable — AI call BLOCKED") + logger.warning("Neutralization required by config but service unavailable — continuing without neutralization") excludedDocs.append("Neutralization service unavailable; prompt sent un-neutralized") return request, False, excludedDocs @@ -607,28 +636,148 @@ detectedIntent-Werte: _wasNeutralized = True logger.debug("Neutralized prompt in AiCallRequest") else: + if _hardMode: + raise RuntimeError(f"Prompt neutralization returned empty — AI call BLOCKED (hard mode)") logger.warning("Neutralization of prompt returned no neutralized_text — sending original prompt") excludedDocs.append("Prompt neutralization failed; original prompt used") + except RuntimeError: + raise except Exception as e: + if _hardMode: + raise RuntimeError(f"Prompt neutralization failed — AI call BLOCKED: {e}") from e logger.warning(f"Neutralization of prompt failed: {e} — sending original prompt") excludedDocs.append(f"Prompt neutralization error: {e}") + if request.context: + try: + result = neutralSvc.processText(request.context) + if result and result.get("neutralized_text"): + request.context = result["neutralized_text"] + _wasNeutralized = True + logger.debug("Neutralized context in AiCallRequest") + else: + if _hardMode: + raise RuntimeError("Context neutralization returned empty — AI call BLOCKED (hard mode)") + logger.warning("Neutralization of context returned no neutralized_text — sending original context") + excludedDocs.append("Context neutralization failed; original context used") + except RuntimeError: + raise + except Exception as e: + if _hardMode: + raise RuntimeError(f"Context neutralization failed — AI call BLOCKED: {e}") from e + logger.warning(f"Neutralization of context failed: {e} — sending original context") + excludedDocs.append(f"Context neutralization error: {e}") + if request.messages and isinstance(request.messages, list): + cleanMessages = [] for idx, msg in enumerate(request.messages): content = msg.get("content") if isinstance(msg, dict) else None - if not isinstance(content, str) or not content: + if content is None: + cleanMessages.append(msg) continue - try: - result = neutralSvc.processText(content) - if result and result.get("neutralized_text"): - msg["content"] = result["neutralized_text"] - _wasNeutralized = True + if isinstance(content, str): + if not content: + cleanMessages.append(msg) + continue + try: + result = neutralSvc.processText(content) + if result and result.get("neutralized_text"): + msg["content"] = result["neutralized_text"] + _wasNeutralized = True + cleanMessages.append(msg) + else: + if _hardMode: + logger.warning(f"Message[{idx}] neutralization empty — REMOVING message (hard mode)") + excludedDocs.append(f"Message[{idx}] neutralization failed; message REMOVED") + else: + logger.warning(f"Neutralization of message[{idx}] returned no neutralized_text — keeping original") + excludedDocs.append(f"Message[{idx}] neutralization failed; original kept") + cleanMessages.append(msg) + except Exception as e: + if _hardMode: + logger.warning(f"Message[{idx}] neutralization error — REMOVING message (hard mode): {e}") + excludedDocs.append(f"Message[{idx}] neutralization error; message REMOVED: {e}") + else: + logger.warning(f"Neutralization of message[{idx}] failed: {e} — keeping original") + excludedDocs.append(f"Message[{idx}] neutralization error: {e}") + cleanMessages.append(msg) + elif isinstance(content, list): + _cleanParts = [] + for _partIdx, _part in enumerate(content): + if not isinstance(_part, dict): + _cleanParts.append(_part) + continue + _partType = _part.get("type", "") + if _partType == "text" and _part.get("text"): + try: + _result = neutralSvc.processText(_part["text"]) + if _result and _result.get("neutralized_text"): + _part["text"] = _result["neutralized_text"] + _wasNeutralized = True + _cleanParts.append(_part) + else: + if _hardMode: + logger.warning(f"Message[{idx}].content[{_partIdx}] text neutralization empty — REMOVING part") + excludedDocs.append(f"Message[{idx}].content[{_partIdx}] text removed") + else: + _cleanParts.append(_part) + except Exception as e: + if _hardMode: + logger.warning(f"Message[{idx}].content[{_partIdx}] text neutralization error — REMOVING: {e}") + excludedDocs.append(f"Message[{idx}].content[{_partIdx}] text error: {e}") + else: + _cleanParts.append(_part) + elif _partType == "image_url": + if _hardMode: + logger.warning(f"Message[{idx}].content[{_partIdx}] image_url — REMOVING (neutralization active)") + excludedDocs.append(f"Message[{idx}].content[{_partIdx}] image removed (neutralization)") + else: + _cleanParts.append(_part) + else: + _cleanParts.append(_part) + if _cleanParts: + msg["content"] = _cleanParts + cleanMessages.append(msg) + elif _hardMode: + logger.warning(f"Message[{idx}] all parts removed — REMOVING message") + excludedDocs.append(f"Message[{idx}] fully removed after neutralization") + else: + cleanMessages.append(msg) + request.messages = cleanMessages + + if hasattr(request, 'contentParts') and request.contentParts: + _cleanParts = [] + for _cpIdx, _cp in enumerate(request.contentParts): + _tg = getattr(_cp, 'typeGroup', '') or '' + _data = getattr(_cp, 'data', '') or '' + if _tg in ('text', 'table') and _data: + try: + _result = neutralSvc.processText(str(_data)) + if _result and _result.get("neutralized_text"): + _cp.data = _result["neutralized_text"] + _wasNeutralized = True + _cleanParts.append(_cp) + else: + if _hardMode: + logger.warning(f"ContentPart[{_cpIdx}] neutralization empty — REMOVING") + excludedDocs.append(f"ContentPart[{_cpIdx}] removed") + else: + _cleanParts.append(_cp) + except Exception as e: + if _hardMode: + logger.warning(f"ContentPart[{_cpIdx}] neutralization error — REMOVING: {e}") + excludedDocs.append(f"ContentPart[{_cpIdx}] error: {e}") + else: + _cleanParts.append(_cp) + elif _tg == 'image': + if _hardMode: + logger.warning(f"ContentPart[{_cpIdx}] image — REMOVING (neutralization active)") + excludedDocs.append(f"ContentPart[{_cpIdx}] image removed") else: - logger.warning(f"Neutralization of message[{idx}] returned no neutralized_text — keeping original") - excludedDocs.append(f"Message[{idx}] neutralization failed; original kept") - except Exception as e: - logger.warning(f"Neutralization of message[{idx}] failed: {e} — keeping original") - excludedDocs.append(f"Message[{idx}] neutralization error: {e}") + _cleanParts.append(_cp) + else: + _cleanParts.append(_cp) + request.contentParts = _cleanParts return request, _wasNeutralized, excludedDocs diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py index 0f20bc7f..49774a38 100644 --- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py +++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py @@ -83,12 +83,47 @@ class KnowledgeService: """ contentObjects = contentObjects or [] - # 1. Create FileContentIndex + # 1. Resolve scope fields from FileItem (Single Source of Truth) + # FileItem lives in poweron_management; its scope/mandateId/featureInstanceId + # are authoritative and must be mirrored onto the FileContentIndex. + resolvedScope = "personal" + resolvedMandateId = mandateId + resolvedFeatureInstanceId = featureInstanceId + resolvedUserId = userId + _shouldNeutralize = False + try: + from modules.datamodels.datamodelFiles import FileItem as _FileItem + _dbComponent = getattr(self._context, "interfaceDbComponent", None) + _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else [] + if not _fileRecords: + from modules.interfaces.interfaceDbManagement import ComponentObjects + _row = ComponentObjects().db._loadRecord(_FileItem, fileId) + if _row: + _fileRecords = [_row] + if _fileRecords: + _fileRecord = _fileRecords[0] + _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) + _shouldNeutralize = bool(_get("neutralize", False)) + _fileScope = _get("scope") + if _fileScope: + resolvedScope = _fileScope + if not resolvedMandateId: + resolvedMandateId = str(_get("mandateId", "") or "") + if not resolvedFeatureInstanceId: + resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "") + _fileCreatedBy = _get("sysCreatedBy") + if _fileCreatedBy: + resolvedUserId = str(_fileCreatedBy) + except Exception: + pass + + # 2. Create FileContentIndex with correct scope from the start index = FileContentIndex( id=fileId, - userId=userId, - featureInstanceId=featureInstanceId, - mandateId=mandateId, + userId=resolvedUserId, + featureInstanceId=resolvedFeatureInstanceId, + mandateId=resolvedMandateId, + scope=resolvedScope, fileName=fileName, mimeType=mimeType, containerPath=containerPath, @@ -108,28 +143,9 @@ class KnowledgeService: ) self._knowledgeDb.upsertFileContentIndex(index) - # 2. Chunk text content objects and create embeddings + # 3. Chunk text content objects and create embeddings textObjects = [o for o in contentObjects if o.get("contentType") == "text"] - # Read FileItem attributes for index metadata and neutralization - _shouldNeutralize = False - try: - from modules.datamodels.datamodelFiles import FileItem as _FileItem - _dbComponent = getattr(self._context, 'interfaceDbComponent', None) - _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else [] - if _fileRecords: - _fileRecord = _fileRecords[0] - _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) - _shouldNeutralize = bool(_get("neutralize", False)) - _fileScope = _get("scope") - if _fileScope: - index.scope = _fileScope - _fileCreatedBy = _get("sysCreatedBy") - if _fileCreatedBy: - index.userId = str(_fileCreatedBy) - except Exception: - pass - if _shouldNeutralize and textObjects: _neutralizedObjects = [] try: @@ -142,9 +158,7 @@ class KnowledgeService: if not _textContent: continue try: - _neutralResult = _neutralSvc.processText( - _textContent, userId=userId, featureInstanceId=featureInstanceId - ) + _neutralResult = _neutralSvc.processText(_textContent) if _neutralResult and _neutralResult.get("neutralized_text"): _obj["data"] = _neutralResult["neutralized_text"] _neutralizedObjects.append(_obj) @@ -176,8 +190,8 @@ class KnowledgeService: contentChunk = ContentChunk( contentObjectId=chunk["contentObjectId"], fileId=fileId, - userId=userId, - featureInstanceId=featureInstanceId, + userId=resolvedUserId, + featureInstanceId=resolvedFeatureInstanceId, contentType="text", data=chunk["data"], contextRef=chunk["contextRef"], @@ -185,14 +199,36 @@ class KnowledgeService: ) self._knowledgeDb.upsertContentChunk(contentChunk) - # 3. Store non-text content objects (images, etc.) without embedding + # 4. Store non-text content objects (images, etc.) without embedding nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"] + if _shouldNeutralize and nonTextObjects: + import base64 as _b64 + _filteredNonText = [] + for _obj in nonTextObjects: + if _obj.get("contentType") != "image": + _filteredNonText.append(_obj) + continue + _imgData = (_obj.get("data", "") or "").strip() + if not _imgData: + _filteredNonText.append(_obj) + continue + try: + _imgBytes = _b64.b64decode(_imgData) + _imgResult = await _neutralSvc.processImageAsync(_imgBytes, fileName) + if _imgResult.get("status") == "ok": + _filteredNonText.append(_obj) + logger.debug(f"Image chunk OK for file {fileId}, storing") + else: + logger.warning(f"Image chunk blocked for file {fileId} (PII detected), skipping (fail-safe)") + except Exception as _imgErr: + logger.warning(f"Image neutralization check failed for file {fileId}: {_imgErr}, skipping (fail-safe)") + nonTextObjects = _filteredNonText for obj in nonTextObjects: contentChunk = ContentChunk( contentObjectId=obj.get("contentObjectId", ""), fileId=fileId, - userId=userId, - featureInstanceId=featureInstanceId, + userId=resolvedUserId, + featureInstanceId=resolvedFeatureInstanceId, contentType=obj.get("contentType", "other"), data=obj.get("data", ""), contextRef=obj.get("contextRef", {}), @@ -200,21 +236,23 @@ class KnowledgeService: ) self._knowledgeDb.upsertContentChunk(contentChunk) - self._knowledgeDb.updateFileStatus(fileId, "indexed") + # 5. Final upsert ALWAYS — persists scope, neutralization status, etc. index.status = "indexed" if _shouldNeutralize: - try: - index.neutralizationStatus = "completed" - index.isNeutralized = True - self._knowledgeDb.upsertFileContentIndex(index) - except Exception as e: - logger.debug(f"Could not set neutralizationStatus for file {fileId}: {e}") - logger.info(f"Indexed file {fileId} ({fileName}): {len(contentObjects)} objects, {len(textObjects)} text chunks") - if mandateId: + index.neutralizationStatus = "completed" + index.isNeutralized = True + self._knowledgeDb.upsertFileContentIndex(index) + + logger.info( + "Indexed file %s (%s): %d objects, %d text chunks, scope=%s, mandate=%s, instance=%s", + fileId, fileName, len(contentObjects), len(textObjects), + resolvedScope, resolvedMandateId, resolvedFeatureInstanceId, + ) + if resolvedMandateId: try: from modules.interfaces.interfaceDbBilling import _getRootInterface - _getRootInterface().reconcileMandateStorageBilling(str(mandateId)) + _getRootInterface().reconcileMandateStorageBilling(str(resolvedMandateId)) except Exception as ex: logger.warning("reconcileMandateStorageBilling after index failed: %s", ex) return index @@ -328,17 +366,18 @@ class KnowledgeService: if entities: builder.add(priority=3, label="Workflow Context", items=entities, isKeyValue=True, maxChars=2000) - # Layer 3: Shared Layer (mandate-wide shared documents) - sharedChunks = self._knowledgeDb.semanticSearch( - queryVector=queryVector, - mandateId=mandateId, - isShared=True, - limit=10, - minScore=0.7, - isSysAdmin=isSysAdmin, - ) - if sharedChunks: - builder.add(priority=4, label="Shared Knowledge", items=sharedChunks, maxChars=2000) + # Layer 3: Mandate-scoped documents (visible to all mandate users) + if mandateId: + mandateChunks = self._knowledgeDb.semanticSearch( + queryVector=queryVector, + scope="mandate", + mandateId=mandateId, + limit=10, + minScore=0.7, + isSysAdmin=isSysAdmin, + ) + if mandateChunks: + builder.add(priority=4, label="Shared Knowledge", items=mandateChunks, maxChars=2000) # Layer 4: Cross-workflow hint (other conversations in this workspace) if workflowHintItems: diff --git a/modules/serviceCenter/services/serviceKnowledge/subPreScan.py b/modules/serviceCenter/services/serviceKnowledge/subPreScan.py index e025dd99..0688deb2 100644 --- a/modules/serviceCenter/services/serviceKnowledge/subPreScan.py +++ b/modules/serviceCenter/services/serviceKnowledge/subPreScan.py @@ -31,6 +31,7 @@ async def preScanDocument( userId: str = "", featureInstanceId: str = "", mandateId: str = "", + scope: str = "personal", ) -> FileContentIndex: """Create a structural FileContentIndex without AI. @@ -56,6 +57,7 @@ async def preScanDocument( userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, + scope=scope, fileName=fileName, mimeType=mimeType, totalObjects=totalObjects, diff --git a/modules/shared/notifyMandateAdmins.py b/modules/shared/notifyMandateAdmins.py index 27445afb..6bef921d 100644 --- a/modules/shared/notifyMandateAdmins.py +++ b/modules/shared/notifyMandateAdmins.py @@ -7,9 +7,7 @@ All mandate-level notifications (subscription changes, billing warnings, etc.) MUST go through notifyMandateAdmins() to ensure consistent recipient resolution and delivery. -Recipients are the union of: -1. BillingSettings.notifyEmails for the mandate (configured contact addresses) -2. All users with the mandate-level "admin" RBAC role +Recipients: all users with the mandate-level "admin" RBAC role. """ from __future__ import annotations @@ -96,10 +94,10 @@ def _resolveMandateAdminEmails(mandateId: str) -> List[str]: def _resolveAllRecipients(mandateId: str) -> List[str]: - """Union of BillingSettings.notifyEmails + all mandate admin user emails, deduplicated.""" + """Mandate admin user emails only (RBAC-resolved), deduplicated.""" seen: Set[str] = set() result: List[str] = [] - for email in _resolveMandateContactEmails(mandateId) + _resolveMandateAdminEmails(mandateId): + for email in _resolveMandateAdminEmails(mandateId): if email and email not in seen: seen.add(email) result.append(email) @@ -233,7 +231,7 @@ def notifyMandateAdmins( rawHtmlBlock: Optional[str] = None, ) -> int: """ - Send a styled HTML notification to all mandate admins and configured contacts. + Send a styled HTML notification to all mandate admins. Args: mandateId: The mandate to notify admins for. diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index 466165ad..9fb2e7f4 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -6,7 +6,7 @@ import time from typing import Dict, Any from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelDocref import DocumentReferenceList -from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart +from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy logger = logging.getLogger(__name__) @@ -101,74 +101,6 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: # Pass operationId for hierarchical per-document progress logging extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) - # Check if neutralization is enabled and should be applied automatically - neutralizationEnabled = False - try: - config = self.services.neutralization.getConfig() - neutralizationEnabled = config and config.enabled - except Exception as e: - logger.debug(f"Could not check neutralization config: {str(e)}") - - # Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing) - if neutralizationEnabled: - self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data") - logger.info("Neutralization enabled - neutralizing extracted content data") - - # Neutralize each ContentExtracted result - for extracted in extractedResults: - if extracted.parts: - neutralizedParts = [] - for part in extracted.parts: - if not isinstance(part, ContentPart): - # Try to parse as ContentPart if it's a dict - if isinstance(part, dict): - try: - part = ContentPart(**part) - except Exception as e: - logger.warning(f"Could not parse ContentPart: {str(e)}") - neutralizedParts.append(part) - continue - else: - neutralizedParts.append(part) - continue - - # Neutralize the data field if it contains text - if part.data: - try: - # Call neutralization service - neutralizationResult = self.services.neutralization.processText(part.data) - - if neutralizationResult and 'neutralized_text' in neutralizationResult: - # Replace data with neutralized text - neutralizedData = neutralizationResult['neutralized_text'] - - # Create new ContentPart with neutralized data - neutralizedPart = ContentPart( - id=part.id, - parentId=part.parentId, - label=part.label, - typeGroup=part.typeGroup, - mimeType=part.mimeType, - data=neutralizedData, - metadata=part.metadata.copy() if part.metadata else {} - ) - neutralizedParts.append(neutralizedPart) - else: - # Neutralization failed, use original part - logger.warning(f"Neutralization did not return neutralized_text for part {part.id}") - neutralizedParts.append(part) - except Exception as e: - logger.error(f"Error neutralizing part {part.id}: {str(e)}") - # On error, use original part - neutralizedParts.append(part) - else: - # No data to neutralize, keep original part - neutralizedParts.append(part) - - # Update extracted result with neutralized parts - extracted.parts = neutralizedParts - logger.info(f"Neutralized {len(neutralizedParts)} content parts") - # Build ActionDocuments from ContentExtracted results self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents") actionDocuments = [] @@ -190,7 +122,6 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: "documentIndex": i, "extractedId": extracted.id, "partCount": len(extracted.parts) if extracted.parts else 0, - "neutralized": neutralizationEnabled, "originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None } actionDoc = ActionDocument( diff --git a/modules/workflows/methods/methodContext/actions/neutralizeData.py b/modules/workflows/methods/methodContext/actions/neutralizeData.py index a1fc6b91..bd032cac 100644 --- a/modules/workflows/methods/methodContext/actions/neutralizeData.py +++ b/modules/workflows/methods/methodContext/actions/neutralizeData.py @@ -16,14 +16,13 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"context_neutralize_{workflowId}_{int(time.time())}" - # Check if neutralization is enabled neutralizationEnabled = False try: config = self.services.neutralization.getConfig() neutralizationEnabled = config and config.enabled except Exception as e: logger.debug(f"Could not check neutralization config: {str(e)}") - + if not neutralizationEnabled: logger.info("Neutralization is not enabled, returning documents unchanged") # Return original documents if neutralization is disabled @@ -144,8 +143,25 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: neutralizedParts.append(part) continue - # Neutralize the data field if it contains text - if part.data: + # Neutralize the data field based on typeGroup + _typeGroup = getattr(part, 'typeGroup', '') or '' + if _typeGroup == 'image' and part.data: + import base64 as _b64 + try: + self.services.chat.progressLogUpdate( + operationId, + 0.3 + (i / len(chatDocuments)) * 0.6, + f"Checking image part {len(neutralizedParts) + 1} of document {i+1}" + ) + _imgBytes = _b64.b64decode(str(part.data)) + _imgResult = await self.services.neutralization.processImageAsync(_imgBytes, f"part_{part.id}") + if _imgResult.get("status") == "ok": + neutralizedParts.append(part) + else: + logger.warning(f"Fail-Safe: Image part {part.id} blocked (PII detected), SKIPPING") + except Exception as _imgErr: + logger.error(f"Fail-Safe: Image check failed for part {part.id}: {_imgErr}, SKIPPING") + elif part.data: try: self.services.chat.progressLogUpdate( operationId, @@ -153,14 +169,11 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}" ) - # Call neutralization service neutralizationResult = self.services.neutralization.processText(part.data) if neutralizationResult and 'neutralized_text' in neutralizationResult: - # Replace data with neutralized text neutralizedData = neutralizationResult['neutralized_text'] - # Create new ContentPart with neutralized data neutralizedPart = ContentPart( id=part.id, parentId=part.parentId, @@ -172,15 +185,12 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult: ) neutralizedParts.append(neutralizedPart) else: - # Fail-Safe: neutralization incomplete, skip this part logger.warning(f"Fail-Safe: Neutralization incomplete for part {part.id}, SKIPPING (not passing original)") continue except Exception as e: logger.error(f"Fail-Safe: Error neutralizing part {part.id}, SKIPPING document (not passing original): {str(e)}") - # Fail-Safe: do NOT pass original data to AI continue else: - # No data to neutralize, keep original part neutralizedParts.append(part) # Create neutralized ContentExtracted object diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index de332c31..58b76908 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -352,12 +352,6 @@ class WorkflowManager: for i, doc in enumerate(documents, 1): docListText += f"\n{i}. {doc.fileName} ({doc.mimeType}, {doc.fileSize} bytes)" - _userId = getattr(getattr(self.services, 'user', None), 'id', '') or '' - _featureInstanceId = getattr(self.services, 'featureInstanceId', '') or '' - _promptForAnalysis, _wasNeutralized, _mappingId = await self._neutralizePromptIfRequired( - userPrompt, userId=_userId, featureInstanceId=_featureInstanceId - ) - analysisPrompt = f"""You are an input analyzer. From the user's message, perform ALL of the following in one pass: 1. detectedLanguage: Detect ISO 639-1 language code (e.g., de, en, fr, it) @@ -407,7 +401,7 @@ Return ONLY JSON (no markdown) with this exact structure: The following is the user's original input message. Analyze intent, normalize the request, and determine complexity: ################ USER INPUT START ################# -{_promptForAnalysis.replace('{', '{{').replace('}', '}}') if _promptForAnalysis else ''} +{userPrompt.replace('{', '{{').replace('}', '}}') if userPrompt else ''} ################ USER INPUT FINISH ################# """ @@ -425,12 +419,6 @@ The following is the user's original input message. Analyze intent, normalize th jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0 if jsonStart != -1 and jsonEnd > jsonStart: result = json.loads(aiResponse[jsonStart:jsonEnd]) - if _wasNeutralized: - for _field in ('normalizedRequest', 'intent', 'workflowName'): - if _field in result and result[_field]: - result[_field] = await self._rehydrateResponseIfNeeded( - result[_field], True, userId=_userId, featureInstanceId=_featureInstanceId - ) return result else: logger.warning("Could not parse combined analysis response, using defaults") @@ -490,7 +478,6 @@ The following is the user's original input message. Analyze intent, normalize th if userInput.prompt: try: originalPromptBytes = userInput.prompt.encode('utf-8') - originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown") fileItem = self.services.interfaceDbComponent.createFile( name="user_prompt_original.md", mimeType="text/markdown", @@ -680,7 +667,6 @@ The following is the user's original input message. Analyze intent, normalize th if userInput.prompt: try: originalPromptBytes = userInput.prompt.encode('utf-8') - originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown") fileItem = self.services.interfaceDbComponent.createFile( name="user_prompt_original.md", mimeType="text/markdown", @@ -821,7 +807,6 @@ The following is the user's original input message. Analyze intent, normalize th if userInput.prompt: try: originalPromptBytes = userInput.prompt.encode('utf-8') - originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown") fileItem = self.services.interfaceDbComponent.createFile( name="user_prompt_original.md", mimeType="text/markdown", @@ -1365,82 +1350,3 @@ The following is the user's original input message. Analyze intent, normalize th """Set user language for the service center""" self.services.user.language = language - async def _neutralizePromptIfRequired(self, prompt: str, userId: str, featureInstanceId: str) -> tuple: - """Neutralize prompt text if the workflow context requires it. - Returns (processedPrompt, wasNeutralized, mappingId).""" - try: - _neutralSvc = getattr(self.services, 'neutralization', None) - if not _neutralSvc: - return prompt, False, None - _config = _neutralSvc.getConfig() if hasattr(_neutralSvc, 'getConfig') else None - if not _config or not getattr(_config, 'enabled', False): - return prompt, False, None - _result = _neutralSvc.processText(prompt, userId=userId, featureInstanceId=featureInstanceId) - if _result and _result.get("neutralized_text"): - return _result["neutralized_text"], True, _result.get("mappingId") - return prompt, False, None - except Exception as e: - logger.warning(f"Prompt neutralization failed: {e}") - return prompt, False, None - - async def _rehydrateResponseIfNeeded(self, response: str, wasNeutralized: bool, userId: str, featureInstanceId: str) -> str: - """Replace placeholders in AI response with original values.""" - if not wasNeutralized or not response: - return response - try: - _neutralSvc = getattr(self.services, 'neutralization', None) - if not _neutralSvc: - return response - _rehydrated = _neutralSvc.resolveText(response, userId=userId, featureInstanceId=featureInstanceId) - return _rehydrated if _rehydrated else response - except Exception as e: - logger.warning(f"Response re-hydration failed: {e}") - return response - - async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes: - """Neutralize content if neutralization is enabled in user settings""" - try: - # Automation hub may not have neutralization service; skip if unavailable - neutralization = getattr(self.services, 'neutralization', None) - if not neutralization: - return contentBytes - # Check if neutralization is enabled - config = neutralization.getConfig() - if not config or not config.enabled: - return contentBytes - - # Decode content to text for neutralization - try: - textContent = contentBytes.decode('utf-8') - except UnicodeDecodeError: - # Try alternative encodings - for enc in ['latin-1', 'cp1252', 'iso-8859-1']: - try: - textContent = contentBytes.decode(enc) - break - except UnicodeDecodeError: - continue - else: - # If unable to decode, return original bytes (binary content) - logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}") - return contentBytes - - # Neutralize the text content - # Note: The neutralization service should use names from config when processing - result = neutralization.processText(textContent) - if result and 'neutralized_text' in result: - neutralizedText = result['neutralized_text'] - # Encode back to bytes using the same encoding - try: - return neutralizedText.encode('utf-8') - except Exception as e: - logger.warning(f"Error encoding neutralized text: {str(e)}") - return contentBytes - else: - logger.warning("Neutralization did not return neutralized_text") - return contentBytes - except Exception as e: - logger.error(f"Error during content neutralization: {str(e)}") - # Return original content on error - return contentBytes -