Compare commits

..

No commits in common. "4f8473bd701ed35bc2d4e1fafad0958c502ebb15" and "e0caad0a75a6ace5e6dfe01ef8549f9f3db918c1" have entirely different histories.

67 changed files with 953 additions and 819 deletions

11
app.py
View file

@ -311,17 +311,6 @@ async def lifespan(app: FastAPI):
# AI connectors already pre-warmed at module-load via _eager_prewarm() in aicoreModelRegistry.
# Register system-component lifecycle hooks (Composition Root — inverts L4->L5b dependency)
from modules.shared.systemComponentRegistry import registerLifecycleHook
from modules.workflowAutomation.mainWorkflowAutomation import (
onBootstrap as _waOnBootstrap,
onMandateDelete as _waOnMandateDelete,
onInstanceCreate as _waOnInstanceCreate,
)
registerLifecycleHook("onBootstrap", _waOnBootstrap)
registerLifecycleHook("onMandateDelete", _waOnMandateDelete)
registerLifecycleHook("onInstanceCreate", _waOnInstanceCreate)
# Bootstrap database if needed (creates initial users, mandates, roles, etc.)
# This must happen before getting root interface
from modules.security.rootAccess import getRootDbAppConnector

View file

@ -247,7 +247,7 @@ from modules.datamodels.datamodelFeatures import AutoWorkflow
@i18nModel("Workflow (Ansicht)")
class AutoWorkflowView(AutoWorkflow):
class Automation2WorkflowView(AutoWorkflow):
"""AutoWorkflow extended with computed dashboard fields.
Used exclusively for /api/attributes/ so the frontend can resolve column

View file

@ -53,7 +53,7 @@ class AutoTemplateScope(str, Enum):
SYSTEM = "system"
WORKFLOW_AUTOMATION_DATABASE = "poweron_graphicaleditor"
GRAPHICAL_EDITOR_DATABASE = "poweron_graphicaleditor"
# ---------------------------------------------------------------------------

View file

@ -60,7 +60,7 @@ class InvestorDemo2026(BaseDemoConfig):
label = "Investor Demo April 2026"
description = (
"Two mandates (HappyLife AG + Alpina Treuhand AG), one SysAdmin user, "
"trustee with RMA, workspace, workflow automation, and neutralization."
"trustee with RMA, workspace, graph editor, and neutralization."
)
credentials = [
{
@ -554,21 +554,20 @@ class InvestorDemo2026(BaseDemoConfig):
try:
from modules.datamodels.datamodelWorkflowAutomation import (
AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
WORKFLOW_AUTOMATION_DATABASE,
)
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
waDb = DatabaseConnector(
geDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase="poweron_graphicaleditor",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
userId=None,
)
workflows = waDb.getRecordset(AutoWorkflow, recordFilter={
workflows = geDb.getRecordset(AutoWorkflow, recordFilter={
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
}) or []
@ -578,20 +577,20 @@ class InvestorDemo2026(BaseDemoConfig):
if not wfId:
continue
for version in waDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoVersion, version.get("id"))
for version in geDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoVersion, version.get("id"))
runs = waDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []
runs = geDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []
for run in runs:
runId = run.get("id")
for stepLog in waDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
waDb.recordDelete(AutoStepLog, stepLog.get("id"))
waDb.recordDelete(AutoRun, runId)
for stepLog in geDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
geDb.recordDelete(AutoStepLog, stepLog.get("id"))
geDb.recordDelete(AutoRun, runId)
for task in waDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoTask, task.get("id"))
for task in geDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoTask, task.get("id"))
waDb.recordDelete(AutoWorkflow, wfId)
geDb.recordDelete(AutoWorkflow, wfId)
if workflows:
summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}")

View file

@ -51,6 +51,9 @@ _FEATURES_PWG = [
{"code": "neutralization", "label": "Datenschutz"},
]
# Filename markers used to identify the imported pilot workflow on remove().
_PILOT_WORKFLOW_LABEL = "PWG Pilot: Jahresmietzinsbestätigung"
_PILOT_WORKFLOW_FILE = "pwg-mietzinsbestaetigung-pilot.workflow.json"
_SEED_TRUSTEE_FILE = "_seedTrusteeData.json"
@ -59,7 +62,8 @@ class PwgDemo2026(BaseDemoConfig):
label = "PWG Pilot Demo (Mietzinsbestätigungen)"
description = (
"Stiftung PWG, ein Demo-Sachbearbeiter, Trustee mit fiktiven Mietern, "
"Workflow-Automation (als File importiert, active=false). Idempotent."
"Graph-Editor mit dem Pilot-Workflow für Jahresmietzinsbestätigungen "
"(als File importiert, active=false). Idempotent."
)
credentials = [
{
@ -532,6 +536,92 @@ class PwgDemo2026(BaseDemoConfig):
if skippedTenants:
summary["skipped"].append(f"PWG seed: {skippedTenants} tenants already present")
def _ensurePilotWorkflow(self, mandateId: str, featureInstanceId: str, summary: Dict):
"""Import the pilot workflow JSON into the WorkflowAutomation DB.
Uses the schema-aware import pipeline introduced in Phase 1
(``_workflowFileSchema.envelopeToWorkflowData`` +
``WorkflowAutomationObjects.importWorkflowFromDict``). The workflow is
always created with ``active=False`` so a manual trigger is required
this matches the demo-bootstrap safety default.
"""
envelopePath = _demoDataDir() / "workflows" / _PILOT_WORKFLOW_FILE
if not envelopePath.is_file():
summary["errors"].append(f"Pilot workflow file missing: {envelopePath}")
return
try:
envelope = json.loads(envelopePath.read_text(encoding="utf-8"))
except Exception as exc:
summary["errors"].append(f"Pilot workflow file unreadable: {exc}")
return
try:
geDb = _openWorkflowAutomationDb()
except Exception as exc:
summary["errors"].append(f"WorkflowAutomation DB connection failed: {exc}")
return
from modules.nodeCatalog._workflowFileSchema import (
envelopeToWorkflowData,
validateFileEnvelope,
)
from modules.datamodels.datamodelWorkflowAutomation import AutoWorkflow
from modules.workflowAutomation.editor.nodeRegistry import STATIC_NODE_TYPES
existing = geDb.getRecordset(AutoWorkflow, recordFilter={
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
"label": _PILOT_WORKFLOW_LABEL,
}) or []
if existing:
summary["skipped"].append(f"Pilot workflow already imported ({existing[0].get('id')})")
return
knownTypes = [n.get("id") for n in STATIC_NODE_TYPES if isinstance(n, dict) and n.get("id")]
try:
normalized, warnings = validateFileEnvelope(envelope, knownNodeTypes=knownTypes)
except Exception as exc:
summary["errors"].append(f"Pilot workflow envelope invalid: {exc}")
return
if warnings:
summary["created"].append(f"Pilot workflow warnings: {warnings}")
data = envelopeToWorkflowData(
normalized,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
# Inject the trustee feature-instance id into the parameters so the
# node runtime resolves it without manual editor cleanup.
trusteeInstanceId = self._guessTrusteeInstanceId(mandateId)
if trusteeInstanceId:
for node in data.get("graph", {}).get("nodes", []) or []:
params = node.get("parameters") or {}
if "featureInstanceId" in params and not params["featureInstanceId"]:
params["featureInstanceId"] = trusteeInstanceId
node["parameters"] = params
# Force-import: AutoWorkflow.create accepts our envelope-derived data
# (graph, label, invocations, …) verbatim; we add ids/timestamps that
# AutoWorkflow expects.
record = AutoWorkflow(
id=str(uuid.uuid4()),
mandateId=mandateId,
featureInstanceId=featureInstanceId,
label=data.get("label") or _PILOT_WORKFLOW_LABEL,
description=data.get("description") or "",
tags=data.get("tags") or [],
graph=data.get("graph") or {"nodes": [], "connections": []},
invocations=data.get("invocations") or [],
templateScope=data.get("templateScope") or "instance",
sharedReadOnly=bool(data.get("sharedReadOnly")),
notifyOnFailure=bool(data.get("notifyOnFailure", True)),
active=False,
)
created = geDb.recordCreate(AutoWorkflow, record)
summary["created"].append(f"Pilot workflow imported (active=false, id={created.get('id')})")
logger.info(f"Imported pilot workflow into workflowAutomation instance {featureInstanceId}")
def _guessTrusteeInstanceId(self, mandateId: str) -> Optional[str]:
"""Return the first trustee feature-instance id of the given mandate.
@ -638,23 +728,23 @@ class PwgDemo2026(BaseDemoConfig):
AutoVersion,
AutoWorkflow,
)
waDb = _openWorkflowAutomationDb()
workflows = waDb.getRecordset(AutoWorkflow, recordFilter={
geDb = _openWorkflowAutomationDb()
workflows = geDb.getRecordset(AutoWorkflow, recordFilter={
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
}) or []
for wf in workflows:
wfId = wf.get("id")
for version in waDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoVersion, version.get("id"))
for run in waDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []:
for version in geDb.getRecordset(AutoVersion, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoVersion, version.get("id"))
for run in geDb.getRecordset(AutoRun, recordFilter={"workflowId": wfId}) or []:
runId = run.get("id")
for step in waDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
waDb.recordDelete(AutoStepLog, step.get("id"))
waDb.recordDelete(AutoRun, runId)
for task in waDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
waDb.recordDelete(AutoTask, task.get("id"))
waDb.recordDelete(AutoWorkflow, wfId)
for step in geDb.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
geDb.recordDelete(AutoStepLog, step.get("id"))
geDb.recordDelete(AutoRun, runId)
for task in geDb.getRecordset(AutoTask, recordFilter={"workflowId": wfId}) or []:
geDb.recordDelete(AutoTask, task.get("id"))
geDb.recordDelete(AutoWorkflow, wfId)
if workflows:
summary["removed"].append(f"{len(workflows)} AutoWorkflows in {mandateLabel}")
except Exception as e:
@ -724,13 +814,12 @@ def _openTrusteeDb():
def _openWorkflowAutomationDb():
"""Open a privileged DB connection to the workflow-automation database."""
"""Open a privileged DB connection to ``poweron_graphicaleditor``."""
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelWorkflowAutomation import WORKFLOW_AUTOMATION_DATABASE
from modules.shared.configuration import APP_CONFIG
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase="poweron_graphicaleditor",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),

View file

@ -9,8 +9,7 @@ from urllib.parse import urlparse, unquote
from modules.datamodels.datamodelUam import User
from .datamodelFeatureNeutralizer import DataNeutralizerAttributes, DataNeutraliserConfig, DataNeutralizationSnapshot
from .interfaceFeatureNeutralizer import getInterface as _getNeutralizerInterface
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
logger = logging.getLogger(__name__)
@ -22,13 +21,10 @@ class NeutralizationPlayground:
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self._ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId, feature_instance_id=featureInstanceId)
def _getService(self, name: str):
return getService(name, self._ctx)
self.services = getServices(currentUser, None, mandateId=mandateId, featureInstanceId=featureInstanceId)
def processText(self, text: str) -> Dict[str, Any]:
return self._getService("neutralization").processText(text)
return self.services.neutralization.processText(text)
async def processUploadedFileAsync(self, file_bytes: bytes, filename: str) -> Dict[str, Any]:
"""Process an uploaded file (bytes + filename). Returns neutralized result for text or binary.
@ -47,35 +43,32 @@ class NeutralizationPlayground:
original_file_id = None
neutralized_file_id = None
neutralizationService = self._getService("neutralization")
try:
chatService = self._getService("chat")
except Exception:
chatService = None
if chatService:
# Save original file to user files
if self.services.interfaceDbComponent:
try:
file_item, _ = chatService.saveUploadedFile(file_bytes, filename)
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(file_bytes, filename)
original_file_id = str(file_item.id)
except Exception as e:
logger.warning(f"Could not save original file to user files: {e}")
if is_binary:
result = await neutralizationService.processBinaryBytesAsync(file_bytes, filename, mime)
result = await self.services.neutralization.processBinaryBytesAsync(file_bytes, filename, mime)
neu_bytes = result.get('neutralized_bytes')
logger.debug(f"Binary result: neu_bytes type={type(neu_bytes).__name__}, len={len(neu_bytes) if neu_bytes is not None else 0}")
if neu_bytes is not None and len(neu_bytes) > 0:
result['neutralized_file_base64'] = base64.b64encode(neu_bytes).decode('ascii')
result['neutralized_file_name'] = result.get('neutralized_file_name', f'neutralized_{filename}')
result['mime_type'] = result.get('mime_type', mime)
if chatService:
# Save neutralized binary to user files
if self.services.interfaceDbComponent:
try:
neu_name = result['neutralized_file_name']
file_item, _ = chatService.saveUploadedFile(neu_bytes, neu_name)
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name)
neutralized_file_id = str(file_item.id)
except Exception as e:
logger.warning(f"Could not save neutralized file to user files: {e}")
# Remove raw bytes before JSON response (avoid serialization issues; use base64 only)
result.pop('neutralized_bytes', None)
result['original_file_id'] = original_file_id
result['neutralized_file_id'] = neutralized_file_id
@ -93,14 +86,15 @@ class NeutralizationPlayground:
'neutralized_file_id': None,
'processed_info': {'type': 'error', 'error': 'File could not be decoded as text. Supported: UTF-8, Latin-1. For PDF/Word/Excel, use supported binary formats.'}
}
result = await neutralizationService.processTextAsync(text_content)
result = await self.services.neutralization.processTextAsync(text_content)
result['neutralized_file_name'] = f'neutralized_{filename}'
if chatService and result.get('neutralized_text') is not None:
# Save neutralized text as file to user files
if self.services.interfaceDbComponent and result.get('neutralized_text') is not None:
try:
neu_text = result['neutralized_text']
neu_bytes = neu_text.encode('utf-8')
neu_name = result['neutralized_file_name']
file_item, _ = chatService.saveUploadedFile(neu_bytes, neu_name)
file_item, _ = self.services.interfaceDbComponent.saveUploadedFile(neu_bytes, neu_name)
neutralized_file_id = str(file_item.id)
except Exception as e:
logger.warning(f"Could not save neutralized text file to user files: {e}")
@ -117,7 +111,7 @@ class NeutralizationPlayground:
errors: List[str] = []
for fileId in fileIds:
try:
res = self._getService("neutralization").processFile(fileId)
res = self.services.neutralization.processFile(fileId)
results.append({
'file_id': fileId,
'neutralized_file_name': res.get('neutralized_file_name'),
@ -143,12 +137,12 @@ class NeutralizationPlayground:
# Cleanup attributes
def cleanAttributes(self, fileId: str) -> bool:
return self._getService("neutralization").deleteNeutralizationAttributes(fileId)
return self.services.neutralization.deleteNeutralizationAttributes(fileId)
# Stats
def getStats(self) -> Dict[str, Any]:
try:
allAttributes = self._getService("neutralization").getAttributes()
allAttributes = self.services.neutralization.getAttributes()
patternCounts: Dict[str, int] = {}
for attr in allAttributes:
# Handle both dict and object access patterns
@ -190,24 +184,24 @@ class NeutralizationPlayground:
# Additional methods needed by the route
def getConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get neutralization configuration"""
return self._getService("neutralization").getConfig()
return self.services.neutralization.getConfig()
def saveConfig(self, configData: Dict[str, Any]) -> DataNeutraliserConfig:
"""Save neutralization configuration"""
return self._getService("neutralization").saveConfig(configData)
return self.services.neutralization.saveConfig(configData)
def neutralizeText(self, text: str, fileId: str = None) -> Dict[str, Any]:
"""Neutralize text content"""
return self._getService("neutralization").processText(text)
return self.services.neutralization.processText(text)
def resolveText(self, text: str) -> str:
"""Resolve UIDs in neutralized text back to original text"""
return self._getService("neutralization").resolveText(text)
return self.services.neutralization.resolveText(text)
def getSnapshots(self) -> List[DataNeutralizationSnapshot]:
"""Return stored neutralization text snapshots."""
try:
return self._getService("neutralization").getSnapshots()
return self.services.neutralization.getSnapshots()
except Exception as e:
logger.error(f"Error getting snapshots: {e}")
return []
@ -215,7 +209,7 @@ class NeutralizationPlayground:
def getAttributes(self, fileId: str = None) -> List[DataNeutralizerAttributes]:
"""Get neutralization attributes, optionally filtered by file ID"""
try:
allAttributes = self._getService("neutralization").getAttributes()
allAttributes = self.services.neutralization.getAttributes()
if fileId:
want = str(fileId).strip()
@ -233,7 +227,8 @@ class NeutralizationPlayground:
async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
"""Process files from SharePoint source path and store neutralized files in target path"""
processor = SharepointProcessor(self.currentUser, self._ctx)
from modules.serviceCenter.services.serviceSharepoint.mainServiceSharepoint import SharepointService
processor = SharepointProcessor(self.currentUser, self.services)
return await processor.processSharepointFiles(sourcePath, targetPath)
def batchNeutralizeFiles(self, filesData: List[Dict[str, Any]]) -> Dict[str, Any]:
@ -252,18 +247,15 @@ class NeutralizationPlayground:
# Internal SharePoint helper module separated to keep feature logic tidy
class SharepointProcessor:
def __init__(self, currentUser: User, ctx: ServiceCenterContext):
def __init__(self, currentUser: User, services):
self.currentUser = currentUser
self._ctx = ctx
self._sharepoint = getService("sharepoint", ctx)
self._neutralization = getService("neutralization", ctx)
from modules.interfaces.interfaceDbApp import getInterface as _getAppInterface
self._interfaceDbApp = _getAppInterface(currentUser, mandateId=ctx.mandate_id)
self.services = services
async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
try:
logger.info(f"Processing SharePoint files from {sourcePath} to {targetPath}")
# Get SharePoint connection
connection = await self._getSharepointConnection(sourcePath)
if not connection:
return {
@ -273,7 +265,8 @@ class SharepointProcessor:
'errors': ['No SharePoint connection found'],
}
if not self._sharepoint.setAccessTokenFromConnection(connection):
# Set access token for SharePoint service
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
return {
'success': False,
'message': 'Failed to set SharePoint access token',
@ -293,7 +286,8 @@ class SharepointProcessor:
async def _getSharepointConnection(self, sharepointPath: str = None):
try:
connections = self._interfaceDbApp.getUserConnections(self._interfaceDbApp.userId)
# Use interface method to get user connections
connections = self.services.interfaceDbApp.getUserConnections(self.services.interfaceDbApp.userId)
def _is_msft_connection(c):
av = c.authority.value if hasattr(c.authority, 'value') else str(getattr(c, 'authority', ''))
return av and str(av).lower() == 'msft'
@ -328,7 +322,7 @@ class SharepointProcessor:
for connection in connections:
try:
if not self._sharepoint.setAccessTokenFromConnection(connection):
if not self.services.sharepoint.setAccessTokenFromConnection(connection):
continue
if await self._testSharepointAccess(sharepointPath):
logger.info(f"Found matching connection for domain {targetDomain}: {connection.get('id')}")
@ -346,7 +340,7 @@ class SharepointProcessor:
siteUrl, _ = self._parseSharepointPath(sharepointPath)
if not siteUrl:
return False
siteInfo = await self._sharepoint.findSiteByWebUrl(siteUrl)
siteInfo = await self.services.sharepoint.findSiteByWebUrl(siteUrl)
return siteInfo is not None
except Exception:
return False
@ -357,17 +351,17 @@ class SharepointProcessor:
targetSite, targetFolder = self._parseSharepointPath(targetPath)
if not sourceSite or not targetSite:
return {'success': False, 'message': 'Invalid SharePoint path format', 'processed_files': 0, 'errors': ['Invalid SharePoint path format']}
sourceSiteInfo = await self._sharepoint.findSiteByWebUrl(sourceSite)
sourceSiteInfo = await self.services.sharepoint.findSiteByWebUrl(sourceSite)
if not sourceSiteInfo:
return {'success': False, 'message': f'Source site not found: {sourceSite}', 'processed_files': 0, 'errors': [f'Source site not found: {sourceSite}']}
targetSiteInfo = await self._sharepoint.findSiteByWebUrl(targetSite)
targetSiteInfo = await self.services.sharepoint.findSiteByWebUrl(targetSite)
if not targetSiteInfo:
return {'success': False, 'message': f'Target site not found: {targetSite}', 'processed_files': 0, 'errors': [f'Target site not found: {targetSite}']}
logger.info(f"Listing files in folder: {sourceFolder} for site: {sourceSiteInfo['id']}")
files = await self._sharepoint.listFolderContents(sourceSiteInfo['id'], sourceFolder)
files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], sourceFolder)
if not files:
logger.warning(f"No files found in folder '{sourceFolder}', trying root folder")
files = await self._sharepoint.listFolderContents(sourceSiteInfo['id'], '')
files = await self.services.sharepoint.listFolderContents(sourceSiteInfo['id'], '')
if files:
folders = [f for f in files if f.get('type') == 'folder']
folderNames = [f.get('name') for f in folders]
@ -391,7 +385,7 @@ class SharepointProcessor:
async def _processSingle(fileInfo: Dict[str, Any]):
try:
fileContent = await self._sharepoint.downloadFile(sourceSiteInfo['id'], fileInfo['id'])
fileContent = await self.services.sharepoint.downloadFile(sourceSiteInfo['id'], fileInfo['id'])
if not fileContent:
return {'error': f"Failed to download file: {fileInfo['name']}"}
name_lower = (fileInfo.get('name') or '').lower()
@ -408,7 +402,7 @@ class SharepointProcessor:
mime = next((mime_map[ext] for ext in BINARY_EXTS if name_lower.endswith(ext)), 'text/plain')
if is_binary:
result = self._neutralization.processBinaryBytes(fileContent, fileInfo['name'], mime)
result = self.services.neutralization.processBinaryBytes(fileContent, fileInfo['name'], mime)
if result.get('neutralized_bytes'):
content_to_upload = result['neutralized_bytes']
else:
@ -418,11 +412,11 @@ class SharepointProcessor:
textContent = fileContent.decode('utf-8')
except UnicodeDecodeError:
textContent = fileContent.decode('latin-1')
result = await self._neutralization.processTextAsync(textContent)
result = await self.services.neutralization.processTextAsync(textContent)
content_to_upload = (result.get('neutralized_text') or '').encode('utf-8')
neutralizedFilename = f"neutralized_{fileInfo['name']}"
uploadResult = await self._sharepoint.uploadFile(targetSiteInfo['id'], targetFolder, neutralizedFilename, content_to_upload)
uploadResult = await self.services.sharepoint.uploadFile(targetSiteInfo['id'], targetFolder, neutralizedFilename, content_to_upload)
if 'error' in uploadResult:
return {'error': f"Failed to upload neutralized file: {neutralizedFilename} - {uploadResult['error']}"}
return {

View file

@ -51,6 +51,7 @@ class NeutralizationService:
"""
self.services = serviceCenter
self._getService = getServiceFn
self.interfaceDbComponent = getattr(serviceCenter, "interfaceDbComponent", None)
# Create feature-specific interface for neutralizer DB operations
self.interfaceNeutralizer: InterfaceFeatureNeutralizer = None
@ -304,20 +305,19 @@ class NeutralizationService:
raise
def processFile(self, fileId: str) -> Dict[str, Any]:
"""Neutralize a file referenced by its fileId using ChatService.
"""Neutralize a file referenced by its fileId using component interface.
Supports text files directly; PDF/DOCX/XLSX/PPTX via extract -> neutralize -> generate."""
chatService = self._getService("chat") if self._getService else None
if not chatService:
raise ValueError("Chat service is required to process a file by fileId")
if not self.interfaceDbComponent:
raise ValueError("Component interface is required to process a file by fileId")
fileInfo = None
try:
fileInfo = chatService.getFile(fileId)
fileInfo = self.interfaceDbComponent.getFile(fileId)
except Exception:
fileInfo = None
fileName = getattr(fileInfo, 'fileName', None) if fileInfo else None
mimeType = getattr(fileInfo, 'mimeType', None) if fileInfo else None
fileData = chatService.getFileData(fileId)
fileData = self.interfaceDbComponent.getFileData(fileId)
if not fileData:
raise ValueError(f"No file data found for fileId: {fileId}")

View file

@ -24,8 +24,7 @@ from .datamodelFeatureRealEstate import (
Kanton,
Land,
)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface
from .serviceGeometry import fetch_parcel_polygon_from_swisstopo
@ -232,8 +231,8 @@ async def processNaturalLanguageCommand(
logger.info(f"Processing natural language command for user {currentUser.id} (mandate: {mandateId})")
logger.debug(f"User input: {userInput}")
ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId)
aiService = getService("ai", ctx)
services = getServices(currentUser, workflow=None, mandateId=mandateId)
aiService = services.ai
intentAnalysis = await analyzeUserIntent(aiService, userInput)

View file

@ -12,8 +12,7 @@ from fastapi import HTTPException, status
from modules.datamodels.datamodelUam import User
from .datamodelFeatureRealEstate import DokumentTyp
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
from .interfaceFeatureRealEstate import getInterface as getRealEstateInterface
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
from modules.features.realEstate.bzoDocumentRetriever import BZODocumentRetriever
@ -234,8 +233,10 @@ async def extract_bzo_information(
bzo_params_result = None
try:
ctx = ServiceCenterContext(user=currentUser, mandate_id=_mandateId, feature_instance_id=featureInstanceId)
ai_service = getService("ai", ctx)
services = getServices(
currentUser, workflow=None, mandateId=_mandateId, featureInstanceId=featureInstanceId
)
ai_service = services.ai
bzo_params_result = await run_bzo_params_extraction(
extracted_content=all_extracted_content,
bauzone=bauzone,
@ -520,8 +521,10 @@ async def generate_bauzone_ai_summary(
AI-generated summary string
"""
try:
ctx = ServiceCenterContext(user=currentUser, mandate_id=mandateId, feature_instance_id=featureInstanceId)
aiService = getService("ai", ctx)
services = getServices(
currentUser, workflow=None, mandateId=mandateId, featureInstanceId=featureInstanceId
)
aiService = services.ai
context_parts = []

View file

@ -158,7 +158,7 @@ def _backfillTargetFeatureInstanceId() -> None:
"""
def _do() -> None:
from modules.shared.configuration import APP_CONFIG
from modules.datamodels.datamodelWorkflowAutomation import AutoWorkflow, WORKFLOW_AUTOMATION_DATABASE
from modules.datamodels.datamodelWorkflowAutomation import AutoWorkflow
dbHost = APP_CONFIG.get("DB_HOST", "localhost")
dbUser = APP_CONFIG.get("DB_USER")
@ -166,7 +166,7 @@ def _backfillTargetFeatureInstanceId() -> None:
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
geDb = DatabaseConnector(
dbHost=dbHost,
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase="poweron_graphicaleditor",
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,

View file

@ -110,13 +110,12 @@ def initBootstrap(db: DatabaseConnector) -> None:
except Exception as e:
logger.warning(f"Mandate retention purge failed: {e}")
# System-component lifecycle hooks (registered via app.py Composition Root)
from modules.shared.systemComponentRegistry import getLifecycleHooks
for _scHook in getLifecycleHooks("onBootstrap"):
try:
_scHook()
except Exception as _scErr:
logger.warning(f"onBootstrap hook for system component failed: {_scErr}")
# WorkflowAutomation bootstrap (system component, not auto-discovered)
try:
from modules.workflowAutomation.mainWorkflowAutomation import onBootstrap as _waBootstrap
_waBootstrap()
except Exception as _waBootErr:
logger.warning(f"onBootstrap hook for 'workflowAutomation' failed: {_waBootErr}")
# Let features run their own bootstrap logic via lifecycle hooks
from modules.shared.featureDiscovery import loadFeatureMainModules

View file

@ -1870,13 +1870,12 @@ class AppObjects:
instances = self.db.getRecordset(FeatureInstance, recordFilter={"mandateId": mandateId})
# 0-pre-sc. System-component cascade-delete (registered via app.py Composition Root)
from modules.shared.systemComponentRegistry import getLifecycleHooks
for _scHook in getLifecycleHooks("onMandateDelete"):
try:
_scHook(mandateId, instances)
except Exception as _scErr:
logger.warning(f"onMandateDelete hook for system component failed: {_scErr}")
# 0-pre-wa. WorkflowAutomation cascade-delete (system component, not auto-discovered)
try:
from modules.workflowAutomation.mainWorkflowAutomation import onMandateDelete as _waDeleteHook
_waDeleteHook(mandateId, instances)
except Exception as _waDelErr:
logger.warning(f"onMandateDelete hook for 'workflowAutomation' failed: {_waDelErr}")
# 0-pre. Let features cascade-delete their own data via lifecycle hooks
from modules.shared.featureDiscovery import loadFeatureMainModules

View file

@ -807,7 +807,7 @@ class ComponentObjects:
next ``updateFile`` / ``getFile`` then rejects with
``File with ID ... not found`` -- the well-known "ghost duplicate"
symptom seen when ``interfaceDbComponent`` is initialised without an
``featureInstanceId`` (e.g. via ``serviceCenter``) but a same-hash+name
``featureInstanceId`` (e.g. via ``serviceHub``) but a same-hash+name
file exists in another featureInstance under the same mandate.
We therefore cross-check the candidate through the RBAC-aware ``getFile``
before returning it; if RBAC blocks it, we treat it as "no duplicate
@ -933,7 +933,9 @@ class ComponentObjects:
If pagination is provided: PaginatedResult with items and metadata
"""
def _convertFileItems(files):
from modules.shared.workflowArtifactVisibility import suppressWorkflowFileInWorkspaceUi
from modules.workflowAutomation.engine.workflowArtifactVisibility import (
suppress_workflow_file_in_workspace_ui,
)
fileItems = []
for file in files:
@ -947,7 +949,7 @@ class ComponentObjects:
fileName = file.get("fileName")
if not fileName or fileName == "None":
continue
if suppressWorkflowFileInWorkspaceUi(file):
if suppress_workflow_file_in_workspace_ui(file):
continue
if file.get("scope") is None:

View file

@ -321,12 +321,7 @@ class FeatureInterface:
f"for feature '{featureCode}' to instance {instanceId} (mandate={mandateId})"
)
from modules.shared.systemComponentRegistry import getLifecycleHooks
_onInstanceCreateHooks = getLifecycleHooks("onInstanceCreate")
if not _onInstanceCreateHooks:
logger.warning("_copyTemplateWorkflows: no onInstanceCreate hooks registered")
return 0
_waOnInstanceCreate = _onInstanceCreateHooks[0]
from modules.workflowAutomation.mainWorkflowAutomation import onInstanceCreate as _waOnInstanceCreate
try:
copied = _waOnInstanceCreate(mandateId, instanceId, featureCode, templateWorkflows)

View file

@ -46,7 +46,7 @@ def _make_json_serializable(obj: Any, _depth: int = 0) -> Any:
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelWorkflowAutomation import (
WORKFLOW_AUTOMATION_DATABASE,
GRAPHICAL_EDITOR_DATABASE,
AutoWorkflow,
AutoVersion,
AutoRun,
@ -59,15 +59,15 @@ from modules.dbHelpers.dbRegistry import registerDatabase
logger = logging.getLogger(__name__)
workflowAutomationDatabase = WORKFLOW_AUTOMATION_DATABASE
workflowAutomationDatabase = GRAPHICAL_EDITOR_DATABASE
registerDatabase(workflowAutomationDatabase)
_CALLBACK_WORKFLOW_CHANGED = "workflowAutomation.workflow.changed"
def _invocationsSyncedWithGraph(graph, invocations):
"""Sync invocations with graph trigger nodes (via nodeCatalog, L2)."""
from modules.nodeCatalog.entryPoints import invocationsSyncedWithGraph
return invocationsSyncedWithGraph(graph, invocations)
"""Lazy-load entryPoints to avoid L4->L5 top-level import."""
from modules.workflowAutomation.editor.entryPoints import invocations_synced_with_graph
return invocations_synced_with_graph(graph, invocations)
def _getWorkflowAutomationInterface(

View file

@ -2,7 +2,7 @@
# All rights reserved.
"""
Stripe webhook and subscription business logic for billing.
Handles checkout credit, subscription lifecycle transitions, and invoice events.
Extracted from routeBilling.py for maintainability.
"""
import logging

View file

@ -332,7 +332,7 @@ def _getStripeClient():
def _creditStripeSessionIfNeeded(billingInterface, session: Dict[str, Any], eventId: Optional[str] = None) -> CheckoutConfirmResponse:
"""Credit balance from Stripe Checkout session if not already credited."""
from modules.serviceCenter.services.serviceBilling.billingWebhookHandler import creditStripeSessionIfNeeded
from .billingWebhookHandler import creditStripeSessionIfNeeded
return creditStripeSessionIfNeeded(billingInterface, session, eventId, CheckoutConfirmResponse)
@ -907,8 +907,8 @@ def createCheckoutSession(
mandateLabel = targetMandateId
invoiceAddress = None
from modules.serviceCenter.services.serviceBilling.stripeCheckout import createCheckoutSession
redirect_url = createCheckoutSession(
from modules.serviceCenter.services.serviceBilling.stripeCheckout import create_checkout_session
redirect_url = create_checkout_session(
mandate_id=targetMandateId,
user_id=checkoutRequest.userId,
amount_chf=checkoutRequest.amount,
@ -1079,13 +1079,13 @@ async def stripeWebhook(
def handleSubscriptionCheckoutCompleted(session, eventId: str) -> None:
"""Handle checkout.session.completed for mode=subscription."""
from modules.serviceCenter.services.serviceBilling.billingWebhookHandler import handleSubscriptionCheckoutCompleted as _handler
from .billingWebhookHandler import handleSubscriptionCheckoutCompleted as _handler
_handler(session, eventId, getRootInterface)
def _handleSubscriptionWebhook(event) -> None:
"""Process Stripe subscription webhook events."""
from modules.serviceCenter.services.serviceBilling.billingWebhookHandler import handleSubscriptionWebhook as _handler
from .billingWebhookHandler import handleSubscriptionWebhook as _handler
_handler(event, getRootInterface)

View file

@ -9,8 +9,7 @@ from fastapi import APIRouter, Depends, HTTPException, Path, Query, Request, sta
from modules.auth import getCurrentUser, limiter
from modules.datamodels.datamodelUam import AuthAuthority, User, UserConnection
from modules.interfaces.interfaceDbApp import getInterface
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeClickup")
@ -60,14 +59,13 @@ def _clickup_connection_or_404(interface, connection_id: str, user_id: str) -> U
def _svc_for_connection(current_user: User, connection: UserConnection):
ctx = ServiceCenterContext(user=current_user)
clickupService = getService("clickup", ctx)
if not clickupService.setAccessTokenFromConnection(connection):
services = getServices(current_user, None)
if not services.clickup.setAccessTokenFromConnection(connection):
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=routeApiMsg("Failed to set ClickUp access token. Connection may be expired or invalid."),
)
return clickupService
return services.clickup
@router.get("/{connectionId}/teams/{teamId}", response_model=Dict[str, Any])

View file

@ -12,8 +12,7 @@ from fastapi import APIRouter, HTTPException, Depends, Path, Query, Request, sta
from modules.auth import limiter, getCurrentUser
from modules.datamodels.datamodelUam import User, UserConnection
from modules.interfaces.interfaceDbApp import getInterface
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeSharepoint")
@ -123,17 +122,19 @@ async def getSharepointFolderOptionsByReference(
detail=f"Connection is not a Microsoft connection (authority: {authority})"
)
ctx = ServiceCenterContext(user=currentUser)
sharepointService = getService("sharepoint", ctx)
# Initialize services
services = getServices(currentUser, None)
if not sharepointService.setAccessTokenFromConnection(connection):
# Set access token on SharePoint service
if not services.sharepoint.setAccessTokenFromConnection(connection):
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=routeApiMsg("Failed to set SharePoint access token. Connection may be expired or invalid.")
)
# Mode 1: Return sites list if no siteId specified
if not siteId:
sites = await sharepointService.discoverSites()
sites = await services.sharepoint.discoverSites()
return [
{
"type": "site",
@ -147,8 +148,9 @@ async def getSharepointFolderOptionsByReference(
for site in sites
]
# Mode 2: Return folders within specific site
folderPath = path or ""
items = await sharepointService.listFolderContents(siteId, folderPath)
items = await services.sharepoint.listFolderContents(siteId, folderPath)
if not items:
return []

View file

@ -839,12 +839,12 @@ def _buildIntegrationsOverviewPayload(userId: str, user=None) -> Dict[str, Any]:
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelPagination import PaginationParams
from modules.datamodels.datamodelWorkflowAutomation import (
AutoWorkflow, AutoRun, WORKFLOW_AUTOMATION_DATABASE,
AutoWorkflow, AutoRun,
)
wfDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase="poweron_graphicaleditor",
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),

View file

@ -31,7 +31,7 @@ from modules.datamodels.datamodelPagination import PaginationParams, PaginationM
from modules.datamodels.datamodelWorkflowAutomation import (
AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
)
from modules.dbHelpers.paginationHelpers import applyFiltersAndSort, paginateInMemory
from modules.dbHelpers.paginationHelpers import applyFiltersAndSort
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.shared.i18nRegistry import apiRouteContext, resolveText
from modules.workflowAutomation.helpers import (
@ -75,12 +75,9 @@ async def _listWorkflows(
scopeFilter = {"mandateId": mandateId}
params = _parsePaginationOr400(pagination)
records = db.getRecordset(AutoWorkflow, recordFilter=scopeFilter)
if params:
filtered = applyFiltersAndSort(records or [], params)
pageItems, totalItems = paginateInMemory(filtered, params)
return {"items": pageItems, "total": totalItems}
return {"items": records or [], "total": len(records or [])}
records = db.getRecordset(AutoWorkflow, recordFilter=scopeFilter, pagination=params)
total = db.getRecordCount(AutoWorkflow, recordFilter=scopeFilter) if params else len(records or [])
return {"items": records or [], "total": total}
finally:
db.close()
@ -184,12 +181,9 @@ async def _listRuns(
scopeFilter = {**(scopeFilter or {}), "workflowId": workflowId}
params = _parsePaginationOr400(pagination)
records = db.getRecordset(AutoRun, recordFilter=scopeFilter)
if params:
filtered = applyFiltersAndSort(records or [], params)
pageItems, totalItems = paginateInMemory(filtered, params)
return {"items": pageItems, "total": totalItems}
return {"items": records or [], "total": len(records or [])}
records = db.getRecordset(AutoRun, recordFilter=scopeFilter, pagination=params)
total = db.getRecordCount(AutoRun, recordFilter=scopeFilter) if params else len(records or [])
return {"items": records or [], "total": total}
finally:
db.close()
@ -240,12 +234,9 @@ async def _listTasks(
scopeFilter = {**(scopeFilter or {}), "status": status}
params = _parsePaginationOr400(pagination)
records = db.getRecordset(AutoTask, recordFilter=scopeFilter)
if params:
filtered = applyFiltersAndSort(records or [], params)
pageItems, totalItems = paginateInMemory(filtered, params)
return {"items": pageItems, "total": totalItems}
return {"items": records or [], "total": len(records or [])}
records = db.getRecordset(AutoTask, recordFilter=scopeFilter, pagination=params)
total = db.getRecordCount(AutoTask, recordFilter=scopeFilter) if params else len(records or [])
return {"items": records or [], "total": total}
finally:
db.close()
@ -1252,11 +1243,11 @@ def _getRunDetail(
except Exception as e:
logger.warning("_getRunDetail: file lookup failed: %s", e)
from modules.shared.workflowArtifactVisibility import suppressWorkflowFileInWorkspaceUi
from modules.workflowAutomation.engine.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui
def _resolveFileList(ids: set) -> list:
rows = [dict(fileMetaById[fid]) for fid in ids if fid in fileMetaById]
return [m for m in rows if not suppressWorkflowFileInWorkspaceUi(m)]
return [m for m in rows if not suppress_workflow_file_in_workspace_ui(m)]
assignedFileIds: set = set()
for step, (inputIds, outputIds) in zip(steps, perStepFileIds):
@ -1314,7 +1305,7 @@ def _buildExecuteRunEnvelope(
merge_run_envelope,
normalize_run_envelope,
)
from modules.nodeCatalog.entryPoints import findInvocation
from modules.workflowAutomation.editor.entryPoints import find_invocation
if isinstance(body.get("runEnvelope"), dict):
env = normalize_run_envelope(body["runEnvelope"], user_id=userId)
@ -1330,7 +1321,7 @@ def _buildExecuteRunEnvelope(
status_code=400,
detail=routeApiMsg("entryPointId requires a saved workflow (workflowId must refer to a stored workflow)"),
)
inv = findInvocation(workflow, entryPointId)
inv = find_invocation(workflow, entryPointId)
if not inv:
raise HTTPException(status_code=400, detail=routeApiMsg("entryPointId not found on workflow"))
if not inv.get("enabled", True):

View file

@ -16,10 +16,9 @@ from modules.serviceCenter.registry import (
)
from modules.serviceCenter.resolver import (
resolve,
getResolutionCache,
clearCache,
get_resolution_cache,
clear_cache,
)
from modules.serviceCenter.services.serviceAgent.mainServiceAgent import ServicesBag
logger = logging.getLogger(__name__)
@ -38,7 +37,7 @@ def getService(
Returns:
Service instance
"""
cache = getResolutionCache()
cache = get_resolution_cache()
resolving = set()
return resolve(key, context, cache, resolving)
@ -81,13 +80,13 @@ def registerServiceObjects(catalogService) -> bool:
return False
def canAccessService(
def can_access_service(
user,
rbac,
serviceKey: str,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
allowWhenNoRbac: bool = True,
service_key: str,
mandate_id: Optional[str] = None,
feature_instance_id: Optional[str] = None,
allow_when_no_rbac: bool = True,
) -> bool:
"""
Check if user has permission to access the given service.
@ -95,42 +94,40 @@ def canAccessService(
Args:
user: User object
rbac: RbacClass instance (e.g. from interfaceDbApp.rbac)
serviceKey: Service key (e.g., "web", "extraction")
mandateId: Optional mandate context
featureInstanceId: Optional feature instance context
allowWhenNoRbac: If True, allow when rbac is None (migration/default)
service_key: Service key (e.g., "web", "extraction")
mandate_id: Optional mandate context
feature_instance_id: Optional feature instance context
allow_when_no_rbac: If True, allow when rbac is None (migration/default)
Returns:
True if user has view permission on the service
"""
if not rbac:
return allowWhenNoRbac
if serviceKey not in IMPORTABLE_SERVICES:
return allow_when_no_rbac
if service_key not in IMPORTABLE_SERVICES:
return False
obj = IMPORTABLE_SERVICES[serviceKey]
objectKey = obj.get("objectKey")
if not objectKey:
obj = IMPORTABLE_SERVICES[service_key]
object_key = obj.get("objectKey")
if not object_key:
return False
from modules.datamodels.datamodelRbac import AccessRuleContext
permissions = rbac.getUserPermissions(
user,
AccessRuleContext.RESOURCE,
objectKey,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
object_key,
mandateId=mandate_id,
featureInstanceId=feature_instance_id,
)
return permissions.view if permissions else False
__all__ = [
"ServiceCenterContext",
"ServicesBag",
"getService",
"preWarm",
"clearCache",
"clear_cache",
"registerServiceObjects",
"canAccessService",
"can_access_service",
"SERVICE_RBAC_OBJECTS",
"CORE_SERVICES",
"IMPORTABLE_SERVICES",

View file

@ -75,21 +75,18 @@ except ImportError:
pass
def getResolutionCache() -> Dict[str, Any]:
def get_resolution_cache() -> Dict[str, Any]:
"""Get the module-level resolution cache (for preWarm/clear)."""
return _resolution_cache
def clearCache() -> None:
def clear_cache() -> None:
"""Clear the resolution cache."""
lock = _cache_lock if _cache_lock is not None else _DummyLock()
with lock:
_resolution_cache.clear()
class _DummyLock:
def __enter__(self):
return self

View file

@ -0,0 +1,189 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Service Hub.
Consumer-facing aggregation layer for services, DB interfaces, and runtime state.
Architecture:
- serviceHub delegates service resolution to serviceCenter (DI container)
- serviceHub owns DB interface initialization and runtime state
- serviceCenter knows nothing about serviceHub (one-way dependency)
Import-Regelwerk:
- Zentrale Module (wie dieses) duerfen KEINE Feature-Container importieren
- Feature-spezifische Services werden dynamisch geladen
- Shared Services werden via serviceCenter resolved
"""
import os
import importlib
import glob
from typing import Any, Optional, TYPE_CHECKING
import logging
from modules.datamodels.datamodelUam import User
if TYPE_CHECKING:
from modules.datamodels.datamodelChat import ChatWorkflow
logger = logging.getLogger(__name__)
_FEATURES_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "features")
class PublicService:
"""Lightweight proxy exposing only public callable attributes of a target."""
def __init__(self, target: Any, functionsOnly: bool = True, nameFilter=None):
self._target = target
self._functionsOnly = functionsOnly
self._nameFilter = nameFilter
def __getattr__(self, name: str):
if name.startswith('_'):
raise AttributeError(f"'{type(self._target).__name__}' attribute '{name}' is private")
if self._nameFilter and not self._nameFilter(name):
raise AttributeError(f"'{name}' not exposed by policy")
attr = getattr(self._target, name)
if self._functionsOnly and not callable(attr):
raise AttributeError(f"'{name}' is not a function")
return attr
def __dir__(self):
return sorted([
n for n in dir(self._target)
if not n.startswith('_')
and (not self._functionsOnly or callable(getattr(self._target, n, None)))
and (self._nameFilter(n) if self._nameFilter else True)
])
class ServiceHub:
"""
Consumer-facing aggregation of services, DB interfaces, and runtime state.
Services are lazy-resolved via serviceCenter on first access.
DB interfaces and runtime state are initialized eagerly.
Feature services/interfaces are discovered dynamically from features/.
"""
_SERVICE_CENTER_WRAPPING = {
"ai": {"functionsOnly": False},
}
def __init__(self, user: User, workflow: "ChatWorkflow" = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None):
self.user: User = user
self.workflow = workflow
self.mandateId: Optional[str] = mandateId
self.featureInstanceId: Optional[str] = featureInstanceId
self.currentUserPrompt: str = ""
self.rawUserPrompt: str = ""
from modules.serviceCenter.context import ServiceCenterContext
self._serviceCenterContext = ServiceCenterContext(
user=user,
workflow=workflow,
mandate_id=mandateId,
feature_instance_id=featureInstanceId,
)
from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
self.interfaceDbApp = getAppInterface(user, mandateId=mandateId)
from modules.interfaces.interfaceDbManagement import getInterface as getComponentInterface
self.interfaceDbComponent = getComponentInterface(user, mandateId=mandateId)
self.rbac = self.interfaceDbApp.rbac if self.interfaceDbApp else None
from modules.interfaces.interfaceDbChat import getInterface as getChatInterface
self.interfaceDbChat = getChatInterface(user, mandateId=mandateId, featureInstanceId=featureInstanceId)
self._loadFeatureInterfaces()
self._loadFeatureServices()
def __getattr__(self, name: str):
"""Lazy-resolve services via serviceCenter on first access."""
if name.startswith('_'):
raise AttributeError(name)
try:
from modules.serviceCenter import getService
service = getService(name, self._serviceCenterContext)
wrapping = self._SERVICE_CENTER_WRAPPING.get(name, {})
functionsOnly = wrapping.get("functionsOnly", True)
wrapped = PublicService(service, functionsOnly=functionsOnly)
setattr(self, name, wrapped)
return wrapped
except KeyError:
raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'")
def _loadFeatureInterfaces(self):
"""Dynamically load interfaces from feature containers by filename pattern."""
pattern = os.path.join(_FEATURES_DIR, "*", "interfaceFeature*.py")
for filepath in glob.glob(pattern):
try:
featureDir = os.path.basename(os.path.dirname(filepath))
filename = os.path.basename(filepath)[:-3]
modulePath = f"modules.features.{featureDir}.{filename}"
module = importlib.import_module(modulePath)
if hasattr(module, "getInterface"):
interface = module.getInterface(self.user, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId)
attrName = filename.replace("interfaceFeature", "interfaceDb")
setattr(self, attrName, interface)
logger.debug(f"Loaded interface: {attrName} from {modulePath}")
except Exception as e:
logger.debug(f"Could not load interface from {filepath}: {e}")
def _loadFeatureServices(self):
"""Dynamically load services from feature containers by filename pattern."""
pattern = os.path.join(_FEATURES_DIR, "*", "service*", "mainService*.py")
for filepath in glob.glob(pattern):
try:
serviceDir = os.path.basename(os.path.dirname(filepath))
featureDir = os.path.basename(os.path.dirname(os.path.dirname(filepath)))
filename = os.path.basename(filepath)[:-3]
modulePath = f"modules.features.{featureDir}.{serviceDir}.{filename}"
module = importlib.import_module(modulePath)
serviceClass = None
for attrName in dir(module):
if attrName.endswith("Service") and not attrName.startswith("_"):
cls = getattr(module, attrName)
if isinstance(cls, type):
serviceClass = cls
break
if serviceClass:
attrName = serviceDir.replace("service", "").lower()
if not attrName:
attrName = serviceDir.lower()
functionsOnly = attrName != "ai"
def _makeServiceResolver(hub):
def _resolver(depKey: str):
return getattr(hub, depKey)
return _resolver
import inspect
sig = inspect.signature(serviceClass.__init__)
paramCount = len([p for p in sig.parameters if p != 'self'])
if paramCount >= 2:
serviceInstance = serviceClass(self, _makeServiceResolver(self))
else:
serviceInstance = serviceClass(self)
setattr(self, attrName, PublicService(serviceInstance, functionsOnly=functionsOnly))
logger.debug(f"Loaded service: {attrName} from {modulePath}")
except Exception as e:
logger.debug(f"Could not load service from {filepath}: {e}")
# Backward-compatible alias
Services = ServiceHub
def getInterface(user: User, workflow: "ChatWorkflow" = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None) -> ServiceHub:
"""Get ServiceHub instance for the given user, mandate, and feature instance context."""
return ServiceHub(user, workflow, mandateId=mandateId, featureInstanceId=featureInstanceId)

View file

@ -274,7 +274,7 @@ def _persistLargeDocument(doc, services, context: Dict[str, Any]) -> Optional[Di
docName = getattr(doc, "documentName", "unnamed")
docMime = getattr(doc, "mimeType", "application/octet-stream")
try:
fileItem, _ = chatService.saveUploadedFile(docBytes, docName)
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docBytes, docName)
from modules.serviceCenter.services.serviceAgent.coreTools._helpers import (
_attachFileAsChatDocument,
@ -295,7 +295,7 @@ def _persistLargeDocument(doc, services, context: Dict[str, Any]) -> Optional[Di
updateFields["mandateId"] = mandateId
if updateFields:
logger.debug("_persistLargeDocument: updating file %s with %s", fileItem.id, updateFields)
chatService.updateFile(fileItem.id, updateFields)
chatService.interfaceDbComponent.updateFile(fileItem.id, updateFields)
else:
logger.warning("_persistLargeDocument: no updateFields for file %s (tempFolderId=%s, fiId=%s)", fileItem.id, tempFolderId, fiId)

View file

@ -88,11 +88,12 @@ def registerConnectionTools(registry: ToolRegistry, services):
graphAttachments: List[Dict[str, Any]] = []
if attachmentFileIds:
chatService = services.chat
dbMgmt = chatService.interfaceDbComponent
for fid in attachmentFileIds:
fileRow = chatService.getFile(fid)
fileRow = dbMgmt.getFile(fid)
if not fileRow:
return ToolResult(toolCallId="", toolName="sendMail", success=False, error=f"Attachment file not found: {fid}")
rawBytes = chatService.getFileData(fid)
rawBytes = dbMgmt.getFileData(fid)
if not rawBytes:
return ToolResult(toolCallId="", toolName="sendMail", success=False, error=f"Attachment file has no data: {fid}")
graphAttachments.append({

View file

@ -27,7 +27,8 @@ def registerCrossWorkflowTools(registry: ToolRegistry, services):
"""List all chat workflows in this workspace with metadata."""
try:
chatService = services.chat
allWorkflows = chatService.getWorkflows() or []
chatInterface = chatService.interfaceDbChat
allWorkflows = chatInterface.getWorkflows() or []
allWorkflows.sort(
key=lambda w: w.get("sysCreatedAt") or w.get("startedAt") or 0,
@ -42,7 +43,7 @@ def registerCrossWorkflowTools(registry: ToolRegistry, services):
createdAt = wf.get("sysCreatedAt") or wf.get("startedAt") or 0
lastActivity = wf.get("lastActivity") or createdAt
msgs = chatService.getMessages(wfId) or []
msgs = chatInterface.getMessages(wfId) or []
messageCount = len(msgs)
lastPreview = ""
if msgs:
@ -101,7 +102,8 @@ def registerCrossWorkflowTools(registry: ToolRegistry, services):
try:
chatService = services.chat
allMsgs = chatService.getMessages(targetWorkflowId) or []
chatInterface = chatService.interfaceDbChat
allMsgs = chatInterface.getMessages(targetWorkflowId) or []
sliced = allMsgs[offset:offset + limit]
items = []

View file

@ -359,7 +359,7 @@ def registerDataSourceTools(registry: ToolRegistry, services):
elif fileBytes[:2] == b"PK":
fileName = f"{fileName}.zip"
chatService = services.chat
fileItem, _ = chatService.saveUploadedFile(fileBytes, fileName)
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(fileBytes, fileName)
updateFields = {}
tempFolderId = _getOrCreateTempFolder(chatService)
if tempFolderId:
@ -370,7 +370,7 @@ def registerDataSourceTools(registry: ToolRegistry, services):
if _sourceNeutralize:
updateFields["neutralize"] = True
if updateFields:
chatService.updateFile(fileItem.id, updateFields)
chatService.interfaceDbComponent.updateFile(fileItem.id, updateFields)
chatDocId = _attachFileAsChatDocument(
services, fileItem,

View file

@ -173,6 +173,11 @@ def registerFeatureSubAgentTools(registry: ToolRegistry, services):
neutralizePolicy[tn] = {"tableActive": tableActive, "explicitFields": explicitFields}
neutralizationService = services.getService("neutralization") if hasattr(services, "getService") else None
if neutralizationService is not None and not getattr(neutralizationService, "interfaceDbComponent", None):
try:
neutralizationService.interfaceDbComponent = services.chat.interfaceDbComponent
except Exception:
pass
cacheKey = f"{featureInstanceId}:{hashlib.md5(question.encode()).hexdigest()}"
if cacheKey in _featureQueryCache:

View file

@ -48,16 +48,23 @@ def _looksLikeBinary(data: bytes, sampleSize: int = 1024) -> bool:
def _getOrCreateTempFolder(chatService) -> Optional[str]:
"""Return the ID of the user's 'Temp' folder, creating it if it doesn't exist."""
ifc = getattr(chatService, "interfaceDbComponent", None)
if not ifc:
logger.warning("_getOrCreateTempFolder: no interfaceDbComponent on chatService")
return None
userId = getattr(ifc, "userId", None)
if not userId:
logger.warning("_getOrCreateTempFolder: userId is None on interfaceDbComponent")
return None
try:
ownFolders = chatService.getOwnFolderTree()
ownFolders = ifc.getOwnFolderTree()
for f in ownFolders:
if f.get("name") == "Temp":
folderId = f.get("id")
logger.debug("_getOrCreateTempFolder: found existing Temp folder %s", folderId)
return str(folderId) if folderId else None
newFolder = chatService.createFolder("Temp")
newFolder = ifc.createFolder("Temp")
folderId = newFolder.get("id") if isinstance(newFolder, dict) else getattr(newFolder, "id", None)
userId = getattr(getattr(chatService, "interfaceDbComponent", None), "userId", None)
logger.info("_getOrCreateTempFolder: created Temp folder %s for user %s", folderId, userId)
return str(folderId) if folderId else None
except Exception as e:

View file

@ -46,8 +46,8 @@ def registerMediaTools(registry: ToolRegistry, services):
if sourceFileId:
try:
chatService = services.chat
fileRow = chatService.getFile(sourceFileId)
dbMgmt = services.chat.interfaceDbComponent
fileRow = dbMgmt.getFile(sourceFileId)
if not fileRow:
return ToolResult(
toolCallId="",
@ -55,7 +55,7 @@ def registerMediaTools(registry: ToolRegistry, services):
success=False,
error=f"sourceFileId not found: {sourceFileId}",
)
rawBytes = chatService.getFileData(sourceFileId)
rawBytes = dbMgmt.getFileData(sourceFileId)
if not rawBytes:
return ToolResult(
toolCallId="",
@ -244,7 +244,11 @@ def registerMediaTools(registry: ToolRegistry, services):
if not docName.lower().endswith(f".{outputFormat}"):
docName = f"{sanitizedTitle}.{outputFormat}"
fileItem, _ = chatService.saveUploadedFile(docData, docName)
fileItem = None
if hasattr(chatService.interfaceDbComponent, "saveGeneratedFile"):
fileItem = chatService.interfaceDbComponent.saveGeneratedFile(docData, docName, docMime)
else:
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docData, docName)
if fileItem:
fid = fileItem.id if hasattr(fileItem, "id") else fileItem.get("id", "?")
@ -256,7 +260,7 @@ def registerMediaTools(registry: ToolRegistry, services):
if fiId:
updateFields["featureInstanceId"] = fiId
if updateFields:
chatService.updateFile(fid, updateFields)
chatService.interfaceDbComponent.updateFile(fid, updateFields)
chatDocId = _attachFileAsChatDocument(
services, fileItem,
label=f"renderDocument:{docName}",
@ -540,7 +544,11 @@ def registerMediaTools(registry: ToolRegistry, services):
if not docName.lower().endswith(".png"):
docName = f"{sanitizedTitle}.png"
fileItem, _ = chatService.saveUploadedFile(docData, docName)
fileItem = None
if hasattr(chatService.interfaceDbComponent, "saveGeneratedFile"):
fileItem = chatService.interfaceDbComponent.saveGeneratedFile(docData, docName, docMime)
else:
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(docData, docName)
if fileItem:
fid = fileItem.id if hasattr(fileItem, "id") else fileItem.get("id", "?")
@ -552,7 +560,7 @@ def registerMediaTools(registry: ToolRegistry, services):
if fiId:
updateFields["featureInstanceId"] = fiId
if updateFields:
chatService.updateFile(fid, updateFields)
chatService.interfaceDbComponent.updateFile(fid, updateFields)
chatDocId = _attachFileAsChatDocument(
services, fileItem,
label=f"generateImage:{docName}",
@ -701,7 +709,10 @@ def registerMediaTools(registry: ToolRegistry, services):
sanitizedTitle = re.sub(r'[^\w._-]', '_', title, flags=re.UNICODE).strip('_') or "chart"
fileName = f"{sanitizedTitle}.png"
fileItem, _ = chatService.saveUploadedFile(pngData, fileName)
if hasattr(chatService.interfaceDbComponent, "saveGeneratedFile"):
fileItem = chatService.interfaceDbComponent.saveGeneratedFile(pngData, fileName, "image/png")
else:
fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(pngData, fileName)
fid = fileItem.id if hasattr(fileItem, "id") else fileItem.get("id", "?") if isinstance(fileItem, dict) else "?"
if fid != "?":
@ -713,7 +724,7 @@ def registerMediaTools(registry: ToolRegistry, services):
if fiId:
updateFields["featureInstanceId"] = fiId
if updateFields:
chatService.updateFile(fid, updateFields)
chatService.interfaceDbComponent.updateFile(fid, updateFields)
chatDocId = _attachFileAsChatDocument(
services, fileItem,
@ -800,7 +811,7 @@ def registerMediaTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="speechToText", success=False, error="fileId is required")
try:
chatService = services.chat
audioData = chatService.getFileData(fileId)
audioData = chatService.interfaceDbComponent.getFileData(fileId)
if not audioData:
return ToolResult(toolCallId="", toolName="speechToText", success=False, error=f"No data found for file {fileId}")
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
@ -844,6 +855,8 @@ def registerMediaTools(registry: ToolRegistry, services):
neutralizationService = services.getService("neutralization")
if not neutralizationService:
return ToolResult(toolCallId="", toolName="neutralizeData", success=False, error="Neutralization service not available")
if not neutralizationService.interfaceDbComponent:
neutralizationService.interfaceDbComponent = services.chat.interfaceDbComponent
if text:
result = await neutralizationService.processTextAsync(text, fileId or None)
else:
@ -877,13 +890,16 @@ def registerMediaTools(registry: ToolRegistry, services):
if not neutralizationService or not hasattr(neutralizationService, "resolveText"):
return ToolResult(toolCallId="", toolName="revealDocument", success=False,
error="Neutralization service not available")
if not getattr(neutralizationService, "interfaceDbComponent", None):
neutralizationService.interfaceDbComponent = services.chat.interfaceDbComponent
if fileId and not text:
chatService = services.chat
fileRow = chatService.getFile(fileId)
dbMgmt = services.chat.interfaceDbComponent
fileRow = dbMgmt.getFile(fileId)
if not fileRow:
return ToolResult(toolCallId="", toolName="revealDocument", success=False,
error=f"fileId not found: {fileId}")
rawBytes = chatService.getFileData(fileId)
rawBytes = dbMgmt.getFileData(fileId)
if not rawBytes:
return ToolResult(toolCallId="", toolName="revealDocument", success=False,
error="File data not accessible")

View file

@ -283,7 +283,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="tagFile", success=False, error="fileId is required")
try:
chatService = services.chat
chatService.updateFile(fileId, {"tags": tags})
chatService.interfaceDbComponent.updateFile(fileId, {"tags": tags})
return ToolResult(
toolCallId="", toolName="tagFile", success=True,
data=f"Tags updated to {tags} for file {fileId}"
@ -302,21 +302,22 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
try:
chatService = services.chat
dbMgmt = chatService.interfaceDbComponent
if mode == "append":
if not fileId:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error="fileId is required for mode=append")
file = chatService.getFile(fileId)
file = dbMgmt.getFile(fileId)
if not file:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error=f"File {fileId} not found")
existingData = chatService.getFileData(fileId) or b""
existingData = dbMgmt.getFileData(fileId) or b""
try:
existingText = existingData.decode("utf-8")
except UnicodeDecodeError:
existingText = existingData.decode("latin-1", errors="replace")
newContent = existingText + content
chatService.updateFileData(fileId, newContent.encode("utf-8"))
chatService.updateFile(fileId, {"fileSize": len(newContent.encode("utf-8"))})
dbMgmt.updateFileData(fileId, newContent.encode("utf-8"))
dbMgmt.updateFile(fileId, {"fileSize": len(newContent.encode("utf-8"))})
return ToolResult(
toolCallId="", toolName="writeFile", success=True,
data=f"Appended {len(content)} chars to '{file.fileName}' (id: {fileId}, total: {len(newContent)} chars)",
@ -326,11 +327,11 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
if mode == "overwrite":
if not fileId:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error="fileId is required for mode=overwrite")
file = chatService.getFile(fileId)
file = dbMgmt.getFile(fileId)
if not file:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error=f"File {fileId} not found")
chatService.updateFileData(fileId, content.encode("utf-8"))
chatService.updateFile(fileId, {"fileSize": len(content.encode("utf-8"))})
dbMgmt.updateFileData(fileId, content.encode("utf-8"))
dbMgmt.updateFile(fileId, {"fileSize": len(content.encode("utf-8"))})
return ToolResult(
toolCallId="", toolName="writeFile", success=True,
data=f"Overwritten '{file.fileName}' (id: {fileId}, {len(content)} chars)",
@ -340,7 +341,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
# mode == "create" (default)
if not name:
return ToolResult(toolCallId="", toolName="writeFile", success=False, error="name is required for mode=create")
fileItem, _ = chatService.saveUploadedFile(content.encode("utf-8"), name)
fileItem, _ = dbMgmt.saveUploadedFile(content.encode("utf-8"), name)
fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "")
updateFields: Dict[str, Any] = {}
if fiId:
@ -350,7 +351,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
if args.get("tags"):
updateFields["tags"] = args["tags"]
if updateFields:
chatService.updateFile(fileItem.id, updateFields)
dbMgmt.updateFile(fileItem.id, updateFields)
chatDocId = _attachFileAsChatDocument(
services, fileItem,
@ -497,7 +498,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="deleteFile", success=False, error="fileId is required")
try:
chatService = services.chat
file = chatService.getFile(fileId)
file = chatService.interfaceDbComponent.getFile(fileId)
if not file:
return ToolResult(toolCallId="", toolName="deleteFile", success=False, error=f"File {fileId} not found")
fileName = file.fileName
@ -507,7 +508,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
knowledgeService.removeFile(fileId)
except Exception as e:
logger.warning(f"deleteFile: knowledge store cleanup failed for {fileId}: {e}")
chatService.deleteFile(fileId)
chatService.interfaceDbComponent.deleteFile(fileId)
return ToolResult(
toolCallId="", toolName="deleteFile", success=True,
data=f"File '{fileName}' (id: {fileId}) deleted",
@ -523,7 +524,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="renameFile", success=False, error="fileId and newName are required")
try:
chatService = services.chat
chatService.updateFile(fileId, {"fileName": newName})
chatService.interfaceDbComponent.updateFile(fileId, {"fileName": newName})
return ToolResult(
toolCallId="", toolName="renameFile", success=True,
data=f"File {fileId} renamed to '{newName}'",
@ -650,7 +651,7 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="copyFile", success=False, error="fileId is required")
try:
chatService = services.chat
copiedFile = chatService.copyFile(
copiedFile = chatService.interfaceDbComponent.copyFile(
fileId,
newFileName=args.get("newFileName"),
)
@ -675,15 +676,16 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="replaceInFile", success=False, error="fileId and oldText are required")
try:
chatService = services.chat
file = chatService.getFile(fileId)
dbMgmt = chatService.interfaceDbComponent
file = dbMgmt.getFile(fileId)
if not file:
return ToolResult(toolCallId="", toolName="replaceInFile", success=False, error=f"File {fileId} not found")
if not chatService.isTextMimeType(file.mimeType):
if not dbMgmt.isTextMimeType(file.mimeType):
return ToolResult(
toolCallId="", toolName="replaceInFile", success=False,
error=f"Cannot edit binary file ({file.mimeType}). Only text-based files are supported."
)
rawData = chatService.getFileData(fileId)
rawData = dbMgmt.getFileData(fileId)
if not rawData:
return ToolResult(toolCallId="", toolName="replaceInFile", success=False, error="File has no content")
try:
@ -748,7 +750,8 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="createFolder", success=False, error="name is required")
try:
chatService = services.chat
folder = chatService.createFolder(name, parentId=parentId)
dbMgmt = chatService.interfaceDbComponent
folder = dbMgmt.createFolder(name, parentId=parentId)
folderId = folder.get("id") if isinstance(folder, dict) else getattr(folder, "id", None)
folderName = folder.get("name") if isinstance(folder, dict) else getattr(folder, "name", name)
return ToolResult(
@ -762,7 +765,8 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
async def _listFolders(args: Dict[str, Any], context: Dict[str, Any]):
try:
chatService = services.chat
folders = chatService.getOwnFolderTree()
dbMgmt = chatService.interfaceDbComponent
folders = dbMgmt.getOwnFolderTree()
if not folders:
return ToolResult(toolCallId="", toolName="listFolders", success=True, data="No folders found.")
lines = []
@ -791,10 +795,11 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="moveFile", success=False, error="fileId is required")
try:
chatService = services.chat
file = chatService.getFile(fileId)
dbMgmt = chatService.interfaceDbComponent
file = dbMgmt.getFile(fileId)
if not file:
return ToolResult(toolCallId="", toolName="moveFile", success=False, error=f"File {fileId} not found")
chatService.updateFile(fileId, {"folderId": folderId or None})
dbMgmt.updateFile(fileId, {"folderId": folderId or None})
targetLabel = f"folder {folderId}" if folderId else "root"
return ToolResult(
toolCallId="", toolName="moveFile", success=True,
@ -838,7 +843,8 @@ def registerWorkspaceTools(registry: ToolRegistry, services):
return ToolResult(toolCallId="", toolName="renameFolder", success=False, error="folderId and newName are required")
try:
chatService = services.chat
folder = chatService.renameFolder(folderId, newName)
dbMgmt = chatService.interfaceDbComponent
folder = dbMgmt.renameFolder(folderId, newName)
return ToolResult(
toolCallId="", toolName="renameFolder", success=True,
data=f"Folder {folderId} renamed to '{newName}'",

View file

@ -26,7 +26,7 @@ from modules.serviceCenter.services.serviceBilling.mainServiceBilling import (
logger = logging.getLogger(__name__)
def _toolbox_connection_authorities(services: "ServicesBag") -> List[str]:
def _toolbox_connection_authorities(services: "_ServicesAdapter") -> List[str]:
"""Collect connection authority strings for toolbox gating (requiresConnection).
The optional ``connection`` service is not always registered; fall back to
@ -59,10 +59,8 @@ def _toolbox_connection_authorities(services: "ServicesBag") -> List[str]:
return list(seen)
class ServicesBag:
"""Canonical services bag providing service access from (context, get_service).
Used by AgentService and WorkflowAutomation as the single source of truth
for service resolution, RBAC checks, and context-scoped properties."""
class _ServicesAdapter:
"""Adapter providing service access from (context, get_service)."""
def __init__(self, context, getService: Callable[[str], Any]):
self._context = context
@ -107,6 +105,13 @@ class ServicesBag:
def extraction(self):
return self._getService("extraction")
@property
def interfaceDbComponent(self):
try:
return self.chat.interfaceDbComponent
except Exception:
return None
@property
def rbac(self):
"""Same RbacClass as workflow hub (MethodBase permission checks during discoverMethods)."""
@ -123,15 +128,6 @@ class ServicesBag:
"""Access any service by name."""
return self._getService(name)
def canAccessService(self, serviceKey: str) -> bool:
"""Check if current user has RBAC permission for a service."""
from modules.serviceCenter import canAccessService
return canAccessService(
self.user, self.rbac, serviceKey,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
)
def __getattr__(self, name: str):
"""Resolve e.g. services.clickup for MethodClickup / ActionExecutor (discoverMethods)."""
if name.startswith("_"):
@ -161,7 +157,7 @@ class AgentService:
def __init__(self, context, get_service: Callable[[str], Any]):
self._context = context
self._getService = get_service
self.services = ServicesBag(context, get_service)
self.services = _ServicesAdapter(context, get_service)
async def runAgent(
self,
@ -680,7 +676,8 @@ def _buildWorkflowHintItems(
Limited to 10 most recent other workflows to keep the hint small.
"""
try:
allWorkflows = services.chat.getWorkflows() or []
chatInterface = services.chat.interfaceDbChat
allWorkflows = chatInterface.getWorkflows() or []
except Exception:
return []

View file

@ -261,7 +261,7 @@ class ContentExtractor:
# Check if it's standardized JSON format (has "documents" or "sections")
if document.mimeType == "application/json":
docBytes = self.services.chat.getFileData(document.fileId)
docBytes = self.services.interfaceDbComponent.getFileData(document.fileId)
if docBytes:
try:
docData = docBytes.decode('utf-8')
@ -349,7 +349,7 @@ class ContentExtractor:
if document.mimeType.startswith("image/") or self._isBinary(document.mimeType):
try:
# Lade Binary-Daten (getFileData ist nicht async - keine await nötig)
binaryData = self.services.chat.getFileData(document.fileId)
binaryData = self.services.interfaceDbComponent.getFileData(document.fileId)
if not binaryData:
logger.warning(f"No binary data found for document {document.id}")
continue

View file

@ -155,7 +155,7 @@ class DocumentIntentAnalyzer:
return None
try:
docBytes = self.services.chat.getFileData(document.fileId)
docBytes = self.services.interfaceDbComponent.getFileData(document.fileId)
if not docBytes:
return None

View file

@ -212,7 +212,7 @@ def _normalizeReturnUrl(returnUrl: str) -> str:
return urlunsplit((parsed.scheme, parsed.netloc, normalized_path, normalized_query, ""))
def createCheckoutSession(
def create_checkout_session(
mandate_id: str,
user_id: Optional[str],
amount_chf: float,

View file

@ -788,151 +788,6 @@ class ChatService:
'workflowId': 'unknown'
}
def createActionItem(self, actionData: Dict[str, Any]):
"""Create an ActionItem record in the chat DB.
Encapsulates low-level _separateObjectFields + db.recordCreate so callers
never need direct interfaceDbChat access."""
from modules.datamodels.datamodelChat import ActionItem
simpleFields, _objectFields = self.interfaceDbChat._separateObjectFields(ActionItem, actionData)
return self.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
def getUserConnectionById(self, connectionId: str):
"""Get a single UserConnection by ID, delegating to interfaceDbApp."""
try:
if self.interfaceDbApp and hasattr(self.interfaceDbApp, "getUserConnectionById"):
return self.interfaceDbApp.getUserConnectionById(str(connectionId))
except Exception as e:
logger.error(f"Error getting user connection by ID {connectionId}: {e}")
return None
# ---- File-Write operations (delegate to interfaceDbComponent) ----
def saveUploadedFile(self, fileContent: bytes, fileName: str):
"""Save uploaded file bytes. Returns (fileItem, duplicateStatus)."""
try:
return self.interfaceDbComponent.saveUploadedFile(fileContent, fileName)
except Exception as e:
logger.error(f"Error saving uploaded file '{fileName}': {e}")
raise
def createFile(self, name: str, mimeType: str, content: bytes, folderId=None):
"""Create a new file record with content."""
try:
return self.interfaceDbComponent.createFile(name, mimeType, content, folderId=folderId)
except Exception as e:
logger.error(f"Error creating file '{name}': {e}")
raise
def createFileData(self, fileId: str, data: bytes):
"""Write binary data for an existing file record."""
try:
return self.interfaceDbComponent.createFileData(fileId, data)
except Exception as e:
logger.error(f"Error creating file data for fileId '{fileId}': {e}")
raise
def updateFile(self, fileId: str, updateData: dict):
"""Update file metadata (tags, fileName, fileSize, folderId, etc.)."""
try:
return self.interfaceDbComponent.updateFile(fileId, updateData)
except Exception as e:
logger.error(f"Error updating file '{fileId}': {e}")
raise
def updateFileData(self, fileId: str, data: bytes):
"""Replace file binary content."""
try:
return self.interfaceDbComponent.updateFileData(fileId, data)
except Exception as e:
logger.error(f"Error updating file data for fileId '{fileId}': {e}")
raise
# ---- File-Manage operations (delegate to interfaceDbComponent) ----
def getFile(self, fileId: str):
"""Get file metadata object by ID."""
try:
return self.interfaceDbComponent.getFile(fileId)
except Exception as e:
logger.error(f"Error getting file '{fileId}': {e}")
return None
def deleteFile(self, fileId: str):
"""Delete a file by ID."""
try:
return self.interfaceDbComponent.deleteFile(fileId)
except Exception as e:
logger.error(f"Error deleting file '{fileId}': {e}")
raise
def copyFile(self, sourceFileId: str, newFileName=None):
"""Copy a file, optionally with a new name."""
try:
return self.interfaceDbComponent.copyFile(sourceFileId, newFileName=newFileName)
except Exception as e:
logger.error(f"Error copying file '{sourceFileId}': {e}")
raise
def isTextMimeType(self, mimeType: str) -> bool:
"""Check if a MIME type represents text content."""
try:
return self.interfaceDbComponent.isTextMimeType(mimeType)
except Exception as e:
logger.error(f"Error checking MIME type '{mimeType}': {e}")
return False
def getMimeType(self, fileName: str) -> str:
"""Determine MIME type from file name."""
try:
return self.interfaceDbComponent.getMimeType(fileName)
except Exception as e:
logger.error(f"Error getting MIME type for '{fileName}': {e}")
return "application/octet-stream"
# ---- Folder operations (delegate to interfaceDbComponent) ----
def createFolder(self, name: str, parentId=None):
"""Create a folder, optionally under a parent."""
try:
return self.interfaceDbComponent.createFolder(name, parentId=parentId)
except Exception as e:
logger.error(f"Error creating folder '{name}': {e}")
raise
def getOwnFolderTree(self):
"""Get the user's folder tree."""
try:
return self.interfaceDbComponent.getOwnFolderTree()
except Exception as e:
logger.error(f"Error getting folder tree: {e}")
return None
def renameFolder(self, folderId: str, newName: str):
"""Rename a folder."""
try:
return self.interfaceDbComponent.renameFolder(folderId, newName)
except Exception as e:
logger.error(f"Error renaming folder '{folderId}': {e}")
raise
# ---- Workflow-Listing operations (delegate to interfaceDbChat) ----
def getWorkflows(self, pagination=None):
"""Get all workflows for the current context."""
try:
return self.interfaceDbChat.getWorkflows(pagination)
except Exception as e:
logger.error(f"Error getting workflows: {e}")
return []
def getMessages(self, workflowId: str, pagination=None):
"""Get messages for a specific workflow."""
try:
return self.interfaceDbChat.getMessages(workflowId, pagination)
except Exception as e:
logger.error(f"Error getting messages for workflow '{workflowId}': {e}")
return []
def createWorkflow(self, workflowData: Dict[str, Any]):
"""Create a new workflow by delegating to the chat interface"""
try:

View file

@ -2,6 +2,6 @@
# All rights reserved.
"""ClickUp service."""
from .mainServiceClickup import ClickupService
from .mainServiceClickup import ClickupService, clickup_authorization_header
__all__ = ["ClickupService"]
__all__ = ["ClickupService", "clickup_authorization_header"]

View file

@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
_CLICKUP_API_BASE = "https://api.clickup.com/api/v2"
def _clickupAuthorizationHeader(token: str) -> str:
def clickup_authorization_header(token: str) -> str:
"""ClickUp: personal tokens are `pk_...` without Bearer; OAuth uses Bearer."""
return clickupAuthorizationHeader(token)

View file

@ -31,6 +31,7 @@ class _ServicesAdapter:
self.mandateId = context.mandate_id
self.featureInstanceId = context.feature_instance_id
chat = get_service("chat")
self.interfaceDbComponent = chat.interfaceDbComponent
self.interfaceDbChat = chat.interfaceDbChat
@property
@ -55,6 +56,7 @@ class GenerationService:
"""Initialize with ServiceCenterContext and service resolver."""
self.services = _ServicesAdapter(context, get_service)
self._get_service = get_service
self.interfaceDbComponent = self.services.interfaceDbComponent
self.interfaceDbChat = self.services.interfaceDbChat
def processActionResultDocuments(self, actionResult, action) -> List[Dict[str, Any]]:
@ -287,11 +289,10 @@ class GenerationService:
logger.warning(f"Could not set workflow context on document: {str(e)}")
def _createDocument(self, fileName: str, mimeType: str, content: str, base64encoded: bool = True, messageId: str = None) -> Optional[ChatDocument]:
"""Create file and ChatDocument using chat service."""
"""Create file and ChatDocument using interfaces without service indirection."""
try:
chat = self.services.chat
if not chat:
logger.error("Chat service not available for document creation")
if not self.interfaceDbComponent:
logger.error("Component interface not available for document creation")
return None
# Convert content to bytes
if base64encoded:
@ -300,12 +301,12 @@ class GenerationService:
else:
content_bytes = content.encode('utf-8')
# Create file and store data
file_item = chat.createFile(
file_item = self.interfaceDbComponent.createFile(
name=fileName,
mimeType=mimeType,
content=content_bytes
)
chat.createFileData(file_item.id, content_bytes)
self.interfaceDbComponent.createFileData(file_item.id, content_bytes)
# Collect file info
file_info = self._getFileInfo(file_item.id)
if not file_info:
@ -320,6 +321,12 @@ class GenerationService:
fileSize=file_info.get("size", 0),
mimeType=file_info.get("mimeType", mimeType)
)
# Ensure document can access component interface later
if hasattr(document, 'setComponentInterface') and self.interfaceDbComponent:
try:
document.setComponentInterface(self.interfaceDbComponent)
except Exception:
pass
return document
except Exception as e:
logger.error(f"Error creating document: {str(e)}")
@ -327,10 +334,9 @@ class GenerationService:
def _getFileInfo(self, fileId: str) -> Optional[Dict[str, Any]]:
try:
chat = self.services.chat
if not chat:
if not self.interfaceDbComponent:
return None
file_item = chat.getFile(fileId)
file_item = self.interfaceDbComponent.getFile(fileId)
if file_item:
return {
"id": file_item.id,

View file

@ -1,32 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
"""
System-component lifecycle-hook registry (Layer L0 shared).
Higher-layer system components (e.g. workflowAutomation) register their
lifecycle hooks here at boot time via ``app.py`` (Composition Root, L7).
Interface modules read the registry generically no upward imports needed.
Supported events: ``onBootstrap``, ``onMandateDelete``, ``onInstanceCreate``.
This is the same inversion pattern used by
``serviceAgent/externalToolRegistry.py`` for agent tools.
"""
import logging
from typing import Any, Callable, Dict, List
logger = logging.getLogger(__name__)
_hooks: Dict[str, List[Callable[..., Any]]] = {}
def registerLifecycleHook(eventName: str, handler: Callable[..., Any]) -> None:
"""Register a lifecycle handler for *eventName*."""
_hooks.setdefault(eventName, []).append(handler)
logger.info("Registered system-component lifecycle hook: %s -> %s",
eventName, getattr(handler, "__qualname__", repr(handler)))
def getLifecycleHooks(eventName: str) -> List[Callable[..., Any]]:
"""Return all registered handlers for *eventName* (may be empty)."""
return list(_hooks.get(eventName, []))

View file

@ -32,7 +32,7 @@ def checkWorkflowStopped(services: Any) -> None:
try:
# Get the current workflow status from the database to avoid stale data
currentWorkflow = services.chat.getWorkflow(workflow.id)
currentWorkflow = services.interfaceDbChat.getWorkflow(workflow.id)
if currentWorkflow and currentWorkflow.status == "stopped":
logger.info("Workflow stopped by user, aborting operation")
raise WorkflowStoppedException("Workflow was stopped by user")

View file

@ -3,28 +3,27 @@
Workflow entry points (Starts) configuration outside the flow editor.
Kinds align with run envelope trigger.type where applicable.
Canonical location: modules.nodeCatalog.entryPoints (L2).
Depends only on stdlib no cross-module imports.
"""
import uuid
from typing import Any, Dict, List, Optional
# On-demand (gear: Manueller Trigger, Formular)
KINDS_ON_DEMAND = frozenset({"manual", "form", "api"})
# Always-on (gear: Zeitplan, Immer aktiv, plus legacy listener kinds)
KINDS_ALWAYS_ON = frozenset({"schedule", "always_on", "email", "webhook", "event"})
ALL_KINDS = KINDS_ON_DEMAND | KINDS_ALWAYS_ON
def categoryForKind(kind: str) -> str:
def category_for_kind(kind: str) -> str:
if kind in KINDS_ALWAYS_ON:
return "always_on"
return "on_demand"
def defaultManualEntryPoint() -> Dict[str, Any]:
def default_manual_entry_point() -> Dict[str, Any]:
"""Single default manual start when a workflow has no invocations yet."""
return {
"id": str(uuid.uuid4()),
@ -37,7 +36,7 @@ def defaultManualEntryPoint() -> Dict[str, Any]:
}
def _normalizeTitle(title: Any) -> str:
def _normalize_title(title: Any) -> str:
"""Extract a plain string from a title value for storage (not display)."""
if isinstance(title, dict):
picked = title.get("xx") or next((v for v in title.values() if v), None)
@ -47,14 +46,14 @@ def _normalizeTitle(title: Any) -> str:
return "Start"
def normalizeInvocationEntry(raw: Dict[str, Any]) -> Dict[str, Any]:
def normalize_invocation_entry(raw: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and normalize a single entry point dict."""
kind = (raw.get("kind") or "manual").strip()
if kind not in ALL_KINDS:
kind = "manual"
cat = raw.get("category")
if cat not in ("on_demand", "always_on"):
cat = categoryForKind(kind)
cat = category_for_kind(kind)
eid = raw.get("id") or str(uuid.uuid4())
enabled = raw.get("enabled", True)
if not isinstance(enabled, bool):
@ -66,21 +65,21 @@ def normalizeInvocationEntry(raw: Dict[str, Any]) -> Dict[str, Any]:
"kind": kind,
"category": cat,
"enabled": enabled,
"title": _normalizeTitle(raw.get("title")),
"title": _normalize_title(raw.get("title")),
"description": desc,
"config": config,
}
def normalizeInvocationsList(items: Optional[List[Any]]) -> List[Dict[str, Any]]:
def normalize_invocations_list(items: Optional[List[Any]]) -> List[Dict[str, Any]]:
if not items:
return [defaultManualEntryPoint()]
return [default_manual_entry_point()]
out: List[Dict[str, Any]] = []
for raw in items:
if isinstance(raw, dict):
out.append(normalizeInvocationEntry(raw))
out.append(normalize_invocation_entry(raw))
if not out:
return [defaultManualEntryPoint()]
return [default_manual_entry_point()]
return out
@ -91,36 +90,26 @@ _NODE_TYPE_TO_KIND = {
}
def _getTriggerNodes(nodes: List[Dict]) -> List[Dict]:
"""Return start/trigger nodes: type ``trigger.*``, or category ``trigger`` / ``start``."""
return [
n
for n in nodes
if (
str(n.get("type", "")).startswith("trigger.")
or n.get("category") in ("trigger", "start")
)
]
def invocationsSyncedWithGraph(
def invocations_synced_with_graph(
graph: Optional[Dict[str, Any]],
storedInvocations: Optional[List[Any]],
stored_invocations: Optional[List[Any]],
) -> List[Dict[str, Any]]:
"""Derive primary invocation (index 0) from the first start node in ``graph``.
If the graph has no start node, only non-primary stored invocations are kept
(no injected default). Document order in ``nodes`` defines which start wins.
"""
from modules.workflowAutomation.engine.graphUtils import getTriggerNodes
g = graph if isinstance(graph, dict) else {}
nodes = g.get("nodes") or []
stored = list(storedInvocations or [])
stored = list(stored_invocations or [])
rest: List[Dict[str, Any]] = []
for raw in stored[1:]:
if isinstance(raw, dict):
rest.append(normalizeInvocationEntry(raw))
rest.append(normalize_invocation_entry(raw))
triggers = _getTriggerNodes(nodes)
triggers = getTriggerNodes(nodes)
if not triggers:
return rest
@ -130,28 +119,29 @@ def invocationsSyncedWithGraph(
nid = node.get("id")
if not nid:
nid = str(uuid.uuid4())
rawTitle = node.get("title") or node.get("label") or "Start"
raw_title = node.get("title") or node.get("label") or "Start"
oldPrimary = stored[0] if stored and isinstance(stored[0], dict) else {}
old_primary = stored[0] if stored and isinstance(stored[0], dict) else {}
config: Dict[str, Any] = {}
if isinstance(oldPrimary.get("config"), dict) and oldPrimary.get("kind") == kind:
config = dict(oldPrimary["config"])
desc = oldPrimary.get("description") if isinstance(oldPrimary.get("description"), dict) else {}
if isinstance(old_primary.get("config"), dict) and old_primary.get("kind") == kind:
config = dict(old_primary["config"])
desc = old_primary.get("description") if isinstance(old_primary.get("description"), dict) else {}
primaryRaw: Dict[str, Any] = {
primary_raw: Dict[str, Any] = {
"id": str(nid),
"kind": kind,
"enabled": True,
"title": rawTitle,
"title": raw_title,
"description": desc,
"config": config,
}
primary = normalizeInvocationEntry(primaryRaw)
primary = normalize_invocation_entry(primary_raw)
return [primary] + rest
# POST .../execute with entryPointId set to a schedule entry — no separate in-process scheduler here yet.
def findInvocation(workflow: Dict[str, Any], entryPointId: str) -> Optional[Dict[str, Any]]:
def find_invocation(workflow: Dict[str, Any], entry_point_id: str) -> Optional[Dict[str, Any]]:
for inv in workflow.get("invocations") or []:
if isinstance(inv, dict) and inv.get("id") == entryPointId:
if isinstance(inv, dict) and inv.get("id") == entry_point_id:
return inv
return None

View file

@ -34,8 +34,8 @@ from modules.nodeCatalog.nodeDefinitions import STATIC_NODE_TYPES
from modules.datamodels.serviceExceptions import SubscriptionInactiveException as _SubscriptionInactiveException, BillingContextError as _BillingContextError
from modules.workflowAutomation.engine.runFileLogger import (
RunFileLogger,
workflowAutomationRunFileLoggingEnabled,
mergeRunContextWithWaLogPrefix,
graphical_editor_run_file_logging_enabled,
merge_run_context_with_ge_log_prefix,
)
from modules.workflowAutomation.engine.runEnvelope import normalize_run_envelope
@ -383,7 +383,7 @@ async def _ge_log_node_finished(
exec_rec["output"] = (
_stripBinaryValues(output) if isinstance(output, dict) else {"value": _stripBinaryValues(output)}
)
await file_logger.appendNodeExecutionLine(exec_rec)
await file_logger.append_node_execution_line(exec_rec)
ctx_rec: Dict[str, Any] = {
"timestamp": ts,
@ -398,7 +398,7 @@ async def _ge_log_node_finished(
ctx_rec["loopIndex"] = loop_index
if loop_node_id is not None:
ctx_rec["loopNodeId"] = loop_node_id
await file_logger.appendContextSnapshotLine(ctx_rec)
await file_logger.append_context_snapshot_line(ctx_rec)
async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryDelaySeconds: float = 1.0):
@ -511,7 +511,7 @@ async def _run_post_loop_done_nodes(
automation2_interface: Optional[Any],
runId: Optional[str],
processed_in_loop: Set[str],
waFileLogger: Optional[RunFileLogger] = None,
ge_file_logger: Optional[RunFileLogger] = None,
) -> Optional[Dict[str, Any]]:
"""After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once."""
_prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids)
@ -553,7 +553,7 @@ async def _run_post_loop_done_nodes(
if _skId:
_updateStepLog(automation2_interface, _skId, "skipped")
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -586,7 +586,7 @@ async def _run_post_loop_done_nodes(
output=_dres if isinstance(_dres, dict) else {"value": _dres},
durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -603,7 +603,7 @@ async def _run_post_loop_done_nodes(
_updateStepLog(automation2_interface, _dStepId, "completed",
durationMs=int((time.time() - _dStart) * 1000))
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -619,7 +619,7 @@ async def _run_post_loop_done_nodes(
_updateStepLog(automation2_interface, _dStepId, "completed",
durationMs=int((time.time() - _dStart) * 1000))
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -636,7 +636,7 @@ async def _run_post_loop_done_nodes(
_updateStepLog(automation2_interface, _dStepId, "failed",
error="Subscription/Billing error", durationMs=_dFailDur)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -654,7 +654,7 @@ async def _run_post_loop_done_nodes(
_updateStepLog(automation2_interface, _dStepId, "failed",
error=str(_dex), durationMs=_dFailDur2)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -767,7 +767,7 @@ async def executeGraph(
except Exception as valErr:
logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr)
waFileLogger: Optional[RunFileLogger] = None
ge_file_logger: Optional[RunFileLogger] = None
nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {})
if not runId and automation2_interface and workflowId and not is_resume:
run_context = {
@ -805,8 +805,8 @@ async def executeGraph(
)
runId = run.get("id") if run else None
logger.info("executeGraph created run %s label=%s", runId, run_label)
if runId and workflowAutomationRunFileLoggingEnabled():
waFileLogger = RunFileLogger.bootstrapNewRun(
if runId and graphical_editor_run_file_logging_enabled():
ge_file_logger = RunFileLogger.bootstrap_new_run(
automation2_interface,
runId,
run_context,
@ -842,12 +842,12 @@ async def executeGraph(
_activeRunContexts[runId] = context
if (
workflowAutomationRunFileLoggingEnabled()
graphical_editor_run_file_logging_enabled()
and automation2_interface
and runId
and waFileLogger is None
and ge_file_logger is None
):
waFileLogger = RunFileLogger.ensureAttached(
ge_file_logger = RunFileLogger.ensure_attached(
automation2_interface,
runId,
)
@ -916,7 +916,7 @@ async def executeGraph(
output=result if isinstance(result, dict) else {"value": result},
durationMs=_rDur, retryCount=_rRetry)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -940,7 +940,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _rStepId, "completed",
durationMs=_rPauseDur)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -964,7 +964,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _rStepId, "completed",
durationMs=_rEmailDur)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -984,7 +984,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _rStepId, "failed",
error="Subscription/Billing error", durationMs=_rFailDurSb)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1005,7 +1005,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _rStepId, "failed",
error=str(ex), durationMs=_rFailDurEx)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1049,7 +1049,7 @@ async def executeGraph(
automation2_interface=automation2_interface,
runId=runId,
processed_in_loop=processed_in_loop,
waFileLogger=waFileLogger,
ge_file_logger=ge_file_logger,
)
for i, node in enumerate(ordered):
@ -1088,7 +1088,7 @@ async def executeGraph(
if _skipStepId:
_updateStepLog(automation2_interface, _skipStepId, "skipped")
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1206,7 +1206,7 @@ async def executeGraph(
output=bres if isinstance(bres, dict) else {"value": bres},
durationMs=_bDur, retryCount=_bRetry)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1230,7 +1230,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _bStepId, "completed",
durationMs=_bHd)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1256,7 +1256,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _bStepId, "completed",
durationMs=_bEd)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1277,7 +1277,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _bStepId, "failed",
error="Subscription/Billing error", durationMs=_bSb)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1299,7 +1299,7 @@ async def executeGraph(
_updateStepLog(automation2_interface, _bStepId, "failed",
error=str(ex), durationMs=_bFail)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1393,7 +1393,7 @@ async def executeGraph(
automation2_interface=automation2_interface,
runId=runId,
processed_in_loop=processed_in_loop,
waFileLogger=waFileLogger,
ge_file_logger=ge_file_logger,
)
_loopDurMs = int((time.time() - _stepStartMs) * 1000)
@ -1407,7 +1407,7 @@ async def executeGraph(
output=_loopStepOut,
durationMs=_loopDurMs)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1441,7 +1441,7 @@ async def executeGraph(
output=result if isinstance(result, dict) else {"value": result},
durationMs=_mergeDurMs, tokensUsed=_mergeTok, retryCount=retryCount)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1471,7 +1471,7 @@ async def executeGraph(
output=result if isinstance(result, dict) else {"value": result},
durationMs=_durMs, tokensUsed=_tokens, retryCount=retryCount)
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1500,7 +1500,7 @@ async def executeGraph(
if _ge_in is None:
_ge_in = locals().get("_loopInputSnap") or {}
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1528,7 +1528,7 @@ async def executeGraph(
if _ge_email_in is None:
_ge_email_in = locals().get("_loopInputSnap") or {}
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
@ -1564,7 +1564,7 @@ async def executeGraph(
}
if automation2_interface and e.runId:
prev_ctx = dict((automation2_interface.getRun(e.runId) or {}).get("context") or {})
run_ctx = mergeRunContextWithWaLogPrefix(prev_ctx, run_ctx)
run_ctx = merge_run_context_with_ge_log_prefix(prev_ctx, run_ctx)
automation2_interface.updateRun(
e.runId,
status="paused",
@ -1589,7 +1589,7 @@ async def executeGraph(
if _ge_fail_in is None:
_ge_fail_in = locals().get("_loopInputSnap") or {}
await _ge_log_node_finished(
waFileLogger,
ge_file_logger,
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),

View file

@ -210,10 +210,10 @@ def _resolveConnectionIdToReference(chatService, connectionId: str, services=Non
return f"connection:{authority}:{username}"
except Exception as e:
logger.debug("_resolveConnectionIdToReference chatService: %s", e)
chatSvc = getattr(services, "chat", None) if services else None
if chatSvc and hasattr(chatSvc, "getUserConnectionById"):
app = getattr(services, "interfaceDbApp", None) if services else None
if app and hasattr(app, "getUserConnectionById"):
try:
conn = chatSvc.getUserConnectionById(str(connectionId))
conn = app.getUserConnectionById(str(connectionId))
if conn:
authority = getattr(conn, "authority", None)
if hasattr(authority, "value"):
@ -542,7 +542,8 @@ class ActionNodeExecutor:
resolvedParams[pname] = _wired
# 3. Resolve connectionReference
_resolveConnectionParam(resolvedParams, self.services.chat, self.services)
chatService = getattr(self.services, "chat", None)
_resolveConnectionParam(resolvedParams, chatService, self.services)
# 3b. Optional graph-level injections declared on the node definition.
# - injectUpstreamPayload: True → ``_upstreamPayload`` (port 0 source output, transit-unwrapped)
@ -579,10 +580,12 @@ class ActionNodeExecutor:
# 6. Create progress parent so nested actions have a hierarchy
nodeOperationId = f"node_{nodeId}_{context.get('_runId', 'x')}_{int(time.time())}"
try:
self.services.chat.progressLogStart(nodeOperationId, methodName.capitalize(), actionName, f"Node {nodeId}")
except Exception:
pass
chatService = getattr(self.services, "chat", None)
if chatService:
try:
chatService.progressLogStart(nodeOperationId, methodName.capitalize(), actionName, f"Node {nodeId}")
except Exception:
pass
resolvedParams["parentOperationId"] = nodeOperationId
# 9. Execute action
@ -629,7 +632,26 @@ class ActionNodeExecutor:
rawBytes = coerceDocumentDataToBytes(rawData)
if isinstance(dumped, dict) and rawBytes:
try:
_mgmt = self.services.interfaceDbComponent
from modules.interfaces.interfaceDbManagement import getInterface as _getMgmtInterface
from modules.interfaces.interfaceDbApp import getInterface as _getAppInterface
from modules.security.rootAccess import getRootUser
_userId = context.get("userId")
_mandateId = context.get("mandateId")
_instanceId = context.get("instanceId")
_owner = None
if _userId:
try:
_umap = _getAppInterface(getRootUser()).getUsersByIds([str(_userId)])
_owner = _umap.get(str(_userId))
except Exception as _ue:
logger.warning("Could not resolve workflow user for file persistence: %s", _ue)
if _owner is None:
_owner = getRootUser()
logger.debug(
"Persisting workflow document as root user (no resolved owner userId=%r)",
_userId,
)
_mgmt = _getMgmtInterface(_owner, mandateId=_mandateId, featureInstanceId=_instanceId)
_docName = dumped.get("documentName") or f"workflow-result-{nodeId}.bin"
_mimeType = dumped.get("mimeType") or "application/octet-stream"
_fileItem = _mgmt.createFile(_docName, _mimeType, rawBytes, folderId=persist_folder_id)

View file

@ -47,9 +47,9 @@ class InputExecutor:
)
taskId = task.get("id")
from modules.workflowAutomation.engine.runFileLogger import mergePersistedRunContext
from modules.workflowAutomation.engine.runFileLogger import merge_persisted_run_context
_pause_ctx = mergePersistedRunContext(
_pause_ctx = merge_persisted_run_context(
self.automation2,
runId,
{

View file

@ -1,5 +1,5 @@
# Copyright (c) 2025 Patrick Motsch
"""Per-run NDJSON logs for persisted workflow-automation runs."""
"""Per-run NDJSON logs for persisted Automation2 / graphical-editor runs."""
from __future__ import annotations
@ -16,40 +16,40 @@ from modules.shared.debugLogger import ensureDir, resolve_app_log_dir
logger = logging.getLogger(__name__)
RUN_FILE_LOG_RELATIVE_ROOT = "workflow_automation_runs"
CONTEXT_KEY = "_waRunFileLogRelativeDir"
RUN_FILE_LOG_RELATIVE_ROOT = "graphical_editor_runs"
CONTEXT_KEY = "_geRunFileLogRelativeDir"
EXECUTION_FILENAME = "node_execution.ndjson"
CONTEXT_SNAPSHOT_FILENAME = "workflow_context.ndjson"
def workflowAutomationRunFileLoggingEnabled() -> bool:
def graphical_editor_run_file_logging_enabled() -> bool:
"""True when NDJSON files should be written for each persisted run."""
raw = APP_CONFIG.get("APP_WORKFLOW_AUTOMATION_RUN_FILE_LOGGING") or APP_CONFIG.get("APP_GRAPHICAL_EDITOR_RUN_FILE_LOGGING", False)
raw = APP_CONFIG.get("APP_GRAPHICAL_EDITOR_RUN_FILE_LOGGING", False)
if isinstance(raw, bool):
return raw
s = str(raw).strip().lower()
return s in ("1", "true", "yes", "on")
def mergeRunContextWithWaLogPrefix(
baseContext: Optional[Dict[str, Any]],
def merge_run_context_with_ge_log_prefix(
base_context: Optional[Dict[str, Any]],
incoming: Dict[str, Any],
) -> Dict[str, Any]:
"""Copy ``CONTEXT_KEY`` from *baseContext* onto *incoming* if present (pause paths)."""
"""Copy ``CONTEXT_KEY`` from *base_context* onto *incoming* if present (pause paths)."""
out = dict(incoming or {})
prev = (baseContext or {}).get(CONTEXT_KEY)
prev = (base_context or {}).get(CONTEXT_KEY)
if prev is not None:
out[CONTEXT_KEY] = prev
return out
def mergePersistedRunContext(
workflowAutomationInterface: Any,
runId: str,
def merge_persisted_run_context(
automation2_interface: Any,
run_id: str,
replacement: Dict[str, Any],
) -> Dict[str, Any]:
"""``{**db_context, **replacement}`` so *_waRunFileLogRelativeDir* and other keys survive pause updates."""
prev = dict((workflowAutomationInterface.getRun(runId) or {}).get("context") or {})
"""``{**db_context, **replacement}`` so *_geRunFileLogRelativeDir* and other keys survive pause updates."""
prev = dict((automation2_interface.getRun(run_id) or {}).get("context") or {})
return {**prev, **(replacement or {})}
@ -58,65 +58,65 @@ class RunFileLogger:
__slots__ = ("_exec_path", "_ctx_path", "_lock", "_run_id")
def __init__(self, runId: str, absoluteRunDir: str) -> None:
self._run_id = runId
ensureDir(absoluteRunDir)
self._exec_path = os.path.join(absoluteRunDir, EXECUTION_FILENAME)
self._ctx_path = os.path.join(absoluteRunDir, CONTEXT_SNAPSHOT_FILENAME)
def __init__(self, run_id: str, absolute_run_dir: str) -> None:
self._run_id = run_id
ensureDir(absolute_run_dir)
self._exec_path = os.path.join(absolute_run_dir, EXECUTION_FILENAME)
self._ctx_path = os.path.join(absolute_run_dir, CONTEXT_SNAPSHOT_FILENAME)
self._lock = asyncio.Lock()
@property
def runId(self) -> str:
def run_id(self) -> str:
return self._run_id
@staticmethod
def freshRunSubdirectoryName(runId: str) -> str:
def fresh_run_subdirectory_name(run_id: str) -> str:
ts = datetime.now(timezone.utc).strftime("%Y_%m_%d_%H_%M_%S")
return f"{ts}__{runId}"
return f"{ts}__{run_id}"
@staticmethod
def relativeRunPath(subdirName: str) -> str:
def relative_run_path(subdir_name: str) -> str:
"""Path relative to ``APP_LOGGING_LOG_DIR`` (POSIX-style segments)."""
return "/".join((RUN_FILE_LOG_RELATIVE_ROOT, subdirName))
return "/".join((RUN_FILE_LOG_RELATIVE_ROOT, subdir_name))
@classmethod
def bootstrapNewRun(cls, workflowAutomationInterface: Any, runId: str, runContext: Dict[str, Any]) -> RunFileLogger | None:
def bootstrap_new_run(cls, automation2_interface: Any, run_id: str, run_context: Dict[str, Any]) -> RunFileLogger | None:
"""Create filesystem folder + persist CONTEXT_KEY via ``updateRun``."""
if not workflowAutomationRunFileLoggingEnabled():
if not graphical_editor_run_file_logging_enabled():
return None
if not workflowAutomationInterface or not runId:
if not automation2_interface or not run_id:
return None
subdir = cls.freshRunSubdirectoryName(runId)
rel = cls.relativeRunPath(subdir)
subdir = cls.fresh_run_subdirectory_name(run_id)
rel = cls.relative_run_path(subdir)
base = resolve_app_log_dir()
absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir)
merged = dict(runContext or {})
merged = dict(run_context or {})
merged[CONTEXT_KEY] = rel
try:
workflowAutomationInterface.updateRun(runId, context=merged)
automation2_interface.updateRun(run_id, context=merged)
except Exception as ex:
logger.warning("WaRunFileLog: could not persist log dir on run=%s: %s", runId, ex)
logger.warning("GeRunFileLog: could not persist log dir on run=%s: %s", run_id, ex)
return None
logger.info(
"WaRunFileLog: created run folder %s (run=%s)",
"GeRunFileLog: created run folder %s (run=%s)",
absolute,
runId,
run_id,
)
return cls(runId, absolute)
return cls(run_id, absolute)
@classmethod
def openFromRunRecord(cls, workflowAutomationInterface: Any, runId: str) -> RunFileLogger | None:
def open_from_run_record(cls, automation2_interface: Any, run_id: str) -> RunFileLogger | None:
"""Open logger for an existing run using CONTEXT_KEY from DB."""
if not workflowAutomationRunFileLoggingEnabled():
if not graphical_editor_run_file_logging_enabled():
return None
if not workflowAutomationInterface or not runId:
if not automation2_interface or not run_id:
return None
try:
run = workflowAutomationInterface.getRun(runId) or {}
run = automation2_interface.getRun(run_id) or {}
except Exception as ex:
logger.debug("WaRunFileLog: getRun failed run=%s: %s", runId, ex)
logger.debug("GeRunFileLog: getRun failed run=%s: %s", run_id, ex)
return None
rel = (run.get("context") or {}).get(CONTEXT_KEY)
if not rel or not isinstance(rel, str):
@ -126,21 +126,21 @@ class RunFileLogger:
cand = os.path.realpath(os.path.join(base_norm, *rel.replace("\\", "/").split("/")))
if cand != allowed_root and not cand.startswith(allowed_root + os.sep):
logger.warning(
"WaRunFileLog: path outside log root denied for run=%s rel=%s",
runId,
"GeRunFileLog: path outside log root denied for run=%s rel=%s",
run_id,
rel,
)
return None
absolute = cand
return cls(runId, absolute)
return cls(run_id, absolute)
@classmethod
def findExistingAbsoluteDir(cls, runId: str) -> Optional[str]:
def find_existing_absolute_dir(cls, run_id: str) -> Optional[str]:
"""If a folder named ``*{timestamp}__{run_id}`` exists under the log root, return its absolute path."""
root = os.path.realpath(os.path.join(resolve_app_log_dir(), RUN_FILE_LOG_RELATIVE_ROOT))
if not os.path.isdir(root):
return None
suffix = f"__{runId}"
suffix = f"__{run_id}"
try:
names = sorted((n for n in os.listdir(root) if n.endswith(suffix)), reverse=True)
except OSError:
@ -154,62 +154,62 @@ class RunFileLogger:
return cand if os.path.isdir(cand) else None
@classmethod
def ensureAttached(cls, workflowAutomationInterface: Any, runId: str) -> RunFileLogger | None:
"""Open logger from DB, or reattach an on-disk folder for *runId*, or create a new one."""
opened = cls.openFromRunRecord(workflowAutomationInterface, runId)
def ensure_attached(cls, automation2_interface: Any, run_id: str) -> RunFileLogger | None:
"""Open logger from DB, or reattach an on-disk folder for *run_id*, or create a new one."""
opened = cls.open_from_run_record(automation2_interface, run_id)
if opened is not None:
return opened
if not workflowAutomationRunFileLoggingEnabled():
if not graphical_editor_run_file_logging_enabled():
return None
if not workflowAutomationInterface or not runId:
if not automation2_interface or not run_id:
return None
try:
run = workflowAutomationInterface.getRun(runId) or {}
run = automation2_interface.getRun(run_id) or {}
except Exception as ex:
logger.debug("WaRunFileLog: ensure getRun failed run=%s: %s", runId, ex)
logger.debug("GeRunFileLog: ensure getRun failed run=%s: %s", run_id, ex)
return None
prev_ctx = dict(run.get("context") or {})
existing_abs = cls.findExistingAbsoluteDir(runId)
existing_abs = cls.find_existing_absolute_dir(run_id)
if existing_abs:
base_norm = os.path.realpath(resolve_app_log_dir())
rel = os.path.relpath(existing_abs, base_norm).replace(os.sep, "/")
merged = {**prev_ctx, CONTEXT_KEY: rel}
try:
workflowAutomationInterface.updateRun(runId, context=merged)
automation2_interface.updateRun(run_id, context=merged)
except Exception as ex:
logger.warning("WaRunFileLog: reattach persist failed run=%s: %s", runId, ex)
logger.warning("GeRunFileLog: reattach persist failed run=%s: %s", run_id, ex)
return None
logger.info("WaRunFileLog: reattached existing folder for run=%s -> %s", runId, existing_abs)
return cls(runId, existing_abs)
logger.info("GeRunFileLog: reattached existing folder for run=%s -> %s", run_id, existing_abs)
return cls(run_id, existing_abs)
subdir = cls.freshRunSubdirectoryName(runId)
rel = cls.relativeRunPath(subdir)
subdir = cls.fresh_run_subdirectory_name(run_id)
rel = cls.relative_run_path(subdir)
base = resolve_app_log_dir()
absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir)
merged = {**prev_ctx, CONTEXT_KEY: rel}
try:
workflowAutomationInterface.updateRun(runId, context=merged)
automation2_interface.updateRun(run_id, context=merged)
except Exception as ex:
logger.warning("WaRunFileLog: ensure new folder persist failed run=%s: %s", runId, ex)
logger.warning("GeRunFileLog: ensure new folder persist failed run=%s: %s", run_id, ex)
return None
logger.info("WaRunFileLog: created late attach folder %s (run=%s)", absolute, runId)
return cls(runId, absolute)
logger.info("GeRunFileLog: created late attach folder %s (run=%s)", absolute, run_id)
return cls(run_id, absolute)
async def appendNodeExecutionLine(self, record: Dict[str, Any]) -> None:
async def append_node_execution_line(self, record: Dict[str, Any]) -> None:
line = json.dumps(record, ensure_ascii=False, default=str)
async with self._lock:
try:
with open(self._exec_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception as ex:
logger.warning("WaRunFileLog: append execution failed run=%s: %s", self._run_id, ex)
logger.warning("GeRunFileLog: append execution failed run=%s: %s", self._run_id, ex)
async def appendContextSnapshotLine(self, record: Dict[str, Any]) -> None:
async def append_context_snapshot_line(self, record: Dict[str, Any]) -> None:
line = json.dumps(record, ensure_ascii=False, default=str)
async with self._lock:
try:
with open(self._ctx_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
except Exception as ex:
logger.warning("WaRunFileLog: append context snapshot failed run=%s: %s", self._run_id, ex)
logger.warning("GeRunFileLog: append context snapshot failed run=%s: %s", self._run_id, ex)

View file

@ -9,13 +9,13 @@ from typing import Any, Mapping, Optional
_WORKFLOW_INTERNAL_FILE_TAG = "_workflowInternal"
def suppressWorkflowFileInWorkspaceUi(meta: Optional[Mapping[str, Any]]) -> bool:
def suppress_workflow_file_in_workspace_ui(meta: Optional[Mapping[str, Any]]) -> bool:
"""True when a file row should not appear in user-facing file lists.
Used by Automation Workspace **and** ``/api/files/list`` (Meine Dateien).
Matches persisted JSON handovers from transient runs (``extracted_content_transient*``),
internal extract image files (``extract_media_*``), the ``_workflowInternal`` tag, and
optional explicit flags.
optional explicit flags.
"""
if not isinstance(meta, Mapping):
return False

View file

@ -22,7 +22,7 @@ from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict
from modules.datamodels.datamodelWorkflowAutomation import (
AutoRun, AutoStepLog, AutoWorkflow, AutoTask, AutoVersion,
WORKFLOW_AUTOMATION_DATABASE,
GRAPHICAL_EDITOR_DATABASE,
)
from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface
from modules.shared.configuration import APP_CONFIG
@ -38,7 +38,7 @@ def _getWorkflowAutomationDb() -> DatabaseConnector:
"""Get a DatabaseConnector for the WorkflowAutomation (graphicaleditor) DB."""
return DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase=GRAPHICAL_EDITOR_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),

View file

@ -39,12 +39,12 @@ def _getWorkflowAutomationServices(
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
workflow=None,
):
) -> "_WorkflowAutomationServiceHub":
"""
Get a ServicesBag for WorkflowAutomation using the service center.
Get a service hub for WorkflowAutomation using the service center.
Used for methodDiscovery (I/O nodes) and execution (ActionExecutor).
"""
from modules.serviceCenter import getService, ServicesBag
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
_workflow = workflow
@ -61,7 +61,55 @@ def _getWorkflowAutomationServices(
feature_instance_id=featureInstanceId,
workflow=_workflow,
)
return ServicesBag(ctx, lambda key: getService(key, ctx))
hub = _WorkflowAutomationServiceHub()
hub.user = user
hub.mandateId = mandateId
hub.featureInstanceId = featureInstanceId
hub._service_context = ctx
hub.workflow = _workflow
hub.featureCode = COMPONENT_CODE
for spec in REQUIRED_SERVICES:
key = spec["serviceKey"]
try:
svc = getService(key, ctx)
setattr(hub, key, svc)
except Exception as e:
logger.warning(f"Could not resolve service '{key}' for workflowAutomation: {e}")
setattr(hub, key, None)
if hub.chat:
hub.interfaceDbApp = getattr(hub.chat, "interfaceDbApp", None)
hub.interfaceDbComponent = getattr(hub.chat, "interfaceDbComponent", None)
hub.interfaceDbChat = getattr(hub.chat, "interfaceDbChat", None)
hub.rbac = getattr(hub.interfaceDbApp, "rbac", None) if getattr(hub, "interfaceDbApp", None) else None
return hub
class _WorkflowAutomationServiceHub:
"""Lightweight hub for WorkflowAutomation (methodDiscovery, execution)."""
user = None
mandateId = None
featureInstanceId = None
_service_context = None
workflow = None
featureCode = COMPONENT_CODE
interfaceDbApp = None
interfaceDbComponent = None
interfaceDbChat = None
rbac = None
chat = None
ai = None
utils = None
extraction = None
sharepoint = None
clickup = None
generation = None
# ---------------------------------------------------------------------------
@ -71,7 +119,7 @@ def _getWorkflowAutomationServices(
def onMandateDelete(mandateId: str, instances: list) -> None:
"""Cascade-delete all AutoWorkflow data for this mandate."""
from modules.datamodels.datamodelWorkflowAutomation import (
WORKFLOW_AUTOMATION_DATABASE, AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
GRAPHICAL_EDITOR_DATABASE, AutoWorkflow, AutoVersion, AutoRun, AutoStepLog, AutoTask,
)
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
@ -79,7 +127,7 @@ def onMandateDelete(mandateId: str, instances: list) -> None:
try:
waDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase=GRAPHICAL_EDITOR_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
@ -197,14 +245,14 @@ def onBootstrap() -> None:
_migrateRbacNamespace()
_registerAgentTools()
from modules.datamodels.datamodelWorkflowAutomation import WORKFLOW_AUTOMATION_DATABASE, AutoWorkflow
from modules.datamodels.datamodelWorkflowAutomation import GRAPHICAL_EDITOR_DATABASE, AutoWorkflow
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
try:
waDb = DatabaseConnector(
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
dbDatabase=GRAPHICAL_EDITOR_DATABASE,
dbUser=APP_CONFIG.get("DB_USER"),
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
)

View file

@ -209,7 +209,7 @@ class WorkflowScheduler:
from modules.workflowAutomation.mainWorkflowAutomation import _getWorkflowAutomationServices
from modules.workflowAutomation.engine.executionEngine import executeGraph
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
from modules.nodeCatalog.entryPoints import findInvocation
from modules.workflowAutomation.editor.entryPoints import find_invocation
from modules.workflowAutomation.engine.runEnvelope import default_run_envelope, normalize_run_envelope
iface = _getWorkflowAutomationInterface(eventUser, mandateId, instanceId)
@ -221,7 +221,7 @@ class WorkflowScheduler:
logger.info("WorkflowScheduler: workflow %s inactive, skipping", workflowId)
return
inv = findInvocation(wf, entryPointId)
inv = find_invocation(wf, entryPointId)
if inv and (inv.get("kind") != "schedule" or not inv.get("enabled", True)):
logger.info("WorkflowScheduler: entry point %s disabled for workflow %s", entryPointId, workflowId)
return

View file

@ -40,7 +40,7 @@ def _action_docs_to_content_parts(services, docs: List[Any]) -> List[ContentPart
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
all_parts = []
extraction = services.extraction
extraction = getattr(services, "extraction", None)
if not extraction:
logger.warning("ai.process: No extraction service - cannot extract from inline documents")
return []
@ -80,24 +80,25 @@ def _resolve_file_refs_to_content_parts(services, fileIdRefs) -> List[ContentPar
via ``getChatDocumentsFromDocumentList`` instead."""
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
extraction = services.extraction
if not extraction:
logger.warning("_resolve_file_refs_to_content_parts: missing extraction service")
mgmt = getattr(services, 'interfaceDbComponent', None)
extraction = getattr(services, 'extraction', None)
if not mgmt or not extraction:
logger.warning("_resolve_file_refs_to_content_parts: missing interfaceDbComponent or extraction service")
return []
allParts: List[ContentPart] = []
opts = ExtractionOptions(prompt="", mergeStrategy=MergeStrategy())
for ref in fileIdRefs:
fileId = ref.documentId
fileMeta = services.chat.getFile(fileId)
fileMeta = mgmt.getFile(fileId)
if not fileMeta:
logger.warning("_resolve_file_refs_to_content_parts: file %s not found "
"(lookup scope: mandate=%s, featureInstanceId=%s, userId=%s)",
fileId, getattr(services, "mandateId", "?"),
getattr(services, "featureInstanceId", "?"),
getattr(services, "userId", "?"))
fileId, getattr(mgmt, "mandateId", "?"),
getattr(mgmt, "featureInstanceId", "?"),
getattr(mgmt, "userId", "?"))
continue
fileData = services.chat.getFileData(fileId)
fileData = mgmt.getFileData(fileId)
if not fileData:
logger.warning(f"_resolve_file_refs_to_content_parts: no data for file {fileId}")
continue
@ -264,7 +265,7 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult:
try:
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
simpleParts = _action_docs_to_content_parts(self.services, [
{"documentData": self.services.chat.getFileData(doc.fileId),
{"documentData": self.services.interfaceDbComponent.getFileData(doc.fileId),
"documentName": getattr(doc, 'fileName', ''),
"mimeType": getattr(doc, 'mimeType', 'application/octet-stream')}
for doc in documents if hasattr(doc, 'fileId') and doc.fileId

View file

@ -44,6 +44,8 @@ def _build_research_prompt(parameters: Dict[str, Any]) -> str:
async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult:
from modules.serviceCenter import ServiceCenterContext, getService, can_access_service
operationId = None
try:
prompt = _build_research_prompt(parameters)
@ -51,10 +53,25 @@ async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult:
return ActionResult.isFailure(error="Research prompt is required")
# RBAC: Check service-level permission
if hasattr(self.services, "canAccessService") and not self.services.canAccessService("web"):
rbac = getattr(self.services, "rbac", None)
if rbac and not can_access_service(
self.services.user,
rbac,
"web",
mandate_id=getattr(self.services, "mandateId", None),
feature_instance_id=getattr(self.services, "featureInstanceId", None),
):
return ActionResult.isFailure(error="Permission denied: Web research service")
web_service = self.services.getService("web")
# Build context for service center
context = ServiceCenterContext(
user=self.services.user,
mandate_id=getattr(self.services, "mandateId", None),
feature_instance_id=getattr(self.services, "featureInstanceId", None),
workflow_id=self.services.workflow.id if self.services.workflow else None,
workflow=self.services.workflow,
)
web_service = getService("web", context)
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"

View file

@ -133,7 +133,7 @@ class MethodBase:
return False
# Get current user from services.user (not from chat service)
currentUser = self.services.user
currentUser = getattr(self.services, 'user', None)
if not currentUser:
self.logger.warning(f"No current user found (services.user is None). Action {actionId} will be denied.")
return False
@ -141,8 +141,8 @@ class MethodBase:
# RBAC-Check: RESOURCE context, item = actionId
# mandateId/featureInstanceId from services context needed to resolve user roles
try:
mandateId = self.services.mandateId
featureInstanceId = self.services.featureInstanceId
mandateId = getattr(self.services, 'mandateId', None)
featureInstanceId = getattr(self.services, 'featureInstanceId', None)
permissions = self.services.rbac.getUserPermissions(
user=currentUser,
context=AccessRuleContext.RESOURCE,

View file

@ -1177,7 +1177,6 @@ def _persist_extracted_image_parts(
*,
name_stem: str,
run_context: Optional[Dict[str, Any]],
services=None,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Decode base64 image parts, persist bytes, replace with ``embeddedImageFileId``; return artifacts meta."""
artifacts: List[Dict[str, Any]] = []
@ -1194,19 +1193,27 @@ def _persist_extracted_image_parts(
)
return content_extracted_serial, artifacts
if services and hasattr(services, "interfaceDbComponent"):
mgmt = services.interfaceDbComponent
else:
try:
from modules.interfaces.interfaceDbManagement import getInterface as _get_mgmt
from modules.interfaces.interfaceDbApp import getInterface as _get_app
from modules.security.rootAccess import getRootUser
try:
mgmt = _get_mgmt(getRootUser(), mandateId=str(mandate_id), featureInstanceId=str(instance_id))
except Exception as exc:
logger.warning("extractContent image persist: mgmt interface failed: %s", exc)
return content_extracted_serial, artifacts
except Exception as exc:
logger.warning("extractContent image persist: import failed: %s", exc)
return content_extracted_serial, artifacts
if not mgmt:
logger.warning("extractContent image persist: no interfaceDbComponent available")
owner = getRootUser()
uid = run_context.get("userId")
if uid:
try:
umap = _get_app(getRootUser()).getUsersByIds([str(uid)])
owner = umap.get(str(uid)) or owner
except Exception:
pass
try:
mgmt = _get_mgmt(owner, mandateId=str(mandate_id), featureInstanceId=str(instance_id))
except Exception as exc:
logger.warning("extractContent image persist: mgmt interface failed: %s", exc)
return content_extracted_serial, artifacts
stem = re.sub(r"[^\w\-]+", "_", name_stem).strip("_") or "extract"
@ -1819,7 +1826,6 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
content_extracted_serial,
name_stem=stem,
run_context=run_ctx if isinstance(run_ctx, dict) else None,
services=self.services,
)
presentation = build_presentation_for_serial_extractions(content_extracted_serial, file_names, pres_cfg)

View file

@ -58,9 +58,22 @@ def _persistDocumentsToUserFiles(
) -> None:
"""Persist file.create output documents to user's file storage (like upload).
Adds fileId to each document's validationMetadata for download links in UI."""
chat = getattr(services, "chat", None)
if not chat:
logger.warning("file.create: chat service not available for persistence")
mgmt = getattr(services, "interfaceDbComponent", None)
if not mgmt:
try:
import modules.interfaces.interfaceDbManagement as iface
user = getattr(services, "user", None)
if not user:
return
mgmt = iface.getInterface(
user,
mandateId=getattr(services, "mandateId", None) or "",
featureInstanceId=getattr(services, "featureInstanceId", None) or "",
)
except Exception as e:
logger.warning("file.create: could not get management interface for persistence: %s", e)
return
if not mgmt:
return
for doc in action_documents:
try:
@ -84,8 +97,8 @@ def _persistDocumentsToUserFiles(
or doc.get("mimeType")
or "application/octet-stream"
)
file_item = chat.createFile(doc_name, mime, content, folderId=folder_id)
chat.createFileData(file_item.id, content)
file_item = mgmt.createFile(doc_name, mime, content, folderId=folder_id)
mgmt.createFileData(file_item.id, content)
meta = getattr(doc, "validationMetadata", None) or doc.get("validationMetadata") or {}
if isinstance(meta, dict):
meta["fileId"] = file_item.id
@ -105,11 +118,23 @@ def _sanitize_output_stem(title: str) -> str:
def _get_management_interface(services) -> Optional[Any]:
"""Get chat service for file operations."""
chat = getattr(services, "chat", None)
if chat:
return chat
return None
mgmt = getattr(services, "interfaceDbComponent", None)
if mgmt:
return mgmt
try:
import modules.interfaces.interfaceDbManagement as iface
user = getattr(services, "user", None)
if not user:
return None
return iface.getInterface(
user,
mandateId=getattr(services, "mandateId", None) or "",
featureInstanceId=getattr(services, "featureInstanceId", None) or "",
)
except Exception as e:
logger.warning("file.create: could not get management interface: %s", e)
return None
def _load_image_bytes_from_action_doc(doc: dict, services) -> Optional[bytes]:

View file

@ -89,15 +89,17 @@ async def downloadFileByPath(self, parameters: Dict[str, Any]) -> ActionResult:
"downloadFileByPath"
)
# Save to user's Files (FileItem + FileData) via chat service appears in Files UI
# Save to user's Files (FileItem + FileData) via interfaceDbComponent appears in Files UI
fileItem = None
try:
mimeType = self.services.chat.getMimeType(filename)
fileItem = self.services.chat.createFile(name=filename, mimeType=mimeType, content=fileContent)
self.services.chat.createFileData(fileItem.id, fileContent)
logger.info(f"Saved SharePoint file to user Files: {filename} (id={fileItem.id})")
except Exception as e:
logger.warning(f"Could not save to user Files: {e}")
db = getattr(self.services, "interfaceDbComponent", None)
if db:
try:
mimeType = db.getMimeType(filename) if hasattr(db, "getMimeType") else "application/octet-stream"
fileItem = db.createFile(name=filename, mimeType=mimeType, content=fileContent)
db.createFileData(fileItem.id, fileContent)
logger.info(f"Saved SharePoint file to user Files: {filename} (id={fileItem.id})")
except Exception as e:
logger.warning(f"Could not save to user Files: {e}")
# Encode as base64 for workflow context (AI, data nodes)
fileBase64 = base64.b64encode(fileContent).decode('utf-8')

View file

@ -349,7 +349,7 @@ class AutomationMode(BaseMode):
workflow = self.services.workflow
updateData = {"totalActions": totalActions}
workflow.totalActions = totalActions
self.services.chat.updateWorkflow(workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} after action planning: {updateData}")
except Exception as e:
logger.error(f"Error updating workflow after action planning: {str(e)}")
@ -369,7 +369,7 @@ class AutomationMode(BaseMode):
updateData["totalActions"] = totalActions
if updateData:
self.services.chat.updateWorkflow(workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} totals: {updateData}")
except Exception as e:
logger.error(f"Error setting workflow totals: {str(e)}")

View file

@ -67,7 +67,8 @@ class BaseMode(ABC):
if "execParameters" not in actionData:
actionData["execParameters"] = {}
createdAction = self.services.chat.createActionItem(actionData)
simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData)
createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
return ActionItem(
id=createdAction["id"],
@ -102,7 +103,7 @@ class BaseMode(ABC):
workflow.currentTask = taskNumber
workflow.currentAction = 0
workflow.totalActions = 0
self.services.chat.updateWorkflow(workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}")
except Exception as e:
logger.error(f"Error updating workflow before executing task: {str(e)}")
@ -113,7 +114,7 @@ class BaseMode(ABC):
workflow = self.services.workflow
updateData = {"currentAction": actionNumber}
workflow.currentAction = actionNumber
self.services.chat.updateWorkflow(workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}")
except Exception as e:
logger.error(f"Error updating workflow before executing action: {str(e)}")

View file

@ -190,7 +190,7 @@ class WorkflowProcessor:
self.workflow.totalActions = 0
# Update in database
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after task plan creation: {updateData}")
except Exception as e:
@ -211,7 +211,7 @@ class WorkflowProcessor:
self.workflow.totalActions = 0
# Update in database
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing task {taskNumber}: {updateData}")
except Exception as e:
@ -228,7 +228,7 @@ class WorkflowProcessor:
self.workflow.totalActions = totalActions
# Update in database
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} after action planning: {updateData}")
except Exception as e:
@ -245,7 +245,7 @@ class WorkflowProcessor:
self.workflow.currentAction = actionNumber
# Update in database
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} before executing action {actionNumber}: {updateData}")
except Exception as e:
@ -266,7 +266,7 @@ class WorkflowProcessor:
# Update workflow object in database if we have changes
if updateData:
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Updated workflow {self.workflow.id} totals in database: {updateData}")
logger.debug(f"Updated workflow totals: Tasks {self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 'N/A'}, Actions {self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 'N/A'}")
@ -290,7 +290,7 @@ class WorkflowProcessor:
self.workflow.totalActions = 0
# Update in database
self.services.chat.updateWorkflow(self.workflow.id, updateData)
self.services.interfaceDbChat.updateWorkflow(self.workflow.id, updateData)
logger.info(f"Reset workflow {self.workflow.id} for new session: {updateData}")
except Exception as e:
@ -636,12 +636,12 @@ class WorkflowProcessor:
else:
contentBytes = json.dumps(rawData, ensure_ascii=False).encode('utf-8')
fileItem = self.services.chat.createFile(
fileItem = self.services.interfaceDbComponent.createFile(
name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else f"task_{taskResult.taskId}_result.txt",
mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain",
content=contentBytes
)
self.services.chat.createFileData(
self.services.interfaceDbComponent.createFileData(
fileItem.id,
contentBytes
)

View file

@ -118,7 +118,7 @@ class WorkflowManager:
"totalTasks": 0,
"totalActions": 0,
"mandateId": self.services.mandateId,
"featureInstanceId": self.services.featureInstanceId,
"featureInstanceId": getattr(self.services, 'featureInstanceId', None), # Feature instance ID for isolation
"messageIds": [],
"workflowMode": workflowMode,
"maxSteps": 10 , # Set maxSteps
@ -478,12 +478,12 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
fileItem = self.services.chat.createFile(
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
content=originalPromptBytes
)
self.services.chat.createFileData(fileItem.id, originalPromptBytes)
self.services.interfaceDbComponent.createFileData(fileItem.id, originalPromptBytes)
fileInfo = self.services.chat.getFileInfo(fileItem.id)
doc = {
"fileId": fileItem.id,
@ -544,13 +544,13 @@ The following is the user's original input message. Analyze intent, normalize th
for actionDoc in result.documents:
if hasattr(actionDoc, 'documentData') and actionDoc.documentData:
# Create file in component storage
fileItem = self.services.chat.createFile(
fileItem = self.services.interfaceDbComponent.createFile(
name=actionDoc.documentName if hasattr(actionDoc, 'documentName') else "fast_path_response.txt",
mimeType=actionDoc.mimeType if hasattr(actionDoc, 'mimeType') else "text/plain",
content=actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8')
)
# Persist file data
self.services.chat.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8'))
self.services.interfaceDbComponent.createFileData(fileItem.id, actionDoc.documentData if isinstance(actionDoc.documentData, bytes) else actionDoc.documentData.encode('utf-8'))
# Get file info
fileInfo = self.services.chat.getFileInfo(fileItem.id)
@ -667,12 +667,12 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
fileItem = self.services.chat.createFile(
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
content=originalPromptBytes
)
self.services.chat.createFileData(fileItem.id, originalPromptBytes)
self.services.interfaceDbComponent.createFileData(fileItem.id, originalPromptBytes)
fileInfo = self.services.chat.getFileInfo(fileItem.id)
doc = {
"fileId": fileItem.id,
@ -807,12 +807,12 @@ The following is the user's original input message. Analyze intent, normalize th
if userInput.prompt:
try:
originalPromptBytes = userInput.prompt.encode('utf-8')
fileItem = self.services.chat.createFile(
fileItem = self.services.interfaceDbComponent.createFile(
name="user_prompt_original.md",
mimeType="text/markdown",
content=originalPromptBytes
)
self.services.chat.createFileData(fileItem.id, originalPromptBytes)
self.services.interfaceDbComponent.createFileData(fileItem.id, originalPromptBytes)
fileInfo = self.services.chat.getFileInfo(fileItem.id)
doc = {
"fileId": fileItem.id,

View file

@ -409,39 +409,20 @@ def _extractNumbers(text: str) -> List[float]:
def _bootstrapServices() -> Tuple[Any, str, str]:
"""Spin up a minimal services bag bound to the root user + initial mandate.
"""Spin up a minimal service hub bound to the root user + initial mandate.
Returns a services bag, the user id, and the mandate id used for billing.
Returns the ServiceHub, the user id, and the mandate id used for billing.
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelUam import Mandate
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
rootInterface = getRootInterface()
user = rootInterface.currentUser
mandateId = rootInterface.getInitialId(Mandate)
if not mandateId:
raise RuntimeError("No initial mandate available -- run bootstrap loader first.")
ctx = ServiceCenterContext(user=user, mandate_id=mandateId)
class _BenchmarkServicesBag:
def __init__(self, ctx):
self._ctx = ctx
self.user = ctx.user
self.mandateId = ctx.mandate_id
self.featureInstanceId = ctx.feature_instance_id
self.workflow = ctx.workflow
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
svc = getService(name, self._ctx)
setattr(self, name, svc)
return svc
services = _BenchmarkServicesBag(ctx)
services = getServices(user, workflow=None, mandateId=mandateId, featureInstanceId=None)
return services, user.id, mandateId

View file

@ -19,8 +19,7 @@ _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
from modules.serviceCenter.serviceHub import getInterface as getServices
from modules.datamodels.datamodelAi import (
AiCallOptions,
AiCallRequest,
@ -34,23 +33,6 @@ from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
class _TestServicesBag:
"""Mutable services bag for tests — lazy-resolves via getService, allows attribute overrides."""
def __init__(self, ctx):
self._ctx = ctx
self.user = ctx.user
self.mandateId = ctx.mandate_id
self.featureInstanceId = ctx.feature_instance_id
self.workflow = ctx.workflow
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
svc = getService(name, self._ctx)
setattr(self, name, svc)
return svc
class ModelSelectionTester:
def __init__(self) -> None:
testUser = User(
@ -61,8 +43,7 @@ class ModelSelectionTester:
language="en",
mandateId="test_mandate",
)
ctx = ServiceCenterContext(user=testUser)
self.services = _TestServicesBag(ctx)
self.services = getServices(testUser, None)
async def initialize(self) -> None:
from modules.serviceCenter.services.serviceAi.mainServiceAi import AiService

View file

@ -31,31 +31,14 @@ _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
# Import the service initialization
from modules.serviceCenter.serviceHub import getInterface as getServices
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.datamodels.datamodelUam import User
class _TestServicesBag:
"""Mutable services bag for tests — lazy-resolves via getService, allows attribute overrides."""
def __init__(self, ctx):
self._ctx = ctx
self.user = ctx.user
self.mandateId = ctx.mandate_id
self.featureInstanceId = ctx.feature_instance_id
self.workflow = ctx.workflow
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
svc = getService(name, self._ctx)
setattr(self, name, svc)
return svc
class AIModelsTester:
def __init__(self):
# Create a minimal user context for testing
testUser = User(
id="test_user",
username="test_user",
@ -65,8 +48,8 @@ class AIModelsTester:
mandateId="test_mandate"
)
ctx = ServiceCenterContext(user=testUser)
self.services = _TestServicesBag(ctx)
# Initialize services using the existing system
self.services = getServices(testUser, None) # Test user, no workflow
self.testResults = []
# Create logs directory if it doesn't exist (go up 2 levels from tests/unit/services/)

View file

@ -20,8 +20,6 @@ if _gateway_path not in sys.path:
from modules.datamodels.datamodelAi import OperationTypeEnum
from modules.datamodels.datamodelChat import ChatWorkflow, ChatDocument, WorkflowModeEnum
from modules.datamodels.datamodelUam import User
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
class MethodAiOperationsTester:
@ -98,27 +96,15 @@ class MethodAiOperationsTester:
import logging
logging.getLogger().setLevel(logging.DEBUG)
# Import and initialize services
import modules.interfaces.interfaceDbChat as interfaceFeatureAiChat
interfaceDbChat = interfaceFeatureAiChat.getInterface(self.testUser)
interfaceDbChat = interfaceDbChat.getInterface(self.testUser)
ctx = ServiceCenterContext(user=self.testUser, mandate_id=self.testMandateId)
# Import and initialize services
from modules.serviceCenter.serviceHub import getInterface as getServices
class _TestServicesBag:
def __init__(self, ctx):
self._ctx = ctx
self.user = ctx.user
self.mandateId = ctx.mandate_id
self.featureInstanceId = ctx.feature_instance_id
self.workflow = ctx.workflow
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
svc = getService(name, self._ctx)
setattr(self, name, svc)
return svc
self.services = _TestServicesBag(ctx)
# Get services first
self.services = getServices(self.testUser, None)
# Now create AND SAVE workflow in database using the interface
import uuid

View file

@ -16,40 +16,26 @@ _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
# Import the service initialization
from modules.serviceCenter.serviceHub import getInterface as getServices
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelWorkflow import AiResponse
class _TestServicesBag:
"""Mutable services bag for tests — lazy-resolves via getService, allows attribute overrides."""
def __init__(self, ctx):
self._ctx = ctx
self.user = ctx.user
self.mandateId = ctx.mandate_id
self.featureInstanceId = ctx.feature_instance_id
self.workflow = ctx.workflow
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
svc = getService(name, self._ctx)
setattr(self, name, svc)
return svc
# The test uses the AI service which handles JSON template internally
class AIBehaviorTester:
def __init__(self):
# Use root user for testing (has full access to everything)
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelUam import Mandate
rootInterface = getRootInterface()
self.testUser = rootInterface.currentUser
# Get initial mandate ID for testing (User has no mandateId - use initial mandate)
self.testMandateId = rootInterface.getInitialId(Mandate)
ctx = ServiceCenterContext(user=self.testUser)
self.services = _TestServicesBag(ctx)
# Initialize services using the existing system
self.services = getServices(self.testUser, None) # Test user, no workflow
self.testResults = []
async def initialize(self):

View file

@ -395,14 +395,14 @@ def test_action_result_contract_new_extract_payload_keys():
def test_automation_workspace_suppresses_extract_artifacts():
from modules.shared.workflowArtifactVisibility import suppressWorkflowFileInWorkspaceUi
from modules.workflowAutomation.engine.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui
assert suppressWorkflowFileInWorkspaceUi({"fileName": "extracted_content_transient-abc_99.json"})
assert suppressWorkflowFileInWorkspaceUi({"fileName": "extract_media_stem_uuid.png"})
assert not suppressWorkflowFileInWorkspaceUi({"fileName": "export_2026.csv"})
assert suppressWorkflowFileInWorkspaceUi({"fileName": "", "suppressInWorkflowFileLists": True})
assert suppressWorkflowFileInWorkspaceUi({"fileName": "report.pdf", "tags": ["_workflowInternal"]})
assert not suppressWorkflowFileInWorkspaceUi({"fileName": "report.pdf", "tags": ["invoice"]})
assert suppress_workflow_file_in_workspace_ui({"fileName": "extracted_content_transient-abc_99.json"})
assert suppress_workflow_file_in_workspace_ui({"fileName": "extract_media_stem_uuid.png"})
assert not suppress_workflow_file_in_workspace_ui({"fileName": "export_2026.csv"})
assert suppress_workflow_file_in_workspace_ui({"fileName": "", "suppressInWorkflowFileLists": True})
assert suppress_workflow_file_in_workspace_ui({"fileName": "report.pdf", "tags": ["_workflowInternal"]})
assert not suppress_workflow_file_in_workspace_ui({"fileName": "report.pdf", "tags": ["invoice"]})
def test_normalize_presentation_envelopes_action_result_and_list():