From 472353fea03ef15b64c395f3a925f4cc6f1a9bf8 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 23 Sep 2025 22:47:54 +0200
Subject: [PATCH] Refactor full workflow engine 3.0
---
app.py | 49 +-
.../chatPlayground/mainChatPlayground.py | 11 +-
.../mainNeutralizePlayground.py | 752 +++-------
modules/features/syncDelta/mainSyncDelta.py | 31 +-
modules/interfaces/interfaceAiCalls.py | 527 -------
modules/interfaces/interfaceAiModel.py | 30 +
modules/interfaces/interfaceAiObjects.py | 117 ++
modules/interfaces/interfaceAppObjects.py | 543 +++-----
modules/interfaces/interfaceTicketObjects.py | 8 +-
modules/routes/routeDataNeutralization.py | 2 +-
modules/routes/routeSecurityGoogle.py | 74 +-
modules/routes/routeSecurityLocal.py | 18 +-
modules/routes/routeSecurityMsft.py | 9 +-
modules/security/auth.py | 101 +-
modules/security/jwtService.py | 72 +
modules/security/tokenManager.py | 66 +-
modules/security/tokenRefreshService.py | 8 +-
modules/services/__init__.py | 100 ++
modules/services/serviceAi/mainServiceAi.py | 137 ++
modules/services/serviceCenter.py | 1206 -----------------
...on.py => mainServiceDocumentExtraction.py} | 119 +-
...on.py => mainServiceDocumentGeneration.py} | 2 +-
.../mainNeutralization.py | 206 +++
.../serviceNeutralization/neutralizer.py | 112 --
.../services/serviceNeutralization/readme.md | 91 --
.../serviceNeutralization/subProcessCommon.py | 14 +
.../serviceNeutralization/subProcessText.py | 3 +-
.../serviceSharepoint/mainSharepoint.py} | 2 +-
.../serviceWorkflows/mainServiceWorkflows.py | 546 ++++++++
modules/shared/eventManagement.py | 120 ++
modules/workflows/methods/methodAi.py | 62 +-
modules/workflows/methods/methodDocument.py | 47 +-
modules/workflows/methods/methodOutlook.py | 39 +-
modules/workflows/methods/methodSharepoint.py | 5 +-
modules/workflows/methods/methodWeb.py | 19 +-
.../executionState.py | 0
.../handlingTasks.py | 286 +++-
.../promptFactory.py | 184 ++-
modules/workflows/workflowManager.py | 120 +-
tool_stats_durations_from_log.py | 2 +-
40 files changed, 2605 insertions(+), 3235 deletions(-)
delete mode 100644 modules/interfaces/interfaceAiCalls.py
create mode 100644 modules/interfaces/interfaceAiModel.py
create mode 100644 modules/interfaces/interfaceAiObjects.py
create mode 100644 modules/security/jwtService.py
create mode 100644 modules/services/__init__.py
create mode 100644 modules/services/serviceAi/mainServiceAi.py
delete mode 100644 modules/services/serviceCenter.py
rename modules/services/serviceDocument/{documentExtraction.py => mainServiceDocumentExtraction.py} (95%)
rename modules/services/serviceDocument/{documentGeneration.py => mainServiceDocumentGeneration.py} (99%)
create mode 100644 modules/services/serviceNeutralization/mainNeutralization.py
delete mode 100644 modules/services/serviceNeutralization/neutralizer.py
delete mode 100644 modules/services/serviceNeutralization/readme.md
rename modules/{connectors/connectorSharepoint.py => services/serviceSharepoint/mainSharepoint.py} (99%)
create mode 100644 modules/services/serviceWorkflows/mainServiceWorkflows.py
create mode 100644 modules/shared/eventManagement.py
rename modules/workflows/{_transfer => processing}/executionState.py (100%)
rename modules/workflows/{_transfer => processing}/handlingTasks.py (88%)
rename modules/workflows/{_transfer => processing}/promptFactory.py (83%)
diff --git a/app.py b/app.py
index a86f0d37..ccf0c7d1 100644
--- a/app.py
+++ b/app.py
@@ -4,7 +4,7 @@ os.environ["NUMEXPR_MAX_THREADS"] = "12"
from fastapi import FastAPI, HTTPException, Depends, Body, status, Response
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
-from zoneinfo import ZoneInfo
+
import logging
from logging.handlers import RotatingFileHandler
@@ -12,8 +12,7 @@ from datetime import timedelta, datetime
import pathlib
from modules.shared.configuration import APP_CONFIG
-from apscheduler.schedulers.asyncio import AsyncIOScheduler
-from apscheduler.triggers.cron import CronTrigger
+from modules.shared.eventManagement import eventManager
class DailyRotatingFileHandler(RotatingFileHandler):
@@ -202,46 +201,15 @@ instanceLabel = APP_CONFIG.get("APP_ENV_LABEL")
# Define lifespan context manager for application startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
- # Startup logic
logger.info("Application is starting up")
-
- # Setup APScheduler for JIRA sync
- scheduler = AsyncIOScheduler(timezone=ZoneInfo("Europe/Zurich"))
- try:
- from modules.features.syncDelta.mainSyncDelta import perform_sync_jira_delta_group
- # Schedule sync every 20 minutes (at minutes 00, 20, 40)
- scheduler.add_job(
- perform_sync_jira_delta_group,
- CronTrigger(minute="0,20,40"),
- id="jira_delta_group_sync",
- replace_existing=True,
- coalesce=True,
- max_instances=1,
- misfire_grace_time=1800,
- )
- scheduler.start()
- logger.info("APScheduler started (jira_delta_group_sync every 20 minutes at 00, 20, 40)")
-
- # Run initial sync on startup (non-blocking failure)
- try:
- logger.info("Running initial JIRA sync on app startup...")
- await perform_sync_jira_delta_group()
- logger.info("Initial JIRA sync completed successfully")
- except Exception as e:
- logger.error(f"Initial JIRA sync failed: {str(e)}")
- except Exception as e:
- logger.error(f"Failed to initialize scheduler or JIRA sync: {str(e)}")
-
+ eventManager.start()
yield
-
- # Shutdown logic
+ eventManager.stop()
logger.info("Application has been shut down")
- try:
- if 'scheduler' in locals() and scheduler.running:
- scheduler.shutdown(wait=False)
- logger.info("APScheduler stopped")
- except Exception as e:
- logger.error(f"Error shutting down scheduler: {str(e)}")
+
+
+
+
# START APP
app = FastAPI(
@@ -250,7 +218,6 @@ app = FastAPI(
lifespan=lifespan
)
-
# Parse CORS origins from environment variable
def get_allowed_origins():
origins_str = APP_CONFIG.get("APP_ALLOWED_ORIGINS", "http://localhost:8080")
diff --git a/modules/features/chatPlayground/mainChatPlayground.py b/modules/features/chatPlayground/mainChatPlayground.py
index 07d43043..6fa9ab1a 100644
--- a/modules/features/chatPlayground/mainChatPlayground.py
+++ b/modules/features/chatPlayground/mainChatPlayground.py
@@ -24,8 +24,11 @@ async def chatStart(interfaceChat, currentUser: User, userInput: UserInputReques
"""
try:
from modules.workflows.workflowManager import WorkflowManager
- workflowManager = WorkflowManager(interfaceChat, currentUser)
- return await workflowManager.workflowStart(userInput, workflowId, workflowMode)
+ from modules.services import getInterface as getServices
+ services = getServices(currentUser, None)
+ workflowManager = WorkflowManager(services)
+ workflow = await workflowManager.workflowStart(userInput, workflowId, workflowMode)
+ return workflow
except Exception as e:
logger.error(f"Error starting chat: {str(e)}")
raise
@@ -34,7 +37,9 @@ async def chatStop(interfaceChat, currentUser: User, workflowId: str) -> ChatWor
"""Stops a running chat."""
try:
from modules.workflows.workflowManager import WorkflowManager
- workflowManager = WorkflowManager(interfaceChat, currentUser)
+ from modules.services import getInterface as getServices
+ services = getServices(currentUser, None)
+ workflowManager = WorkflowManager(services)
return await workflowManager.workflowStop(workflowId)
except Exception as e:
logger.error(f"Error stopping chat: {str(e)}")
diff --git a/modules/features/neutralizePlayground/mainNeutralizePlayground.py b/modules/features/neutralizePlayground/mainNeutralizePlayground.py
index 877ca8aa..4b48a495 100644
--- a/modules/features/neutralizePlayground/mainNeutralizePlayground.py
+++ b/modules/features/neutralizePlayground/mainNeutralizePlayground.py
@@ -1,587 +1,285 @@
-"""
-Data Neutralization Service
-Handles file processing for data neutralization including SharePoint integration
-"""
-
import logging
-import os
-import uuid
-from typing import Dict, List, Any, Optional, Tuple
-from datetime import datetime
-from pathlib import Path
-import mimetypes
+from typing import Any, Dict, List, Optional
-from modules.interfaces.interfaceAppObjects import getInterface
-from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
-from modules.services.serviceNeutralization.neutralizer import DataAnonymizer
-from modules.shared.timezoneUtils import get_utc_timestamp
+from modules.interfaces.interfaceAppModel import User
+from modules.services.serviceNeutralization.mainNeutralization import NeutralizationService
logger = logging.getLogger(__name__)
-class NeutralizationService:
- """Service for handling data neutralization operations"""
-
- def __init__(self, current_user: User):
- """Initialize the service with user context"""
- self.current_user = current_user
- self.app_interface = getInterface(current_user)
-
- def get_config(self) -> Optional[DataNeutraliserConfig]:
- """Get the neutralization configuration for the current user's mandate"""
- return self.app_interface.getNeutralizationConfig()
-
- def save_config(self, config_data: Dict[str, Any]) -> DataNeutraliserConfig:
- """Save or update the neutralization configuration"""
- return self.app_interface.createOrUpdateNeutralizationConfig(config_data)
-
- def neutralize_text(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
- """Neutralize text content and return results with attribute mappings"""
- return self.app_interface.neutralizeText(text, file_id)
-
- def get_attributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]:
- """Get neutralization attributes, optionally filtered by file ID"""
- return self.app_interface.getNeutralizationAttributes(file_id)
-
- def resolve_text(self, text: str) -> str:
- """Resolve UIDs in neutralized text back to original text"""
- return self.app_interface.resolveNeutralizedText(text)
-
- async def process_sharepoint_files(self, source_path: str, target_path: str) -> Dict[str, Any]:
- """
- Process files from SharePoint source path, neutralize them, and store in target path
-
- Args:
- source_path: SharePoint path to read files from
- target_path: SharePoint path to store neutralized files
-
- Returns:
- Dictionary with processing results
- """
+
+class NeutralizationPlayground:
+ """Feature/UI wrapper around NeutralizationService for playground & routes."""
+
+ def __init__(self, currentUser: User):
+ self.currentUser = currentUser
+ self.service = NeutralizationService(currentUser)
+
+ def processText(self, text: str) -> Dict[str, Any]:
+ return self.service.processText(text)
+
+ def processFiles(self, fileIds: List[str]) -> Dict[str, Any]:
+ results: List[Dict[str, Any]] = []
+ errors: List[str] = []
+ for fileId in fileIds:
+ try:
+ res = self.service.processFile(fileId)
+ results.append({
+ 'file_id': fileId,
+ 'neutralized_file_name': res.get('neutralized_file_name'),
+ 'attributes_count': len(res.get('attributes', []))
+ })
+ except Exception as e:
+ logger.error(f"Error processing file {fileId}: {str(e)}")
+ errors.append(f"{fileId}: {str(e)}")
+ return {
+ 'success': len(errors) == 0,
+ 'total_files': len(fileIds),
+ 'successful_files': len(results),
+ 'failed_files': len(errors),
+ 'results': results,
+ 'errors': errors,
+ }
+
+ async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
+ from modules.features.neutralizePlayground.sharepoint import SharepointProcessor
+ processor = SharepointProcessor(self.currentUser, self.service)
+ return await processor.processSharepointFiles(sourcePath, targetPath)
+
+ # Cleanup attributes
+ def cleanAttributes(self, fileId: str) -> bool:
+ if not self.service.app_interface:
+ return False
+ return self.service.app_interface.deleteNeutralizationAttributes(fileId)
+
+ # Stats
+ def getStats(self) -> Dict[str, Any]:
try:
- logger.info(f"Processing SharePoint files from {source_path} to {target_path}")
-
- # Get user's SharePoint connection that matches the source path
- sharepoint_connection = await self._get_sharepoint_connection(source_path)
- if not sharepoint_connection:
+ allAttributes = self.service._getAttributes()
+ patternCounts: Dict[str, int] = {}
+ for attr in allAttributes:
+ patternType = attr.patternType
+ patternCounts[patternType] = patternCounts.get(patternType, 0) + 1
+ uniqueFiles = set(attr.fileId for attr in allAttributes if attr.fileId)
+ return {
+ 'total_attributes': len(allAttributes),
+ 'unique_files': len(uniqueFiles),
+ 'pattern_counts': patternCounts,
+ 'mandate_id': self.currentUser.mandateId if self.currentUser else None,
+ }
+ except Exception as e:
+ logger.error(f"Error getting stats: {str(e)}")
+ return {
+ 'total_attributes': 0,
+ 'unique_files': 0,
+ 'pattern_counts': {},
+ 'error': str(e),
+ }
+
+
+# Internal SharePoint helper module separated to keep feature logic tidy
+class SharepointProcessor:
+ def __init__(self, currentUser: User, service: NeutralizationService):
+ self.currentUser = currentUser
+ self.service = service
+
+ async def processSharepointFiles(self, sourcePath: str, targetPath: str) -> Dict[str, Any]:
+ try:
+ logger.info(f"Processing SharePoint files from {sourcePath} to {targetPath}")
+ connection = await self._getSharepointConnection(sourcePath)
+ if not connection:
return {
- "success": False,
- "message": "No SharePoint connection found for user",
- "processed_files": 0,
- "errors": ["No SharePoint connection found"]
+ 'success': False,
+ 'message': 'No SharePoint connection found for user',
+ 'processed_files': 0,
+ 'errors': ['No SharePoint connection found'],
}
-
- logger.info(f"Using SharePoint connection: {sharepoint_connection.get('id')} for path: {source_path}")
-
- # Get SharePoint access token
- sharepoint_token = self.app_interface.getConnectionToken(sharepoint_connection["id"])
- if not sharepoint_token:
+ from modules.security.tokenManager import TokenManager
+ token = TokenManager().getFreshToken(self.service.app_interface, connection['id'])
+ if not token:
return {
- "success": False,
- "message": "No SharePoint access token found",
- "processed_files": 0,
- "errors": ["No SharePoint access token found"]
+ 'success': False,
+ 'message': 'No SharePoint access token found',
+ 'processed_files': 0,
+ 'errors': ['No SharePoint access token found'],
}
-
- # Process files asynchronously
- return await self._process_sharepoint_files_async(
- source_path, target_path, sharepoint_token.tokenAccess
- )
-
+ return await self._processSharepointFilesAsync(sourcePath, targetPath, token.tokenAccess)
except Exception as e:
logger.error(f"Error processing SharePoint files: {str(e)}")
return {
- "success": False,
- "message": f"Error processing SharePoint files: {str(e)}",
- "processed_files": 0,
- "errors": [str(e)]
+ 'success': False,
+ 'message': f'Error processing SharePoint files: {str(e)}',
+ 'processed_files': 0,
+ 'errors': [str(e)],
}
-
- async def _get_sharepoint_connection(self, sharepoint_path: str = None):
- """Get user's SharePoint connection that matches the given path"""
+
+ async def _getSharepointConnection(self, sharepointPath: str = None):
try:
- # Get all user connections
from modules.interfaces.interfaceAppModel import UserConnection
- connections = self.app_interface.db.getRecordset(
+ connections = self.service.app_interface.db.getRecordset(
UserConnection,
- recordFilter={"userId": self.app_interface.userId}
+ recordFilter={"userId": self.service.app_interface.userId}
)
-
- # Find all Microsoft connections
- msft_connections = [conn for conn in connections if conn.get("authority") == "msft"]
-
- if not msft_connections:
- logger.warning("No Microsoft connections found for user")
+ msftConnections = [c for c in connections if c.get('authority') == 'msft']
+ if not msftConnections:
+ logger.warning('No Microsoft connections found for user')
return None
-
- if len(msft_connections) == 1:
- logger.info(f"Found single Microsoft connection: {msft_connections[0].get('id')}")
- return msft_connections[0]
-
- # If multiple connections and we have a path, try to match
- if sharepoint_path:
- return await self._match_connection_to_path(msft_connections, sharepoint_path)
-
- # If no path provided, return the first one
- logger.info(f"Multiple Microsoft connections found, using first one: {msft_connections[0].get('id')}")
- return msft_connections[0]
-
- except Exception as e:
- logger.error(f"Error getting SharePoint connection: {str(e)}")
+ if len(msftConnections) == 1:
+ logger.info(f"Found single Microsoft connection: {msftConnections[0].get('id')}")
+ return msftConnections[0]
+ if sharepointPath:
+ return await self._matchConnectionToPath(msftConnections, sharepointPath)
+ logger.info(f"Multiple Microsoft connections found, using first one: {msftConnections[0].get('id')}")
+ return msftConnections[0]
+ except Exception:
+ logger.error('Error getting SharePoint connection')
return None
-
- async def _match_connection_to_path(self, connections: list, sharepoint_path: str):
- """Match a connection to the SharePoint path by testing access"""
+
+ async def _matchConnectionToPath(self, connections: list, sharepointPath: str):
try:
- # Extract domain from the path
from urllib.parse import urlparse
- parsed_url = urlparse(sharepoint_path)
- target_domain = parsed_url.netloc.lower()
-
- logger.info(f"Looking for connection matching domain: {target_domain}")
-
- # Try each connection to see which one can access the site
+ targetDomain = urlparse(sharepointPath).netloc.lower()
+ logger.info(f"Looking for connection matching domain: {targetDomain}")
+ from modules.security.tokenManager import TokenManager
for connection in connections:
try:
- # Get token for this connection
- token = self.app_interface.getConnectionToken(connection["id"])
+ token = TokenManager().getFreshToken(self.service.app_interface, connection['id'])
if not token:
continue
-
- # Test if this connection can access the SharePoint site
- if await self._test_sharepoint_access(token.tokenAccess, sharepoint_path):
- logger.info(f"Found matching connection for domain {target_domain}: {connection.get('id')}")
+ if await self._testSharepointAccess(token.tokenAccess, sharepointPath):
+ logger.info(f"Found matching connection for domain {targetDomain}: {connection.get('id')}")
return connection
-
- except Exception as e:
+ except Exception:
continue
-
- # If no specific match found, return the first connection
- logger.warning(f"No specific connection match found for {target_domain}, using first available")
+ logger.warning(f"No specific connection match found for {targetDomain}, using first available")
return connections[0]
-
- except Exception as e:
- logger.error(f"Error matching connection to path: {str(e)}")
+ except Exception:
+ logger.error('Error matching connection to path')
return connections[0] if connections else None
-
- async def _test_sharepoint_access(self, access_token: str, sharepoint_path: str) -> bool:
- """Test if the access token can access the given SharePoint path"""
+
+ async def _testSharepointAccess(self, accessToken: str, sharepointPath: str) -> bool:
try:
- return await self._test_sharepoint_access_async(access_token, sharepoint_path)
- except Exception as e:
+ return await self._testSharepointAccessAsync(accessToken, sharepointPath)
+ except Exception:
return False
-
- async def _test_sharepoint_access_async(self, access_token: str, sharepoint_path: str) -> bool:
- """Async test for SharePoint access"""
+
+ async def _testSharepointAccessAsync(self, accessToken: str, sharepointPath: str) -> bool:
try:
- from modules.connectors.connectorSharepoint import ConnectorSharepoint
-
- connector = ConnectorSharepoint(access_token=access_token)
-
- # Parse the path to get site URL
- site_url, _ = self._parse_sharepoint_path(sharepoint_path)
- if not site_url:
+ from modules.services.serviceSharepoint.mainSharepoint import SharepointService
+ connector = SharepointService(access_token=accessToken)
+ siteUrl, _ = self._parseSharepointPath(sharepointPath)
+ if not siteUrl:
return False
-
- # Try to find the site
- site_info = await connector.find_site_by_web_url(site_url)
- return site_info is not None
-
- except Exception as e:
+ siteInfo = await connector.find_site_by_web_url(siteUrl)
+ return siteInfo is not None
+ except Exception:
return False
-
- async def _process_sharepoint_files_async(self, source_path: str, target_path: str, access_token: str) -> Dict[str, Any]:
- """Process SharePoint files asynchronously"""
+
+ async def _processSharepointFilesAsync(self, sourcePath: str, targetPath: str, accessToken: str) -> Dict[str, Any]:
try:
import asyncio
- from modules.connectors.connectorSharepoint import ConnectorSharepoint
-
- # Initialize SharePoint connector
- connector = ConnectorSharepoint(access_token=access_token)
-
- # Parse source and target paths to extract site and folder info
- source_site, source_folder = self._parse_sharepoint_path(source_path)
- target_site, target_folder = self._parse_sharepoint_path(target_path)
-
- if not source_site or not target_site:
- return {
- "success": False,
- "message": "Invalid SharePoint path format",
- "processed_files": 0,
- "errors": ["Invalid SharePoint path format"]
- }
-
- # Find source site
- source_site_info = await connector.find_site_by_web_url(source_site)
- if not source_site_info:
- return {
- "success": False,
- "message": f"Source site not found: {source_site}",
- "processed_files": 0,
- "errors": [f"Source site not found: {source_site}"]
- }
-
- # Find target site
- target_site_info = await connector.find_site_by_web_url(target_site)
- if not target_site_info:
- return {
- "success": False,
- "message": f"Target site not found: {target_site}",
- "processed_files": 0,
- "errors": [f"Target site not found: {target_site}"]
- }
-
- # List files in source folder
- logger.info(f"Listing files in folder: {source_folder} for site: {source_site_info['id']}")
- files = await connector.list_folder_contents(source_site_info["id"], source_folder)
-
- # If no files found, try listing the root folder to see what's available
+ from modules.services.serviceSharepoint.mainSharepoint import SharepointService
+ connector = SharepointService(access_token=accessToken)
+ sourceSite, sourceFolder = self._parseSharepointPath(sourcePath)
+ targetSite, targetFolder = self._parseSharepointPath(targetPath)
+ if not sourceSite or not targetSite:
+ return {'success': False, 'message': 'Invalid SharePoint path format', 'processed_files': 0, 'errors': ['Invalid SharePoint path format']}
+ sourceSiteInfo = await connector.find_site_by_web_url(sourceSite)
+ if not sourceSiteInfo:
+ return {'success': False, 'message': f'Source site not found: {sourceSite}', 'processed_files': 0, 'errors': [f'Source site not found: {sourceSite}']}
+ targetSiteInfo = await connector.find_site_by_web_url(targetSite)
+ if not targetSiteInfo:
+ return {'success': False, 'message': f'Target site not found: {targetSite}', 'processed_files': 0, 'errors': [f'Target site not found: {targetSite}']}
+ logger.info(f"Listing files in folder: {sourceFolder} for site: {sourceSiteInfo['id']}")
+ files = await connector.list_folder_contents(sourceSiteInfo['id'], sourceFolder)
if not files:
- logger.warning(f"No files found in folder '{source_folder}', trying root folder")
- files = await connector.list_folder_contents(source_site_info["id"], "")
-
+ logger.warning(f"No files found in folder '{sourceFolder}', trying root folder")
+ files = await connector.list_folder_contents(sourceSiteInfo['id'], '')
if files:
- # List available folders for debugging
- folders = [f for f in files if f.get("type") == "folder"]
- folder_names = [f.get('name') for f in folders]
- logger.info(f"Available folders in root: {folder_names}")
-
- # Format folder list for better UI display
- folder_list = ", ".join(folder_names) if folder_names else "None"
-
+ folders = [f for f in files if f.get('type') == 'folder']
+ folderNames = [f.get('name') for f in folders]
+ logger.info(f"Available folders in root: {folderNames}")
+ folderList = ", ".join(folderNames) if folderNames else "None"
return {
- "success": False,
- "message": f"Folder '{source_folder}' not found. Available folders in root: {folder_list}",
- "processed_files": 0,
- "errors": [f"Folder '{source_folder}' not found. Available folders: {folder_list}"],
- "available_folders": folder_names
+ 'success': False,
+ 'message': f"Folder '{sourceFolder}' not found. Available folders in root: {folderList}",
+ 'processed_files': 0,
+ 'errors': [f"Folder '{sourceFolder}' not found. Available folders: {folderList}"],
+ 'available_folders': folderNames,
}
else:
- return {
- "success": False,
- "message": f"No files found in source folder: {source_folder}",
- "processed_files": 0,
- "errors": [f"No files found in source folder: {source_folder}"]
- }
-
- # Filter for text files only
- text_files = [f for f in files if f.get("type") == "file" and self._is_text_file(f.get("name", ""))]
-
- if not text_files:
- return {
- "success": False,
- "message": "No text files found in source folder",
- "processed_files": 0,
- "errors": ["No text files found in source folder"]
- }
-
- # Process files in parallel for better performance
- processed_files = []
- errors = []
-
- # Create tasks for parallel processing
- async def process_single_file(file_info):
- """Process a single file - download, neutralize, upload"""
+ return {'success': False, 'message': f'No files found in source folder: {sourceFolder}', 'processed_files': 0, 'errors': [f'No files found in source folder: {sourceFolder}']}
+
+ textFiles = [f for f in files if f.get('type') == 'file']
+ processed: List[Dict[str, Any]] = []
+ errors: List[str] = []
+
+ async def _processSingle(fileInfo: Dict[str, Any]):
try:
- # Download file
- file_content = await connector.download_file(source_site_info["id"], file_info["id"])
- if not file_content:
- return {"error": f"Failed to download file: {file_info['name']}"}
-
- # Convert to text
+ fileContent = await connector.download_file(sourceSiteInfo['id'], fileInfo['id'])
+ if not fileContent:
+ return {'error': f"Failed to download file: {fileInfo['name']}"}
try:
- text_content = file_content.decode('utf-8')
+ textContent = fileContent.decode('utf-8')
except UnicodeDecodeError:
- text_content = file_content.decode('latin-1')
-
- # Neutralize the text
- neutralization_result = self.app_interface.neutralizeText(text_content, file_info["id"])
-
- # Create neutralized filename
- neutralized_filename = f"neutralized_{file_info['name']}"
-
- # Upload neutralized file
- neutralized_content = neutralization_result["neutralized_text"].encode('utf-8')
- upload_result = await connector.upload_file(
- target_site_info["id"],
- target_folder,
- neutralized_filename,
- neutralized_content
- )
-
- if "error" in upload_result:
- return {"error": f"Failed to upload neutralized file: {neutralized_filename} - {upload_result['error']}"}
- else:
- return {
- "success": True,
- "original_name": file_info["name"],
- "neutralized_name": neutralized_filename,
- "attributes_count": len(neutralization_result.get("attributes", []))
- }
-
+ textContent = fileContent.decode('latin-1')
+ result = self.service._neutralizeText(textContent, 'text')
+ neutralizedFilename = f"neutralized_{fileInfo['name']}"
+ uploadResult = await connector.upload_file(targetSiteInfo['id'], targetFolder, neutralizedFilename, result['neutralized_text'].encode('utf-8'))
+ if 'error' in uploadResult:
+ return {'error': f"Failed to upload neutralized file: {neutralizedFilename} - {uploadResult['error']}"}
+ return {
+ 'success': True,
+ 'original_name': fileInfo['name'],
+ 'neutralized_name': neutralizedFilename,
+ 'attributes_count': len(result.get('attributes', [])),
+ }
except Exception as e:
- error_msg = f"Error processing file {file_info['name']}: {str(e)}"
- logger.error(error_msg)
- return {"error": error_msg}
-
- # Process all files in parallel
- logger.info(f"Processing {len(text_files)} files in parallel...")
- tasks = [process_single_file(file_info) for file_info in text_files]
+ return {'error': f"Error processing file {fileInfo['name']}: {str(e)}"}
+
+ tasks = [ _processSingle(f) for f in textFiles ]
results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # Process results
- for i, result in enumerate(results):
- if isinstance(result, Exception):
- error_msg = f"Exception processing file {text_files[i]['name']}: {str(result)}"
- errors.append(error_msg)
- logger.error(error_msg)
- elif isinstance(result, dict) and "error" in result:
- errors.append(result["error"])
- elif isinstance(result, dict) and result.get("success"):
- processed_files.append({
- "original_name": result["original_name"],
- "neutralized_name": result["neutralized_name"],
- "attributes_count": result["attributes_count"]
+ for i, r in enumerate(results):
+ if isinstance(r, Exception):
+ errors.append(f"Exception processing file {textFiles[i]['name']}: {str(r)}")
+ elif isinstance(r, dict) and 'error' in r:
+ errors.append(r['error'])
+ elif isinstance(r, dict) and r.get('success'):
+ processed.append({
+ 'original_name': r['original_name'],
+ 'neutralized_name': r['neutralized_name'],
+ 'attributes_count': r['attributes_count'],
})
- logger.info(f"Successfully processed file: {result['original_name']} -> {result['neutralized_name']}")
else:
- error_msg = f"Unknown result processing file {text_files[i]['name']}: {result}"
- errors.append(error_msg)
- logger.error(error_msg)
-
+ errors.append(f"Unknown result processing file {textFiles[i]['name']}: {r}")
return {
- "success": len(processed_files) > 0,
- "message": f"Processed {len(processed_files)} files successfully",
- "processed_files": len(processed_files),
- "files": processed_files,
- "errors": errors
+ 'success': len(processed) > 0,
+ 'message': f"Processed {len(processed)} files successfully",
+ 'processed_files': len(processed),
+ 'files': processed,
+ 'errors': errors,
}
-
except Exception as e:
logger.error(f"Error in async SharePoint processing: {str(e)}")
- return {
- "success": False,
- "message": f"Error in async SharePoint processing: {str(e)}",
- "processed_files": 0,
- "errors": [str(e)]
- }
-
- def _parse_sharepoint_path(self, path: str) -> tuple[str, str]:
- """Parse SharePoint path to extract site URL and folder path"""
+ return {'success': False, 'message': f'Error in async SharePoint processing: {str(e)}', 'processed_files': 0, 'errors': [str(e)]}
+
+ def _parseSharepointPath(self, path: str) -> tuple[str, str]:
try:
- # Expected format: https://domain.sharepoint.com/sites/sitename/folder/path
- if not path.startswith("https://"):
+ if not path.startswith('https://'):
return None, None
-
- # Remove query parameters
- if "?" in path:
- path = path.split("?")[0]
-
- # Split by /sites/
- if "/sites/" not in path:
+ if '?' in path:
+ path = path.split('?')[0]
+ if '/sites/' not in path:
return None, None
-
- parts = path.split("/sites/", 1)
+ parts = path.split('/sites/', 1)
if len(parts) != 2:
return None, None
-
- # Extract domain and site name
- domain = parts[0].replace("https://", "")
- site_name = parts[1].split("/")[0]
-
- # Create proper site URL for Graph API
- site_url = f"https://{domain}/sites/{site_name}"
-
- # Extract folder path (everything after the site name)
- folder_parts = parts[1].split("/")[1:]
- folder_path = "/".join(folder_parts) if folder_parts else ""
-
- # URL decode the folder path
+ domain = parts[0].replace('https://', '')
+ siteName = parts[1].split('/')[0]
+ siteUrl = f"https://{domain}/sites/{siteName}"
+ folderParts = parts[1].split('/')[1:]
from urllib.parse import unquote
- folder_path = unquote(folder_path)
-
-
- return site_url, folder_path
-
- except Exception as e:
- logger.error(f"Error parsing SharePoint path '{path}': {str(e)}")
+ folderPath = unquote('/'.join(folderParts) if folderParts else '')
+ return siteUrl, folderPath
+ except Exception:
+ logger.error(f"Error parsing SharePoint path '{path}'")
return None, None
-
- def _is_text_file(self, filename: str) -> bool:
- """Check if file is a text file based on extension"""
- text_extensions = [
- '.txt', '.csv', '.json', '.xml', '.md', '.log',
- '.doc', '.docx', '.rtf', '.odt', # Document formats
- '.html', '.htm', '.css', '.js', '.ts', '.py', '.java', '.cpp', '.c', '.h', # Code files
- '.ini', '.cfg', '.conf', '.properties', # Config files
- '.sql', '.yaml', '.yml', '.toml', # Data/config files
- '.ps1', '.bat', '.sh', '.bash' # Script files
- ]
- return any(filename.lower().endswith(ext) for ext in text_extensions)
-
- def process_file_content(self, file_content: bytes, file_name: str, mime_type: str) -> Dict[str, Any]:
- """
- Process file content for neutralization
-
- Args:
- file_content: Binary file content
- file_name: Name of the file
- mime_type: MIME type of the file
-
- Returns:
- Dictionary with neutralization results
- """
- try:
- # Determine content type based on MIME type
- content_type = self._get_content_type_from_mime(mime_type)
-
- # Decode content to text
- try:
- text_content = file_content.decode('utf-8')
- except UnicodeDecodeError:
- # Try with different encodings
- for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
- try:
- text_content = file_content.decode(encoding)
- break
- except UnicodeDecodeError:
- continue
- else:
- raise ValueError("Unable to decode file content")
-
- # Generate a temporary file ID for tracking
- temp_file_id = str(uuid.uuid4())
-
- # Neutralize the content
- neutralization_result = self.neutralize_text(text_content, temp_file_id)
-
- # Encode the neutralized content back to bytes
- neutralized_content = neutralization_result["neutralized_text"].encode('utf-8')
-
- # Generate neutralized file name
- neutralized_file_name = f"neutralized_{file_name}"
-
- return {
- "success": True,
- "original_content": text_content,
- "neutralized_content": neutralization_result["neutralized_text"],
- "neutralized_file_name": neutralized_file_name,
- "attributes": neutralization_result["attributes"],
- "mapping": neutralization_result["mapping"],
- "file_id": temp_file_id
- }
-
- except Exception as e:
- logger.error(f"Error processing file content: {str(e)}")
- return {
- "success": False,
- "error": str(e),
- "original_content": None,
- "neutralized_content": None
- }
-
- def _get_content_type_from_mime(self, mime_type: str) -> str:
- """Determine content type from MIME type for neutralization processing"""
- if mime_type.startswith('text/'):
- return 'text'
- elif mime_type in ['application/json', 'application/xml', 'text/xml']:
- return 'json' if 'json' in mime_type else 'xml'
- elif mime_type in ['text/csv', 'application/csv']:
- return 'csv'
- else:
- return 'text' # Default to text processing
-
- def batch_neutralize_files(self, files_data: List[Dict[str, Any]]) -> Dict[str, Any]:
- """
- Process multiple files for neutralization
-
- Args:
- files_data: List of dictionaries containing file information
- Each dict should have: content, name, mime_type
-
- Returns:
- Dictionary with batch processing results
- """
- try:
- results = []
- total_files = len(files_data)
- successful_files = 0
- errors = []
-
- for file_data in files_data:
- try:
- result = self.process_file_content(
- file_data['content'],
- file_data['name'],
- file_data['mime_type']
- )
-
- if result['success']:
- successful_files += 1
- results.append({
- 'file_name': file_data['name'],
- 'neutralized_file_name': result['neutralized_file_name'],
- 'file_id': result['file_id'],
- 'attributes_count': len(result['attributes'])
- })
- else:
- errors.append(f"Failed to process {file_data['name']}: {result['error']}")
-
- except Exception as e:
- error_msg = f"Error processing {file_data['name']}: {str(e)}"
- errors.append(error_msg)
- logger.error(error_msg)
-
- return {
- "success": len(errors) == 0,
- "total_files": total_files,
- "successful_files": successful_files,
- "failed_files": len(errors),
- "results": results,
- "errors": errors
- }
-
- except Exception as e:
- logger.error(f"Error in batch neutralization: {str(e)}")
- return {
- "success": False,
- "total_files": len(files_data),
- "successful_files": 0,
- "failed_files": len(files_data),
- "results": [],
- "errors": [str(e)]
- }
-
- def cleanup_file_attributes(self, file_id: str) -> bool:
- """Clean up neutralization attributes for a specific file"""
- return self.app_interface.deleteNeutralizationAttributes(file_id)
-
- def get_processing_stats(self) -> Dict[str, Any]:
- """Get statistics about neutralization processing"""
- try:
- # Get all attributes for the current mandate
- all_attributes = self.get_attributes()
-
- # Group by pattern type
- pattern_counts = {}
- for attr in all_attributes:
- pattern_type = attr.patternType
- pattern_counts[pattern_type] = pattern_counts.get(pattern_type, 0) + 1
-
- # Get unique files
- unique_files = set(attr.fileId for attr in all_attributes if attr.fileId)
-
- return {
- "total_attributes": len(all_attributes),
- "unique_files": len(unique_files),
- "pattern_counts": pattern_counts,
- "mandate_id": self.current_user.mandateId
- }
-
- except Exception as e:
- logger.error(f"Error getting processing stats: {str(e)}")
- return {
- "total_attributes": 0,
- "unique_files": 0,
- "pattern_counts": {},
- "error": str(e)
- }
diff --git a/modules/features/syncDelta/mainSyncDelta.py b/modules/features/syncDelta/mainSyncDelta.py
index f2c5dbe6..3fac4f7b 100644
--- a/modules/features/syncDelta/mainSyncDelta.py
+++ b/modules/features/syncDelta/mainSyncDelta.py
@@ -11,7 +11,7 @@ import csv
import io
from datetime import datetime, UTC
from typing import Dict, Any, List, Optional
-from modules.connectors.connectorSharepoint import ConnectorSharepoint
+from modules.services.serviceSharepoint.mainSharepoint import SharepointService
from modules.connectors.connectorTicketJira import ConnectorTicketJira
from modules.interfaces.interfaceAppObjects import getRootInterface
from modules.interfaces.interfaceAppModel import UserInDB
@@ -232,6 +232,10 @@ class ManagerSyncDelta:
self.jira_connector = None
self.sharepoint_connector = None
self.target_site = None
+ # Initialize centralized services with root user
+ from modules.services import getInterface as getServices
+ root_user = self.root_interface.getUserByUsername("admin")
+ self.services = getServices(root_user, None)
def get_sync_file_name(self) -> str:
"""Get the appropriate sync file name based on the sync mode."""
@@ -294,8 +298,9 @@ class ManagerSyncDelta:
logger.info(f"Found SharePoint connection: {sharepoint_connection.id}")
- # Get SharePoint token for this connection
- sharepoint_token = self.root_interface.getConnectionToken(sharepoint_connection.id)
+ # Get fresh SharePoint token for this connection
+ from modules.security.tokenManager import TokenManager
+ sharepoint_token = TokenManager().getFreshToken(self.root_interface, sharepoint_connection.id)
if not sharepoint_token:
logger.error("No SharePoint token found for Delta Group user connection")
return False
@@ -303,7 +308,7 @@ class ManagerSyncDelta:
logger.info(f"Found SharePoint token: {sharepoint_token.id}")
# Initialize SharePoint connector with Graph API
- self.sharepoint_connector = ConnectorSharepoint(access_token=sharepoint_token.tokenAccess)
+ self.sharepoint_connector = SharepointService(access_token=sharepoint_token.tokenAccess)
# Resolve the site by hostname + site path to get the real site ID
logger.info(
@@ -552,3 +557,21 @@ async def perform_sync_jira_delta_group() -> bool:
except Exception as e:
logger.error(f"Error in perform_sync_jira_delta_group: {str(e)}")
return False
+
+# Register scheduled job on import using the shared event manager
+try:
+ from modules.shared.eventManagement import eventManager
+
+ # Schedule sync every 20 minutes (at minutes 00, 20, 40)
+ eventManager.register_cron(
+ job_id="jira_delta_group_sync",
+ func=perform_sync_jira_delta_group,
+ cron_kwargs={"minute": "0,20,40"},
+ replace_existing=True,
+ coalesce=True,
+ max_instances=1,
+ misfire_grace_time=1800,
+ )
+ logger.info("Registered jira_delta_group_sync via EventManagement (every 20 minutes)")
+except Exception as e:
+ logger.error(f"Failed to register jira_delta_group_sync: {str(e)}")
\ No newline at end of file
diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py
deleted file mode 100644
index 6f0de9c9..00000000
--- a/modules/interfaces/interfaceAiCalls.py
+++ /dev/null
@@ -1,527 +0,0 @@
-import logging
-from typing import Dict, Any, List, Union, Optional
-from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
-from modules.connectors.connectorAiAnthropic import AiAnthropic
-from modules.services.serviceDocument.documentExtraction import DocumentExtraction
-from modules.interfaces.interfaceChatModel import ChatDocument
-
-logger = logging.getLogger(__name__)
-
-# AI Model Registry with Performance Data
-AI_MODELS = {
- "openai_gpt4o": {
- "connector": "openai",
- "max_tokens": 128000,
- "cost_per_1k_tokens": 0.03, # Input
- "cost_per_1k_tokens_output": 0.06, # Output
- "speed_rating": 8, # 1-10
- "quality_rating": 9, # 1-10
- "supports_images": True,
- "supports_documents": True,
- "context_length": 128000,
- "model_name": "gpt-4o"
- },
- "openai_gpt35": {
- "connector": "openai",
- "max_tokens": 16000,
- "cost_per_1k_tokens": 0.0015,
- "cost_per_1k_tokens_output": 0.002,
- "speed_rating": 9,
- "quality_rating": 7,
- "supports_images": False,
- "supports_documents": True,
- "context_length": 16000,
- "model_name": "gpt-3.5-turbo"
- },
- "anthropic_claude": {
- "connector": "anthropic",
- "max_tokens": 200000,
- "cost_per_1k_tokens": 0.015,
- "cost_per_1k_tokens_output": 0.075,
- "speed_rating": 7,
- "quality_rating": 10,
- "supports_images": True,
- "supports_documents": True,
- "context_length": 200000,
- "model_name": "claude-3-sonnet-20240229"
- }
-}
-
-class AiCalls:
- """Interface for AI service interactions with centralized call method"""
-
- def __init__(self):
- self.openaiService = AiOpenai()
- self.anthropicService = AiAnthropic()
- self.document_extractor = DocumentExtraction()
-
- async def callAi(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- operation_type: str = "general",
- priority: str = "balanced", # "speed", "quality", "cost", "balanced"
- compress_prompt: bool = True,
- compress_documents: bool = True,
- process_documents_individually: bool = False,
- max_cost: float = None,
- max_processing_time: int = None
- ) -> str:
- """
- Zentrale AI Call Methode mit intelligenter Modell-Auswahl und Content-Verarbeitung.
-
- Args:
- prompt: Der Hauptprompt für die AI
- documents: Liste von Dokumenten zur Verarbeitung
- operation_type: Art der Operation ("general", "document_analysis", "image_analysis", etc.)
- priority: Priorität für Modell-Auswahl ("speed", "quality", "cost", "balanced")
- compress_prompt: Ob der Prompt komprimiert werden soll
- compress_documents: Ob Dokumente komprimiert werden sollen
- process_documents_individually: Ob Dokumente einzeln verarbeitet werden sollen
- max_cost: Maximale Kosten für den Call
- max_processing_time: Maximale Verarbeitungszeit in Sekunden
-
- Returns:
- AI Response als String
- """
- try:
- # 1. Dokumente verarbeiten falls vorhanden
- document_content = ""
- if documents:
- document_content = await self._process_documents_for_ai(
- documents,
- operation_type,
- compress_documents,
- process_documents_individually
- )
-
- # 2. Bestes Modell basierend auf Priorität und Content auswählen
- selected_model = self._select_optimal_model(
- prompt,
- document_content,
- priority,
- operation_type,
- max_cost,
- max_processing_time
- )
-
- # 3. Content für das gewählte Modell optimieren
- optimized_prompt, optimized_content = await self._optimize_content_for_model(
- prompt,
- document_content,
- selected_model,
- compress_prompt,
- compress_documents
- )
-
- # 4. AI Call mit Failover ausführen
- return await self._execute_ai_call_with_failover(
- selected_model,
- optimized_prompt,
- optimized_content
- )
-
- except Exception as e:
- logger.error(f"Error in centralized AI call: {str(e)}")
- return f"Error: {str(e)}"
-
- def _select_optimal_model(
- self,
- prompt: str,
- document_content: str,
- priority: str,
- operation_type: str,
- max_cost: float = None,
- max_processing_time: int = None
- ) -> str:
- """Wählt das optimale Modell basierend auf Priorität und Content aus"""
-
- # Content-Größe berechnen
- total_content_size = len(prompt.encode('utf-8')) + len(document_content.encode('utf-8'))
-
- # Verfügbare Modelle filtern
- available_models = {}
- for model_name, model_info in AI_MODELS.items():
- # Prüfe ob Modell für Content-Größe geeignet ist
- if total_content_size > model_info["context_length"] * 0.8: # 80% für Content
- continue
-
- # Prüfe Kosten-Limit
- if max_cost:
- estimated_cost = self._estimate_cost(model_info, total_content_size)
- if estimated_cost > max_cost:
- continue
-
- # Prüfe Operation-Type Kompatibilität
- if operation_type == "image_analysis" and not model_info["supports_images"]:
- continue
-
- available_models[model_name] = model_info
-
- if not available_models:
- # Fallback zum kleinsten Modell
- return "openai_gpt35"
-
- # Modell basierend auf Priorität auswählen
- if priority == "speed":
- return max(available_models.keys(), key=lambda x: available_models[x]["speed_rating"])
- elif priority == "quality":
- return max(available_models.keys(), key=lambda x: available_models[x]["quality_rating"])
- elif priority == "cost":
- return min(available_models.keys(), key=lambda x: available_models[x]["cost_per_1k_tokens"])
- else: # balanced
- # Gewichtete Bewertung: 40% Qualität, 30% Geschwindigkeit, 30% Kosten
- def balanced_score(model_name):
- model_info = available_models[model_name]
- quality_score = model_info["quality_rating"] * 0.4
- speed_score = model_info["speed_rating"] * 0.3
- cost_score = (10 - (model_info["cost_per_1k_tokens"] * 1000)) * 0.3 # Niedrigere Kosten = höherer Score
- return quality_score + speed_score + cost_score
-
- return max(available_models.keys(), key=balanced_score)
-
- def _estimate_cost(self, model_info: Dict, content_size: int) -> float:
- """Schätzt die Kosten für einen AI Call"""
- # Grobe Schätzung: 1 Token ≈ 4 Zeichen
- estimated_tokens = content_size / 4
- input_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens"]
- output_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens_output"] * 0.1 # 10% für Output
- return input_cost + output_cost
-
- async def _process_documents_for_ai(
- self,
- documents: List[ChatDocument],
- operation_type: str,
- compress_documents: bool,
- process_individually: bool
- ) -> str:
- """Verarbeitet Dokumente für AI Call mit documentExtraction.py"""
-
- if not documents:
- return ""
-
- processed_contents = []
-
- for doc in documents:
- try:
- # Extrahiere Content mit documentExtraction.py
- extracted = await self.document_extractor.processFileData(
- doc.fileData,
- doc.fileName,
- doc.mimeType,
- prompt=f"Extract relevant content for {operation_type}",
- documentId=doc.id,
- enableAI=True
- )
-
- # Kombiniere alle Content-Items
- doc_content = []
- for content_item in extracted.contents:
- if content_item.data and content_item.data.strip():
- doc_content.append(content_item.data)
-
- if doc_content:
- combined_doc_content = "\n\n".join(doc_content)
-
- # Komprimiere falls gewünscht
- if compress_documents and len(combined_doc_content.encode('utf-8')) > 10000: # 10KB Limit
- combined_doc_content = await self._compress_content(
- combined_doc_content,
- 10000,
- "document"
- )
-
- processed_contents.append(f"Document: {doc.fileName}\n{combined_doc_content}")
-
- except Exception as e:
- logger.warning(f"Error processing document {doc.fileName}: {str(e)}")
- processed_contents.append(f"Document: {doc.fileName}\n[Error processing document: {str(e)}]")
-
- return "\n\n---\n\n".join(processed_contents)
-
- async def _optimize_content_for_model(
- self,
- prompt: str,
- document_content: str,
- model_name: str,
- compress_prompt: bool,
- compress_documents: bool
- ) -> tuple[str, str]:
- """Optimiert Content für das gewählte Modell"""
-
- model_info = AI_MODELS[model_name]
- max_content_size = model_info["context_length"] * 0.7 # 70% für Content
-
- optimized_prompt = prompt
- optimized_content = document_content
-
- # Prompt komprimieren falls gewünscht
- if compress_prompt and len(prompt.encode('utf-8')) > 2000: # 2KB Limit für Prompt
- optimized_prompt = await self._compress_content(prompt, 2000, "prompt")
-
- # Dokument-Content komprimieren falls gewünscht
- if compress_documents and document_content:
- content_size = len(document_content.encode('utf-8'))
- if content_size > max_content_size:
- optimized_content = await self._compress_content(
- document_content,
- int(max_content_size),
- "document"
- )
-
- return optimized_prompt, optimized_content
-
- async def _compress_content(self, content: str, target_size: int, content_type: str) -> str:
- """Komprimiert Content intelligent basierend auf Typ"""
-
- if len(content.encode('utf-8')) <= target_size:
- return content
-
- try:
- # Verwende AI für intelligente Kompression
- compression_prompt = f"""
- Komprimiere den folgenden {content_type} auf maximal {target_size} Zeichen,
- behalte aber alle wichtigen Informationen bei:
-
- {content}
-
- Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
- """
-
- # Verwende das schnellste verfügbare Modell für Kompression
- compression_model = "openai_gpt35"
- model_info = AI_MODELS[compression_model]
- connector = getattr(self, f"{model_info['connector']}Service")
-
- messages = [{"role": "user", "content": compression_prompt}]
-
- if model_info["connector"] == "openai":
- compressed = await connector.callAiBasic(messages)
- else:
- response = await connector.callAiBasic(messages)
- compressed = response["choices"][0]["message"]["content"]
-
- return compressed
-
- except Exception as e:
- logger.warning(f"AI compression failed, using truncation: {str(e)}")
- # Fallback: Einfache Truncation
- return content[:target_size] + "... [truncated]"
-
- async def _execute_ai_call_with_failover(
- self,
- model_name: str,
- prompt: str,
- document_content: str
- ) -> str:
- """Führt AI Call mit automatischem Failover aus"""
-
- try:
- model_info = AI_MODELS[model_name]
- connector = getattr(self, f"{model_info['connector']}Service")
-
- # Messages vorbereiten
- messages = []
- if document_content:
- messages.append({
- "role": "system",
- "content": f"Context from documents:\n{document_content}"
- })
-
- messages.append({
- "role": "user",
- "content": prompt
- })
-
- # AI Call ausführen
- if model_info["connector"] == "openai":
- return await connector.callAiBasic(messages)
- else: # anthropic
- response = await connector.callAiBasic(messages)
- return response["choices"][0]["message"]["content"]
-
- except ContextLengthExceededException:
- logger.warning(f"Context length exceeded for {model_name}, trying fallback")
- # Fallback zu Modell mit größerem Context
- fallback_model = self._find_fallback_model(model_name)
- if fallback_model:
- return await self._execute_ai_call_with_failover(fallback_model, prompt, document_content)
- else:
- # Letzter Ausweg: Content weiter komprimieren
- compressed_prompt = await self._compress_content(prompt, 1000, "prompt")
- compressed_content = await self._compress_content(document_content, 5000, "document")
- return await self._execute_ai_call_with_failover("openai_gpt35", compressed_prompt, compressed_content)
-
- except Exception as e:
- logger.warning(f"AI call failed with {model_name}: {e}")
- # Allgemeiner Fallback
- return await self._execute_ai_call_with_failover("openai_gpt35", prompt, document_content)
-
- def _find_fallback_model(self, current_model: str) -> Optional[str]:
- """Findet ein Fallback-Modell mit größerem Context"""
- current_context = AI_MODELS[current_model]["context_length"]
-
- # Suche Modell mit größerem Context
- for model_name, model_info in AI_MODELS.items():
- if model_info["context_length"] > current_context:
- return model_name
-
- return None
-
- # Legacy methods
-
- async def callAiTextBasic(self, prompt: str, context: Optional[str] = None) -> str:
- """
- Basic text processing - now uses centralized AI call method.
-
- Args:
- prompt: The user prompt to process
- context: Optional system context/prompt
-
- Returns:
- The AI response as text
- """
- # Combine context with prompt if provided
- full_prompt = prompt
- if context:
- full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
-
- # Use centralized AI call with speed priority for basic calls
- return await self.callAi(
- prompt=full_prompt,
- priority="speed",
- compress_prompt=True,
- compress_documents=False
- )
-
- async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str:
- """
- Advanced text processing - now uses centralized AI call method.
-
- Args:
- prompt: The user prompt to process
- context: Optional system context/prompt
- _is_fallback: Internal flag (kept for compatibility)
-
- Returns:
- The AI response as text
- """
- # Combine context with prompt if provided
- full_prompt = prompt
- if context:
- full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
-
- # Use centralized AI call with quality priority for advanced calls
- return await self.callAi(
- prompt=full_prompt,
- priority="quality",
- compress_prompt=False,
- compress_documents=False
- )
-
- async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
- """
- Basic image processing - now uses centralized AI call method.
-
- Args:
- prompt: The prompt for image analysis
- imageData: The image data (file path or bytes)
- mimeType: Optional MIME type of the image
-
- Returns:
- The AI response as text
- """
- try:
- # For image processing, use the original connector directly
- # as the centralized method doesn't handle images yet
- return await self.openaiService.callAiImage(prompt, imageData, mimeType)
- except Exception as e:
- logger.error(f"Error in OpenAI image call: {str(e)}")
- return f"Error: {str(e)}"
-
- async def callAiImageAdvanced(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
- """
- Advanced image processing - now uses centralized AI call method.
-
- Args:
- prompt: The prompt for image analysis
- imageData: The image data (file path or bytes)
- mimeType: Optional MIME type of the image
-
- Returns:
- The AI response as text
- """
- try:
- # For image processing, use the original connector directly
- # as the centralized method doesn't handle images yet
- return await self.anthropicService.callAiImage(prompt, imageData, mimeType)
- except Exception as e:
- logger.error(f"Error in Anthropic image call: {str(e)}")
- return f"Error: {str(e)}"
-
- # Convenience methods for common use cases
-
- async def callAiForDocumentAnalysis(
- self,
- prompt: str,
- documents: List[ChatDocument],
- priority: str = "balanced"
- ) -> str:
- """Convenience method for document analysis"""
- return await self.callAi(
- prompt=prompt,
- documents=documents,
- operation_type="document_analysis",
- priority=priority,
- compress_documents=True,
- process_documents_individually=False
- )
-
- async def callAiForReportGeneration(
- self,
- prompt: str,
- documents: List[ChatDocument],
- priority: str = "quality"
- ) -> str:
- """Convenience method for report generation"""
- return await self.callAi(
- prompt=prompt,
- documents=documents,
- operation_type="report_generation",
- priority=priority,
- compress_documents=True,
- process_documents_individually=True
- )
-
- async def callAiForEmailComposition(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- priority: str = "speed"
- ) -> str:
- """Convenience method for email composition"""
- return await self.callAi(
- prompt=prompt,
- documents=documents,
- operation_type="email_composition",
- priority=priority,
- compress_prompt=True,
- compress_documents=True
- )
-
- async def callAiForTaskPlanning(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- priority: str = "balanced"
- ) -> str:
- """Convenience method for task planning"""
- return await self.callAi(
- prompt=prompt,
- documents=documents,
- operation_type="task_planning",
- priority=priority,
- compress_prompt=False,
- compress_documents=True
- )
-
diff --git a/modules/interfaces/interfaceAiModel.py b/modules/interfaces/interfaceAiModel.py
new file mode 100644
index 00000000..6bd541b2
--- /dev/null
+++ b/modules/interfaces/interfaceAiModel.py
@@ -0,0 +1,30 @@
+from typing import Optional
+from pydantic import BaseModel, Field
+
+
+class AiCallOptions(BaseModel):
+ """Options for centralized AI processing (no document extraction here)."""
+
+ operationType: str = Field(default="general", description="Type of operation")
+ priority: str = Field(default="balanced", description="speed|quality|cost|balanced")
+ compressPrompt: bool = Field(default=True, description="Whether to compress the prompt")
+ compressContext: bool = Field(default=True, description="Whether to compress optional context")
+ maxCost: Optional[float] = Field(default=None, description="Max cost budget")
+ maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds")
+
+
+class AiCallRequest(BaseModel):
+ """Centralized AI call request payload for interface use."""
+
+ prompt: str = Field(description="The user prompt")
+ context: Optional[str] = Field(default=None, description="Optional external context (e.g., extracted docs)")
+ options: AiCallOptions = Field(default_factory=AiCallOptions)
+
+
+class AiCallResponse(BaseModel):
+ """Standardized AI call response."""
+
+ content: str = Field(description="AI response content")
+ modelName: str = Field(description="Selected model name")
+ usedTokens: Optional[int] = Field(default=None, description="Estimated used tokens")
+ costEstimate: Optional[float] = Field(default=None, description="Estimated cost of the call")
diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py
new file mode 100644
index 00000000..6f59eace
--- /dev/null
+++ b/modules/interfaces/interfaceAiObjects.py
@@ -0,0 +1,117 @@
+import logging
+from typing import Dict, Any, List
+
+from modules.connectors.connectorAiOpenai import AiOpenai
+from modules.connectors.connectorAiAnthropic import AiAnthropic
+from modules.interfaces.interfaceAiModel import AiCallOptions, AiCallRequest, AiCallResponse
+
+
+logger = logging.getLogger(__name__)
+
+
+# Local model registry (connectors specification) belongs in interface layer, not service
+aiModels: Dict[str, Dict[str, Any]] = {
+ "openai_gpt4o": {
+ "connector": "openai",
+ "contextLength": 128000,
+ "costPer1kTokens": 0.03,
+ "costPer1kTokensOutput": 0.06,
+ "speedRating": 8,
+ "qualityRating": 9,
+ },
+ "openai_gpt35": {
+ "connector": "openai",
+ "contextLength": 16000,
+ "costPer1kTokens": 0.0015,
+ "costPer1kTokensOutput": 0.002,
+ "speedRating": 9,
+ "qualityRating": 7,
+ },
+ "anthropic_claude": {
+ "connector": "anthropic",
+ "contextLength": 200000,
+ "costPer1kTokens": 0.015,
+ "costPer1kTokensOutput": 0.075,
+ "speedRating": 7,
+ "qualityRating": 10,
+ },
+}
+
+
+class AiObjects:
+ """Centralized AI interface: selects model and calls connector. No document handling."""
+
+ def __init__(self):
+ self.openaiService = AiOpenai()
+ self.anthropicService = AiAnthropic()
+
+ def _estimateCost(self, modelInfo: Dict[str, Any], contentSize: int) -> float:
+ estimatedTokens = contentSize / 4
+ inputCost = (estimatedTokens / 1000) * modelInfo["costPer1kTokens"]
+ outputCost = (estimatedTokens / 1000) * modelInfo["costPer1kTokensOutput"] * 0.1
+ return inputCost + outputCost
+
+ def _selectModel(self, prompt: str, context: str, options: AiCallOptions) -> str:
+ totalSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8"))
+ candidates: Dict[str, Dict[str, Any]] = {}
+ for name, info in aiModels.items():
+ if totalSize > info["contextLength"] * 0.8:
+ continue
+ if options.maxCost is not None:
+ if self._estimateCost(info, totalSize) > options.maxCost:
+ continue
+ candidates[name] = info
+ if not candidates:
+ return "openai_gpt35"
+ if options.priority == "speed":
+ return max(candidates, key=lambda k: candidates[k]["speedRating"])
+ if options.priority == "quality":
+ return max(candidates, key=lambda k: candidates[k]["qualityRating"])
+ if options.priority == "cost":
+ return min(candidates, key=lambda k: candidates[k]["costPer1kTokens"])
+ def balancedScore(name: str) -> float:
+ info = candidates[name]
+ return info["qualityRating"] * 0.4 + info["speedRating"] * 0.3 + (10 - info["costPer1kTokens"] * 1000) * 0.3
+ return max(candidates, key=balancedScore)
+
+ def _connectorFor(self, modelName: str):
+ return self.openaiService if aiModels[modelName]["connector"] == "openai" else self.anthropicService
+
+ async def call(self, request: AiCallRequest) -> AiCallResponse:
+ prompt = request.prompt
+ context = request.context or ""
+ options = request.options
+
+ # Compress optionally (prompt/context) - simple truncation fallback kept here
+ def maybeTruncate(text: str, limit: int) -> str:
+ data = text.encode("utf-8")
+ if len(data) <= limit:
+ return text
+ return data[:limit].decode("utf-8", errors="ignore") + "... [truncated]"
+
+ if options.compressPrompt and len(prompt.encode("utf-8")) > 2000:
+ prompt = maybeTruncate(prompt, 2000)
+ if options.compressContext and len(context.encode("utf-8")) > 70000:
+ context = maybeTruncate(context, 70000)
+
+ modelName = self._selectModel(prompt, context, options)
+
+ messages: List[Dict[str, Any]] = []
+ if context:
+ messages.append({"role": "system", "content": f"Context from documents:\n{context}"})
+ messages.append({"role": "user", "content": prompt})
+
+ connector = self._connectorFor(modelName)
+ if aiModels[modelName]["connector"] == "openai":
+ content = await connector.callAiBasic(messages)
+ else:
+ response = await connector.callAiBasic(messages)
+ content = response["choices"][0]["message"]["content"]
+
+ # Estimate cost/tokens
+ totalSize = len((prompt + context).encode("utf-8"))
+ cost = self._estimateCost(aiModels[modelName], totalSize)
+ usedTokens = int(totalSize / 4)
+
+ return AiCallResponse(content=content, modelName=modelName, usedTokens=usedTokens, costEstimate=cost)
+
diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py
index ccd471f1..069556cf 100644
--- a/modules/interfaces/interfaceAppObjects.py
+++ b/modules/interfaces/interfaceAppObjects.py
@@ -201,7 +201,6 @@ class AppObjects:
"""
return self.access.canModify(model_class, recordId)
-
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
@@ -268,105 +267,6 @@ class AppObjects:
logger.error(f"Error getting user by ID: {str(e)}")
return None
- def getUserConnections(self, userId: str) -> List[UserConnection]:
- """Returns all connections for a user."""
- try:
- # Get connections for this user
- connections = self.db.getRecordset(UserConnection, recordFilter={"userId": userId})
-
- # Convert to UserConnection objects
- result = []
- for conn_dict in connections:
- try:
- # Create UserConnection object
- connection = UserConnection(
- id=conn_dict["id"],
- userId=conn_dict["userId"],
- authority=conn_dict.get("authority"),
- externalId=conn_dict.get("externalId", ""),
- externalUsername=conn_dict.get("externalUsername", ""),
- externalEmail=conn_dict.get("externalEmail"),
- status=conn_dict.get("status", "pending"),
- connectedAt=conn_dict.get("connectedAt"),
- lastChecked=conn_dict.get("lastChecked"),
- expiresAt=conn_dict.get("expiresAt")
- )
- result.append(connection)
- except Exception as e:
- logger.error(f"Error converting connection dict to object: {str(e)}")
- continue
- return result
-
- except Exception as e:
- logger.error(f"Error getting user connections: {str(e)}")
- return []
-
- def addUserConnection(self, userId: str, authority: AuthAuthority, externalId: str,
- externalUsername: str, externalEmail: Optional[str] = None,
- status: ConnectionStatus = ConnectionStatus.PENDING) -> UserConnection:
- """
- Adds a new connection for a user.
-
- Args:
- userId: The ID of the user
- authority: The authentication authority (e.g., MSFT, GOOGLE)
- externalId: The external ID from the authority
- externalUsername: The username from the authority
- externalEmail: Optional email from the authority
- status: The connection status (defaults to PENDING)
-
- Returns:
- The created UserConnection object
- """
- try:
- # Get the user
- user = self.getUser(userId)
- if not user:
- raise ValueError(f"User not found: {userId}")
-
- # Create new connection with all required fields
- connection = UserConnection(
- id=str(uuid.uuid4()),
- userId=userId,
- authority=authority,
- externalId=externalId,
- externalUsername=externalUsername,
- externalEmail=externalEmail,
- status=status,
- connectedAt=get_utc_timestamp(),
- lastChecked=get_utc_timestamp(),
- expiresAt=None # Optional field, set to None by default
- )
-
- # Save to connections table
- self.db.recordCreate(UserConnection, connection)
-
-
- return connection
-
- except Exception as e:
- logger.error(f"Error adding user connection: {str(e)}")
- raise ValueError(f"Failed to add user connection: {str(e)}")
-
- def removeUserConnection(self, connectionId: str) -> None:
- """Remove a connection to an external service"""
- try:
- # Get connection
- connections = self.db.getRecordset(UserConnection, recordFilter={
- "id": connectionId
- })
-
- if not connections:
- raise ValueError(f"Connection {connectionId} not found")
-
- # Delete connection
- self.db.recordDelete(UserConnection, connectionId)
-
-
- except Exception as e:
- logger.error(f"Error removing user connection: {str(e)}")
- raise ValueError(f"Failed to remove user connection: {str(e)}")
-
def authenticateLocalUser(self, username: str, password: str) -> Optional[User]:
"""Authenticates a user by username and password using local authentication."""
# Clear the users table from cache and reload it
@@ -551,6 +451,154 @@ class AppObjects:
logger.error(f"Error deleting user: {str(e)}")
raise ValueError(f"Failed to delete user: {str(e)}")
+ def _getInitialUser(self) -> Optional[Dict[str, Any]]:
+ """Get the initial user record directly from database without access control."""
+ try:
+ initialUserId = self.getInitialId(UserInDB)
+ if not initialUserId:
+ return None
+
+ users = self.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
+ return users[0] if users else None
+ except Exception as e:
+ logger.error(f"Error getting initial user: {str(e)}")
+ return None
+
+ def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
+ """Checks if a username is available for registration."""
+ try:
+ username = checkData.get("username")
+ authenticationAuthority = checkData.get("authenticationAuthority", "local")
+
+ if not username:
+ return {
+ "available": False,
+ "message": "Username is required"
+ }
+
+ # Get user by username
+ user = self.getUserByUsername(username)
+
+ # Check if user exists (User model instance)
+ if user is not None:
+ return {
+ "available": False,
+ "message": "Username is already taken"
+ }
+
+ return {
+ "available": True,
+ "message": "Username is available"
+ }
+
+ except Exception as e:
+ logger.error(f"Error checking username availability: {str(e)}")
+ return {
+ "available": False,
+ "message": f"Error checking username availability: {str(e)}"
+ }
+
+ # Connection methods
+
+ def getUserConnections(self, userId: str) -> List[UserConnection]:
+ """Returns all connections for a user."""
+ try:
+ # Get connections for this user
+ connections = self.db.getRecordset(UserConnection, recordFilter={"userId": userId})
+
+ # Convert to UserConnection objects
+ result = []
+ for conn_dict in connections:
+ try:
+ # Create UserConnection object
+ connection = UserConnection(
+ id=conn_dict["id"],
+ userId=conn_dict["userId"],
+ authority=conn_dict.get("authority"),
+ externalId=conn_dict.get("externalId", ""),
+ externalUsername=conn_dict.get("externalUsername", ""),
+ externalEmail=conn_dict.get("externalEmail"),
+ status=conn_dict.get("status", "pending"),
+ connectedAt=conn_dict.get("connectedAt"),
+ lastChecked=conn_dict.get("lastChecked"),
+ expiresAt=conn_dict.get("expiresAt")
+ )
+ result.append(connection)
+ except Exception as e:
+ logger.error(f"Error converting connection dict to object: {str(e)}")
+ continue
+ return result
+
+ except Exception as e:
+ logger.error(f"Error getting user connections: {str(e)}")
+ return []
+
+ def addUserConnection(self, userId: str, authority: AuthAuthority, externalId: str,
+ externalUsername: str, externalEmail: Optional[str] = None,
+ status: ConnectionStatus = ConnectionStatus.PENDING) -> UserConnection:
+ """
+ Adds a new connection for a user.
+
+ Args:
+ userId: The ID of the user
+ authority: The authentication authority (e.g., MSFT, GOOGLE)
+ externalId: The external ID from the authority
+ externalUsername: The username from the authority
+ externalEmail: Optional email from the authority
+ status: The connection status (defaults to PENDING)
+
+ Returns:
+ The created UserConnection object
+ """
+ try:
+ # Get the user
+ user = self.getUser(userId)
+ if not user:
+ raise ValueError(f"User not found: {userId}")
+
+ # Create new connection with all required fields
+ connection = UserConnection(
+ id=str(uuid.uuid4()),
+ userId=userId,
+ authority=authority,
+ externalId=externalId,
+ externalUsername=externalUsername,
+ externalEmail=externalEmail,
+ status=status,
+ connectedAt=get_utc_timestamp(),
+ lastChecked=get_utc_timestamp(),
+ expiresAt=None # Optional field, set to None by default
+ )
+
+ # Save to connections table
+ self.db.recordCreate(UserConnection, connection)
+
+
+ return connection
+
+ except Exception as e:
+ logger.error(f"Error adding user connection: {str(e)}")
+ raise ValueError(f"Failed to add user connection: {str(e)}")
+
+ def removeUserConnection(self, connectionId: str) -> None:
+ """Remove a connection to an external service"""
+ try:
+ # Get connection
+ connections = self.db.getRecordset(UserConnection, recordFilter={
+ "id": connectionId
+ })
+
+ if not connections:
+ raise ValueError(f"Connection {connectionId} not found")
+
+ # Delete connection
+ self.db.recordDelete(UserConnection, connectionId)
+
+
+ except Exception as e:
+ logger.error(f"Error removing user connection: {str(e)}")
+ raise ValueError(f"Failed to remove user connection: {str(e)}")
+
# Mandate methods
def getAllMandates(self) -> List[Mandate]:
@@ -650,52 +698,7 @@ class AppObjects:
logger.error(f"Error deleting mandate: {str(e)}")
raise ValueError(f"Failed to delete mandate: {str(e)}")
- def _getInitialUser(self) -> Optional[Dict[str, Any]]:
- """Get the initial user record directly from database without access control."""
- try:
- initialUserId = self.getInitialId(UserInDB)
- if not initialUserId:
- return None
-
- users = self.db.getRecordset(UserInDB, recordFilter={"id": initialUserId})
- return users[0] if users else None
- except Exception as e:
- logger.error(f"Error getting initial user: {str(e)}")
- return None
-
- def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
- """Checks if a username is available for registration."""
- try:
- username = checkData.get("username")
- authenticationAuthority = checkData.get("authenticationAuthority", "local")
-
- if not username:
- return {
- "available": False,
- "message": "Username is required"
- }
-
- # Get user by username
- user = self.getUserByUsername(username)
-
- # Check if user exists (User model instance)
- if user is not None:
- return {
- "available": False,
- "message": "Username is already taken"
- }
-
- return {
- "available": True,
- "message": "Username is available"
- }
-
- except Exception as e:
- logger.error(f"Error checking username availability: {str(e)}")
- return {
- "available": False,
- "message": f"Error checking username availability: {str(e)}"
- }
+ # Token methods
def saveAccessToken(self, token: Token, replace_existing: bool = True) -> None:
"""Save an access token for the current user (must NOT have connectionId)"""
@@ -803,56 +806,8 @@ class AppObjects:
logger.error(f"Error saving connection token: {str(e)}")
raise
- def getAccessToken(self, authority: str, auto_refresh: bool = True) -> Optional[Token]:
- """Get the latest valid access token for the current user and authority, optionally auto-refresh if expired"""
- try:
- # Validate that we're not looking for connection tokens
- if not self.currentUser or not self.currentUser.id:
- raise ValueError("No valid user context available for token retrieval")
-
- # Get access tokens for this user and authority (must NOT have connectionId)
- tokens = self.db.getRecordset(Token, recordFilter={
- "userId": self.currentUser.id,
- "authority": authority,
- "connectionId": None # Ensure we only get access tokens
- })
-
- if not tokens:
- return None
-
- # Sort by creation date and get the latest
- tokens.sort(key=lambda x: x.get("createdAt", ""), reverse=True)
- latest_token = Token(**tokens[0])
-
- # Check if token is expired
- if latest_token.expiresAt and latest_token.expiresAt < get_utc_timestamp():
- if auto_refresh:
- # Import TokenManager here to avoid circular imports
- from modules.security.tokenManager import TokenManager
- token_manager = TokenManager()
-
- # Try to refresh the token
- refreshed_token = token_manager.refresh_token(latest_token)
- if refreshed_token:
- # Save the new token (which will automatically replace old ones)
- self.saveAccessToken(refreshed_token)
-
- return refreshed_token
- else:
- logger.warning(f"Failed to refresh expired access token for {authority}")
- return None
- else:
- logger.warning(f"Access token for {authority} is expired (expiresAt: {latest_token.expiresAt})")
- return None
-
- return latest_token
-
- except Exception as e:
- logger.error(f"Error getting access token: {str(e)}")
- return None
-
- def getConnectionToken(self, connectionId: str, auto_refresh: bool = True) -> Optional[Token]:
- """Get the connection token for a specific connectionId, optionally auto-refresh if expired"""
+ def getConnectionToken(self, connectionId: str) -> Optional[Token]:
+ """Get the latest stored token for a specific connectionId (no refresh)."""
try:
# Validate connectionId
if not connectionId:
@@ -873,31 +828,7 @@ class AppObjects:
tokens.sort(key=lambda x: x.get("expiresAt", 0), reverse=True)
latest_token = Token(**tokens[0])
- # Check if token is expired or expires within 30 minutes
- current_time = get_utc_timestamp()
- thirty_minutes = 30 * 60 # 30 minutes in seconds
-
- if latest_token.expiresAt and latest_token.expiresAt < (current_time + thirty_minutes):
- if auto_refresh:
- # Import TokenManager here to avoid circular imports
- from modules.security.tokenManager import TokenManager
- token_manager = TokenManager()
-
- # Try to refresh the token
- refreshed_token = token_manager.refresh_token(latest_token)
-
- if refreshed_token:
- # Save the new token (which will automatically replace old ones)
- self.saveConnectionToken(refreshed_token)
-
- logger.info(f"Proactively refreshed connection token for connectionId {connectionId} (expired in {latest_token.expiresAt - current_time} seconds)")
- return refreshed_token
- else:
- logger.warning(f"Token refresh failed for connectionId {connectionId}")
- return None
- else:
- logger.warning(f"Connection token for connectionId {connectionId} expires soon (expiresAt: {latest_token.expiresAt})")
- return None
+ # No auto-refresh here. Callers should use a higher-level service to refresh when needed.
return latest_token
@@ -905,53 +836,6 @@ class AppObjects:
logger.error(f"Error getting connection token for connectionId {connectionId}: {str(e)}")
return None
- def deleteAccessToken(self, authority: str) -> None:
- """Delete all access tokens for the current user and authority"""
- try:
- # Validate user context
- if not self.currentUser or not self.currentUser.id:
- raise ValueError("No valid user context available for token deletion")
-
- # Get access tokens to delete (must NOT have connectionId)
- tokens = self.db.getRecordset(Token, recordFilter={
- "userId": self.currentUser.id,
- "authority": authority,
- "connectionId": None # Ensure we only delete access tokens
- })
-
- # Delete each token
- for token in tokens:
- self.db.recordDelete(Token, token["id"])
-
-
- except Exception as e:
- logger.error(f"Error deleting access token: {str(e)}")
- raise
-
- def deleteConnectionTokenByConnectionId(self, connectionId: str) -> None:
- """Delete all connection tokens for a specific connectionId"""
- try:
- # Validate connectionId
- if not connectionId:
- raise ValueError("connectionId is required for deleteConnectionTokenByConnectionId")
-
- # Get connection tokens to delete
- tokens = self.db.getRecordset(Token, recordFilter={
- "connectionId": connectionId
- })
-
- # Delete each token
- for token in tokens:
- self.db.recordDelete(Token, token["id"])
-
-
- except Exception as e:
- logger.error(f"Error deleting connection token for connectionId {connectionId}: {str(e)}")
- raise
-
- # =====================
- # Token revocation (LOCAL gateway JWTs)
- # =====================
def findActiveTokenById(self, tokenId: str, userId: str, authority: AuthAuthority, sessionId: str = None, mandateId: str = None) -> Optional[Token]:
"""Find an active access token by its id (jti) with optional session/tenant scoping."""
try:
@@ -1088,7 +972,7 @@ class AppObjects:
logger.error(f"Error during logout: {str(e)}")
raise
- # Data Neutralization methods
+ # Neutralization methods
def getNeutralizationConfig(self) -> Optional[DataNeutraliserConfig]:
"""Get the data neutralization configuration for the current user's mandate"""
@@ -1138,98 +1022,6 @@ class AppObjects:
logger.error(f"Error creating/updating neutralization config: {str(e)}")
raise ValueError(f"Failed to create/update neutralization config: {str(e)}")
- def neutralizeText(self, text: str, file_id: Optional[str] = None) -> Dict[str, Any]:
- """Neutralize text content and store attribute mappings"""
- try:
- from modules.services.serviceNeutralization.neutralizer import DataAnonymizer
-
- # Get neutralization configuration to extract namesToParse
- config = self.getNeutralizationConfig()
- names_to_parse = []
- if config and hasattr(config, 'namesToParse') and config.namesToParse:
- # Split by newlines and filter out empty strings
- names_to_parse = [name.strip() for name in config.namesToParse.split('\n') if name.strip()]
-
- # Initialize anonymizer with custom names
- anonymizer = DataAnonymizer(names_to_parse=names_to_parse)
-
- # Process the text
- result = anonymizer.process_content(text, 'text')
-
- # Store attribute mappings in database
- stored_attributes = []
- for original_text, neutralized_text in result.mapping.items():
- # Extract pattern type and UUID from the neutralized text format [type.uuid]
- pattern_type = "unknown"
- placeholder_uuid = None
-
- if neutralized_text.startswith("[") and "." in neutralized_text and neutralized_text.endswith("]"):
- # Extract type and UUID from [type.uuid] format
- inner = neutralized_text[1:-1] # Remove [ and ]
- if "." in inner:
- pattern_type, placeholder_uuid = inner.split(".", 1)
-
- # Check if this exact original text already has a placeholder in the database
- existing_attribute = self.getExistingPlaceholder(original_text)
-
- if existing_attribute:
- # Reuse existing placeholder
- existing_uuid = existing_attribute.id
- existing_pattern_type = existing_attribute.patternType
-
- # Update the neutralized text to use the existing UUID
- result.data = result.data.replace(neutralized_text, f"[{existing_pattern_type}.{existing_uuid}]")
- result.mapping[original_text] = f"[{existing_pattern_type}.{existing_uuid}]"
-
- stored_attributes.append(existing_attribute)
- else:
- # Create new attribute record with the UUID that the neutralizer generated
- attribute_data = {
- "id": placeholder_uuid, # Use the UUID from the neutralizer
- "mandateId": self.mandateId,
- "userId": self.userId,
- "originalText": original_text,
- "fileId": file_id,
- "patternType": pattern_type
- }
-
- attribute = DataNeutralizerAttributes.from_dict(attribute_data)
- created_attribute = self.db.recordCreate(DataNeutralizerAttributes, attribute)
- stored_attributes.append(created_attribute)
-
-
- # The neutralized text is already in the correct [type.uuid] format
- # No need to replace it, as it's already properly formatted
-
- return {
- "neutralized_text": result.data,
- "attributes": stored_attributes,
- "mapping": result.mapping,
- "replaced_fields": result.replaced_fields,
- "processed_info": result.processed_info
- }
-
- except Exception as e:
- logger.error(f"Error neutralizing text: {str(e)}")
- raise ValueError(f"Failed to neutralize text: {str(e)}")
-
- def getExistingPlaceholder(self, original_text: str) -> Optional[DataNeutralizerAttributes]:
- """Get existing placeholder for original text if it exists"""
- try:
- existing_attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
- "mandateId": self.mandateId,
- "userId": self.userId,
- "originalText": original_text
- })
-
- if existing_attributes:
- return DataNeutralizerAttributes.from_dict(existing_attributes[0])
- return None
-
- except Exception as e:
- logger.error(f"Error getting existing placeholder: {str(e)}")
- return None
-
def getNeutralizationAttributes(self, file_id: Optional[str] = None) -> List[DataNeutralizerAttributes]:
"""Get neutralization attributes, optionally filtered by file ID"""
try:
@@ -1246,35 +1038,6 @@ class AppObjects:
logger.error(f"Error getting neutralization attributes: {str(e)}")
return []
- def resolveNeutralizedText(self, text: str) -> str:
- """Resolve UIDs in neutralized text back to original text"""
- try:
- # Find all placeholders in the new format [type.uuid]
- placeholder_pattern = r'\[([a-z]+)\.([a-f0-9-]{36})\]'
- matches = re.findall(placeholder_pattern, text)
-
- resolved_text = text
- for placeholder_type, uid in matches:
- # Find the attribute with this UID (which is the record ID)
- attributes = self.db.getRecordset(DataNeutralizerAttributes, recordFilter={
- "mandateId": self.mandateId,
- "id": uid
- })
-
- if attributes:
- attribute = attributes[0]
- # Replace placeholder with original text
- placeholder = f"[{placeholder_type}.{uid}]"
- resolved_text = resolved_text.replace(placeholder, attribute["originalText"])
- else:
- logger.warning(f"No attribute found for UID {uid}")
-
- return resolved_text
-
- except Exception as e:
- logger.error(f"Error resolving neutralized text: {str(e)}")
- return text
-
def deleteNeutralizationAttributes(self, file_id: str) -> bool:
"""Delete all neutralization attributes for a specific file"""
try:
diff --git a/modules/interfaces/interfaceTicketObjects.py b/modules/interfaces/interfaceTicketObjects.py
index 73894ab8..28dd3c3d 100644
--- a/modules/interfaces/interfaceTicketObjects.py
+++ b/modules/interfaces/interfaceTicketObjects.py
@@ -6,7 +6,7 @@ import pandas as pd
import openpyxl
from modules.shared.timezoneUtils import get_utc_now
-from modules.connectors.connectorSharepoint import ConnectorSharepoint
+from modules.services.serviceSharepoint.mainSharepoint import SharepointService
from modules.interfaces.interfaceTicketModel import TicketBase, Task
@@ -14,7 +14,7 @@ from modules.interfaces.interfaceTicketModel import TicketBase, Task
@dataclass(slots=True)
class TicketSharepointSyncInterface:
connector_ticket: TicketBase
- connector_sharepoint: ConnectorSharepoint
+ connector_sharepoint: SharepointService
task_sync_definition: dict
sync_folder: str
sync_file: str
@@ -26,7 +26,7 @@ class TicketSharepointSyncInterface:
async def create(
cls,
connector_ticket: TicketBase,
- connector_sharepoint: ConnectorSharepoint,
+ connector_sharepoint: SharepointService,
task_sync_definition: dict,
sync_folder: str,
sync_file: str,
@@ -700,7 +700,7 @@ class TicketSharepointSyncInterface:
def _transform_tasks(
self, tasks: list[Task], include_put: bool = False
- ) -> list[Task]:
+ ) -> list[Task]:
"""Transforms tasks according to the task_sync_definition."""
transformed_tasks = []
diff --git a/modules/routes/routeDataNeutralization.py b/modules/routes/routeDataNeutralization.py
index 939c4422..322d5398 100644
--- a/modules/routes/routeDataNeutralization.py
+++ b/modules/routes/routeDataNeutralization.py
@@ -7,7 +7,7 @@ from modules.security.auth import limiter, getCurrentUser
# Import interfaces
from modules.interfaces.interfaceAppModel import User, DataNeutraliserConfig, DataNeutralizerAttributes
-from modules.features.neutralizePlayground.mainNeutralizePlayground import NeutralizationService
+from modules.features.neutralization.mainNeutralizationPlayground import NeutralizationService
# Configure logger
logger = logging.getLogger(__name__)
diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py
index 9cca2b3a..bf87259e 100644
--- a/modules/routes/routeSecurityGoogle.py
+++ b/modules/routes/routeSecurityGoogle.py
@@ -339,7 +339,7 @@ async def auth_callback(code: str, state: str, request: Request) -> HTMLResponse
)
# Create JWT token data (like Microsoft does)
- from modules.security.auth import createAccessToken
+ from modules.security.jwtService import createAccessToken
jwt_token_data = {
"sub": user.username,
"mandateId": str(user.mandateId),
@@ -637,29 +637,19 @@ async def verify_token(
detail="No Google connection found for current user"
)
- # Get the current token
- current_token = appInterface.getConnectionToken(google_connection.id, auto_refresh=False)
-
+ # Get a fresh token via TokenManager convenience method
+ from modules.security.tokenManager import TokenManager
+ current_token = TokenManager().getFreshToken(appInterface, google_connection.id)
+
if not current_token:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No Google token found for this connection"
)
- # Verify the token
+ # Verify the (fresh) token
token_verification = await verify_google_token(current_token.tokenAccess)
- if not token_verification.get("valid"):
- # Try to refresh the token if verification failed
- from modules.security.tokenManager import TokenManager
- token_manager = TokenManager()
- refreshed_token = token_manager.refresh_token(current_token)
-
- if refreshed_token:
- appInterface.saveConnectionToken(refreshed_token)
- # Verify the refreshed token
- token_verification = await verify_google_token(refreshed_token.tokenAccess)
-
return {
"valid": token_verification.get("valid", False),
"scopes": token_verification.get("scopes", []),
@@ -721,8 +711,9 @@ async def refresh_token(
logger.debug(f"Found Google connection: {google_connection.id}, status={google_connection.status}")
- # Get the token for this specific connection using the new method
- current_token = appInterface.getConnectionToken(google_connection.id, auto_refresh=False)
+ # Get the token for this specific connection (fresh if expiring soon)
+ from modules.security.tokenManager import TokenManager
+ current_token = TokenManager().getFreshToken(appInterface, google_connection.id)
if not current_token:
raise HTTPException(
@@ -731,38 +722,25 @@ async def refresh_token(
)
+ # If we could not obtain a fresh token, report error
+ if not current_token:
+ raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to refresh token")
- # Always attempt refresh (as per your requirement)
- from modules.security.tokenManager import TokenManager
- token_manager = TokenManager()
+ # Update the connection status and timing
+ google_connection.expiresAt = float(current_token.expiresAt) if current_token.expiresAt else google_connection.expiresAt
+ google_connection.lastChecked = get_utc_timestamp()
+ google_connection.status = ConnectionStatus.ACTIVE
+ appInterface.db.recordModify(UserConnection, google_connection.id, google_connection.to_dict())
- refreshed_token = token_manager.refresh_token(current_token)
- if refreshed_token:
- # Save the new connection token (which will automatically replace old ones)
- appInterface.saveConnectionToken(refreshed_token)
-
- # Update the connection's expiration time
- google_connection.expiresAt = float(refreshed_token.expiresAt)
- google_connection.lastChecked = get_utc_timestamp()
- google_connection.status = ConnectionStatus.ACTIVE
-
- # Save updated connection
- appInterface.db.recordModify(UserConnection, google_connection.id, google_connection.to_dict())
-
- # Calculate time until expiration
- current_time = get_utc_timestamp()
- expires_in = int(refreshed_token.expiresAt - current_time)
-
- return {
- "message": "Token refreshed successfully",
- "expires_at": refreshed_token.expiresAt,
- "expires_in_seconds": expires_in
- }
- else:
- raise HTTPException(
- status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
- detail="Failed to refresh token"
- )
+ # Calculate time until expiration
+ current_time = get_utc_timestamp()
+ expires_in = int(current_token.expiresAt - current_time) if current_token.expiresAt else 0
+
+ return {
+ "message": "Token refreshed successfully",
+ "expires_at": current_token.expiresAt,
+ "expires_in_seconds": expires_in
+ }
except HTTPException:
raise
diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py
index 7b396f77..017e7e90 100644
--- a/modules/routes/routeSecurityLocal.py
+++ b/modules/routes/routeSecurityLocal.py
@@ -13,7 +13,8 @@ from jose import jwt
from pydantic import BaseModel
# Import auth modules
-from modules.security.auth import createAccessToken, createAccessTokenWithCookie, setRefreshTokenCookie, getCurrentUser, limiter, SECRET_KEY, ALGORITHM
+from modules.security.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
+from modules.security.jwtService import createAccessToken, createRefreshToken, setAccessTokenCookie, setRefreshTokenCookie
from modules.interfaces.interfaceAppObjects import getInterface, getRootInterface
from modules.interfaces.interfaceAppModel import User, UserInDB, AuthAuthority, UserPrivilege, Token
from modules.shared.attributeUtils import ModelMixin
@@ -91,11 +92,13 @@ async def login(
session_id = str(uuid.uuid4())
token_data["sid"] = session_id
- # Create access token with httpOnly cookie
- access_token = createAccessTokenWithCookie(token_data, response)
+ # Create access token + set cookie
+ access_token, _access_expires = createAccessToken(token_data)
+ setAccessTokenCookie(response, access_token)
- # Create refresh token with httpOnly cookie
- refresh_token = setRefreshTokenCookie(token_data, response)
+ # Create refresh token + set cookie
+ refresh_token, _refresh_expires = createRefreshToken(token_data)
+ setRefreshTokenCookie(response, refresh_token)
# Get expiration time for response
try:
@@ -287,8 +290,9 @@ async def refresh_token(
"authenticationAuthority": currentUser.authenticationAuthority
}
- # Create new access token with cookie
- access_token = createAccessTokenWithCookie(token_data, response)
+ # Create new access token + set cookie
+ access_token, _expires = createAccessToken(token_data)
+ setAccessTokenCookie(response, access_token)
# Get expiration time
try:
diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py
index 8c2d8856..2b73db59 100644
--- a/modules/routes/routeSecurityMsft.py
+++ b/modules/routes/routeSecurityMsft.py
@@ -14,7 +14,8 @@ import httpx
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceAppObjects import getInterface, getRootInterface
from modules.interfaces.interfaceAppModel import AuthAuthority, User, Token, ConnectionStatus, UserConnection
-from modules.security.auth import getCurrentUser, limiter, createAccessToken
+from modules.security.auth import getCurrentUser, limiter
+from modules.security.jwtService import createAccessToken
from modules.shared.attributeUtils import ModelMixin
from modules.shared.timezoneUtils import get_utc_now, create_expiration_timestamp, get_utc_timestamp
@@ -559,9 +560,9 @@ async def refresh_token(
logger.debug(f"Found Microsoft connection: {msft_connection.id}, status={msft_connection.status}")
- # Get the token for this specific connection using the new method
- # Enable auto-refresh to handle expired tokens gracefully
- current_token = appInterface.getConnectionToken(msft_connection.id, auto_refresh=True)
+ # Get a fresh token via TokenManager convenience method
+ from modules.security.tokenManager import TokenManager
+ current_token = TokenManager().getFreshToken(appInterface, msft_connection.id)
if not current_token:
raise HTTPException(
diff --git a/modules/security/auth.py b/modules/security/auth.py
index 5b882203..a457c60c 100644
--- a/modules/security/auth.py
+++ b/modules/security/auth.py
@@ -54,106 +54,7 @@ limiter = Limiter(key_func=get_remote_address)
# Logger
logger = logging.getLogger(__name__)
-def createAccessToken(data: dict, expiresDelta: Optional[timedelta] = None) -> Tuple[str, datetime]:
- """
- Creates a JWT Access Token.
-
- Args:
- data: Data to encode (usually user ID or username)
- expiresDelta: Validity duration of the token (optional)
-
- Returns:
- Tuple of (JWT Token as string, expiration datetime)
- """
- toEncode = data.copy()
- # Ensure a token id (jti) exists for revocation tracking (only required for local, harmless otherwise)
- if "jti" not in toEncode or not toEncode.get("jti"):
- toEncode["jti"] = str(uuid.uuid4())
-
- if expiresDelta:
- expire = get_utc_now() + expiresDelta
- else:
- expire = get_utc_now() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
-
- toEncode.update({"exp": expire})
- encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
-
- return encodedJwt, expire
-
-def createAccessTokenWithCookie(data: dict, response: Response, expiresDelta: Optional[timedelta] = None) -> str:
- """
- Creates a JWT Access Token and sets it as an httpOnly cookie.
-
- Args:
- data: Data to encode (usually user ID or username)
- response: FastAPI Response object to set cookie
- expiresDelta: Validity duration of the token (optional)
-
- Returns:
- JWT Token as string
- """
- access_token, expires_at = createAccessToken(data, expiresDelta)
-
- # Set httpOnly cookie
- response.set_cookie(
- key="auth_token",
- value=access_token,
- httponly=True,
- secure=True, # HTTPS only in production
- samesite="strict",
- max_age=int(expiresDelta.total_seconds()) if expiresDelta else ACCESS_TOKEN_EXPIRE_MINUTES * 60
- )
-
- return access_token
-
-def createRefreshToken(data: dict) -> Tuple[str, datetime]:
- """
- Creates a JWT Refresh Token with longer expiration.
-
- Args:
- data: Data to encode (usually user ID or username)
-
- Returns:
- Tuple of (JWT Refresh Token as string, expiration datetime)
- """
- toEncode = data.copy()
- # Ensure a token id (jti) exists for revocation tracking
- if "jti" not in toEncode or not toEncode.get("jti"):
- toEncode["jti"] = str(uuid.uuid4())
-
- # Add refresh token type
- toEncode["type"] = "refresh"
-
- expire = get_utc_now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
- toEncode.update({"exp": expire})
- encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
-
- return encodedJwt, expire
-
-def setRefreshTokenCookie(data: dict, response: Response) -> str:
- """
- Creates a JWT Refresh Token and sets it as an httpOnly cookie.
-
- Args:
- data: Data to encode (usually user ID or username)
- response: FastAPI Response object to set cookie
-
- Returns:
- JWT Refresh Token as string
- """
- refresh_token, expires_at = createRefreshToken(data)
-
- # Set httpOnly cookie for refresh token
- response.set_cookie(
- key="refresh_token",
- value=refresh_token,
- httponly=True,
- secure=True, # HTTPS only in production
- samesite="strict",
- max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60 # Days to seconds
- )
-
- return refresh_token
+# Note: JWT creation and cookie helpers moved to modules.security.jwtService
def _getUserBase(token: str = Depends(cookieAuth)) -> User:
"""
diff --git a/modules/security/jwtService.py b/modules/security/jwtService.py
new file mode 100644
index 00000000..5e09e63e
--- /dev/null
+++ b/modules/security/jwtService.py
@@ -0,0 +1,72 @@
+"""
+JWT Service
+Centralizes local JWT creation and cookie helpers.
+"""
+
+from datetime import timedelta
+from typing import Optional, Tuple
+from fastapi import Response
+from jose import jwt
+
+from modules.shared.configuration import APP_CONFIG
+from modules.shared.timezoneUtils import get_utc_now
+
+# Config
+SECRET_KEY = APP_CONFIG.get("APP_JWT_KEY_SECRET")
+ALGORITHM = APP_CONFIG.get("Auth_ALGORITHM")
+ACCESS_TOKEN_EXPIRE_MINUTES = int(APP_CONFIG.get("APP_TOKEN_EXPIRY"))
+REFRESH_TOKEN_EXPIRE_DAYS = int(APP_CONFIG.get("APP_REFRESH_TOKEN_EXPIRY", "7"))
+
+
+def createAccessToken(data: dict, expiresDelta: Optional[timedelta] = None) -> Tuple[str, "datetime"]:
+ """Create a JWT access token and return (token, expiresAt)."""
+ toEncode = data.copy()
+ if "jti" not in toEncode or not toEncode.get("jti"):
+ import uuid
+ toEncode["jti"] = str(uuid.uuid4())
+
+ expire = get_utc_now() + (expiresDelta if expiresDelta else timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
+ toEncode.update({"exp": expire})
+ encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
+ return encodedJwt, expire
+
+
+def createRefreshToken(data: dict) -> Tuple[str, "datetime"]:
+ """Create a JWT refresh token and return (token, expiresAt)."""
+ toEncode = data.copy()
+ if "jti" not in toEncode or not toEncode.get("jti"):
+ import uuid
+ toEncode["jti"] = str(uuid.uuid4())
+ toEncode["type"] = "refresh"
+
+ expire = get_utc_now() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
+ toEncode.update({"exp": expire})
+ encodedJwt = jwt.encode(toEncode, SECRET_KEY, algorithm=ALGORITHM)
+ return encodedJwt, expire
+
+
+def setAccessTokenCookie(response: Response, token: str, expiresDelta: Optional[timedelta] = None) -> None:
+ """Set access token as httpOnly cookie."""
+ maxAge = int(expiresDelta.total_seconds()) if expiresDelta else ACCESS_TOKEN_EXPIRE_MINUTES * 60
+ response.set_cookie(
+ key="auth_token",
+ value=token,
+ httponly=True,
+ secure=True,
+ samesite="strict",
+ max_age=maxAge
+ )
+
+
+def setRefreshTokenCookie(response: Response, token: str) -> None:
+ """Set refresh token as httpOnly cookie."""
+ response.set_cookie(
+ key="refresh_token",
+ value=token,
+ httponly=True,
+ secure=True,
+ samesite="strict",
+ max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
+ )
+
+
diff --git a/modules/security/tokenManager.py b/modules/security/tokenManager.py
index c27c9939..92fa747f 100644
--- a/modules/security/tokenManager.py
+++ b/modules/security/tokenManager.py
@@ -6,7 +6,7 @@ Handles all token operations including automatic refresh for backend services.
import logging
import httpx
from datetime import datetime
-from typing import Optional, Dict, Any
+from typing import Optional, Dict, Any, Callable
from modules.interfaces.interfaceAppModel import Token, AuthAuthority
from modules.shared.configuration import APP_CONFIG
@@ -198,4 +198,66 @@ class TokenManager:
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
return None
-
\ No newline at end of file
+
+ def ensure_fresh_token(self, token: Token, *, seconds_before_expiry: int = 30 * 60, save_callback: Optional[Callable[[Token], None]] = None) -> Optional[Token]:
+ """Ensure a token is fresh; refresh if expiring within threshold.
+
+ Args:
+ token: Existing token to validate/refresh.
+ seconds_before_expiry: Threshold window to proactively refresh.
+ save_callback: Optional function to persist a refreshed token.
+
+ Returns:
+ A fresh token (refreshed or original) or None if refresh failed.
+ """
+ try:
+ if token is None:
+ return None
+
+ now_ts = get_utc_timestamp()
+ expires_at = token.expiresAt or 0
+
+ # If token expires within the threshold, try to refresh
+ if expires_at and expires_at < (now_ts + seconds_before_expiry):
+ logger.info(
+ f"ensure_fresh_token: Token for connection {token.connectionId} expiring soon "
+ f"(in {max(0, expires_at - now_ts)}s). Attempting proactive refresh."
+ )
+ refreshed = self.refresh_token(token)
+ if refreshed:
+ if save_callback is not None:
+ try:
+ save_callback(refreshed)
+ except Exception as e:
+ logger.warning(f"ensure_fresh_token: Failed to persist refreshed token: {e}")
+ return refreshed
+ else:
+ logger.warning("ensure_fresh_token: Token refresh failed")
+ return None
+
+ # Token is sufficiently fresh
+ return token
+ except Exception as e:
+ logger.error(f"ensure_fresh_token: Error ensuring fresh token: {e}")
+ return None
+
+ # Convenience wrapper to fetch and ensure fresh token for a connection via interface layer
+ def getFreshToken(self, interfaceApp, connectionId: str, secondsBeforeExpiry: int = 30 * 60) -> Optional[Token]:
+ """Return a fresh token for a connection, refreshing when expiring soon.
+
+ Reads the latest stored token via interfaceApp.getConnectionToken, then
+ uses ensure_fresh_token to refresh if needed and persists the refreshed
+ token via interfaceApp.saveConnectionToken.
+ """
+ try:
+ token = interfaceApp.getConnectionToken(connectionId)
+ if not token:
+ return None
+ return self.ensure_fresh_token(
+ token,
+ seconds_before_expiry=secondsBeforeExpiry,
+ save_callback=lambda t: interfaceApp.saveConnectionToken(t)
+ )
+ except Exception as e:
+ logger.error(f"getFreshToken: Error fetching or refreshing token for connection {connectionId}: {e}")
+ return None
\ No newline at end of file
diff --git a/modules/security/tokenRefreshService.py b/modules/security/tokenRefreshService.py
index 649960bc..5dcef46a 100644
--- a/modules/security/tokenRefreshService.py
+++ b/modules/security/tokenRefreshService.py
@@ -51,8 +51,8 @@ class TokenRefreshService:
try:
logger.debug(f"Refreshing Google token for connection {connection.id}")
- # Get current token
- current_token = interface.getConnectionToken(connection.id, auto_refresh=False)
+ # Get current token (no refresh in interface layer)
+ current_token = interface.getConnectionToken(connection.id)
if not current_token:
logger.warning(f"No Google token found for connection {connection.id}")
return False
@@ -100,8 +100,8 @@ class TokenRefreshService:
try:
logger.debug(f"Refreshing Microsoft token for connection {connection.id}")
- # Get current token
- current_token = interface.getConnectionToken(connection.id, auto_refresh=False)
+ # Get current token (no refresh in interface layer)
+ current_token = interface.getConnectionToken(connection.id)
if not current_token:
logger.warning(f"No Microsoft token found for connection {connection.id}")
return False
diff --git a/modules/services/__init__.py b/modules/services/__init__.py
new file mode 100644
index 00000000..842a2176
--- /dev/null
+++ b/modules/services/__init__.py
@@ -0,0 +1,100 @@
+from typing import Any
+
+from modules.interfaces.interfaceAppModel import User
+from modules.interfaces.interfaceChatModel import ChatWorkflow
+from modules.services.serviceWorkflows.mainServiceWorkflows import WorkflowService
+
+class PublicService:
+ """Lightweight proxy exposing only public callable attributes of a target.
+
+ - Hides names starting with '_'
+ - Optionally restricts to callables only
+ - Optional name_filter predicate for allow-list patterns
+ """
+
+ def __init__(self, target: Any, functions_only: bool = True, name_filter=None):
+ self._target = target
+ self._functions_only = functions_only
+ self._name_filter = name_filter
+
+ def __getattr__(self, name: str):
+ if name.startswith('_'):
+ raise AttributeError(f"'{type(self._target).__name__}' attribute '{name}' is private")
+ if self._name_filter and not self._name_filter(name):
+ raise AttributeError(f"'{name}' not exposed by policy")
+ attr = getattr(self._target, name)
+ if self._functions_only and not callable(attr):
+ raise AttributeError(f"'{name}' is not a function")
+ return attr
+
+ def __dir__(self):
+ names = [
+ n for n in dir(self._target)
+ if not n.startswith('_')
+ and (not self._functions_only or callable(getattr(self._target, n, None)))
+ and (self._name_filter(n) if self._name_filter else True)
+ ]
+ return sorted(names)
+
+
+class Services:
+
+ def __init__(self, user: User, workflow: ChatWorkflow):
+ self.user: User = user
+ self.workflow: ChatWorkflow = workflow
+
+ # Directly expose existing service modules
+
+ from .serviceDocument.mainServiceDocumentExtraction import DocumentExtractionService
+ self.document = PublicService(DocumentExtractionService(self))
+
+ from .serviceDocument.mainServiceDocumentGeneration import DocumentGenerationService
+ self.document = PublicService(DocumentGenerationService(self))
+
+ from .serviceNeutralization.mainNeutralization import NeutralizationService
+ self.neutralization = PublicService(NeutralizationService())
+
+ from .serviceSharepoint.mainSharepoint import SharePointService
+ self.sharepoint = PublicService(SharePointService(self))
+
+ from .serviceAi.mainServiceAi import AiService
+ self.ai = PublicService(AiService(self))
+
+ from .serviceWorkflows.mainServiceWorkflows import WorkflowService
+ self.workflow = PublicService(WorkflowService(self))
+
+ # Initialize chat interface for workflow operations
+ from modules.interfaces.interfaceChatObjects import getInterface as getChatInterface
+ self.chatInterface = getChatInterface(user)
+
+ # Chat interface wrapper methods
+ def getWorkflow(self, workflowId: str):
+ return self.chatInterface.getWorkflow(workflowId)
+
+ def createWorkflow(self, workflowData: dict):
+ return self.chatInterface.createWorkflow(workflowData)
+
+ def updateWorkflow(self, workflowId: str, workflowData: dict):
+ return self.chatInterface.updateWorkflow(workflowId, workflowData)
+
+ def createMessage(self, messageData: dict):
+ return self.chatInterface.createMessage(messageData)
+
+ def updateMessage(self, messageId: str, messageData: dict):
+ return self.chatInterface.updateMessage(messageId, messageData)
+
+ def createLog(self, logData: dict):
+ return self.chatInterface.createLog(logData)
+
+ def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0):
+ return self.chatInterface.updateWorkflowStats(workflowId, bytesSent, bytesReceived, tokenCount)
+
+ @property
+ def mandateId(self):
+ return self.chatInterface.mandateId
+
+
+def getInterface(user: User, workflow: ChatWorkflow) -> Services:
+ return Services(user, workflow)
+
+
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
new file mode 100644
index 00000000..c7458756
--- /dev/null
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -0,0 +1,137 @@
+import logging
+from typing import Dict, Any, List, Optional, Tuple
+
+from modules.interfaces.interfaceChatModel import ChatDocument
+from modules.services.serviceDocument.documentExtraction import DocumentExtractionService
+from modules.interfaces.interfaceAiModel import AiCallRequest, AiCallOptions
+from modules.interfaces.interfaceAiObjects import AiObjects
+
+
+logger = logging.getLogger(__name__)
+
+
+# Model registry is now provided by interfaces via AiModels
+
+
+class AiService:
+ """Centralized AI service orchestrating documents, model selection and failover.
+
+ The concrete connector instances (OpenAI/Anthropic) are injected by the interface layer.
+ """
+
+ def __init__(self, aiObjects: AiObjects | None = None) -> None:
+ # Only depend on interfaces
+ self.aiObjects = aiObjects or AiObjects()
+ self.documentExtractor = DocumentExtractionService()
+
+ async def callAi(
+ self,
+ prompt: str,
+ documents: Optional[List[ChatDocument]] = None,
+ processDocumentsIndividually: bool = False,
+ options: Optional[AiCallOptions] = None,
+ ) -> str:
+ try:
+ documentContent = ""
+ if documents:
+ documentContent = await self._processDocumentsForAi(
+ documents,
+ options.operationType if options else "general",
+ options.compressContext if options else True,
+ processDocumentsIndividually,
+ )
+
+ effectiveOptions = options or AiCallOptions()
+ request = AiCallRequest(
+ prompt=prompt,
+ context=documentContent or None,
+ options=effectiveOptions,
+ )
+
+ response = await self.aiObjects.call(request)
+ return response.content
+ except Exception as e:
+ logger.error(f"Error in centralized AI call: {str(e)}")
+ return f"Error: {str(e)}"
+
+ # Model selection now handled by interface AiObjects
+
+ # Cost estimation handled by interface for model selection
+
+ async def _processDocumentsForAi(
+ self,
+ documents: List[ChatDocument],
+ operationType: str,
+ compressDocuments: bool,
+ processIndividually: bool,
+ ) -> str:
+ if not documents:
+ return ""
+
+ processedContents: List[str] = []
+ for doc in documents:
+ try:
+ extracted = await self.documentExtractor.processFileData(
+ doc.fileData,
+ doc.fileName,
+ doc.mimeType,
+ prompt=f"Extract relevant content for {operationType}",
+ documentId=doc.id,
+ enableAI=True,
+ )
+
+ docContent: List[str] = []
+ for contentItem in extracted.contents:
+ if contentItem.data and contentItem.data.strip():
+ docContent.append(contentItem.data)
+
+ if docContent:
+ combinedDocContent = "\n\n".join(docContent)
+ if (
+ compressDocuments
+ and len(combinedDocContent.encode("utf-8")) > 10000
+ ):
+ combinedDocContent = await self._compressContent(
+ combinedDocContent, 10000, "document"
+ )
+ processedContents.append(
+ f"Document: {doc.fileName}\n{combinedDocContent}"
+ )
+ except Exception as e:
+ logger.warning(
+ f"Error processing document {doc.fileName}: {str(e)}"
+ )
+ processedContents.append(
+ f"Document: {doc.fileName}\n[Error processing document: {str(e)}]"
+ )
+
+ return "\n\n---\n\n".join(processedContents)
+
+ # Prompt/context optimization (compression) handled by interface
+
+ async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
+ if len(content.encode("utf-8")) <= targetSize:
+ return content
+
+ try:
+ compressionPrompt = f"""
+ Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
+ behalte aber alle wichtigen Informationen bei:
+
+ {content}
+
+ Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
+ """
+
+ # Service must not call connectors directly; use simple truncation fallback here
+ data = content.encode("utf-8")
+ return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
+ except Exception as e:
+ logger.warning(f"AI compression failed, using truncation: {str(e)}")
+ return content[:targetSize] + "... [truncated]"
+
+ # Failover logic now centralized in interface via model selection; service delegates a single call
+
+ # Fallback selection moved to interface; service doesn't select models directly
+
+
diff --git a/modules/services/serviceCenter.py b/modules/services/serviceCenter.py
deleted file mode 100644
index 0b999772..00000000
--- a/modules/services/serviceCenter.py
+++ /dev/null
@@ -1,1206 +0,0 @@
-import logging
-import importlib
-import pkgutil
-import inspect
-import os
-from typing import Dict, Any, List, Optional
-from modules.interfaces.interfaceAppModel import User, UserConnection
-from modules.interfaces.interfaceChatModel import (
- TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent
-)
-from modules.interfaces.interfaceAiCalls import AiCalls
-from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects
-from modules.interfaces.interfaceChatModel import ActionResult
-from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects
-from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects
-from modules.services.serviceDocument.documentExtraction import DocumentExtraction
-from modules.services.serviceDocument.documentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
-from modules.workflows.methods.methodBase import MethodBase
-from modules.shared.timezoneUtils import get_utc_timestamp
-import uuid
-
-import asyncio
-
-logger = logging.getLogger(__name__)
-
-class ServiceCenter:
- """Service center that provides access to all services and their functions"""
-
- def __init__(self, currentUser: User, workflow: ChatWorkflow):
- # Core services
- self.user = currentUser
- self.workflow = workflow
- self.tasks = workflow.tasks
- self.statusEnums = TaskStatus
- self.currentTask = None # Initialize current task as None
-
- # Initialize managers
- self.interfaceChat = getChatObjects(currentUser)
- self.interfaceComponent = getComponentObjects(currentUser)
- self.interfaceApp = getAppObjects(currentUser)
- self.interfaceAiCalls = AiCalls()
- self.documentProcessor = DocumentExtraction(self)
-
- # Initialize methods catalog
- self.methods = {}
- # Discover additional methods
- self._discoverMethods()
-
- def _discoverMethods(self):
- """Dynamically discover all method classes and their actions in modules methods package"""
- try:
- # Import the methods package
- methodsPackage = importlib.import_module('modules.workflows.methods')
-
- # Discover all modules in the package
- for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
- if not isPkg and name.startswith('method'):
- try:
- # Import the module
- module = importlib.import_module(f'modules.workflows.methods.{name}')
-
- # Find all classes in the module that inherit from MethodBase
- for itemName, item in inspect.getmembers(module):
- if (inspect.isclass(item) and
- issubclass(item, MethodBase) and
- item != MethodBase):
- # Instantiate the method
- methodInstance = item(self)
-
- # Discover actions from public methods
- actions = {}
- for methodName, method in inspect.getmembers(type(methodInstance), predicate=inspect.iscoroutinefunction):
- if not methodName.startswith('_'):
- # Bind the method to the instance
- bound_method = method.__get__(methodInstance, type(methodInstance))
- sig = inspect.signature(method)
- params = {}
- for paramName, param in sig.parameters.items():
- if paramName not in ['self']:
- # Get parameter type
- paramType = param.annotation if param.annotation != param.empty else Any
-
- # Get parameter description from docstring or default
- paramDesc = None
- if param.default != param.empty and hasattr(param.default, '__doc__'):
- paramDesc = param.default.__doc__
-
- params[paramName] = {
- 'type': paramType,
- 'required': param.default == param.empty,
- 'description': paramDesc,
- 'default': param.default if param.default != param.empty else None
- }
-
- actions[methodName] = {
- 'description': method.__doc__ or '',
- 'parameters': params,
- 'method': bound_method
- }
-
- # Add method instance with discovered actions
- self.methods[methodInstance.name] = {
- 'instance': methodInstance,
- 'description': methodInstance.description,
- 'actions': actions
- }
- logger.info(f"Discovered method: {methodInstance.name} with {len(actions)} actions")
-
- except Exception as e:
- logger.error(f"Error loading method module {name}: {str(e)}", exc_info=True)
-
- except Exception as e:
- logger.error(f"Error discovering methods: {str(e)}")
-
-
-
- # ===== Functions for Prompts: Context =====
-
- def getMethodsList(self) -> List[str]:
- """Get list of available methods with their signatures in the required format"""
- methodList = []
- for methodName, method in self.methods.items():
- methodInstance = method['instance']
- for actionName, action in method['actions'].items():
- # Use the new signature format from MethodBase
- signature = methodInstance.getActionSignature(actionName)
- if signature:
- methodList.append(signature)
- return methodList
-
- async def summarizeChat(self, messages: List[ChatMessage]) -> str:
- """
- Summarize chat messages from last to first message with status="first"
-
- Args:
- messages: List of chat messages to summarize
-
- Returns:
- str: Summary of the chat in user's language
- """
- try:
- # Get messages from last to first, stopping at first message with status="first"
- relevantMessages = []
- for msg in reversed(messages):
- relevantMessages.append(msg)
- if msg.status == "first":
- break
-
- # Create prompt for AI
- prompt = f"""You are an AI assistant providing a summary of a chat conversation.
-Please respond in '{self.user.language}' language.
-
-Chat History:
-{chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))}
-
-Instructions:
-1. Summarize the conversation's key points and outcomes
-2. Be concise but informative
-3. Use a professional but friendly tone
-4. Focus on important decisions and next steps if any
-
-Please provide a comprehensive summary of this conversation."""
-
- # Get summary using AI
- return await self.callAiTextBasic(prompt)
-
- except Exception as e:
- logger.error(f"Error summarizing chat: {str(e)}")
- return f"Error summarizing chat: {str(e)}"
-
- # ===== Functions for Prompts + Actions: Document References generation and resolution =====
-
- def getEnhancedDocumentContext(self) -> str:
- """Get enhanced document context formatted for action planning prompts with proper docList and docItem references"""
- try:
- document_list = self.getDocumentReferenceList()
-
- # Build technical context string for AI action planning
- context = "AVAILABLE DOCUMENTS:\n\n"
-
- # Process chat exchanges (current round)
- if document_list["chat"]:
- context += "CURRENT ROUND DOCUMENTS:\n"
- for exchange in document_list["chat"]:
- # Generate docList reference for the exchange (using message ID and label)
- # Find the message that corresponds to this exchange
- message_id = None
- for message in self.workflow.messages:
- if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange.documentsLabel:
- message_id = message.id
- break
-
- if message_id:
- doc_list_ref = f"docList:{message_id}:{exchange.documentsLabel}"
- else:
- # Fallback to label-only format if message ID not found
- doc_list_ref = f"docList:{exchange.documentsLabel}"
-
- logger.debug(f"Using document label for action planning: {exchange.documentsLabel} (message_id: {message_id})")
- context += f"- {doc_list_ref} contains:\n"
- # Generate docItem references for each document in the list
- for doc_ref in exchange.documents:
- if doc_ref.startswith("docItem:"):
- context += f" - {doc_ref}\n"
- else:
- # Convert to proper docItem format if needed
- context += f" - docItem:{doc_ref}\n"
- context += "\n"
-
- # Process history exchanges (previous rounds)
- if document_list["history"]:
- context += "WORKFLOW HISTORY DOCUMENTS:\n"
- for exchange in document_list["history"]:
- # Generate docList reference for the exchange (using message ID and label)
- # Find the message that corresponds to this exchange
- message_id = None
- for message in self.workflow.messages:
- if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange.documentsLabel:
- message_id = message.id
- break
-
- if message_id:
- doc_list_ref = f"docList:{message_id}:{exchange.documentsLabel}"
- else:
- # Fallback to label-only format if message ID not found
- doc_list_ref = f"docList:{exchange.documentsLabel}"
-
- logger.debug(f"Using history document label for action planning: {exchange.documentsLabel} (message_id: {message_id})")
- context += f"- {doc_list_ref} contains:\n"
- # Generate docItem references for each document in the list
- for doc_ref in exchange.documents:
- if doc_ref.startswith("docItem:"):
- context += f" - {doc_ref}\n"
- else:
- # Convert to proper docItem format if needed
- context += f" - docItem:{doc_ref}\n"
- context += "\n"
-
- if not document_list["chat"] and not document_list["history"]:
- context += "NO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
-
- return context
-
- except Exception as e:
- logger.error(f"Error generating enhanced document context: {str(e)}")
- return "NO DOCUMENTS AVAILABLE - Error generating document context."
-
- def getDocumentReferenceList(self) -> Dict[str, List[DocumentExchange]]:
- """Get list of document exchanges with new labeling format, sorted by recency"""
- # Collect all documents first and refresh their attributes
- all_documents = []
- for message in self.workflow.messages:
- if message.documents:
- all_documents.extend(message.documents)
-
- # Refresh file attributes for all documents
- if all_documents:
- self._refreshDocumentFileAttributes(all_documents)
-
- chat_exchanges = []
- history_exchanges = []
-
- # Process messages in reverse order; "first" marks boundary
- in_current_round = True
- for message in reversed(self.workflow.messages):
- is_first = message.status == "first" if hasattr(message, 'status') else False
-
- # Build a DocumentExchange if message has documents
- doc_exchange = None
- if message.documents:
- if message.actionId and message.documentsLabel:
- # Validate that we use the same label as in the message
- validated_label = self._validateDocumentLabelConsistency(message)
-
- # Use the message's actual documentsLabel
- doc_refs = []
- for doc in message.documents:
- doc_ref = self._getDocumentReferenceFromChatDocument(doc, message)
- doc_refs.append(doc_ref)
-
- doc_exchange = DocumentExchange(
- documentsLabel=validated_label,
- documents=doc_refs
- )
- else:
- # Generate new labels for documents without explicit labels
- doc_refs = []
- for doc in message.documents:
- doc_ref = self._getDocumentReferenceFromChatDocument(doc, message)
- doc_refs.append(doc_ref)
-
- if doc_refs:
- # Create a label based on message context
- context_prefix = self._generateWorkflowContextPrefix(message)
- context_label = f"{context_prefix}_context"
-
- doc_exchange = DocumentExchange(
- documentsLabel=context_label,
- documents=doc_refs
- )
-
- # Append to appropriate container based on boundary
- if doc_exchange:
- if in_current_round:
- chat_exchanges.append(doc_exchange)
- else:
- history_exchanges.append(doc_exchange)
-
- # Flip boundary after including the "first" message in chat
- if in_current_round and is_first:
- in_current_round = False
-
- # Sort by recency: most recent first, then current round, then earlier rounds
- # Sort chat exchanges by message sequence number (most recent first)
- chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True)
- # Sort history exchanges by message sequence number (most recent first)
- history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True)
-
- return {
- "chat": chat_exchanges,
- "history": history_exchanges
- }
-
- def _refreshDocumentFileAttributes(self, documents: List[ChatDocument]) -> None:
- """Update file attributes (fileName, fileSize, mimeType) for documents"""
- for doc in documents:
- try:
- file_item = self.interfaceComponent.getFile(doc.fileId)
- if file_item:
- doc.fileName = file_item.fileName
- doc.fileSize = file_item.fileSize
- doc.mimeType = file_item.mimeType
- else:
- logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
- except Exception as e:
- logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
-
- def _generateWorkflowContextPrefix(self, message: ChatMessage) -> str:
- """Generate workflow context prefix: round{num}_task{num}_action{num}"""
- round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
- task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
- action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
- return f"round{round_num}_task{task_num}_action{action_num}"
-
- def _getDocumentReferenceFromChatDocument(self, document: ChatDocument, message: ChatMessage) -> str:
- """Get document reference using document ID and filename."""
- try:
- # Use document ID and filename for simple reference
- return f"docItem:{document.id}:{document.fileName}"
- except Exception as e:
- logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
- # Re-raise the error to prevent workflow from continuing with invalid data
- raise
-
- def _getMessageSequenceForExchange(self, exchange: DocumentExchange) -> int:
- """Get message sequence number for sorting exchanges by recency"""
- try:
- # Extract message ID from the first document reference
- if exchange.documents and len(exchange.documents) > 0:
- first_doc_ref = exchange.documents[0]
- if first_doc_ref.startswith("docItem:"):
- # docItem::