From 7e8800572197da35727e28ae6bf927563d3247ea Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 29 Mar 2026 21:55:09 +0200
Subject: [PATCH] unified failsafe neutralization architecture
---
modules/aicore/aicorePluginPrivateLlm.py | 9 +-
modules/datamodels/datamodelAi.py | 4 +
modules/datamodels/datamodelBilling.py | 1 +
modules/datamodels/datamodelKnowledge.py | 10 +-
modules/datamodels/datamodelUam.py | 34 ++-
.../mainServiceNeutralization.py | 97 +++++++-
modules/interfaces/interfaceBootstrap.py | 7 +
modules/interfaces/interfaceDbApp.py | 18 +-
modules/interfaces/interfaceDbBilling.py | 35 +++
modules/interfaces/interfaceDbKnowledge.py | 95 ++++++--
modules/interfaces/interfaceDbManagement.py | 13 +-
modules/interfaces/interfaceDbSubscription.py | 23 +-
modules/migration/migrateRagScopeFields.py | 114 ++++++++++
modules/routes/routeAdminFeatures.py | 9 +-
modules/routes/routeBilling.py | 23 +-
modules/routes/routeDataFiles.py | 112 +++++----
modules/routes/routeStore.py | 135 ++++++++---
modules/routes/routeSubscription.py | 59 ++---
modules/routes/routeVoiceGoogle.py | 4 +-
modules/routes/routeVoiceUser.py | 4 +-
.../services/serviceAgent/mainServiceAgent.py | 112 ++++++++-
.../services/serviceAi/mainServiceAi.py | 215 +++++++++++++++---
.../serviceKnowledge/mainServiceKnowledge.py | 145 +++++++-----
.../services/serviceKnowledge/subPreScan.py | 2 +
modules/shared/notifyMandateAdmins.py | 10 +-
.../methodContext/actions/extractContent.py | 71 +-----
.../methodContext/actions/neutralizeData.py | 30 ++-
modules/workflows/workflowManager.py | 96 +-------
28 files changed, 1064 insertions(+), 423 deletions(-)
create mode 100644 modules/migration/migrateRagScopeFields.py
diff --git a/modules/aicore/aicorePluginPrivateLlm.py b/modules/aicore/aicorePluginPrivateLlm.py
index 718c5905..38baa35e 100644
--- a/modules/aicore/aicorePluginPrivateLlm.py
+++ b/modules/aicore/aicorePluginPrivateLlm.py
@@ -7,9 +7,9 @@ Connects to the private-llm service running on-premise with Ollama backend.
Provides OCR and Vision capabilities via local AI models.
Models:
-- poweron-ocr-general: Text extraction and OCR (deepseek backend)
-- poweron-vision-general: General vision tasks (qwen2.5vl backend)
-- poweron-vision-deep: Deep vision analysis (granite3.2 backend)
+- poweron-text-general: Text (qwen2.5); NEUTRALIZATION_TEXT + data/plan ops
+- poweron-vision-general: Vision (qwen2.5vl); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE
+- poweron-vision-deep: Vision (granite3.2); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE
Pricing (CHF per call):
- Text models: CHF 0.010
@@ -245,6 +245,7 @@ class AiPrivateLlm(BaseConnectorAi):
(OperationTypeEnum.DATA_ANALYSE, 8),
(OperationTypeEnum.DATA_GENERATE, 8),
(OperationTypeEnum.DATA_EXTRACT, 8),
+ (OperationTypeEnum.NEUTRALIZATION_TEXT, 9),
),
version="qwen2.5:7b",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_TEXT_PER_CALL
@@ -270,6 +271,7 @@ class AiPrivateLlm(BaseConnectorAi):
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
+ (OperationTypeEnum.NEUTRALIZATION_IMAGE, 9),
),
version="qwen2.5vl:7b",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL
@@ -295,6 +297,7 @@ class AiPrivateLlm(BaseConnectorAi):
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
+ (OperationTypeEnum.NEUTRALIZATION_IMAGE, 9),
),
version="granite3.2-vision",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL
diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py
index c31d5696..96e05185 100644
--- a/modules/datamodels/datamodelAi.py
+++ b/modules/datamodels/datamodelAi.py
@@ -22,6 +22,10 @@ class OperationTypeEnum(str, Enum):
IMAGE_ANALYSE = "imageAnalyse"
IMAGE_GENERATE = "imageGenerate"
+ # Neutralization (dedicated model selection; text vs vision backends)
+ NEUTRALIZATION_TEXT = "neutralizationText"
+ NEUTRALIZATION_IMAGE = "neutralizationImage"
+
# Web Operations
WEB_SEARCH_DATA = "webSearch" # Returns list of URLs only
WEB_CRAWL = "webCrawl" # Web crawl for a given URL
diff --git a/modules/datamodels/datamodelBilling.py b/modules/datamodels/datamodelBilling.py
index 2d3bfdb1..ccf1f4a1 100644
--- a/modules/datamodels/datamodelBilling.py
+++ b/modules/datamodels/datamodelBilling.py
@@ -28,6 +28,7 @@ class ReferenceTypeEnum(str, Enum):
ADMIN = "ADMIN" # Admin adjustment
SYSTEM = "SYSTEM" # System credit (e.g., initial credit)
STORAGE = "STORAGE" # Metered storage overage (prepay pool)
+ SUBSCRIPTION = "SUBSCRIPTION" # AI budget credit from subscription plan
class PeriodTypeEnum(str, Enum):
diff --git a/modules/datamodels/datamodelKnowledge.py b/modules/datamodels/datamodelKnowledge.py
index 3742a84b..7ac12c15 100644
--- a/modules/datamodels/datamodelKnowledge.py
+++ b/modules/datamodels/datamodelKnowledge.py
@@ -3,8 +3,10 @@
"""Knowledge Store data models: FileContentIndex, ContentChunk, WorkflowMemory.
These models support the 3-tier RAG architecture:
-- Shared Layer: mandateId-scoped, isShared=True
-- Instance Layer: userId + featureInstanceId-scoped
+- Personal Layer: scope=personal, userId-scoped
+- Instance Layer: scope=featureInstance, featureInstanceId-scoped
+- Mandate Layer: scope=mandate, mandateId-scoped (visible to all mandate users)
+- Global Layer: scope=global (sysAdmin only)
- Workflow Layer: workflowId-scoped (WorkflowMemory)
Vector fields use json_schema_extra={"db_type": "vector(1536)"} for pgvector.
@@ -20,12 +22,11 @@ import uuid
class FileContentIndex(PowerOnModel):
"""Structural index of a file's content objects. Created without AI.
- Lives in the Instance Layer; optionally promoted to Shared Layer via isShared."""
+ Scope is mirrored from FileItem (poweron_management) at indexing time."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key (typically = fileId)")
userId: str = Field(description="Owner user ID")
featureInstanceId: str = Field(default="", description="Feature instance scope")
mandateId: str = Field(default="", description="Mandate scope")
- isShared: bool = Field(default=False, description="Visible in Shared Layer for all mandate users")
fileName: str = Field(description="Original file name")
mimeType: str = Field(description="MIME type of the file")
containerPath: Optional[str] = Field(default=None, description="Path within a container (e.g. 'archive.zip/folder/report.pdf')")
@@ -57,7 +58,6 @@ registerModelLabels(
"userId": {"en": "User ID", "fr": "ID utilisateur"},
"featureInstanceId": {"en": "Feature Instance ID", "fr": "ID de l'instance"},
"mandateId": {"en": "Mandate ID", "fr": "ID du mandat"},
- "isShared": {"en": "Shared", "fr": "Partagé"},
"fileName": {"en": "File Name", "fr": "Nom de fichier"},
"mimeType": {"en": "MIME Type", "fr": "Type MIME"},
"containerPath": {"en": "Container Path", "fr": "Chemin du conteneur"},
diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py
index 741ce3d5..e0c4f13c 100644
--- a/modules/datamodels/datamodelUam.py
+++ b/modules/datamodels/datamodelUam.py
@@ -10,7 +10,7 @@ Multi-Tenant Design:
"""
import uuid
-from typing import Optional, List, Dict
+from typing import Optional, List, Dict, Any
from enum import Enum
from pydantic import BaseModel, Field, EmailStr, field_validator, computed_field
from modules.datamodels.datamodelBase import PowerOnModel
@@ -303,6 +303,33 @@ registerModelLabels(
)
+def _normalizeTtsVoiceMap(value: Any) -> Optional[Dict[str, str]]:
+ """
+ Coerce ttsVoiceMap payloads to Dict[str, str].
+
+ UI/clients may send per-locale objects like {"voiceName": "de-DE-Chirp3-HD-Achird"};
+ storage and model field type are locale -> voice id string.
+ """
+ if value is None:
+ return None
+ if not isinstance(value, dict):
+ return None
+ out: Dict[str, str] = {}
+ for rawKey, rawVal in value.items():
+ key = str(rawKey)
+ if rawVal is None:
+ continue
+ if isinstance(rawVal, str):
+ out[key] = rawVal
+ elif isinstance(rawVal, dict):
+ vn = rawVal.get("voiceName")
+ if vn is not None and str(vn).strip() != "":
+ out[key] = str(vn).strip()
+ else:
+ out[key] = str(rawVal)
+ return out if out else None
+
+
class UserVoicePreferences(PowerOnModel):
"""User-level voice/language preferences, shared across all features."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
@@ -315,6 +342,11 @@ class UserVoicePreferences(PowerOnModel):
translationSourceLanguage: Optional[str] = Field(default=None, description="Source language for translations")
translationTargetLanguage: Optional[str] = Field(default=None, description="Target language for translations")
+ @field_validator("ttsVoiceMap", mode="before")
+ @classmethod
+ def _validateTtsVoiceMap(cls, value: Any) -> Optional[Dict[str, str]]:
+ return _normalizeTtsVoiceMap(value)
+
registerModelLabels(
"UserVoicePreferences",
diff --git a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py
index c803b375..e583c60b 100644
--- a/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py
+++ b/modules/features/neutralization/serviceNeutralization/mainServiceNeutralization.py
@@ -203,6 +203,89 @@ class NeutralizationService:
'processed_info': {'type': 'binary', 'status': 'error', 'error': str(e)}
}
+ async def processImageAsync(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]:
+ """Analyze image via internal vision model to check for sensitive content.
+
+ Returns dict with:
+ - 'status': 'ok' | 'blocked' | 'error'
+ - 'hasSensitiveContent': bool
+ - 'analysis': str (model's analysis text, if available)
+ - 'processed_info': dict with details
+
+ Uses NEUTRALIZATION_IMAGE operation type → only internal Private-LLM models.
+ If no internal model available → returns 'blocked'.
+ """
+ import base64
+ try:
+ aiService = None
+ if self._getService:
+ try:
+ aiService = self._getService("ai")
+ except Exception:
+ pass
+ if not aiService or not hasattr(aiService, 'callAi'):
+ logger.warning(f"processImage: AI service not available — blocking image '{fileName}'")
+ return {
+ 'status': 'blocked',
+ 'hasSensitiveContent': True,
+ 'analysis': '',
+ 'processed_info': {'type': 'image', 'status': 'blocked', 'reason': 'AI service unavailable'}
+ }
+
+ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
+
+ _b64Data = base64.b64encode(imageBytes).decode('utf-8')
+ _dataUrl = f"data:{mimeType};base64,{_b64Data}"
+
+ _prompt = (
+ "Analyze this image for personally identifiable information (PII). "
+ "Check for: names, addresses, phone numbers, email addresses, ID numbers, "
+ "faces, signatures, handwritten text, license plates, financial data. "
+ "Respond with JSON: {\"hasPII\": true/false, \"findings\": [\"...\"]}"
+ )
+
+ _request = AiCallRequest(
+ prompt=_prompt,
+ options=AiCallOptions(operationType=OperationTypeEnum.NEUTRALIZATION_IMAGE),
+ messages=[{"role": "user", "content": [
+ {"type": "text", "text": _prompt},
+ {"type": "image_url", "image_url": {"url": _dataUrl}},
+ ]}],
+ )
+
+ _response = await aiService.callAi(_request)
+
+ _hasPII = False
+ _analysis = _response.content if _response and hasattr(_response, 'content') else ''
+ if _analysis:
+ _lowerAnalysis = _analysis.lower()
+ if '"haspii": true' in _lowerAnalysis or '"haspii":true' in _lowerAnalysis:
+ _hasPII = True
+
+ return {
+ 'status': 'blocked' if _hasPII else 'ok',
+ 'hasSensitiveContent': _hasPII,
+ 'analysis': _analysis,
+ 'processed_info': {'type': 'image', 'status': 'blocked' if _hasPII else 'ok', 'fileName': fileName}
+ }
+ except Exception as e:
+ logger.error(f"processImage failed for '{fileName}': {e}")
+ return {
+ 'status': 'blocked',
+ 'hasSensitiveContent': True,
+ 'analysis': '',
+ 'processed_info': {'type': 'image', 'status': 'error', 'error': str(e)}
+ }
+
+ def processImage(self, imageBytes: bytes, fileName: str, mimeType: str = "image/png") -> Dict[str, Any]:
+ """Sync wrapper for processImageAsync. Uses asyncio.run when no event loop is running."""
+ import asyncio
+ try:
+ return asyncio.run(self.processImageAsync(imageBytes, fileName, mimeType))
+ except RuntimeError:
+ loop = asyncio.get_event_loop()
+ return loop.run_until_complete(self.processImageAsync(imageBytes, fileName, mimeType))
+
def resolveText(self, text: str) -> str:
if not self.interfaceNeutralizer:
return text
@@ -295,9 +378,21 @@ class NeutralizationService:
p = part if isinstance(part, dict) else part.model_dump() if hasattr(part, 'model_dump') else part
type_group = p.get('typeGroup', '')
data = p.get('data', '')
- if type_group in ('binary', 'image') or not (data and str(data).strip()):
+ if type_group == 'binary' or not (data and str(data).strip()):
neutralized_parts.append(part)
continue
+ if type_group == 'image':
+ import base64 as _b64img
+ try:
+ _imgBytes = _b64img.b64decode(str(data))
+ _imgResult = await self.processImageAsync(_imgBytes, fileName)
+ if _imgResult.get("status") == "ok":
+ neutralized_parts.append(part)
+ else:
+ logger.warning(f"Image part blocked in binary file '{fileName}' (PII detected), removing")
+ except Exception as _imgErr:
+ logger.warning(f"Image check failed in binary file '{fileName}': {_imgErr}, removing (fail-safe)")
+ continue
nr = self._neutralizeText(str(data), 'text' if type_group != 'table' else 'csv')
proc = nr.get('processed_info', {}) or {}
if isinstance(proc, dict) and proc.get('type') == 'error':
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 7eccb3ee..93b17d6a 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -201,6 +201,13 @@ def initBootstrap(db: DatabaseConnector) -> None:
except Exception as e:
logger.error(f"Voice & documents migration failed: {e}")
+ # Backfill FileContentIndex scope fields from FileItem (one-time)
+ try:
+ from modules.migration.migrateRagScopeFields import runMigration as migrateRagScope
+ migrateRagScope(appDb=db)
+ except Exception as e:
+ logger.error(f"RAG scope fields migration failed: {e}")
+
# After migration: root mandate is purely technical — no feature instances
if not migrationDone and mandateId:
initRootMandateFeatures(db, mandateId)
diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py
index 13179634..ffde890f 100644
--- a/modules/interfaces/interfaceDbApp.py
+++ b/modules/interfaces/interfaceDbApp.py
@@ -1931,14 +1931,26 @@ class AppObjects:
raise
logger.debug(f"Subscription capacity check skipped: {e}")
- def _syncSubscriptionQuantity(self, mandateId: str) -> None:
- """Sync Stripe subscription quantities after a resource mutation."""
+ def _syncSubscriptionQuantity(self, mandateId: str, *, raiseOnError: bool = False) -> None:
+ """Sync Stripe subscription quantities after a resource mutation.
+
+ Args:
+ raiseOnError: If True, propagate errors (billing-critical paths).
+ """
try:
from modules.interfaces.interfaceDbSubscription import getInterface as getSubInterface
from modules.security.rootAccess import getRootUser
subIf = getSubInterface(getRootUser(), mandateId)
- subIf.syncQuantityToStripe(mandateId)
+ operative = subIf.getOperativeForMandate(mandateId)
+ if not operative:
+ if raiseOnError:
+ raise ValueError(f"Kein operatives Abonnement für Mandant {mandateId}")
+ logger.debug("No operative subscription for mandate %s — quantity sync skipped", mandateId)
+ return
+ subIf.syncQuantityToStripe(operative["id"], raiseOnError=raiseOnError)
except Exception as e:
+ if raiseOnError:
+ raise
logger.debug(f"Subscription quantity sync skipped: {e}")
def deleteUserMandate(self, userId: str, mandateId: str) -> bool:
diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py
index 1069314f..bb2dc5c9 100644
--- a/modules/interfaces/interfaceDbBilling.py
+++ b/modules/interfaces/interfaceDbBilling.py
@@ -970,6 +970,41 @@ class BillingObjects:
)
return created
+ # =========================================================================
+ # Subscription AI-Budget Credit
+ # =========================================================================
+
+ def creditSubscriptionBudget(self, mandateId: str, planKey: str, periodLabel: str = "") -> Optional[Dict[str, Any]]:
+ """Credit the plan's budgetAiCHF to the mandate pool account.
+
+ Should be called once per billing period (initial activation + each invoice.paid).
+ Returns the created CREDIT transaction or None if budget is 0."""
+ from modules.datamodels.datamodelSubscription import _getPlan
+
+ plan = _getPlan(planKey)
+ if not plan or not plan.budgetAiCHF or plan.budgetAiCHF <= 0:
+ return None
+
+ poolAccount = self.getOrCreateMandateAccount(mandateId)
+ description = f"AI-Budget ({planKey})"
+ if periodLabel:
+ description += f" – {periodLabel}"
+
+ transaction = BillingTransaction(
+ accountId=poolAccount["id"],
+ transactionType=TransactionTypeEnum.CREDIT,
+ amount=plan.budgetAiCHF,
+ description=description,
+ referenceType=ReferenceTypeEnum.SUBSCRIPTION,
+ referenceId=mandateId,
+ )
+ created = self.createTransaction(transaction)
+ logger.info(
+ "AI-Budget credited mandate=%s plan=%s amount=%.2f CHF",
+ mandateId, planKey, plan.budgetAiCHF,
+ )
+ return created
+
# =========================================================================
# Workflow Cost Query
# =========================================================================
diff --git a/modules/interfaces/interfaceDbKnowledge.py b/modules/interfaces/interfaceDbKnowledge.py
index c7b9e29a..ede37c87 100644
--- a/modules/interfaces/interfaceDbKnowledge.py
+++ b/modules/interfaces/interfaceDbKnowledge.py
@@ -294,7 +294,6 @@ class KnowledgeObjects:
userId: str = None,
featureInstanceId: str = None,
mandateId: str = None,
- isShared: bool = None,
scope: str = None,
limit: int = 10,
minScore: float = None,
@@ -305,10 +304,9 @@ class KnowledgeObjects:
Args:
queryVector: Query embedding vector.
- userId: Filter by user (Instance Layer).
+ userId: Filter by user (personal scope).
featureInstanceId: Filter by feature instance.
- mandateId: Filter by mandate (for Shared Layer lookups).
- isShared: If True, search Shared Layer via FileContentIndex join.
+ mandateId: Filter by mandate (scope=mandate means visible to all mandate users).
scope: If provided, filter by this specific scope value.
If not provided, use scope-union approach (personal + featureInstance + mandate + global).
limit: Max results.
@@ -323,8 +321,13 @@ class KnowledgeObjects:
recordFilter["contentType"] = contentType
if scope:
+ scopeFilter: Dict[str, Any] = {"scope": scope}
+ if mandateId:
+ scopeFilter["mandateId"] = mandateId
+ if featureInstanceId:
+ scopeFilter["featureInstanceId"] = featureInstanceId
scopedFileIds = self.db.getRecordset(
- FileContentIndex, recordFilter={"scope": scope}
+ FileContentIndex, recordFilter=scopeFilter
)
fileIds = [
idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None)
@@ -334,16 +337,6 @@ class KnowledgeObjects:
if not fileIds:
return []
recordFilter["fileId"] = fileIds
- elif isShared and mandateId:
- sharedIndexes = self.db.getRecordset(
- FileContentIndex,
- recordFilter={"mandateId": mandateId, "isShared": True},
- )
- sharedFileIds = [idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None) for idx in sharedIndexes]
- sharedFileIds = [fid for fid in sharedFileIds if fid]
- if not sharedFileIds:
- return []
- recordFilter["fileId"] = sharedFileIds
elif userId or featureInstanceId or mandateId:
scopedFileIds = self._getScopedFileIds(
userId=userId,
@@ -410,7 +403,7 @@ class KnowledgeObjects:
if mandateId:
files_shared = self.db.getRecordset(
FileContentIndex,
- recordFilter={"mandateId": mandateId, "isShared": True},
+ recordFilter={"mandateId": mandateId, "scope": "mandate"},
)
by_id: Dict[str, Dict[str, Any]] = {}
@@ -559,6 +552,76 @@ class KnowledgeObjects:
}
+def aggregateMandateRagTotalBytes(mandateId: str) -> int:
+ """Sum FileContentIndex.totalSize for a mandate.
+
+ Primary strategy (relies on correct scope fields on FileContentIndex):
+ 1. FileContentIndex rows with mandateId on the index
+ 2. FileContentIndex rows with featureInstanceId of any mandate FeatureInstance
+ Deduplicates by id.
+ """
+ if not mandateId:
+ return 0
+ from modules.datamodels.datamodelFeatures import FeatureInstance
+ from modules.interfaces.interfaceDbApp import getRootInterface
+
+ knowDb = getInterface(None).db
+ appDb = getRootInterface().db
+ byId: Dict[str, Dict[str, Any]] = {}
+
+ for row in knowDb.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId}):
+ rid = row.get("id")
+ if rid:
+ byId[str(rid)] = row
+
+ instances = appDb.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
+ instIds = [str(inst.get("id", "")) for inst in instances if inst.get("id")]
+
+ for instId in instIds:
+ for row in knowDb.getRecordset(FileContentIndex, recordFilter={"featureInstanceId": instId}):
+ rid = row.get("id")
+ if rid and str(rid) not in byId:
+ byId[str(rid)] = row
+
+ # DEPRECATED: file-ID-correlation fallback from poweron_management.
+ # Only needed for pre-migration data where mandateId/featureInstanceId on the
+ # FileContentIndex are empty. Remove once migrateRagScopeFields has been run.
+ _fallbackCount = 0
+ try:
+ from modules.datamodels.datamodelFiles import FileItem
+ from modules.interfaces.interfaceDbManagement import ComponentObjects
+ mgmtDb = ComponentObjects().db
+ knowledgeIf = getInterface(None)
+
+ fileIds: set = set()
+ for f in mgmtDb.getRecordset(FileItem, recordFilter={"mandateId": mandateId}):
+ fid = f.get("id") if isinstance(f, dict) else getattr(f, "id", None)
+ if fid:
+ fileIds.add(str(fid))
+ for instId in instIds:
+ for f in mgmtDb.getRecordset(FileItem, recordFilter={"featureInstanceId": instId}):
+ fid = f.get("id") if isinstance(f, dict) else getattr(f, "id", None)
+ if fid:
+ fileIds.add(str(fid))
+
+ for fid in fileIds:
+ if fid in byId:
+ continue
+ row = knowledgeIf.getFileContentIndex(fid)
+ if row:
+ byId[fid] = row
+ _fallbackCount += 1
+ except Exception as e:
+ logger.warning("aggregateMandateRagTotalBytes fallback failed: %s", e)
+
+ total = sum(int(r.get("totalSize") or 0) for r in byId.values())
+ logger.info(
+ "aggregateMandateRagTotalBytes(%s): %d indexes, %d bytes (fallback: %d)",
+ mandateId, len(byId), total, _fallbackCount,
+ )
+ return total
+
+
def getInterface(currentUser: Optional[User] = None) -> KnowledgeObjects:
"""Get or create a KnowledgeObjects singleton."""
if "default" not in _instances:
diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py
index 28842958..0a16b734 100644
--- a/modules/interfaces/interfaceDbManagement.py
+++ b/modules/interfaces/interfaceDbManagement.py
@@ -1053,15 +1053,20 @@ class ComponentObjects:
# Ensure fileName is unique
uniqueName = self._generateUniquefileName(name)
- # Use mandateId and featureInstanceId from context for proper data isolation
- # Convert None to empty string to satisfy Pydantic validation
mandateId = self.mandateId or ""
featureInstanceId = self.featureInstanceId or ""
-
- # Create FileItem instance
+
+ if featureInstanceId:
+ scope = "featureInstance"
+ elif mandateId:
+ scope = "mandate"
+ else:
+ scope = "personal"
+
fileItem = FileItem(
mandateId=mandateId,
featureInstanceId=featureInstanceId,
+ scope=scope,
fileName=uniqueName,
mimeType=mimeType,
fileSize=fileSize,
diff --git a/modules/interfaces/interfaceDbSubscription.py b/modules/interfaces/interfaceDbSubscription.py
index d6832f14..f1d7ccf7 100644
--- a/modules/interfaces/interfaceDbSubscription.py
+++ b/modules/interfaces/interfaceDbSubscription.py
@@ -309,13 +309,11 @@ class SubscriptionObjects:
return self._getMandateDataVolumeMB(mandateId)
def _getMandateDataVolumeMB(self, mandateId: str) -> float:
- """Sum RAG index size (FileContentIndex.totalSize) across all feature instances of the mandate."""
+ """Sum RAG index size (FileContentIndex.totalSize) for the mandate; reads poweron_knowledge."""
try:
- from modules.datamodels.datamodelKnowledge import FileContentIndex
- knowledgeDb = _getAppDatabaseConnector()
- indexes = knowledgeDb.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId})
- totalBytes = sum(int(idx.get("totalSize") or 0) for idx in indexes)
- return totalBytes / (1024 * 1024)
+ from modules.interfaces.interfaceDbKnowledge import aggregateMandateRagTotalBytes
+
+ return aggregateMandateRagTotalBytes(mandateId) / (1024 * 1024)
except Exception:
return 0.0
@@ -359,11 +357,18 @@ class SubscriptionObjects:
# Stripe quantity sync
# =========================================================================
- def syncQuantityToStripe(self, subscriptionId: str) -> None:
+ def syncQuantityToStripe(self, subscriptionId: str, *, raiseOnError: bool = False) -> None:
"""Update Stripe subscription item quantities to match actual active counts.
- Takes subscriptionId, not mandateId."""
+ Takes subscriptionId, not mandateId.
+
+ Args:
+ raiseOnError: If True, propagate Stripe API errors instead of logging them.
+ Use True for billing-critical paths (store activation).
+ """
sub = self.getById(subscriptionId)
if not sub or not sub.get("stripeSubscriptionId"):
+ if raiseOnError:
+ raise ValueError(f"Subscription {subscriptionId} hat keine Stripe-Anbindung — Abrechnung nicht möglich.")
return
mandateId = sub["mandateId"]
@@ -389,3 +394,5 @@ class SubscriptionObjects:
logger.info("Stripe quantity synced for sub %s: users=%d, instances=%d", subscriptionId, activeUsers, activeInstances)
except Exception as e:
logger.error("syncQuantityToStripe(%s) failed: %s", subscriptionId, e)
+ if raiseOnError:
+ raise
diff --git a/modules/migration/migrateRagScopeFields.py b/modules/migration/migrateRagScopeFields.py
new file mode 100644
index 00000000..82e0e3fb
--- /dev/null
+++ b/modules/migration/migrateRagScopeFields.py
@@ -0,0 +1,114 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Migration: Backfill FileContentIndex scope fields from FileItem (Single Source of Truth).
+
+Fixes legacy rows in poweron_knowledge where scope/mandateId/featureInstanceId
+are empty or default ("personal") despite the corresponding FileItem having correct values.
+
+Idempotent — safe to run multiple times. Uses a DB flag to skip if already completed.
+"""
+
+import logging
+from modules.shared.configuration import APP_CONFIG
+from modules.connectors.connectorDbPostgre import _get_cached_connector
+
+logger = logging.getLogger(__name__)
+
+_MIGRATION_FLAG_KEY = "migration_rag_scope_fields_completed"
+
+
+def _isMigrationCompleted(appDb) -> bool:
+ try:
+ from modules.datamodels.datamodelUam import Mandate
+ records = appDb.getRecordset(Mandate, recordFilter={"name": _MIGRATION_FLAG_KEY})
+ return len(records) > 0
+ except Exception:
+ return False
+
+
+def _setMigrationCompleted(appDb) -> None:
+ try:
+ from modules.datamodels.datamodelUam import Mandate
+ flag = Mandate(name=_MIGRATION_FLAG_KEY, description="RAG scope fields migration completed")
+ appDb.recordCreate(Mandate, flag)
+ except Exception as e:
+ logger.error("Could not set migration flag: %s", e)
+
+
+def runMigration(appDb=None) -> dict:
+ """Backfill FileContentIndex rows from FileItem metadata.
+
+ Returns dict with counts: {total, updated, skipped, orphaned}.
+ """
+ from modules.datamodels.datamodelKnowledge import FileContentIndex
+ from modules.datamodels.datamodelFiles import FileItem
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ from modules.interfaces.interfaceDbManagement import ComponentObjects
+
+ if appDb is None:
+ from modules.interfaces.interfaceDbApp import getRootInterface
+ appDb = getRootInterface().db
+
+ if _isMigrationCompleted(appDb):
+ logger.info("migrateRagScopeFields: already completed, skipping")
+ return {"total": 0, "updated": 0, "skipped": 0, "orphaned": 0}
+
+ knowDb = getKnowledgeInterface(None).db
+ mgmtDb = ComponentObjects().db
+
+ allIndexes = knowDb.getRecordset(FileContentIndex, recordFilter={})
+ total = len(allIndexes)
+ updated = 0
+ skipped = 0
+ orphaned = 0
+
+ logger.info("migrateRagScopeFields: processing %d FileContentIndex rows", total)
+
+ for idx in allIndexes:
+ idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None)
+ if not idxId:
+ skipped += 1
+ continue
+
+ fileItem = mgmtDb._loadRecord(FileItem, str(idxId))
+ if not fileItem:
+ orphaned += 1
+ continue
+
+ _get = (lambda k, d="": fileItem.get(k, d)) if isinstance(fileItem, dict) else (lambda k, d="": getattr(fileItem, k, d))
+
+ fiScope = _get("scope") or "personal"
+ fiMandateId = str(_get("mandateId") or "")
+ fiFeatureInstanceId = str(_get("featureInstanceId") or "")
+
+ idxGet = (lambda k, d="": idx.get(k, d)) if isinstance(idx, dict) else (lambda k, d="": getattr(idx, k, d))
+ currentScope = idxGet("scope") or "personal"
+ currentMandateId = str(idxGet("mandateId") or "")
+ currentFeatureInstanceId = str(idxGet("featureInstanceId") or "")
+
+ updates = {}
+ if fiScope != currentScope:
+ updates["scope"] = fiScope
+ if fiMandateId and fiMandateId != currentMandateId:
+ updates["mandateId"] = fiMandateId
+ if fiFeatureInstanceId and fiFeatureInstanceId != currentFeatureInstanceId:
+ updates["featureInstanceId"] = fiFeatureInstanceId
+
+ if updates:
+ try:
+ knowDb.recordModify(FileContentIndex, str(idxId), updates)
+ updated += 1
+ logger.debug("migrateRagScopeFields: updated %s -> %s", idxId, updates)
+ except Exception as e:
+ logger.error("migrateRagScopeFields: failed to update %s: %s", idxId, e)
+ skipped += 1
+ else:
+ skipped += 1
+
+ _setMigrationCompleted(appDb)
+ logger.info(
+ "migrateRagScopeFields complete: total=%d, updated=%d, skipped=%d, orphaned=%d",
+ total, updated, skipped, orphaned,
+ )
+ return {"total": total, "updated": updated, "skipped": skipped, "orphaned": orphaned}
diff --git a/modules/routes/routeAdminFeatures.py b/modules/routes/routeAdminFeatures.py
index 12206b06..e69df7b9 100644
--- a/modules/routes/routeAdminFeatures.py
+++ b/modules/routes/routeAdminFeatures.py
@@ -576,14 +576,15 @@ def create_feature_instance(
config=data.config
)
- # Sync Stripe quantity after successful creation
try:
from modules.interfaces.interfaceDbSubscription import getInterface as _getSubIf2
from modules.security.rootAccess import getRootUser as _getRU
_subIf2 = _getSubIf2(_getRU(), mandateIdStr)
- _subIf2.syncQuantityToStripe(mandateIdStr)
- except Exception:
- pass
+ _operative = _subIf2.getOperativeForMandate(mandateIdStr)
+ if _operative:
+ _subIf2.syncQuantityToStripe(_operative["id"], raiseOnError=True)
+ except Exception as e:
+ logger.error("Stripe quantity sync failed for admin feature creation in mandate %s: %s", mandateIdStr, e)
logger.info(
f"User {context.user.id} created feature instance '{data.label}' "
diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py
index 13e94559..0f612d45 100644
--- a/modules/routes/routeBilling.py
+++ b/modules/routes/routeBilling.py
@@ -1104,6 +1104,12 @@ def _handleSubscriptionCheckoutCompleted(session, eventId: str) -> None:
updatedSub = subInterface.getById(subscriptionRecordId)
_notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=updatedSub, platformUrl=platformUrl)
+ try:
+ billingIf = _getRootInterface()
+ billingIf.creditSubscriptionBudget(mandateId, planKey, periodLabel="Erstaktivierung")
+ except Exception as ex:
+ logger.error("creditSubscriptionBudget on activation failed: %s", ex)
+
logger.info(
"Checkout completed: sub=%s -> %s, mandate=%s, plan=%s",
subscriptionRecordId, toStatus.value, mandateId, planKey,
@@ -1162,9 +1168,14 @@ def _handleSubscriptionWebhook(event) -> None:
if stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.SCHEDULED:
subInterface.transitionStatus(subId, SubscriptionStatusEnum.SCHEDULED, SubscriptionStatusEnum.ACTIVE)
subService.invalidateCache(mandateId)
- plan = _getPlan(sub.get("planKey", ""))
+ planKey = sub.get("planKey", "")
+ plan = _getPlan(planKey)
refreshedSub = subInterface.getById(subId)
_notifySubscriptionChange(mandateId, "activated", plan, subscriptionRecord=refreshedSub, platformUrl=webhookPlatformUrl)
+ try:
+ _getRootInterface().creditSubscriptionBudget(mandateId, planKey, periodLabel="Erstaktivierung")
+ except Exception as ex:
+ logger.error("creditSubscriptionBudget SCHEDULED->ACTIVE failed: %s", ex)
logger.info("SCHEDULED -> ACTIVE for sub %s (mandate %s)", subId, mandateId)
elif stripeStatus == "active" and currentStatus == SubscriptionStatusEnum.PAST_DUE:
@@ -1231,14 +1242,24 @@ def _handleSubscriptionWebhook(event) -> None:
elif event.type == "invoice.paid":
period_ts = obj.get("period_start")
+ periodLabel = ""
if period_ts:
period_start_at = datetime.fromtimestamp(int(period_ts), tz=timezone.utc)
+ periodLabel = period_start_at.strftime("%Y-%m-%d")
try:
billing_if = _getRootInterface()
billing_if.resetStorageBillingPeriod(mandateId, period_start_at)
billing_if.reconcileMandateStorageBilling(mandateId)
except Exception as ex:
logger.error("Storage billing on invoice.paid failed: %s", ex)
+
+ planKey = sub.get("planKey", "")
+ try:
+ billing_if = _getRootInterface()
+ billing_if.creditSubscriptionBudget(mandateId, planKey, periodLabel=periodLabel or "Periodenverlängerung")
+ except Exception as ex:
+ logger.error("creditSubscriptionBudget on invoice.paid failed: %s", ex)
+
logger.info("Invoice paid for sub %s (mandate %s)", subId, mandateId)
return None
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index e95da174..17e0ef56 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -1,6 +1,6 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
-from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, Path, Request, status, Query, Response, Body
+from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, Path, Request, status, Query, Response, Body, BackgroundTasks
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
import logging
@@ -41,13 +41,16 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user):
file_meta = mgmtInterface.getFile(fileId)
feature_instance_id = ""
mandate_id = ""
+ file_scope = "personal"
if file_meta:
if isinstance(file_meta, dict):
feature_instance_id = file_meta.get("featureInstanceId") or ""
mandate_id = file_meta.get("mandateId") or ""
+ file_scope = file_meta.get("scope") or "personal"
else:
feature_instance_id = getattr(file_meta, "featureInstanceId", None) or ""
mandate_id = getattr(file_meta, "mandateId", None) or ""
+ file_scope = getattr(file_meta, "scope", None) or "personal"
logger.info(f"Auto-index starting for {fileName} ({len(rawBytes)} bytes, {mimeType})")
@@ -61,6 +64,7 @@ async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user):
userId=userId,
featureInstanceId=str(feature_instance_id) if feature_instance_id else "",
mandateId=str(mandate_id) if mandate_id else "",
+ scope=file_scope,
)
logger.info(
f"Pre-scan complete for {fileName}: "
@@ -667,6 +671,7 @@ def batch_move_items(
@limiter.limit("30/minute")
def updateFileScope(
request: Request,
+ background_tasks: BackgroundTasks,
fileId: str = Path(..., description="ID of the file"),
scope: str = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
@@ -700,19 +705,18 @@ def updateFileScope(
except Exception as e:
logger.warning(f"Failed to update FileContentIndex scope for file {fileId}: {e}")
- # Trigger re-indexing so RAG embeddings metadata reflects the new scope
- try:
- fileMeta = managementInterface.getFile(fileId)
- if fileMeta:
- import asyncio
- asyncio.ensure_future(_autoIndexFile(
- fileId=fileId,
- fileName=fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", ""),
- mimeType=fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", ""),
- user=context.user,
- ))
- except Exception as e:
- logger.warning(f"Failed to trigger re-index after scope change for file {fileId}: {e}")
+ fileMeta = managementInterface.getFile(fileId)
+ if fileMeta:
+ fn = fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", "")
+ mt = fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", "")
+
+ async def _runReindexAfterScopeChange():
+ try:
+ await _autoIndexFile(fileId=fileId, fileName=fn, mimeType=mt, user=context.user)
+ except Exception as ex:
+ logger.warning("Re-index after scope change failed for %s: %s", fileId, ex)
+
+ background_tasks.add_task(_runReindexAfterScopeChange)
return {"fileId": fileId, "scope": scope, "updated": True}
except HTTPException:
@@ -726,11 +730,18 @@ def updateFileScope(
@limiter.limit("30/minute")
def updateFileNeutralize(
request: Request,
+ background_tasks: BackgroundTasks,
fileId: str = Path(..., description="ID of the file"),
neutralize: bool = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
- """Toggle neutralization flag on a file."""
+ """Toggle neutralization flag on a file.
+
+ FAILSAFE: When turning neutralize ON, the existing Knowledge Store index
+ and all content chunks are deleted SYNCHRONOUSLY before the response is
+ returned. The re-index happens in a background task. If re-indexing
+ fails the file simply has no index — no un-neutralized data can leak.
+ """
try:
managementInterface = interfaceDbManagement.getInterface(
context.user,
@@ -740,35 +751,54 @@ def updateFileNeutralize(
managementInterface.updateFile(fileId, {"neutralize": neutralize})
- # Update FileContentIndex neutralization metadata
- try:
- from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
- from modules.datamodels.datamodelKnowledge import FileContentIndex
- knowledgeDb = getKnowledgeInterface()
- neutralizationStatus = "neutralized" if neutralize else "original"
- indices = knowledgeDb.db.getRecordset(FileContentIndex, recordFilter={"id": fileId})
- for idx in indices:
- idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None)
- if idxId:
- knowledgeDb.db.recordModify(FileContentIndex, idxId, {"neutralizationStatus": neutralizationStatus})
- except Exception as e:
- logger.warning(f"Failed to update FileContentIndex neutralize for file {fileId}: {e}")
+ from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
+ knowledgeDb = getKnowledgeInterface()
- # Trigger re-indexing so content is re-processed with/without neutralization
- try:
- fileMeta = managementInterface.getFile(fileId)
- if fileMeta:
- import asyncio
- asyncio.ensure_future(_autoIndexFile(
- fileId=fileId,
- fileName=fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", ""),
- mimeType=fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", ""),
- user=context.user,
- ))
- except Exception as e:
- logger.warning(f"Failed to trigger re-index after neutralize change for file {fileId}: {e}")
+ if neutralize:
+ # ── CRITICAL: purge existing (potentially un-neutralized) index
+ # This MUST succeed before the response is sent so that no stale
+ # raw-text chunks remain searchable while re-indexing runs.
+ try:
+ knowledgeDb.deleteFileContentIndex(fileId)
+ logger.info("Neutralize toggle ON: deleted index + chunks for file %s", fileId)
+ except Exception as e:
+ logger.error("Neutralize toggle ON: FAILED to delete index for file %s: %s", fileId, e)
+ raise HTTPException(
+ status_code=500,
+ detail=f"Could not purge existing index for neutralization — aborting toggle. Error: {e}",
+ )
+ else:
+ # Turning neutralize OFF: update metadata only; re-index will overwrite
+ try:
+ from modules.datamodels.datamodelKnowledge import FileContentIndex
+ indices = knowledgeDb.db.getRecordset(FileContentIndex, recordFilter={"id": fileId})
+ for idx in indices:
+ idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None)
+ if idxId:
+ knowledgeDb.db.recordModify(FileContentIndex, idxId, {
+ "neutralizationStatus": "original",
+ "isNeutralized": False,
+ })
+ except Exception as e:
+ logger.warning("Failed to update FileContentIndex after neutralize-OFF for %s: %s", fileId, e)
+
+ # Background re-index (safe: if it fails, there is simply no index)
+ fileMeta = managementInterface.getFile(fileId)
+ if fileMeta:
+ fn = fileMeta.fileName if hasattr(fileMeta, "fileName") else fileMeta.get("fileName", "")
+ mt = fileMeta.mimeType if hasattr(fileMeta, "mimeType") else fileMeta.get("mimeType", "")
+
+ async def _runReindexAfterNeutralizeToggle():
+ try:
+ await _autoIndexFile(fileId=fileId, fileName=fn, mimeType=mt, user=context.user)
+ except Exception as ex:
+ logger.error("Re-index after neutralize toggle failed for %s: %s (file has NO index until next re-index)", fileId, ex)
+
+ background_tasks.add_task(_runReindexAfterNeutralizeToggle)
return {"fileId": fileId, "neutralize": neutralize, "updated": True}
+ except HTTPException:
+ raise
except Exception as e:
logger.error(f"Error updating file neutralize flag: {e}")
raise HTTPException(status_code=500, detail=str(e))
diff --git a/modules/routes/routeStore.py b/modules/routes/routeStore.py
index c9512d3f..5c6f782a 100644
--- a/modules/routes/routeStore.py
+++ b/modules/routes/routeStore.py
@@ -282,8 +282,9 @@ def activateStoreFeature(
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
- Activate a store feature. Creates a new FeatureInstance in the target mandate.
- If user has no admin mandate, auto-creates a personal mandate.
+ Activate a store feature. Billing-gated: a feature instance is ONLY created
+ if the Stripe subscription quantity update succeeds (proration confirmed).
+ On any billing failure the provisioned instance is rolled back.
"""
featureCode = data.featureCode
userId = str(context.user.id)
@@ -302,21 +303,39 @@ def activateStoreFeature(
if not _isUserAdminInMandate(db, userId, mandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not admin in target mandate")
- # Check subscription capacity
- from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS
- subs = db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId})
- if subs:
- sub = subs[0]
- plan = BUILTIN_PLANS.get(sub.get("planKey"))
- if plan and plan.maxFeatureInstances is not None:
- currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
- if len(currentInstances) >= plan.maxFeatureInstances:
- raise HTTPException(
- status_code=status.HTTP_402_PAYMENT_REQUIRED,
- detail=f"Feature instance limit reached ({plan.maxFeatureInstances}). Upgrade your plan."
- )
+ # ── 1. Resolve subscription & plan ──────────────────────────────
+ from modules.datamodels.datamodelSubscription import MandateSubscription, BUILTIN_PLANS, SubscriptionStatusEnum
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRoot
- # Create new FeatureInstance
+ subInterface = _getSubRoot()
+ operative = subInterface.getOperativeForMandate(mandateId)
+ if not operative:
+ raise HTTPException(
+ status_code=status.HTTP_402_PAYMENT_REQUIRED,
+ detail="Kein aktives Abonnement. Bitte zuerst ein Abo abschliessen.",
+ )
+
+ planKey = operative.get("planKey", "")
+ plan = BUILTIN_PLANS.get(planKey)
+ isBillable = plan is not None and (plan.pricePerFeatureInstanceCHF or 0) > 0
+
+ if isBillable:
+ if not operative.get("stripeSubscriptionId") or not operative.get("stripeItemIdInstances"):
+ raise HTTPException(
+ status_code=status.HTTP_402_PAYMENT_REQUIRED,
+ detail="Stripe-Abonnement ist nicht vollständig eingerichtet — Aktivierung nicht möglich.",
+ )
+
+ # ── 2. Capacity check ───────────────────────────────────────────
+ if plan and plan.maxFeatureInstances is not None:
+ currentInstances = db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
+ if len(currentInstances) >= plan.maxFeatureInstances:
+ raise HTTPException(
+ status_code=status.HTTP_402_PAYMENT_REQUIRED,
+ detail=f"Feature-Instanz-Limit erreicht ({plan.maxFeatureInstances}). Bitte Plan upgraden.",
+ )
+
+ # ── 3. Provision instance ───────────────────────────────────────
featureInterface = getFeatureInterface(db)
featureLabel = featureDef.get("label", {}).get("en", featureCode)
instance = featureInterface.createFeatureInstance(
@@ -332,7 +351,6 @@ def activateStoreFeature(
instanceId = instance.get("id") if isinstance(instance, dict) else instance.id
- # Grant FeatureAccess with admin role — MUST be feature-specific (e.g. workspace-admin)
instanceRoles = db.getRecordset(Role, recordFilter={"featureInstanceId": instanceId})
adminRoleId = None
for ir in instanceRoles:
@@ -342,21 +360,34 @@ def activateStoreFeature(
break
if not adminRoleId:
+ _rollbackInstance(db, instanceId)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail=f"No feature-specific admin role (e.g. {featureCode}-admin) found for instance {instanceId}. "
- f"Template roles were not correctly copied.",
+ detail=f"Keine Feature-Admin-Rolle für {featureCode} gefunden — Rollback.",
)
rootInterface.createFeatureAccess(userId, instanceId, roleIds=[adminRoleId])
- # Sync subscription quantity
- try:
- rootInterface._syncSubscriptionQuantity(mandateId)
- except Exception as e:
- logger.warning(f"Failed to sync subscription quantity: {e}")
+ # ── 4. Billing gate: Stripe quantity sync (MUST succeed) ────────
+ if isBillable:
+ try:
+ rootInterface._syncSubscriptionQuantity(mandateId, raiseOnError=True)
+ except Exception as e:
+ logger.error("Stripe billing for feature activation failed — rolling back instance %s: %s", instanceId, e)
+ _rollbackInstance(db, instanceId, userId=userId)
+ raise HTTPException(
+ status_code=status.HTTP_402_PAYMENT_REQUIRED,
+ detail=f"Stripe-Abrechnung fehlgeschlagen: {e}. Feature wurde NICHT aktiviert.",
+ )
+ else:
+ try:
+ rootInterface._syncSubscriptionQuantity(mandateId)
+ except Exception as e:
+ logger.warning("Non-critical Stripe sync failed for free feature: %s", e)
- logger.info(f"User {userId} activated '{featureCode}' in mandate {mandateId} (instance={instanceId})")
+ # ── 5. Confirmed — notify ──────────────────────────────────────
+ _notifyFeatureActivation(mandateId, featureLabel, featureCode, sub=operative, plan=plan)
+ logger.info("User %s activated '%s' in mandate %s (instance=%s, billed=%s)", userId, featureCode, mandateId, instanceId, isBillable)
return {
"featureCode": featureCode,
@@ -412,11 +443,10 @@ def deactivateStoreFeature(
instanceDeleted = True
logger.info(f"Orphan Control: deleted instance {instanceId} (no remaining accesses)")
- # Sync subscription quantity
try:
- rootInterface._syncSubscriptionQuantity(mandateId)
+ rootInterface._syncSubscriptionQuantity(mandateId, raiseOnError=True)
except Exception as e:
- logger.warning(f"Failed to sync subscription quantity: {e}")
+ logger.error("Stripe quantity sync after deactivation failed for mandate %s: %s", mandateId, e)
logger.info(f"User {userId} deactivated instance {instanceId} in mandate {mandateId} (deleted={instanceDeleted})")
@@ -433,3 +463,52 @@ def deactivateStoreFeature(
except Exception as e:
logger.error(f"Error deactivating store feature: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+
+
+# ============================================================================
+# Internal helpers
+# ============================================================================
+
+def _rollbackInstance(db, instanceId: str, userId: str = None) -> None:
+ """Delete a freshly provisioned FeatureInstance (and its access) on billing failure."""
+ try:
+ if userId:
+ accesses = db.getRecordset(FeatureAccess, recordFilter={"userId": userId, "featureInstanceId": instanceId})
+ for a in accesses:
+ db.recordDelete(FeatureAccess, a.get("id"))
+ db.recordDelete(FeatureInstance, instanceId)
+ logger.info("Rolled back feature instance %s (billing gate)", instanceId)
+ except Exception as e:
+ logger.error("Rollback of instance %s failed: %s", instanceId, e)
+
+
+def _notifyFeatureActivation(
+ mandateId: str,
+ featureLabel: str,
+ featureCode: str,
+ sub: dict = None,
+ plan = None,
+) -> None:
+ """Send email notification to mandate admins about a newly activated feature."""
+ try:
+ from modules.shared.notifyMandateAdmins import notifyMandateAdmins
+
+ priceLine = ""
+ if plan and plan.pricePerFeatureInstanceCHF:
+ priceLine = f"Kosten: CHF {plan.pricePerFeatureInstanceCHF:.2f} / {plan.billingPeriod.value} (anteilig via Stripe-Proration)."
+
+ bodyParagraphs = [
+ f"Die Feature-Instanz «{featureLabel}» ({featureCode}) wurde soeben für Ihren Mandanten aktiviert.",
+ ]
+ if priceLine:
+ bodyParagraphs.append(priceLine)
+ bodyParagraphs.append("Die Stripe-Abrechnung wird automatisch angepasst.")
+
+ notifyMandateAdmins(
+ mandateId=mandateId,
+ subject=f"Feature aktiviert: {featureLabel}",
+ headline="Neue Feature-Instanz aktiviert",
+ bodyParagraphs=bodyParagraphs,
+ )
+ except Exception as e:
+ logger.warning("_notifyFeatureActivation failed for mandate %s: %s", mandateId, e)
diff --git a/modules/routes/routeSubscription.py b/modules/routes/routeSubscription.py
index 7aad386f..88d0b21c 100644
--- a/modules/routes/routeSubscription.py
+++ b/modules/routes/routeSubscription.py
@@ -183,7 +183,7 @@ def activatePlan(
@router.post("/cancel", response_model=Dict[str, Any])
-@limiter.limit("5/minute")
+@limiter.limit("30/minute")
def cancelSubscription(
request: Request,
data: CancelRequest,
@@ -209,7 +209,7 @@ def cancelSubscription(
@router.post("/reactivate", response_model=Dict[str, Any])
-@limiter.limit("5/minute")
+@limiter.limit("30/minute")
def reactivateSubscription(
request: Request,
data: ReactivateRequest,
@@ -235,7 +235,7 @@ def reactivateSubscription(
@router.post("/force-cancel", response_model=Dict[str, Any])
-@limiter.limit("5/minute")
+@limiter.limit("30/minute")
def forceCancel(
request: Request,
data: ForceCancelRequest,
@@ -451,46 +451,47 @@ def _getDataVolumeUsage(
"""Calculate current data volume usage for a mandate vs. plan limit."""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelFiles import FileItem
- from modules.datamodels.datamodelSubscription import MandateSubscription, SubscriptionPlan
- from modules.datamodels.datamodelFeature import FeatureInstance
+ from modules.datamodels.datamodelFeatures import FeatureInstance
+ from modules.interfaces.interfaceDbKnowledge import aggregateMandateRagTotalBytes
+ from modules.interfaces.interfaceDbManagement import getInterface as getMgmtInterface
+ from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRootIf
rootIf = getRootInterface()
mandateId = targetMandateId
instances = rootIf.db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
- totalBytes = 0
- for inst in instances:
- instId = inst.get("id") if isinstance(inst, dict) else getattr(inst, "id", None)
- if not instId:
- continue
- files = rootIf.db.getRecordset(FileItem, recordFilter={"featureInstanceId": instId})
+ instIds = [str(inst.get("id") or "") for inst in instances if inst.get("id")]
+
+ mgmtDb = getMgmtInterface().db
+ totalFileBytes = 0
+ for instId in instIds:
+ files = mgmtDb.getRecordset(FileItem, recordFilter={"featureInstanceId": instId})
for f in files:
size = f.get("fileSize") if isinstance(f, dict) else getattr(f, "fileSize", 0)
- totalBytes += (size or 0)
+ totalFileBytes += (size or 0)
+ mandateFiles = mgmtDb.getRecordset(FileItem, recordFilter={"mandateId": mandateId})
+ for f in mandateFiles:
+ size = f.get("fileSize") if isinstance(f, dict) else getattr(f, "fileSize", 0)
+ totalFileBytes += (size or 0)
+ filesMB = round(totalFileBytes / (1024 * 1024), 2)
- filesMB = round(totalBytes / (1024 * 1024), 2)
-
- from modules.datamodels.datamodelKnowledge import FileContentIndex
- ragIndexes = rootIf.db.getRecordset(FileContentIndex, recordFilter={"mandateId": mandateId})
- ragBytes = sum(int(idx.get("totalSize") or 0) if isinstance(idx, dict) else int(getattr(idx, "totalSize", 0) or 0) for idx in ragIndexes)
+ ragBytes = aggregateMandateRagTotalBytes(mandateId)
ragMB = round(ragBytes / (1024 * 1024), 2)
maxMB = None
- subs = rootIf.db.getRecordset(MandateSubscription, recordFilter={"mandateId": mandateId})
- for sub in subs:
- planKey = sub.get("planKey") if isinstance(sub, dict) else getattr(sub, "planKey", "")
- if planKey:
- plans = rootIf.db.getRecordset(SubscriptionPlan, recordFilter={"planKey": planKey})
- for plan in plans:
- limit = plan.get("maxDataVolumeMB") if isinstance(plan, dict) else getattr(plan, "maxDataVolumeMB", None)
- if limit:
- maxMB = limit
- break
- if maxMB:
- break
+ subIf = _getSubRootIf()
+ operative = subIf.getOperativeForMandate(mandateId)
+ if operative:
+ plan = subIf.getPlan(operative.get("planKey") or "")
+ if plan and plan.maxDataVolumeMB is not None:
+ maxMB = int(plan.maxDataVolumeMB)
usedMB = ragMB
percentUsed = round((usedMB / maxMB) * 100, 1) if maxMB else None
+ logger.info(
+ "data-volume mandate=%s: files=%.2f MB, rag=%.2f MB, max=%s MB",
+ mandateId, filesMB, ragMB, maxMB,
+ )
return {
"mandateId": mandateId,
"usedMB": usedMB,
diff --git a/modules/routes/routeVoiceGoogle.py b/modules/routes/routeVoiceGoogle.py
index dc0c7a85..1c796361 100644
--- a/modules/routes/routeVoiceGoogle.py
+++ b/modules/routes/routeVoiceGoogle.py
@@ -463,7 +463,7 @@ async def save_voice_settings(
currentUser: User = Depends(getCurrentUser)
):
"""Save voice settings for the current user (writes to UserVoicePreferences)."""
- from modules.datamodels.datamodelUam import UserVoicePreferences
+ from modules.datamodels.datamodelUam import UserVoicePreferences, _normalizeTtsVoiceMap
from modules.security.rootAccess import getRootInterface
rootInterface = getRootInterface()
userId = str(currentUser.id)
@@ -473,6 +473,8 @@ async def save_voice_settings(
"translationSourceLanguage", "translationTargetLanguage",
}
updateData = {k: v for k, v in settings.items() if k in allowedFields}
+ if "ttsVoiceMap" in updateData:
+ updateData["ttsVoiceMap"] = _normalizeTtsVoiceMap(updateData["ttsVoiceMap"])
existing = rootInterface.db.getRecordset(
UserVoicePreferences, recordFilter={"userId": userId}
diff --git a/modules/routes/routeVoiceUser.py b/modules/routes/routeVoiceUser.py
index 9b628eeb..2f21662b 100644
--- a/modules/routes/routeVoiceUser.py
+++ b/modules/routes/routeVoiceUser.py
@@ -14,7 +14,7 @@ from typing import Any, Dict
from fastapi import APIRouter, Body, Depends, HTTPException, Query, Request, status
from modules.auth import getCurrentUser, limiter
-from modules.datamodels.datamodelUam import User, UserVoicePreferences
+from modules.datamodels.datamodelUam import User, UserVoicePreferences, _normalizeTtsVoiceMap
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
@@ -79,6 +79,8 @@ def updateVoicePreferences(
"translationTargetLanguage",
}
updateData = {k: v for k, v in preferences.items() if k in allowedFields}
+ if "ttsVoiceMap" in updateData:
+ updateData["ttsVoiceMap"] = _normalizeTtsVoiceMap(updateData["ttsVoiceMap"])
if existing:
existingRecord = existing[0]
diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
index 23a749ab..4529ede0 100644
--- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
+++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py
@@ -27,6 +27,28 @@ _MAX_TOOL_RESULT_CHARS = 50_000
_BINARY_SIGNATURES = (b"%PDF", b"\x89PNG", b"\xff\xd8\xff", b"GIF8", b"PK\x03\x04", b"Rar!", b"\x1f\x8b")
+def _resolveFileScope(fileId: str, context: dict) -> tuple:
+ """Resolve featureInstanceId and mandateId for a file from context or management DB.
+
+ Returns (featureInstanceId, mandateId) — never None, always strings.
+ """
+ fiId = context.get("featureInstanceId", "") or ""
+ mId = context.get("mandateId", "") or ""
+ if fiId and mId:
+ return fiId, mId
+ try:
+ from modules.datamodels.datamodelFiles import FileItem
+ from modules.interfaces.interfaceDbManagement import ComponentObjects
+ fm = ComponentObjects().db._loadRecord(FileItem, fileId)
+ if fm:
+ _get = (lambda k: fm.get(k, "")) if isinstance(fm, dict) else (lambda k: getattr(fm, k, ""))
+ fiId = fiId or str(_get("featureInstanceId") or "")
+ mId = mId or str(_get("mandateId") or "")
+ except Exception:
+ pass
+ return fiId, mId
+
+
def _looksLikeBinary(data: bytes, sampleSize: int = 1024) -> bool:
"""Detect binary content by checking for magic bytes and non-printable char ratio."""
if any(data[:8].startswith(sig) for sig in _BINARY_SIGNATURES):
@@ -602,16 +624,29 @@ def _registerCoreTools(registry: ToolRegistry, services):
if knowledgeService:
try:
userId = context.get("userId", "")
+ _fiId, _mId = _resolveFileScope(fileId, context)
await knowledgeService.indexFile(
fileId=fileId, fileName=fileName, mimeType=mimeType,
userId=userId, contentObjects=contentObjects,
+ featureInstanceId=_fiId,
+ mandateId=_mId,
)
except Exception:
pass
- textParts = [o["data"] for o in contentObjects if o["contentType"] != "image"]
- if textParts:
- joined = "\n\n".join(textParts)
+ joined = ""
+ if knowledgeService:
+ _chunks = knowledgeService._knowledgeDb.getContentChunks(fileId)
+ _textChunks = [
+ c for c in (_chunks or [])
+ if c.get("contentType") != "image" and c.get("data")
+ ]
+ if _textChunks:
+ joined = "\n\n".join(c["data"] for c in _textChunks)
+ if not joined:
+ textParts = [o["data"] for o in contentObjects if o["contentType"] != "image"]
+ joined = "\n\n".join(textParts) if textParts else ""
+ if joined:
chunked = _applyOffsetLimit(joined, offset, limit)
if chunked is not None:
return ToolResult(toolCallId="", toolName="readFile", success=True, data=chunked)
@@ -642,6 +677,36 @@ def _registerCoreTools(registry: ToolRegistry, services):
try:
text = rawBytes.decode(encoding)
if text.strip():
+ _fileNeedNeutralize = False
+ try:
+ from modules.datamodels.datamodelFiles import FileItem as _FI
+ from modules.interfaces.interfaceDbManagement import ComponentObjects as _CO
+ _fRec = _CO().db._loadRecord(_FI, fileId)
+ if _fRec:
+ _fG = (lambda k, d=None: _fRec.get(k, d)) if isinstance(_fRec, dict) else (lambda k, d=None: getattr(_fRec, k, d))
+ _fileNeedNeutralize = bool(_fG("neutralize", False))
+ except Exception:
+ pass
+ if _fileNeedNeutralize:
+ try:
+ _nSvc = services.getService("neutralization") if hasattr(services, "getService") else None
+ if _nSvc and hasattr(_nSvc, 'processText'):
+ _nResult = _nSvc.processText(text)
+ if _nResult and _nResult.get("neutralized_text"):
+ text = _nResult["neutralized_text"]
+ logger.debug(f"readFile: neutralized text for file {fileId}")
+ else:
+ logger.warning(f"readFile: neutralization failed for file {fileId}, blocking text (fail-safe)")
+ return ToolResult(toolCallId="", toolName="readFile", success=True,
+ data="[File requires neutralization but neutralization failed. Content blocked for data protection.]")
+ else:
+ logger.warning(f"readFile: neutralization required but service unavailable for file {fileId}")
+ return ToolResult(toolCallId="", toolName="readFile", success=True,
+ data="[File requires neutralization but service unavailable. Content blocked for data protection.]")
+ except Exception as _nErr:
+ logger.error(f"readFile: neutralization error for file {fileId}: {_nErr}")
+ return ToolResult(toolCallId="", toolName="readFile", success=True,
+ data="[File requires neutralization but an error occurred. Content blocked for data protection.]")
chunked = _applyOffsetLimit(text, offset, limit)
if chunked is not None:
return ToolResult(toolCallId="", toolName="readFile", success=True, data=chunked)
@@ -1562,7 +1627,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
}
async def _resolveDataSource(dsId: str):
- """Resolve a DataSource record and return (connectionId, service, path) or raise."""
+ """Resolve a DataSource record and return (connectionId, service, path, neutralize) or raise."""
chatService = services.chat
ds = chatService.getDataSource(dsId) if hasattr(chatService, "getDataSource") else None
if not ds:
@@ -1571,11 +1636,12 @@ def _registerCoreTools(registry: ToolRegistry, services):
sourceType = ds.get("sourceType", "")
path = ds.get("path", "/")
label = ds.get("label", "")
+ neutralize = bool(ds.get("neutralize", False))
service = _SOURCE_TYPE_TO_SERVICE.get(sourceType, sourceType)
if not connectionId:
raise ValueError(f"DataSource '{dsId}' has no connectionId")
- logger.info(f"Resolved DataSource '{dsId}' ({label}): sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}")
- return connectionId, service, path
+ logger.info(f"Resolved DataSource '{dsId}' ({label}): sourceType={sourceType}, service={service}, connectionId={connectionId}, path={path[:80]}, neutralize={neutralize}")
+ return connectionId, service, path, neutralize
_MAIL_SERVICES = {"outlook", "gmail"}
@@ -1589,7 +1655,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
error="Provide either dataSourceId OR connectionId+service")
try:
if dsId:
- connectionId, service, basePath = await _resolveDataSource(dsId)
+ connectionId, service, basePath, _neutralize = await _resolveDataSource(dsId)
else:
connectionId, service, basePath = directConnId, directService, args.get("path", "/")
if subPath:
@@ -1632,7 +1698,7 @@ def _registerCoreTools(registry: ToolRegistry, services):
error="Provide either dataSourceId OR connectionId+service")
try:
if dsId:
- connectionId, service, basePath = await _resolveDataSource(dsId)
+ connectionId, service, basePath, _neutralize = await _resolveDataSource(dsId)
else:
connectionId, service, basePath = directConnId, directService, args.get("path", "/")
from modules.connectors.connectorResolver import ConnectorResolver
@@ -1666,8 +1732,9 @@ def _registerCoreTools(registry: ToolRegistry, services):
try:
from modules.connectors.connectorResolver import ConnectorResolver
from modules.connectors.connectorProviderBase import DownloadResult as _DR
+ _sourceNeutralize = False
if dsId:
- connectionId, service, basePath = await _resolveDataSource(dsId)
+ connectionId, service, basePath, _sourceNeutralize = await _resolveDataSource(dsId)
else:
connectionId, service, basePath = directConnId, directService, "/"
fullPath = filePath if filePath.startswith("/") else f"{basePath.rstrip('/')}/{filePath}"
@@ -1710,6 +1777,8 @@ def _registerCoreTools(registry: ToolRegistry, services):
fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "")
if fiId:
chatService.interfaceDbComponent.updateFile(fileItem.id, {"featureInstanceId": fiId})
+ if _sourceNeutralize:
+ chatService.interfaceDbComponent.updateFile(fileItem.id, {"neutralize": True})
tempFolderId = _getOrCreateTempFolder(chatService)
if tempFolderId:
chatService.interfaceDbComponent.updateFile(fileItem.id, {"folderId": tempFolderId})
@@ -2040,9 +2109,12 @@ def _registerCoreTools(registry: ToolRegistry, services):
})
if contentObjects:
+ _diFiId, _diMId = _resolveFileScope(fileId, context)
await knowledgeService.indexFile(
fileId=fileId, fileName=fileName, mimeType=fileMime,
userId=context.get("userId", ""), contentObjects=contentObjects,
+ featureInstanceId=_diFiId,
+ mandateId=_diMId,
)
chunks = knowledgeService._knowledgeDb.getContentChunks(fileId)
@@ -2088,9 +2160,22 @@ def _registerCoreTools(registry: ToolRegistry, services):
dataUrl = f"data:{mimeType};base64,{imageData}"
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum as OTE
+ _opType = OTE.IMAGE_ANALYSE
+ try:
+ from modules.datamodels.datamodelFiles import FileItem as _FileItemModel
+ from modules.interfaces.interfaceDbManagement import ComponentObjects as _CO
+ _fRow = _CO().db._loadRecord(_FileItemModel, fileId)
+ if _fRow:
+ _fGet = (lambda k, d=None: _fRow.get(k, d)) if isinstance(_fRow, dict) else (lambda k, d=None: getattr(_fRow, k, d))
+ if bool(_fGet("neutralize", False)):
+ _opType = OTE.NEUTRALIZATION_IMAGE
+ logger.info(f"describeImage: file {fileId} has neutralize=True, using NEUTRALIZATION_IMAGE (internal models only)")
+ except Exception:
+ pass
+
visionRequest = AiCallRequest(
prompt=prompt,
- options=AiCallOptions(operationType=OTE.IMAGE_ANALYSE),
+ options=AiCallOptions(operationType=_opType),
messages=[{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": dataUrl}},
@@ -3099,6 +3184,11 @@ def _registerCoreTools(registry: ToolRegistry, services):
recordFilter={"featureInstanceId": featureInstanceId, "workspaceInstanceId": workspaceInstanceId},
)
+ _anySourceNeutralize = any(
+ bool(ds.get("neutralize", False) if isinstance(ds, dict) else getattr(ds, "neutralize", False))
+ for ds in (featureDataSources or [])
+ )
+
from modules.security.rbacCatalog import getCatalogService
catalog = getCatalogService()
if not featureDataSources:
@@ -3133,6 +3223,8 @@ def _registerCoreTools(registry: ToolRegistry, services):
)
async def _subAgentAiCall(req):
+ if _anySourceNeutralize:
+ req.requireNeutralization = True
return await aiService.callAi(req)
try:
diff --git a/modules/serviceCenter/services/serviceAi/mainServiceAi.py b/modules/serviceCenter/services/serviceAi/mainServiceAi.py
index e2de43e6..9ff6437d 100644
--- a/modules/serviceCenter/services/serviceAi/mainServiceAi.py
+++ b/modules/serviceCenter/services/serviceAi/mainServiceAi.py
@@ -200,10 +200,6 @@ class AiService:
finally:
self.aiObjects.billingCallback = None
- # Rehydrate neutralization placeholders in response
- if _wasNeutralized and response and hasattr(response, 'content') and response.content:
- response.content = self._rehydrateResponse(response.content)
-
# Attach neutralization exclusion metadata if any parts failed
if _excludedDocs and response:
if not hasattr(response, 'metadata') or response.metadata is None:
@@ -240,10 +236,7 @@ class AiService:
self.aiObjects.billingCallback = self._createBillingCallback()
try:
async for chunk in self.aiObjects.callWithTextContextStream(request):
- # Rehydrate the final AiCallResponse (non-str chunks are the final response)
if not isinstance(chunk, str):
- if _wasNeutralized and hasattr(chunk, 'content') and chunk.content:
- chunk.content = self._rehydrateResponse(chunk.content)
if _excludedDocs:
if not hasattr(chunk, 'metadata') or chunk.metadata is None:
chunk.metadata = {}
@@ -566,34 +559,70 @@ detectedIntent-Werte:
def _shouldNeutralize(self, request: AiCallRequest) -> bool:
"""Check if this AI request should have neutralization applied.
- Per-request override: requireNeutralization=True forces it, False skips it.
- Only applies to text prompts -- not embeddings or image processing."""
+
+ OR-logic across three sources (any True → neutralize):
+ 1. Feature-Instance config (NeutralizationConfig.enabled)
+ 2. Workflow/Session (context.requireNeutralization)
+ 3. Per-request (request.requireNeutralization)
+
+ No source can override another's True with False.
+ """
try:
- if request.requireNeutralization is False:
- return False
- if not request.prompt and not request.messages:
+ if not request.prompt and not request.messages and not request.context:
return False
+
+ _sources = []
+
+ # Source 1: Feature-Instance config
+ _neutralSvc = self._get_service("neutralization")
+ if _neutralSvc and hasattr(_neutralSvc, 'getConfig'):
+ _config = _neutralSvc.getConfig()
+ if _config and getattr(_config, 'enabled', False):
+ _sources.append("featureInstance")
+
+ # Source 2: Workflow / Session context
+ _ctx = getattr(self.services, '_context', None)
+ _ctxFlag = getattr(_ctx, "requireNeutralization", None) if _ctx else None
+ if _ctxFlag is True:
+ _sources.append("context")
+
+ # Source 3: Per-request flag
if request.requireNeutralization is True:
+ _sources.append("request")
+
+ if _sources:
+ logger.debug(f"Neutralization required by: {', '.join(_sources)}")
+ request.requireNeutralization = True
return True
- neutralSvc = self._get_service("neutralization")
- if not neutralSvc:
- return False
- config = neutralSvc.getConfig() if hasattr(neutralSvc, 'getConfig') else None
- if not config or not getattr(config, 'enabled', False):
- return False
- return True
- except Exception:
+
+ return False
+ except Exception as e:
+ logger.error(f"_shouldNeutralize check failed: {e} — defaulting to False")
return False
def _neutralizeRequest(self, request: AiCallRequest) -> Tuple[AiCallRequest, bool, List[str]]:
"""Neutralize the prompt text and messages in an AiCallRequest.
+
Returns (modifiedRequest, wasNeutralized, excludedDocs).
- Fail-safe: failing parts are excluded instead of aborting the entire call."""
+
+ FAILSAFE behaviour when ``requireNeutralization is True`` (explicit):
+ - Service unavailable → raises (caller must not send raw data to AI).
+ - Prompt neutralization fails → raises.
+ - Individual message neutralization fails → message is **removed**
+ (not kept in original form) and noted in excludedDocs.
+
+ When neutralization is only config-driven (requireNeutralization is
+ None) the behaviour is softer: failures are logged and originals are
+ kept — but a warning is emitted.
+ """
+ _hardMode = request.requireNeutralization is True
excludedDocs: List[str] = []
neutralSvc = self._get_service("neutralization")
if not neutralSvc or not hasattr(neutralSvc, 'processText'):
- logger.warning("Neutralization required but neutralization service is unavailable — continuing without neutralization")
+ if _hardMode:
+ raise RuntimeError("Neutralization explicitly required but service unavailable — AI call BLOCKED")
+ logger.warning("Neutralization required by config but service unavailable — continuing without neutralization")
excludedDocs.append("Neutralization service unavailable; prompt sent un-neutralized")
return request, False, excludedDocs
@@ -607,28 +636,148 @@ detectedIntent-Werte:
_wasNeutralized = True
logger.debug("Neutralized prompt in AiCallRequest")
else:
+ if _hardMode:
+ raise RuntimeError(f"Prompt neutralization returned empty — AI call BLOCKED (hard mode)")
logger.warning("Neutralization of prompt returned no neutralized_text — sending original prompt")
excludedDocs.append("Prompt neutralization failed; original prompt used")
+ except RuntimeError:
+ raise
except Exception as e:
+ if _hardMode:
+ raise RuntimeError(f"Prompt neutralization failed — AI call BLOCKED: {e}") from e
logger.warning(f"Neutralization of prompt failed: {e} — sending original prompt")
excludedDocs.append(f"Prompt neutralization error: {e}")
+ if request.context:
+ try:
+ result = neutralSvc.processText(request.context)
+ if result and result.get("neutralized_text"):
+ request.context = result["neutralized_text"]
+ _wasNeutralized = True
+ logger.debug("Neutralized context in AiCallRequest")
+ else:
+ if _hardMode:
+ raise RuntimeError("Context neutralization returned empty — AI call BLOCKED (hard mode)")
+ logger.warning("Neutralization of context returned no neutralized_text — sending original context")
+ excludedDocs.append("Context neutralization failed; original context used")
+ except RuntimeError:
+ raise
+ except Exception as e:
+ if _hardMode:
+ raise RuntimeError(f"Context neutralization failed — AI call BLOCKED: {e}") from e
+ logger.warning(f"Neutralization of context failed: {e} — sending original context")
+ excludedDocs.append(f"Context neutralization error: {e}")
+
if request.messages and isinstance(request.messages, list):
+ cleanMessages = []
for idx, msg in enumerate(request.messages):
content = msg.get("content") if isinstance(msg, dict) else None
- if not isinstance(content, str) or not content:
+ if content is None:
+ cleanMessages.append(msg)
continue
- try:
- result = neutralSvc.processText(content)
- if result and result.get("neutralized_text"):
- msg["content"] = result["neutralized_text"]
- _wasNeutralized = True
+ if isinstance(content, str):
+ if not content:
+ cleanMessages.append(msg)
+ continue
+ try:
+ result = neutralSvc.processText(content)
+ if result and result.get("neutralized_text"):
+ msg["content"] = result["neutralized_text"]
+ _wasNeutralized = True
+ cleanMessages.append(msg)
+ else:
+ if _hardMode:
+ logger.warning(f"Message[{idx}] neutralization empty — REMOVING message (hard mode)")
+ excludedDocs.append(f"Message[{idx}] neutralization failed; message REMOVED")
+ else:
+ logger.warning(f"Neutralization of message[{idx}] returned no neutralized_text — keeping original")
+ excludedDocs.append(f"Message[{idx}] neutralization failed; original kept")
+ cleanMessages.append(msg)
+ except Exception as e:
+ if _hardMode:
+ logger.warning(f"Message[{idx}] neutralization error — REMOVING message (hard mode): {e}")
+ excludedDocs.append(f"Message[{idx}] neutralization error; message REMOVED: {e}")
+ else:
+ logger.warning(f"Neutralization of message[{idx}] failed: {e} — keeping original")
+ excludedDocs.append(f"Message[{idx}] neutralization error: {e}")
+ cleanMessages.append(msg)
+ elif isinstance(content, list):
+ _cleanParts = []
+ for _partIdx, _part in enumerate(content):
+ if not isinstance(_part, dict):
+ _cleanParts.append(_part)
+ continue
+ _partType = _part.get("type", "")
+ if _partType == "text" and _part.get("text"):
+ try:
+ _result = neutralSvc.processText(_part["text"])
+ if _result and _result.get("neutralized_text"):
+ _part["text"] = _result["neutralized_text"]
+ _wasNeutralized = True
+ _cleanParts.append(_part)
+ else:
+ if _hardMode:
+ logger.warning(f"Message[{idx}].content[{_partIdx}] text neutralization empty — REMOVING part")
+ excludedDocs.append(f"Message[{idx}].content[{_partIdx}] text removed")
+ else:
+ _cleanParts.append(_part)
+ except Exception as e:
+ if _hardMode:
+ logger.warning(f"Message[{idx}].content[{_partIdx}] text neutralization error — REMOVING: {e}")
+ excludedDocs.append(f"Message[{idx}].content[{_partIdx}] text error: {e}")
+ else:
+ _cleanParts.append(_part)
+ elif _partType == "image_url":
+ if _hardMode:
+ logger.warning(f"Message[{idx}].content[{_partIdx}] image_url — REMOVING (neutralization active)")
+ excludedDocs.append(f"Message[{idx}].content[{_partIdx}] image removed (neutralization)")
+ else:
+ _cleanParts.append(_part)
+ else:
+ _cleanParts.append(_part)
+ if _cleanParts:
+ msg["content"] = _cleanParts
+ cleanMessages.append(msg)
+ elif _hardMode:
+ logger.warning(f"Message[{idx}] all parts removed — REMOVING message")
+ excludedDocs.append(f"Message[{idx}] fully removed after neutralization")
+ else:
+ cleanMessages.append(msg)
+ request.messages = cleanMessages
+
+ if hasattr(request, 'contentParts') and request.contentParts:
+ _cleanParts = []
+ for _cpIdx, _cp in enumerate(request.contentParts):
+ _tg = getattr(_cp, 'typeGroup', '') or ''
+ _data = getattr(_cp, 'data', '') or ''
+ if _tg in ('text', 'table') and _data:
+ try:
+ _result = neutralSvc.processText(str(_data))
+ if _result and _result.get("neutralized_text"):
+ _cp.data = _result["neutralized_text"]
+ _wasNeutralized = True
+ _cleanParts.append(_cp)
+ else:
+ if _hardMode:
+ logger.warning(f"ContentPart[{_cpIdx}] neutralization empty — REMOVING")
+ excludedDocs.append(f"ContentPart[{_cpIdx}] removed")
+ else:
+ _cleanParts.append(_cp)
+ except Exception as e:
+ if _hardMode:
+ logger.warning(f"ContentPart[{_cpIdx}] neutralization error — REMOVING: {e}")
+ excludedDocs.append(f"ContentPart[{_cpIdx}] error: {e}")
+ else:
+ _cleanParts.append(_cp)
+ elif _tg == 'image':
+ if _hardMode:
+ logger.warning(f"ContentPart[{_cpIdx}] image — REMOVING (neutralization active)")
+ excludedDocs.append(f"ContentPart[{_cpIdx}] image removed")
else:
- logger.warning(f"Neutralization of message[{idx}] returned no neutralized_text — keeping original")
- excludedDocs.append(f"Message[{idx}] neutralization failed; original kept")
- except Exception as e:
- logger.warning(f"Neutralization of message[{idx}] failed: {e} — keeping original")
- excludedDocs.append(f"Message[{idx}] neutralization error: {e}")
+ _cleanParts.append(_cp)
+ else:
+ _cleanParts.append(_cp)
+ request.contentParts = _cleanParts
return request, _wasNeutralized, excludedDocs
diff --git a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
index 0f20bc7f..49774a38 100644
--- a/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
+++ b/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py
@@ -83,12 +83,47 @@ class KnowledgeService:
"""
contentObjects = contentObjects or []
- # 1. Create FileContentIndex
+ # 1. Resolve scope fields from FileItem (Single Source of Truth)
+ # FileItem lives in poweron_management; its scope/mandateId/featureInstanceId
+ # are authoritative and must be mirrored onto the FileContentIndex.
+ resolvedScope = "personal"
+ resolvedMandateId = mandateId
+ resolvedFeatureInstanceId = featureInstanceId
+ resolvedUserId = userId
+ _shouldNeutralize = False
+ try:
+ from modules.datamodels.datamodelFiles import FileItem as _FileItem
+ _dbComponent = getattr(self._context, "interfaceDbComponent", None)
+ _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else []
+ if not _fileRecords:
+ from modules.interfaces.interfaceDbManagement import ComponentObjects
+ _row = ComponentObjects().db._loadRecord(_FileItem, fileId)
+ if _row:
+ _fileRecords = [_row]
+ if _fileRecords:
+ _fileRecord = _fileRecords[0]
+ _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d))
+ _shouldNeutralize = bool(_get("neutralize", False))
+ _fileScope = _get("scope")
+ if _fileScope:
+ resolvedScope = _fileScope
+ if not resolvedMandateId:
+ resolvedMandateId = str(_get("mandateId", "") or "")
+ if not resolvedFeatureInstanceId:
+ resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "")
+ _fileCreatedBy = _get("sysCreatedBy")
+ if _fileCreatedBy:
+ resolvedUserId = str(_fileCreatedBy)
+ except Exception:
+ pass
+
+ # 2. Create FileContentIndex with correct scope from the start
index = FileContentIndex(
id=fileId,
- userId=userId,
- featureInstanceId=featureInstanceId,
- mandateId=mandateId,
+ userId=resolvedUserId,
+ featureInstanceId=resolvedFeatureInstanceId,
+ mandateId=resolvedMandateId,
+ scope=resolvedScope,
fileName=fileName,
mimeType=mimeType,
containerPath=containerPath,
@@ -108,28 +143,9 @@ class KnowledgeService:
)
self._knowledgeDb.upsertFileContentIndex(index)
- # 2. Chunk text content objects and create embeddings
+ # 3. Chunk text content objects and create embeddings
textObjects = [o for o in contentObjects if o.get("contentType") == "text"]
- # Read FileItem attributes for index metadata and neutralization
- _shouldNeutralize = False
- try:
- from modules.datamodels.datamodelFiles import FileItem as _FileItem
- _dbComponent = getattr(self._context, 'interfaceDbComponent', None)
- _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else []
- if _fileRecords:
- _fileRecord = _fileRecords[0]
- _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d))
- _shouldNeutralize = bool(_get("neutralize", False))
- _fileScope = _get("scope")
- if _fileScope:
- index.scope = _fileScope
- _fileCreatedBy = _get("sysCreatedBy")
- if _fileCreatedBy:
- index.userId = str(_fileCreatedBy)
- except Exception:
- pass
-
if _shouldNeutralize and textObjects:
_neutralizedObjects = []
try:
@@ -142,9 +158,7 @@ class KnowledgeService:
if not _textContent:
continue
try:
- _neutralResult = _neutralSvc.processText(
- _textContent, userId=userId, featureInstanceId=featureInstanceId
- )
+ _neutralResult = _neutralSvc.processText(_textContent)
if _neutralResult and _neutralResult.get("neutralized_text"):
_obj["data"] = _neutralResult["neutralized_text"]
_neutralizedObjects.append(_obj)
@@ -176,8 +190,8 @@ class KnowledgeService:
contentChunk = ContentChunk(
contentObjectId=chunk["contentObjectId"],
fileId=fileId,
- userId=userId,
- featureInstanceId=featureInstanceId,
+ userId=resolvedUserId,
+ featureInstanceId=resolvedFeatureInstanceId,
contentType="text",
data=chunk["data"],
contextRef=chunk["contextRef"],
@@ -185,14 +199,36 @@ class KnowledgeService:
)
self._knowledgeDb.upsertContentChunk(contentChunk)
- # 3. Store non-text content objects (images, etc.) without embedding
+ # 4. Store non-text content objects (images, etc.) without embedding
nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"]
+ if _shouldNeutralize and nonTextObjects:
+ import base64 as _b64
+ _filteredNonText = []
+ for _obj in nonTextObjects:
+ if _obj.get("contentType") != "image":
+ _filteredNonText.append(_obj)
+ continue
+ _imgData = (_obj.get("data", "") or "").strip()
+ if not _imgData:
+ _filteredNonText.append(_obj)
+ continue
+ try:
+ _imgBytes = _b64.b64decode(_imgData)
+ _imgResult = await _neutralSvc.processImageAsync(_imgBytes, fileName)
+ if _imgResult.get("status") == "ok":
+ _filteredNonText.append(_obj)
+ logger.debug(f"Image chunk OK for file {fileId}, storing")
+ else:
+ logger.warning(f"Image chunk blocked for file {fileId} (PII detected), skipping (fail-safe)")
+ except Exception as _imgErr:
+ logger.warning(f"Image neutralization check failed for file {fileId}: {_imgErr}, skipping (fail-safe)")
+ nonTextObjects = _filteredNonText
for obj in nonTextObjects:
contentChunk = ContentChunk(
contentObjectId=obj.get("contentObjectId", ""),
fileId=fileId,
- userId=userId,
- featureInstanceId=featureInstanceId,
+ userId=resolvedUserId,
+ featureInstanceId=resolvedFeatureInstanceId,
contentType=obj.get("contentType", "other"),
data=obj.get("data", ""),
contextRef=obj.get("contextRef", {}),
@@ -200,21 +236,23 @@ class KnowledgeService:
)
self._knowledgeDb.upsertContentChunk(contentChunk)
- self._knowledgeDb.updateFileStatus(fileId, "indexed")
+ # 5. Final upsert ALWAYS — persists scope, neutralization status, etc.
index.status = "indexed"
if _shouldNeutralize:
- try:
- index.neutralizationStatus = "completed"
- index.isNeutralized = True
- self._knowledgeDb.upsertFileContentIndex(index)
- except Exception as e:
- logger.debug(f"Could not set neutralizationStatus for file {fileId}: {e}")
- logger.info(f"Indexed file {fileId} ({fileName}): {len(contentObjects)} objects, {len(textObjects)} text chunks")
- if mandateId:
+ index.neutralizationStatus = "completed"
+ index.isNeutralized = True
+ self._knowledgeDb.upsertFileContentIndex(index)
+
+ logger.info(
+ "Indexed file %s (%s): %d objects, %d text chunks, scope=%s, mandate=%s, instance=%s",
+ fileId, fileName, len(contentObjects), len(textObjects),
+ resolvedScope, resolvedMandateId, resolvedFeatureInstanceId,
+ )
+ if resolvedMandateId:
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface
- _getRootInterface().reconcileMandateStorageBilling(str(mandateId))
+ _getRootInterface().reconcileMandateStorageBilling(str(resolvedMandateId))
except Exception as ex:
logger.warning("reconcileMandateStorageBilling after index failed: %s", ex)
return index
@@ -328,17 +366,18 @@ class KnowledgeService:
if entities:
builder.add(priority=3, label="Workflow Context", items=entities, isKeyValue=True, maxChars=2000)
- # Layer 3: Shared Layer (mandate-wide shared documents)
- sharedChunks = self._knowledgeDb.semanticSearch(
- queryVector=queryVector,
- mandateId=mandateId,
- isShared=True,
- limit=10,
- minScore=0.7,
- isSysAdmin=isSysAdmin,
- )
- if sharedChunks:
- builder.add(priority=4, label="Shared Knowledge", items=sharedChunks, maxChars=2000)
+ # Layer 3: Mandate-scoped documents (visible to all mandate users)
+ if mandateId:
+ mandateChunks = self._knowledgeDb.semanticSearch(
+ queryVector=queryVector,
+ scope="mandate",
+ mandateId=mandateId,
+ limit=10,
+ minScore=0.7,
+ isSysAdmin=isSysAdmin,
+ )
+ if mandateChunks:
+ builder.add(priority=4, label="Shared Knowledge", items=mandateChunks, maxChars=2000)
# Layer 4: Cross-workflow hint (other conversations in this workspace)
if workflowHintItems:
diff --git a/modules/serviceCenter/services/serviceKnowledge/subPreScan.py b/modules/serviceCenter/services/serviceKnowledge/subPreScan.py
index e025dd99..0688deb2 100644
--- a/modules/serviceCenter/services/serviceKnowledge/subPreScan.py
+++ b/modules/serviceCenter/services/serviceKnowledge/subPreScan.py
@@ -31,6 +31,7 @@ async def preScanDocument(
userId: str = "",
featureInstanceId: str = "",
mandateId: str = "",
+ scope: str = "personal",
) -> FileContentIndex:
"""Create a structural FileContentIndex without AI.
@@ -56,6 +57,7 @@ async def preScanDocument(
userId=userId,
featureInstanceId=featureInstanceId,
mandateId=mandateId,
+ scope=scope,
fileName=fileName,
mimeType=mimeType,
totalObjects=totalObjects,
diff --git a/modules/shared/notifyMandateAdmins.py b/modules/shared/notifyMandateAdmins.py
index 27445afb..6bef921d 100644
--- a/modules/shared/notifyMandateAdmins.py
+++ b/modules/shared/notifyMandateAdmins.py
@@ -7,9 +7,7 @@ All mandate-level notifications (subscription changes, billing warnings, etc.)
MUST go through notifyMandateAdmins() to ensure consistent recipient resolution
and delivery.
-Recipients are the union of:
-1. BillingSettings.notifyEmails for the mandate (configured contact addresses)
-2. All users with the mandate-level "admin" RBAC role
+Recipients: all users with the mandate-level "admin" RBAC role.
"""
from __future__ import annotations
@@ -96,10 +94,10 @@ def _resolveMandateAdminEmails(mandateId: str) -> List[str]:
def _resolveAllRecipients(mandateId: str) -> List[str]:
- """Union of BillingSettings.notifyEmails + all mandate admin user emails, deduplicated."""
+ """Mandate admin user emails only (RBAC-resolved), deduplicated."""
seen: Set[str] = set()
result: List[str] = []
- for email in _resolveMandateContactEmails(mandateId) + _resolveMandateAdminEmails(mandateId):
+ for email in _resolveMandateAdminEmails(mandateId):
if email and email not in seen:
seen.add(email)
result.append(email)
@@ -233,7 +231,7 @@ def notifyMandateAdmins(
rawHtmlBlock: Optional[str] = None,
) -> int:
"""
- Send a styled HTML notification to all mandate admins and configured contacts.
+ Send a styled HTML notification to all mandate admins.
Args:
mandateId: The mandate to notify admins for.
diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py
index 466165ad..9fb2e7f4 100644
--- a/modules/workflows/methods/methodContext/actions/extractContent.py
+++ b/modules/workflows/methods/methodContext/actions/extractContent.py
@@ -6,7 +6,7 @@ import time
from typing import Dict, Any
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelDocref import DocumentReferenceList
-from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart
+from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
logger = logging.getLogger(__name__)
@@ -101,74 +101,6 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
# Pass operationId for hierarchical per-document progress logging
extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId)
- # Check if neutralization is enabled and should be applied automatically
- neutralizationEnabled = False
- try:
- config = self.services.neutralization.getConfig()
- neutralizationEnabled = config and config.enabled
- except Exception as e:
- logger.debug(f"Could not check neutralization config: {str(e)}")
-
- # Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing)
- if neutralizationEnabled:
- self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data")
- logger.info("Neutralization enabled - neutralizing extracted content data")
-
- # Neutralize each ContentExtracted result
- for extracted in extractedResults:
- if extracted.parts:
- neutralizedParts = []
- for part in extracted.parts:
- if not isinstance(part, ContentPart):
- # Try to parse as ContentPart if it's a dict
- if isinstance(part, dict):
- try:
- part = ContentPart(**part)
- except Exception as e:
- logger.warning(f"Could not parse ContentPart: {str(e)}")
- neutralizedParts.append(part)
- continue
- else:
- neutralizedParts.append(part)
- continue
-
- # Neutralize the data field if it contains text
- if part.data:
- try:
- # Call neutralization service
- neutralizationResult = self.services.neutralization.processText(part.data)
-
- if neutralizationResult and 'neutralized_text' in neutralizationResult:
- # Replace data with neutralized text
- neutralizedData = neutralizationResult['neutralized_text']
-
- # Create new ContentPart with neutralized data
- neutralizedPart = ContentPart(
- id=part.id,
- parentId=part.parentId,
- label=part.label,
- typeGroup=part.typeGroup,
- mimeType=part.mimeType,
- data=neutralizedData,
- metadata=part.metadata.copy() if part.metadata else {}
- )
- neutralizedParts.append(neutralizedPart)
- else:
- # Neutralization failed, use original part
- logger.warning(f"Neutralization did not return neutralized_text for part {part.id}")
- neutralizedParts.append(part)
- except Exception as e:
- logger.error(f"Error neutralizing part {part.id}: {str(e)}")
- # On error, use original part
- neutralizedParts.append(part)
- else:
- # No data to neutralize, keep original part
- neutralizedParts.append(part)
-
- # Update extracted result with neutralized parts
- extracted.parts = neutralizedParts
- logger.info(f"Neutralized {len(neutralizedParts)} content parts")
-
# Build ActionDocuments from ContentExtracted results
self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents")
actionDocuments = []
@@ -190,7 +122,6 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
"documentIndex": i,
"extractedId": extracted.id,
"partCount": len(extracted.parts) if extracted.parts else 0,
- "neutralized": neutralizationEnabled,
"originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None
}
actionDoc = ActionDocument(
diff --git a/modules/workflows/methods/methodContext/actions/neutralizeData.py b/modules/workflows/methods/methodContext/actions/neutralizeData.py
index a1fc6b91..bd032cac 100644
--- a/modules/workflows/methods/methodContext/actions/neutralizeData.py
+++ b/modules/workflows/methods/methodContext/actions/neutralizeData.py
@@ -16,14 +16,13 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"context_neutralize_{workflowId}_{int(time.time())}"
- # Check if neutralization is enabled
neutralizationEnabled = False
try:
config = self.services.neutralization.getConfig()
neutralizationEnabled = config and config.enabled
except Exception as e:
logger.debug(f"Could not check neutralization config: {str(e)}")
-
+
if not neutralizationEnabled:
logger.info("Neutralization is not enabled, returning documents unchanged")
# Return original documents if neutralization is disabled
@@ -144,8 +143,25 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
neutralizedParts.append(part)
continue
- # Neutralize the data field if it contains text
- if part.data:
+ # Neutralize the data field based on typeGroup
+ _typeGroup = getattr(part, 'typeGroup', '') or ''
+ if _typeGroup == 'image' and part.data:
+ import base64 as _b64
+ try:
+ self.services.chat.progressLogUpdate(
+ operationId,
+ 0.3 + (i / len(chatDocuments)) * 0.6,
+ f"Checking image part {len(neutralizedParts) + 1} of document {i+1}"
+ )
+ _imgBytes = _b64.b64decode(str(part.data))
+ _imgResult = await self.services.neutralization.processImageAsync(_imgBytes, f"part_{part.id}")
+ if _imgResult.get("status") == "ok":
+ neutralizedParts.append(part)
+ else:
+ logger.warning(f"Fail-Safe: Image part {part.id} blocked (PII detected), SKIPPING")
+ except Exception as _imgErr:
+ logger.error(f"Fail-Safe: Image check failed for part {part.id}: {_imgErr}, SKIPPING")
+ elif part.data:
try:
self.services.chat.progressLogUpdate(
operationId,
@@ -153,14 +169,11 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
f"Neutralizing part {len(neutralizedParts) + 1} of document {i+1}"
)
- # Call neutralization service
neutralizationResult = self.services.neutralization.processText(part.data)
if neutralizationResult and 'neutralized_text' in neutralizationResult:
- # Replace data with neutralized text
neutralizedData = neutralizationResult['neutralized_text']
- # Create new ContentPart with neutralized data
neutralizedPart = ContentPart(
id=part.id,
parentId=part.parentId,
@@ -172,15 +185,12 @@ async def neutralizeData(self, parameters: Dict[str, Any]) -> ActionResult:
)
neutralizedParts.append(neutralizedPart)
else:
- # Fail-Safe: neutralization incomplete, skip this part
logger.warning(f"Fail-Safe: Neutralization incomplete for part {part.id}, SKIPPING (not passing original)")
continue
except Exception as e:
logger.error(f"Fail-Safe: Error neutralizing part {part.id}, SKIPPING document (not passing original): {str(e)}")
- # Fail-Safe: do NOT pass original data to AI
continue
else:
- # No data to neutralize, keep original part
neutralizedParts.append(part)
# Create neutralized ContentExtracted object
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index de332c31..58b76908 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -352,12 +352,6 @@ class WorkflowManager:
for i, doc in enumerate(documents, 1):
docListText += f"\n{i}. {doc.fileName} ({doc.mimeType}, {doc.fileSize} bytes)"
- _userId = getattr(getattr(self.services, 'user', None), 'id', '') or ''
- _featureInstanceId = getattr(self.services, 'featureInstanceId', '') or ''
- _promptForAnalysis, _wasNeutralized, _mappingId = await self._neutralizePromptIfRequired(
- userPrompt, userId=_userId, featureInstanceId=_featureInstanceId
- )
-
analysisPrompt = f"""You are an input analyzer. From the user's message, perform ALL of the following in one pass:
1. detectedLanguage: Detect ISO 639-1 language code (e.g., de, en, fr, it)
@@ -407,7 +401,7 @@ Return ONLY JSON (no markdown) with this exact structure:
The following is the user's original input message. Analyze intent, normalize the request, and determine complexity:
################ USER INPUT START #################
-{_promptForAnalysis.replace('{', '{{').replace('}', '}}') if _promptForAnalysis else ''}
+{userPrompt.replace('{', '{{').replace('}', '}}') if userPrompt else ''}
################ USER INPUT FINISH #################
"""
@@ -425,12 +419,6 @@ The following is the user's original input message. Analyze intent, normalize th
jsonEnd = aiResponse.rfind('}') + 1 if aiResponse else 0
if jsonStart != -1 and jsonEnd > jsonStart:
result = json.loads(aiResponse[jsonStart:jsonEnd])
- if _wasNeutralized:
- for _field in ('normalizedRequest', 'intent', 'workflowName'):
- if _field in result and result[_field]:
- result[_field] = await self._rehydrateResponseIfNeeded(
- result[_field], True, userId=_userId, featureInstanceId=_featureInstanceId
- )
return result
else:
logger.warning("Could not parse combined analysis response, using defaults")
@@ -490,7 +478,6 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
- originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown")
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
@@ -680,7 +667,6 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
- originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown")
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
@@ -821,7 +807,6 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
- originalPromptBytes = await self._neutralizeContentIfEnabled(originalPromptBytes, "text/markdown")
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
@@ -1365,82 +1350,3 @@ The following is the user's original input message. Analyze intent, normalize th
"""Set user language for the service center"""
self.services.user.language = language
- async def _neutralizePromptIfRequired(self, prompt: str, userId: str, featureInstanceId: str) -> tuple:
- """Neutralize prompt text if the workflow context requires it.
- Returns (processedPrompt, wasNeutralized, mappingId)."""
- try:
- _neutralSvc = getattr(self.services, 'neutralization', None)
- if not _neutralSvc:
- return prompt, False, None
- _config = _neutralSvc.getConfig() if hasattr(_neutralSvc, 'getConfig') else None
- if not _config or not getattr(_config, 'enabled', False):
- return prompt, False, None
- _result = _neutralSvc.processText(prompt, userId=userId, featureInstanceId=featureInstanceId)
- if _result and _result.get("neutralized_text"):
- return _result["neutralized_text"], True, _result.get("mappingId")
- return prompt, False, None
- except Exception as e:
- logger.warning(f"Prompt neutralization failed: {e}")
- return prompt, False, None
-
- async def _rehydrateResponseIfNeeded(self, response: str, wasNeutralized: bool, userId: str, featureInstanceId: str) -> str:
- """Replace placeholders in AI response with original values."""
- if not wasNeutralized or not response:
- return response
- try:
- _neutralSvc = getattr(self.services, 'neutralization', None)
- if not _neutralSvc:
- return response
- _rehydrated = _neutralSvc.resolveText(response, userId=userId, featureInstanceId=featureInstanceId)
- return _rehydrated if _rehydrated else response
- except Exception as e:
- logger.warning(f"Response re-hydration failed: {e}")
- return response
-
- async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes:
- """Neutralize content if neutralization is enabled in user settings"""
- try:
- # Automation hub may not have neutralization service; skip if unavailable
- neutralization = getattr(self.services, 'neutralization', None)
- if not neutralization:
- return contentBytes
- # Check if neutralization is enabled
- config = neutralization.getConfig()
- if not config or not config.enabled:
- return contentBytes
-
- # Decode content to text for neutralization
- try:
- textContent = contentBytes.decode('utf-8')
- except UnicodeDecodeError:
- # Try alternative encodings
- for enc in ['latin-1', 'cp1252', 'iso-8859-1']:
- try:
- textContent = contentBytes.decode(enc)
- break
- except UnicodeDecodeError:
- continue
- else:
- # If unable to decode, return original bytes (binary content)
- logger.debug(f"Unable to decode content for neutralization, skipping: {mimeType}")
- return contentBytes
-
- # Neutralize the text content
- # Note: The neutralization service should use names from config when processing
- result = neutralization.processText(textContent)
- if result and 'neutralized_text' in result:
- neutralizedText = result['neutralized_text']
- # Encode back to bytes using the same encoding
- try:
- return neutralizedText.encode('utf-8')
- except Exception as e:
- logger.warning(f"Error encoding neutralized text: {str(e)}")
- return contentBytes
- else:
- logger.warning("Neutralization did not return neutralized_text")
- return contentBytes
- except Exception as e:
- logger.error(f"Error during content neutralization: {str(e)}")
- # Return original content on error
- return contentBytes
-