From bbea0ff1153813c6af8a42f4539a588b9c942483 Mon Sep 17 00:00:00 2001
From: patrick-motsch
Date: Sun, 8 Feb 2026 00:25:48 +0100
Subject: [PATCH] revised state machine for workflow backend and ui
---
app.py | 26 +-
modules/datamodels/datamodelChat.py | 4 +-
modules/datamodels/datamodelUam.py | 2 +
.../chatbot/interfaceFeatureChatbot.py | 13 +-
modules/interfaces/interfaceDbApp.py | 27 +-
modules/interfaces/interfaceDbBilling.py | 410 +++++++++++++++++-
modules/interfaces/interfaceDbChat.py | 5 +-
modules/routes/routeBilling.py | 243 ++++++++++-
modules/routes/routeSecurityGoogle.py | 4 +
modules/routes/routeSecurityMsft.py | 3 +
.../serviceBilling/mainServiceBilling.py | 4 +-
.../mainServiceGeneration.py | 8 +
modules/system/mainSystem.py | 14 +-
.../composeAndDraftEmailWithContext.py | 22 +-
.../methodOutlook/actions/sendDraftEmail.py | 15 +-
.../methodOutlook/helpers/connection.py | 8 +
.../methods/methodOutlook/methodOutlook.py | 4 +-
.../workflows/processing/modes/modeDynamic.py | 42 +-
modules/workflows/workflowManager.py | 43 +-
19 files changed, 795 insertions(+), 102 deletions(-)
diff --git a/app.py b/app.py
index 0acdcfce..df7f9306 100644
--- a/app.py
+++ b/app.py
@@ -313,32 +313,18 @@ async def lifespan(app: FastAPI):
from modules.shared.auditLogger import registerAuditLogCleanupScheduler
registerAuditLogCleanupScheduler()
- # Ensure billing settings and accounts exist
+ # Ensure billing settings and accounts exist for all mandates
try:
from modules.interfaces.interfaceDbBilling import _getRootInterface as getBillingRootInterface
- from modules.datamodels.datamodelBilling import BillingSettings, BillingModelEnum
billingInterface = getBillingRootInterface()
- # Ensure root mandate has billing settings
- rootMandate = rootInterface.getRootMandate()
- if rootMandate:
- rootMandateId = rootMandate.get("id") if isinstance(rootMandate, dict) else getattr(rootMandate, "id", None)
- if rootMandateId:
- existingSettings = billingInterface.getSettings(rootMandateId)
- if not existingSettings:
- settings = BillingSettings(
- mandateId=rootMandateId,
- billingModel=BillingModelEnum.PREPAY_USER,
- defaultUserCredit=10.0,
- warningThresholdPercent=10.0,
- blockOnZeroBalance=True,
- notifyOnWarning=True
- )
- billingInterface.createSettings(settings)
- logger.info(f"Created billing settings for root mandate: PREPAY_USER with 10 CHF default credit")
+ # Step 1: Ensure all mandates have billing settings (creates defaults if missing)
+ settingsCreated = billingInterface.ensureAllMandateSettingsExist()
+ if settingsCreated > 0:
+ logger.info(f"Billing startup: Created {settingsCreated} missing mandate billing settings")
- # Efficient bulk check: Ensure all users have billing accounts (3 queries total)
+ # Step 2: Ensure all users have billing accounts (for PREPAY_USER mandates)
accountsCreated = billingInterface.ensureAllUserAccountsExist()
if accountsCreated > 0:
logger.info(f"Billing startup: Created {accountsCreated} missing user accounts")
diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py
index b1e73ae0..e2d631e8 100644
--- a/modules/datamodels/datamodelChat.py
+++ b/modules/datamodels/datamodelChat.py
@@ -12,6 +12,8 @@ import uuid
class ChatStat(BaseModel):
"""Statistics for chat operations. User-owned, no mandate context."""
+ model_config = {"populate_by_name": True, "extra": "allow"} # Allow DB system fields
+
id: str = Field(
default_factory=lambda: str(uuid.uuid4()), description="Primary key"
)
@@ -41,7 +43,7 @@ registerModelLabels(
"errorCount": {"en": "Error Count", "fr": "Nombre d'erreurs"},
"process": {"en": "Process", "fr": "Processus"},
"engine": {"en": "Engine", "fr": "Moteur"},
- "priceCHF": {"en": "Price USD", "fr": "Prix USD"},
+ "priceCHF": {"en": "Price CHF", "fr": "Prix CHF"},
},
)
diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py
index b0e6b468..155047a2 100644
--- a/modules/datamodels/datamodelUam.py
+++ b/modules/datamodels/datamodelUam.py
@@ -114,6 +114,7 @@ class UserConnection(BaseModel):
{"value": "none", "label": {"en": "None", "fr": "Aucun"}},
]})
tokenExpiresAt: Optional[float] = Field(None, description="When the current token expires (UTC timestamp in seconds)", json_schema_extra={"frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False})
+ grantedScopes: Optional[List[str]] = Field(None, description="OAuth scopes granted for this connection", json_schema_extra={"frontend_type": "list", "frontend_readonly": True, "frontend_required": False})
@computed_field
@computed_field
@@ -146,6 +147,7 @@ registerModelLabels(
"expiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"},
"tokenStatus": {"en": "Connection Status", "de": "Verbindungsstatus", "fr": "Statut de connexion"},
"tokenExpiresAt": {"en": "Expires At", "de": "Läuft ab am", "fr": "Expire le"},
+ "grantedScopes": {"en": "Granted Scopes", "de": "Gewährte Berechtigungen", "fr": "Autorisations accordées"},
"connectionReference": {"en": "Connection Reference", "de": "Verbindungsreferenz", "fr": "Référence de connexion"},
"displayLabel": {"en": "Display Label", "de": "Anzeigebezeichnung", "fr": "Libellé d'affichage"},
},
diff --git a/modules/features/chatbot/interfaceFeatureChatbot.py b/modules/features/chatbot/interfaceFeatureChatbot.py
index c1e5977e..68474898 100644
--- a/modules/features/chatbot/interfaceFeatureChatbot.py
+++ b/modules/features/chatbot/interfaceFeatureChatbot.py
@@ -1116,7 +1116,7 @@ class ChatObjects:
# Emit message event for streaming (if event manager is available)
try:
- from modules.features.chatbot.eventManager import get_event_manager
+ from modules.features.chatbot.eventManager import get_event_manager # type: ignore
event_manager = get_event_manager()
message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp())
# Emit message event in exact chatData format: {type, createdAt, item}
@@ -1514,7 +1514,7 @@ class ChatObjects:
# Only emit events for chatbot workflows, not for automation or dynamic workflows
if workflow.workflowMode == WorkflowModeEnum.WORKFLOW_CHATBOT:
try:
- from modules.features.chatbot.eventManager import get_event_manager
+ from modules.features.chatbot.eventManager import get_event_manager # type: ignore
event_manager = get_event_manager()
log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp())
# Emit log event in exact chatData format: {type, createdAt, item}
@@ -1563,8 +1563,8 @@ class ChatObjects:
if not stats:
return []
- # Return all stats records sorted by creation time
- stats.sort(key=lambda x: x.get("created_at", ""))
+ # Return all stats records sorted by _createdAt (system field from DB)
+ stats.sort(key=lambda x: x.get("_createdAt", 0))
# Ensure mandateId and featureInstanceId are set for each stat
return [ChatStat(**{**stat, "mandateId": stat.get("mandateId") or self.mandateId or "", "featureInstanceId": stat.get("featureInstanceId") or self.featureInstanceId or ""}) for stat in stats]
@@ -1680,11 +1680,12 @@ class ChatObjects:
"item": chatLog
})
- # Get stats list
+ # Get stats - ChatStat model now supports _createdAt via extra="allow"
stats = self.getStats(workflowId)
for stat in stats:
# Apply timestamp filtering in Python
- stat_timestamp = stat.createdAt if hasattr(stat, 'createdAt') else getUtcTimestamp()
+ # Use _createdAt (system field from DB, preserved via model_config extra="allow")
+ stat_timestamp = getattr(stat, '_createdAt', None) or getUtcTimestamp()
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
continue
diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py
index 1c082d33..1d8359a5 100644
--- a/modules/interfaces/interfaceDbApp.py
+++ b/modules/interfaces/interfaceDbApp.py
@@ -55,6 +55,9 @@ _gatewayInterfaces = {}
# Root interface instance
_rootAppObjects = None
+# Bootstrap completion flag - ensures bootstrap runs only ONCE per application lifecycle
+_bootstrapCompleted = False
+
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
@@ -200,8 +203,28 @@ class AppObjects:
return simpleFields, objectFields
def _initRecords(self):
- """Initialize standard records if they don't exist."""
- initBootstrap(self.db)
+ """Initialize standard records if they don't exist.
+
+ Uses a global flag to ensure bootstrap only runs ONCE per application lifecycle.
+ The flag is set BEFORE calling bootstrap to prevent recursive calls during bootstrap.
+ """
+ global _bootstrapCompleted
+
+ if _bootstrapCompleted:
+ return
+
+ # Set flag BEFORE bootstrap to prevent recursive calls during bootstrap
+ _bootstrapCompleted = True
+ logger.info("Starting bootstrap (will only run once)")
+
+ try:
+ initBootstrap(self.db)
+ logger.info("Bootstrap completed successfully")
+ except Exception as e:
+ # Reset flag on failure so bootstrap can be retried
+ _bootstrapCompleted = False
+ logger.error(f"Bootstrap failed: {e}")
+ raise
def checkRbacPermission(
diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py
index b3df9bea..bbb26d20 100644
--- a/modules/interfaces/interfaceDbBilling.py
+++ b/modules/interfaces/interfaceDbBilling.py
@@ -15,7 +15,8 @@ import uuid
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcTimestamp
-from modules.datamodels.datamodelUam import User
+from modules.datamodels.datamodelUam import User, Mandate
+from modules.datamodels.datamodelMembership import UserMandate
from modules.datamodels.datamodelBilling import (
BillingAccount,
BillingTransaction,
@@ -360,6 +361,60 @@ class BillingObjects:
return created
+ def ensureAllMandateSettingsExist(self) -> int:
+ """
+ Efficiently ensure all mandates have billing settings.
+ Creates default settings (PREPAY_USER) for mandates without settings.
+ Uses bulk queries to minimize database connections.
+
+ Returns:
+ Number of settings created
+ """
+ try:
+ settingsCreated = 0
+
+ # Step 1: Get all existing billing settings in one query (from billing DB)
+ allSettings = self.db.getRecordset(BillingSettings)
+ existingMandateIds = set(s.get("mandateId") for s in allSettings if s.get("mandateId"))
+
+ # Step 2: Get all mandates from APP database (separate connection)
+ appDb = DatabaseConnector(
+ dbDatabase=APP_CONFIG.get('DB_DATABASE', 'poweron_app'),
+ dbHost=APP_CONFIG.get('DB_HOST', 'localhost'),
+ dbPort=int(APP_CONFIG.get('DB_PORT', '5432')),
+ dbUser=APP_CONFIG.get('DB_USER'),
+ dbPassword=APP_CONFIG.get('DB_PASSWORD_SECRET')
+ )
+ allMandates = appDb.getRecordset(Mandate, recordFilter={"enabled": True})
+
+ # Step 3: Create settings for mandates that don't have them
+ for mandate in allMandates:
+ mandateId = mandate.get("id")
+ if not mandateId or mandateId in existingMandateIds:
+ continue
+
+ # Create default billing settings
+ settings = BillingSettings(
+ mandateId=mandateId,
+ billingModel=BillingModelEnum.PREPAY_USER,
+ defaultUserCredit=10.0,
+ warningThresholdPercent=10.0,
+ blockOnZeroBalance=True,
+ notifyOnWarning=True
+ )
+ self.createSettings(settings)
+ existingMandateIds.add(mandateId) # Track newly created
+ settingsCreated += 1
+
+ if settingsCreated > 0:
+ logger.info(f"Created {settingsCreated} missing billing settings for mandates")
+
+ return settingsCreated
+
+ except Exception as e:
+ logger.error(f"Error ensuring mandate settings exist: {e}")
+ return 0
+
def ensureAllUserAccountsExist(self) -> int:
"""
Efficiently ensure all users across all mandates have billing accounts.
@@ -368,10 +423,7 @@ class BillingObjects:
Returns:
Number of accounts created
"""
- from modules.interfaces.interfaceDbApp import getRootInterface as getAppRootInterface
-
try:
- appInterface = getAppRootInterface()
accountsCreated = 0
# Step 1: Get all billing settings in one query (only PREPAY_USER mandates need user accounts)
@@ -385,7 +437,7 @@ class BillingObjects:
logger.debug("No PREPAY_USER mandates found, skipping account check")
return 0
- # Step 2: Get all existing USER accounts in one query
+ # Step 2: Get all existing USER accounts in one query (from billing DB)
allAccounts = self.db.getRecordset(
BillingAccount,
recordFilter={"accountType": AccountTypeEnum.USER.value}
@@ -396,9 +448,16 @@ class BillingObjects:
key = (acc.get("mandateId"), acc.get("userId"))
existingAccountKeys.add(key)
- # Step 3: Get all user-mandate combinations in one query
- allUserMandates = appInterface.db.getRecordset(
- appInterface.db.getModel("UserMandate"),
+ # Step 3: Get all user-mandate combinations from APP database (separate connection)
+ appDb = DatabaseConnector(
+ dbDatabase=APP_CONFIG.get('DB_DATABASE', 'poweron_app'),
+ dbHost=APP_CONFIG.get('DB_HOST', 'localhost'),
+ dbPort=int(APP_CONFIG.get('DB_PORT', '5432')),
+ dbUser=APP_CONFIG.get('DB_USER'),
+ dbPassword=APP_CONFIG.get('DB_PASSWORD_SECRET')
+ )
+ allUserMandates = appDb.getRecordset(
+ UserMandate,
recordFilter={"enabled": True}
)
@@ -855,3 +914,338 @@ class BillingObjects:
logger.error(f"Error getting balances for user: {e}")
return balances
+
+ def getTransactionsForUser(self, userId: str, limit: int = 100) -> List[Dict[str, Any]]:
+ """
+ Get all transactions for a user across all mandates they belong to.
+
+ Args:
+ userId: User ID
+ limit: Maximum number of results
+
+ Returns:
+ List of transaction dicts
+ """
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+
+ allTransactions = []
+
+ try:
+ appInterface = getAppInterface(self.currentUser)
+ userMandates = appInterface.getUserMandates(userId)
+
+ for um in userMandates:
+ # Handle both Pydantic models and dicts
+ mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
+ if not mandateId:
+ continue
+
+ # Only include mandates with billing settings
+ settings = self.getSettings(mandateId)
+ if not settings:
+ continue
+
+ # Get transactions for this mandate
+ transactions = self.getTransactionsByMandate(mandateId, limit=limit)
+
+ # Add mandate context to each transaction
+ mandate = appInterface.getMandate(mandateId)
+ mandateName = ""
+ if mandate:
+ mandateName = getattr(mandate, 'name', None) or (mandate.get("name", "") if isinstance(mandate, dict) else "")
+
+ for t in transactions:
+ t["mandateId"] = mandateId
+ t["mandateName"] = mandateName
+ allTransactions.append(t)
+
+ except Exception as e:
+ logger.error(f"Error getting transactions for user: {e}")
+
+ # Sort by creation date descending and limit
+ allTransactions.sort(key=lambda x: x.get("_createdAt", ""), reverse=True)
+ return allTransactions[:limit]
+
+ # =========================================================================
+ # Mandate View Operations (Admin-Level)
+ # =========================================================================
+
+ def getMandateBalances(self, mandateIds: List[str] = None) -> List[Dict[str, Any]]:
+ """
+ Get mandate-level balances.
+
+ Args:
+ mandateIds: Optional list of mandate IDs to filter. If None, returns all.
+
+ Returns:
+ List of mandate balance dicts
+ """
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+
+ balances = []
+
+ try:
+ appInterface = getAppInterface(self.currentUser)
+
+ # Get settings for filtering
+ if mandateIds:
+ allSettings = [self.getSettings(mId) for mId in mandateIds]
+ allSettings = [s for s in allSettings if s]
+ else:
+ allSettings = self.db.getRecordset(BillingSettings)
+
+ for settings in allSettings:
+ mandateId = settings.get("mandateId")
+ if not mandateId:
+ continue
+
+ billingModel = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value))
+
+ # Get mandate info
+ mandate = appInterface.getMandate(mandateId)
+ mandateName = ""
+ if mandate:
+ mandateName = getattr(mandate, 'name', None) or (mandate.get("name", "") if isinstance(mandate, dict) else "")
+
+ # For PREPAY_MANDATE, get the mandate account balance
+ # For PREPAY_USER, aggregate all user balances
+ if billingModel == BillingModelEnum.PREPAY_MANDATE:
+ account = self.getMandateAccount(mandateId)
+ totalBalance = account.get("balance", 0.0) if account else 0.0
+ userCount = 0
+ elif billingModel == BillingModelEnum.PREPAY_USER:
+ # Get all user accounts for this mandate
+ userAccounts = self.db.getRecordset(
+ BillingAccount,
+ recordFilter={"mandateId": mandateId, "accountType": AccountTypeEnum.USER.value}
+ )
+ totalBalance = sum(acc.get("balance", 0.0) for acc in userAccounts)
+ userCount = len(userAccounts)
+ else:
+ totalBalance = 0.0
+ userCount = 0
+
+ balances.append({
+ "mandateId": mandateId,
+ "mandateName": mandateName,
+ "billingModel": billingModel.value,
+ "totalBalance": totalBalance,
+ "userCount": userCount,
+ "defaultUserCredit": settings.get("defaultUserCredit", 0.0),
+ "warningThresholdPercent": settings.get("warningThresholdPercent", 10.0),
+ "blockOnZeroBalance": settings.get("blockOnZeroBalance", True)
+ })
+
+ except Exception as e:
+ logger.error(f"Error getting mandate balances: {e}")
+
+ return balances
+
+ def getMandateTransactions(self, mandateIds: List[str] = None, limit: int = 100) -> List[Dict[str, Any]]:
+ """
+ Get all transactions for specified mandates.
+
+ Args:
+ mandateIds: Optional list of mandate IDs to filter. If None, returns all.
+ limit: Maximum number of results
+
+ Returns:
+ List of transaction dicts with mandate context
+ """
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+
+ allTransactions = []
+
+ try:
+ appInterface = getAppInterface(self.currentUser)
+
+ # Determine which mandates to query
+ if mandateIds:
+ targetMandateIds = mandateIds
+ else:
+ allSettings = self.db.getRecordset(BillingSettings)
+ targetMandateIds = [s.get("mandateId") for s in allSettings if s.get("mandateId")]
+
+ for mandateId in targetMandateIds:
+ transactions = self.getTransactionsByMandate(mandateId, limit=limit)
+
+ # Get mandate name
+ mandate = appInterface.getMandate(mandateId)
+ mandateName = ""
+ if mandate:
+ mandateName = getattr(mandate, 'name', None) or (mandate.get("name", "") if isinstance(mandate, dict) else "")
+
+ for t in transactions:
+ t["mandateId"] = mandateId
+ t["mandateName"] = mandateName
+ allTransactions.append(t)
+
+ except Exception as e:
+ logger.error(f"Error getting mandate transactions: {e}")
+
+ # Sort by creation date descending and limit
+ allTransactions.sort(key=lambda x: x.get("_createdAt", ""), reverse=True)
+ return allTransactions[:limit]
+
+ # =========================================================================
+ # User View Operations (User-Level with RBAC)
+ # =========================================================================
+
+ def getUserBalancesForMandates(self, mandateIds: List[str] = None) -> List[Dict[str, Any]]:
+ """
+ Get all user-level balances for specified mandates.
+
+ Args:
+ mandateIds: Optional list of mandate IDs to filter. If None, returns all.
+
+ Returns:
+ List of user balance dicts with mandate and user context
+ """
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+
+ balances = []
+
+ try:
+ appInterface = getAppInterface(self.currentUser)
+
+ # Get all user accounts
+ accountFilter = {"accountType": AccountTypeEnum.USER.value}
+ allAccounts = self.db.getRecordset(BillingAccount, recordFilter=accountFilter)
+
+ # Filter by mandate if specified
+ if mandateIds:
+ allAccounts = [acc for acc in allAccounts if acc.get("mandateId") in mandateIds]
+
+ # Get all relevant settings in one query
+ settingsMap = {}
+ allSettings = self.db.getRecordset(BillingSettings)
+ for s in allSettings:
+ settingsMap[s.get("mandateId")] = s
+
+ # Get user info efficiently
+ userIds = list(set(acc.get("userId") for acc in allAccounts if acc.get("userId")))
+ userMap = {}
+ for userId in userIds:
+ user = appInterface.getUser(userId)
+ if user:
+ displayName = getattr(user, 'displayName', None) or (user.get("displayName") if isinstance(user, dict) else None)
+ email = getattr(user, 'email', None) or (user.get("email") if isinstance(user, dict) else None)
+ userMap[userId] = displayName or email or userId
+
+ # Get mandate info efficiently
+ mandateMap = {}
+ mandateIdList = list(set(acc.get("mandateId") for acc in allAccounts if acc.get("mandateId")))
+ for mandateId in mandateIdList:
+ mandate = appInterface.getMandate(mandateId)
+ if mandate:
+ mandateName = getattr(mandate, 'name', None) or (mandate.get("name", "") if isinstance(mandate, dict) else "")
+ mandateMap[mandateId] = mandateName
+
+ for account in allAccounts:
+ mandateId = account.get("mandateId")
+ userId = account.get("userId")
+
+ if not mandateId or not userId:
+ continue
+
+ settings = settingsMap.get(mandateId)
+ if not settings:
+ continue
+
+ balance = account.get("balance", 0.0)
+ warningThreshold = account.get("warningThreshold", 0.0)
+
+ balances.append({
+ "accountId": account.get("id"),
+ "mandateId": mandateId,
+ "mandateName": mandateMap.get(mandateId, ""),
+ "userId": userId,
+ "userName": userMap.get(userId, userId),
+ "balance": balance,
+ "warningThreshold": warningThreshold,
+ "isWarning": balance <= warningThreshold,
+ "enabled": account.get("enabled", True)
+ })
+
+ except Exception as e:
+ logger.error(f"Error getting user balances for mandates: {e}")
+
+ return balances
+
+ def getUserTransactionsForMandates(self, mandateIds: List[str] = None, limit: int = 100) -> List[Dict[str, Any]]:
+ """
+ Get all user-level transactions for specified mandates.
+
+ Args:
+ mandateIds: Optional list of mandate IDs to filter. If None, returns all.
+ limit: Maximum number of results
+
+ Returns:
+ List of transaction dicts with mandate and user context
+ """
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+
+ allTransactions = []
+
+ try:
+ appInterface = getAppInterface(self.currentUser)
+
+ # Get all user accounts
+ accountFilter = {"accountType": AccountTypeEnum.USER.value}
+ allAccounts = self.db.getRecordset(BillingAccount, recordFilter=accountFilter)
+
+ # Filter by mandate if specified
+ if mandateIds:
+ allAccounts = [acc for acc in allAccounts if acc.get("mandateId") in mandateIds]
+
+ # Build account to user/mandate mapping
+ accountMap = {}
+ for acc in allAccounts:
+ accountMap[acc.get("id")] = {
+ "mandateId": acc.get("mandateId"),
+ "userId": acc.get("userId")
+ }
+
+ # Get user info efficiently
+ userIds = list(set(acc.get("userId") for acc in allAccounts if acc.get("userId")))
+ userMap = {}
+ for userId in userIds:
+ user = appInterface.getUser(userId)
+ if user:
+ displayName = getattr(user, 'displayName', None) or (user.get("displayName") if isinstance(user, dict) else None)
+ email = getattr(user, 'email', None) or (user.get("email") if isinstance(user, dict) else None)
+ userMap[userId] = displayName or email or userId
+
+ # Get mandate info efficiently
+ mandateMap = {}
+ mandateIdList = list(set(acc.get("mandateId") for acc in allAccounts if acc.get("mandateId")))
+ for mandateId in mandateIdList:
+ mandate = appInterface.getMandate(mandateId)
+ if mandate:
+ mandateName = getattr(mandate, 'name', None) or (mandate.get("name", "") if isinstance(mandate, dict) else "")
+ mandateMap[mandateId] = mandateName
+
+ # Get transactions for all accounts
+ for account in allAccounts:
+ accountId = account.get("id")
+ if not accountId:
+ continue
+
+ transactions = self.getTransactions(accountId, limit=limit)
+ accountInfo = accountMap.get(accountId, {})
+ mandateId = accountInfo.get("mandateId")
+ userId = accountInfo.get("userId")
+
+ for t in transactions:
+ t["mandateId"] = mandateId
+ t["mandateName"] = mandateMap.get(mandateId, "")
+ t["userId"] = userId
+ t["userName"] = userMap.get(userId, userId)
+ allTransactions.append(t)
+
+ except Exception as e:
+ logger.error(f"Error getting user transactions for mandates: {e}")
+
+ # Sort by creation date descending and limit
+ allTransactions.sort(key=lambda x: x.get("_createdAt", ""), reverse=True)
+ return allTransactions[:limit]
diff --git a/modules/interfaces/interfaceDbChat.py b/modules/interfaces/interfaceDbChat.py
index 0aec7fe0..0a3971a8 100644
--- a/modules/interfaces/interfaceDbChat.py
+++ b/modules/interfaces/interfaceDbChat.py
@@ -1634,11 +1634,12 @@ class ChatObjects:
"item": chatLog
})
- # Get stats list
+ # Get stats - ChatStat model supports _createdAt via model_config extra="allow"
stats = self.getStats(workflowId)
for stat in stats:
# Apply timestamp filtering in Python
- stat_timestamp = stat.createdAt if hasattr(stat, 'createdAt') else getUtcTimestamp()
+ # Use _createdAt (system field from DB, preserved via model_config extra="allow")
+ stat_timestamp = getattr(stat, '_createdAt', None) or getUtcTimestamp()
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
continue
diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py
index ffeea594..6cc0a03b 100644
--- a/modules/routes/routeBilling.py
+++ b/modules/routes/routeBilling.py
@@ -74,6 +74,8 @@ class TransactionResponse(BaseModel):
featureCode: Optional[str]
aicoreProvider: Optional[str]
createdAt: Optional[datetime]
+ mandateId: Optional[str] = None
+ mandateName: Optional[str] = None
class AccountSummary(BaseModel):
@@ -97,6 +99,53 @@ class UsageReportResponse(BaseModel):
costByFeature: Dict[str, float]
+# =============================================================================
+# Response Models for Mandate/User Views
+# =============================================================================
+
+class MandateBalanceResponse(BaseModel):
+ """Mandate-level balance summary."""
+ mandateId: str
+ mandateName: str
+ billingModel: str
+ totalBalance: float
+ userCount: int
+ defaultUserCredit: float
+ warningThresholdPercent: float
+ blockOnZeroBalance: bool
+
+
+class UserBalanceResponse(BaseModel):
+ """User-level balance summary."""
+ accountId: str
+ mandateId: str
+ mandateName: str
+ userId: str
+ userName: str
+ balance: float
+ warningThreshold: float
+ isWarning: bool
+ enabled: bool
+
+
+class UserTransactionResponse(BaseModel):
+ """User-level transaction with user context."""
+ id: str
+ accountId: str
+ transactionType: TransactionTypeEnum
+ amount: float
+ description: str
+ referenceType: Optional[ReferenceTypeEnum]
+ workflowId: Optional[str]
+ featureCode: Optional[str]
+ aicoreProvider: Optional[str]
+ createdAt: Optional[datetime]
+ mandateId: Optional[str] = None
+ mandateName: Optional[str] = None
+ userId: Optional[str] = None
+ userName: Optional[str] = None
+
+
# =============================================================================
# Router Setup
# =============================================================================
@@ -186,7 +235,7 @@ async def getTransactions(
ctx: RequestContext = Depends(getRequestContext)
):
"""
- Get transaction history for the current mandate.
+ Get transaction history across all mandates the user belongs to.
"""
try:
billingService = getBillingService(
@@ -195,7 +244,8 @@ async def getTransactions(
featureCode="billing"
)
- transactions = billingService.getTransactionHistory(limit=limit)
+ # Fetch enough transactions for pagination
+ transactions = billingService.getTransactionHistory(limit=offset + limit)
# Convert to response model
result = []
@@ -210,7 +260,9 @@ async def getTransactions(
workflowId=t.get("workflowId"),
featureCode=t.get("featureCode"),
aicoreProvider=t.get("aicoreProvider"),
- createdAt=t.get("_createdAt")
+ createdAt=t.get("_createdAt"),
+ mandateId=t.get("mandateId"),
+ mandateName=t.get("mandateName")
))
return result
@@ -607,3 +659,188 @@ async def getTransactionsAdmin(
except Exception as e:
logger.error(f"Error getting billing transactions for mandate {targetMandateId}: {e}")
raise HTTPException(status_code=500, detail=str(e))
+
+
+# =============================================================================
+# Mandate View Endpoints (for Admins)
+# =============================================================================
+
+@router.get("/view/mandates/balances", response_model=List[MandateBalanceResponse])
+@limiter.limit("30/minute")
+async def getMandateViewBalances(
+ request: Request,
+ ctx: RequestContext = Depends(getRequestContext),
+ _admin = Depends(requireSysAdmin)
+):
+ """
+ Get mandate-level balances (SysAdmin only).
+ Shows aggregated balances per mandate.
+ """
+ try:
+ billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
+ balances = billingInterface.getMandateBalances()
+
+ return [MandateBalanceResponse(**b) for b in balances]
+
+ except Exception as e:
+ logger.error(f"Error getting mandate view balances: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/view/mandates/transactions", response_model=List[TransactionResponse])
+@limiter.limit("30/minute")
+async def getMandateViewTransactions(
+ request: Request,
+ limit: int = Query(default=100, ge=1, le=1000),
+ ctx: RequestContext = Depends(getRequestContext),
+ _admin = Depends(requireSysAdmin)
+):
+ """
+ Get all transactions across mandates (SysAdmin only).
+ """
+ try:
+ billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
+ transactions = billingInterface.getMandateTransactions(limit=limit)
+
+ result = []
+ for t in transactions:
+ result.append(TransactionResponse(
+ id=t.get("id"),
+ accountId=t.get("accountId"),
+ transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
+ amount=t.get("amount", 0.0),
+ description=t.get("description", ""),
+ referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
+ workflowId=t.get("workflowId"),
+ featureCode=t.get("featureCode"),
+ aicoreProvider=t.get("aicoreProvider"),
+ createdAt=t.get("_createdAt"),
+ mandateId=t.get("mandateId"),
+ mandateName=t.get("mandateName")
+ ))
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error getting mandate view transactions: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+# =============================================================================
+# User View Endpoints (RBAC-based)
+# =============================================================================
+
+@router.get("/view/users/balances", response_model=List[UserBalanceResponse])
+@limiter.limit("30/minute")
+async def getUserViewBalances(
+ request: Request,
+ ctx: RequestContext = Depends(getRequestContext)
+):
+ """
+ Get user-level balances.
+ - SysAdmin: sees all user balances across all mandates
+ - MandateAdmin: sees user balances for mandates they manage
+ - Regular user: sees only their own balances
+ """
+ try:
+ billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
+
+ # Determine which mandates the user has access to
+ if ctx.user.isSysAdmin:
+ # SysAdmin sees all
+ mandateIds = None
+ else:
+ # Get mandates where user is admin or has billing access
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+ appInterface = getAppInterface(ctx.user)
+ userMandates = appInterface.getUserMandates(ctx.user.id)
+
+ # Filter to only mandates where user has admin role
+ # For simplicity, we'll check if user is admin in any mandate
+ mandateIds = []
+ for um in userMandates:
+ mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
+ if mandateId:
+ mandateIds.append(mandateId)
+
+ if not mandateIds:
+ return []
+
+ allBalances = billingInterface.getUserBalancesForMandates(mandateIds)
+
+ # Non-admin users only see their own balances
+ if not ctx.user.isSysAdmin:
+ allBalances = [b for b in allBalances if b.get("userId") == ctx.user.id]
+
+ return [UserBalanceResponse(**b) for b in allBalances]
+
+ except Exception as e:
+ logger.error(f"Error getting user view balances: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
+
+
+@router.get("/view/users/transactions", response_model=List[UserTransactionResponse])
+@limiter.limit("30/minute")
+async def getUserViewTransactions(
+ request: Request,
+ limit: int = Query(default=100, ge=1, le=1000),
+ ctx: RequestContext = Depends(getRequestContext)
+):
+ """
+ Get user-level transactions.
+ - SysAdmin: sees all user transactions across all mandates
+ - MandateAdmin: sees user transactions for mandates they manage
+ - Regular user: sees only their own transactions
+ """
+ try:
+ billingInterface = getBillingInterface(ctx.user, ctx.mandateId)
+
+ # Determine which mandates the user has access to
+ if ctx.user.isSysAdmin:
+ # SysAdmin sees all
+ mandateIds = None
+ else:
+ # Get mandates where user has access
+ from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
+ appInterface = getAppInterface(ctx.user)
+ userMandates = appInterface.getUserMandates(ctx.user.id)
+
+ mandateIds = []
+ for um in userMandates:
+ mandateId = getattr(um, 'mandateId', None) or (um.get("mandateId") if isinstance(um, dict) else None)
+ if mandateId:
+ mandateIds.append(mandateId)
+
+ if not mandateIds:
+ return []
+
+ allTransactions = billingInterface.getUserTransactionsForMandates(mandateIds, limit=limit)
+
+ # Non-admin users only see their own transactions
+ if not ctx.user.isSysAdmin:
+ allTransactions = [t for t in allTransactions if t.get("userId") == ctx.user.id]
+
+ result = []
+ for t in allTransactions:
+ result.append(UserTransactionResponse(
+ id=t.get("id"),
+ accountId=t.get("accountId"),
+ transactionType=TransactionTypeEnum(t.get("transactionType", "DEBIT")),
+ amount=t.get("amount", 0.0),
+ description=t.get("description", ""),
+ referenceType=ReferenceTypeEnum(t["referenceType"]) if t.get("referenceType") else None,
+ workflowId=t.get("workflowId"),
+ featureCode=t.get("featureCode"),
+ aicoreProvider=t.get("aicoreProvider"),
+ createdAt=t.get("_createdAt"),
+ mandateId=t.get("mandateId"),
+ mandateName=t.get("mandateName"),
+ userId=t.get("userId"),
+ userName=t.get("userName")
+ ))
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error getting user view transactions: {e}")
+ raise HTTPException(status_code=500, detail=str(e))
diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py
index d8ef3bef..4ee634ed 100644
--- a/modules/routes/routeSecurityGoogle.py
+++ b/modules/routes/routeSecurityGoogle.py
@@ -487,6 +487,10 @@ async def auth_callback(code: str, state: str, request: Request, response: Respo
connection.externalId = user_info.get("id")
connection.externalUsername = user_info.get("email")
connection.externalEmail = user_info.get("email")
+ # Store actually granted scopes for this connection
+ granted_scopes_list = granted_scopes.split(" ") if granted_scopes else SCOPES
+ connection.grantedScopes = granted_scopes_list
+ logger.info(f"Storing granted scopes for connection {connection_id}: {granted_scopes_list}")
# Update connection record directly
rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump())
diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py
index 68bf6fe8..0abb2f56 100644
--- a/modules/routes/routeSecurityMsft.py
+++ b/modules/routes/routeSecurityMsft.py
@@ -498,6 +498,9 @@ async def auth_callback(code: str, state: str, request: Request, response: Respo
connection.externalId = user_info.get("id")
connection.externalUsername = user_info.get("userPrincipalName")
connection.externalEmail = user_info.get("mail")
+ # Store granted scopes for this connection
+ connection.grantedScopes = SCOPES
+ logger.info(f"Storing granted scopes for connection {connection_id}: {SCOPES}")
# Update connection record directly
rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump())
diff --git a/modules/services/serviceBilling/mainServiceBilling.py b/modules/services/serviceBilling/mainServiceBilling.py
index c7a08a1c..472e0b58 100644
--- a/modules/services/serviceBilling/mainServiceBilling.py
+++ b/modules/services/serviceBilling/mainServiceBilling.py
@@ -367,7 +367,7 @@ class BillingService:
def getTransactionHistory(self, limit: int = 100) -> List[Dict[str, Any]]:
"""
- Get transaction history for the current mandate.
+ Get transaction history for the user across all mandates.
Args:
limit: Maximum number of transactions
@@ -375,7 +375,7 @@ class BillingService:
Returns:
List of transactions
"""
- return self._billingInterface.getTransactionsByMandate(self.mandateId, limit=limit)
+ return self._billingInterface.getTransactionsForUser(self.currentUser.id, limit=limit)
# ============================================================================
diff --git a/modules/services/serviceGeneration/mainServiceGeneration.py b/modules/services/serviceGeneration/mainServiceGeneration.py
index a49b78c7..4720c9a0 100644
--- a/modules/services/serviceGeneration/mainServiceGeneration.py
+++ b/modules/services/serviceGeneration/mainServiceGeneration.py
@@ -74,6 +74,14 @@ class GenerationService:
document_data_dict = document_data.dict()
elif isinstance(document_data, dict):
document_data_dict = document_data
+ elif isinstance(document_data, str):
+ # JSON-String: parsen und als dict speichern (z.B. von outlook.composeAndDraftEmailWithContext)
+ import json
+ try:
+ document_data_dict = json.loads(document_data)
+ except json.JSONDecodeError:
+ # Kein valides JSON - als plain text speichern
+ document_data_dict = {"data": document_data}
else:
document_data_dict = {"data": str(document_data)}
diff --git a/modules/system/mainSystem.py b/modules/system/mainSystem.py
index e80efbe6..c48add01 100644
--- a/modules/system/mainSystem.py
+++ b/modules/system/mainSystem.py
@@ -96,21 +96,13 @@ NAVIGATION_SECTIONS = [
"title": {"en": "BILLING", "de": "BILLING", "fr": "FACTURATION"},
"order": 35,
"items": [
- {
- "id": "billing-dashboard",
- "objectKey": "ui.billing.dashboard",
- "label": {"en": "Balance", "de": "Guthaben", "fr": "Solde"},
- "icon": "FaWallet",
- "path": "/billing",
- "order": 10,
- },
{
"id": "billing-transactions",
"objectKey": "ui.billing.transactions",
- "label": {"en": "Transactions", "de": "Transaktionen", "fr": "Transactions"},
- "icon": "FaListAlt",
+ "label": {"en": "Billing", "de": "Billing", "fr": "Facturation"},
+ "icon": "FaWallet",
"path": "/billing/transactions",
- "order": 20,
+ "order": 10,
},
],
},
diff --git a/modules/workflows/methods/methodOutlook/actions/composeAndDraftEmailWithContext.py b/modules/workflows/methods/methodOutlook/actions/composeAndDraftEmailWithContext.py
index 59604896..e8bc94b3 100644
--- a/modules/workflows/methods/methodOutlook/actions/composeAndDraftEmailWithContext.py
+++ b/modules/workflows/methods/methodOutlook/actions/composeAndDraftEmailWithContext.py
@@ -13,16 +13,17 @@ logger = logging.getLogger(__name__)
async def composeAndDraftEmailWithContext(self, parameters: Dict[str, Any]) -> ActionResult:
try:
connectionReference = parameters.get("connectionReference")
- to = parameters.get("to")
+ to = parameters.get("to") or [] # Optional for drafts - can save draft without recipients
context = parameters.get("context")
- documentList = parameters.get("documentList", [])
- cc = parameters.get("cc", [])
- bcc = parameters.get("bcc", [])
- emailStyle = parameters.get("emailStyle", "business")
- maxLength = parameters.get("maxLength", 1000)
+ documentList = parameters.get("documentList") or []
+ cc = parameters.get("cc") or []
+ bcc = parameters.get("bcc") or []
+ emailStyle = parameters.get("emailStyle") or "business"
+ maxLength = parameters.get("maxLength") or 1000
- if not connectionReference or not to or not context:
- return ActionResult.isFailure(error="connectionReference, to, and context are required")
+ # Only connectionReference and context are required - to is optional for drafts
+ if not connectionReference or not context:
+ return ActionResult.isFailure(error="connectionReference and context are required")
# Convert single values to lists for all recipient parameters
if isinstance(to, str):
@@ -82,12 +83,15 @@ async def composeAndDraftEmailWithContext(self, parameters: Dict[str, Any]) -> A
# Escape only the user-controlled context to prevent prompt injection
escaped_context = context.replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
+ # Build recipients text for prompt
+ recipients_text = f"Recipients: {to}" if to else "Recipients: (not specified - this is a draft)"
+
ai_prompt = f"""Compose an email based on this context:
-------
{escaped_context}
-------
-Recipients: {to}
+{recipients_text}
Style: {emailStyle}
Max length: {maxLength} characters
{doc_list_text}
diff --git a/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py b/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py
index 9b7fb011..15c35f44 100644
--- a/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py
+++ b/modules/workflows/methods/methodOutlook/actions/sendDraftEmail.py
@@ -90,15 +90,20 @@ async def sendDraftEmail(self, parameters: Dict[str, Any]) -> ActionResult:
else:
jsonContent = str(fileData)
- # Parse JSON - handle both direct JSON and JSON wrapped in documentData
+ # Parse JSON - handle ActionDocument format with validationMetadata wrapper
try:
draftEmailData = json.loads(jsonContent)
- # If the JSON contains a 'documentData' field, extract it
+ # ActionDocument format: { "validationMetadata": {...}, "documentData": {...} }
+ # Extract documentData which contains the actual draft email data
if isinstance(draftEmailData, dict) and 'documentData' in draftEmailData:
- documentDataStr = draftEmailData['documentData']
- if isinstance(documentDataStr, str):
- draftEmailData = json.loads(documentDataStr)
+ documentDataContent = draftEmailData['documentData']
+ # documentData should be a dict (parsed from JSON by processSingleDocument)
+ if isinstance(documentDataContent, dict):
+ draftEmailData = documentDataContent
+ elif isinstance(documentDataContent, str):
+ # Legacy/fallback: parse if still a string
+ draftEmailData = json.loads(documentDataContent)
# Validate draft email structure
if not isinstance(draftEmailData, dict):
diff --git a/modules/workflows/methods/methodOutlook/helpers/connection.py b/modules/workflows/methods/methodOutlook/helpers/connection.py
index 8f3daded..12621fd3 100644
--- a/modules/workflows/methods/methodOutlook/helpers/connection.py
+++ b/modules/workflows/methods/methodOutlook/helpers/connection.py
@@ -84,6 +84,14 @@ class ConnectionHelper:
elif response.status_code == 403:
logger.error("Permission denied - connection lacks necessary mail permissions")
logger.error("Required scopes: Mail.ReadWrite, Mail.Send, Mail.ReadWrite.Shared")
+ logger.error("Solution: User must reconnect and grant mail permissions")
+ return False
+ elif response.status_code == 404:
+ # 404 on /me/mailFolders typically means the token lacks mail scopes
+ # This happens when the connection was created without mail permissions
+ logger.error("Mail API not accessible (404) - token likely lacks mail scopes")
+ logger.error("This usually means the connection was created without Mail.ReadWrite permission")
+ logger.error("Solution: User must delete the connection and reconnect, granting mail permissions")
return False
else:
logger.warning(f"Permission check returned status {response.status_code}")
diff --git a/modules/workflows/methods/methodOutlook/methodOutlook.py b/modules/workflows/methods/methodOutlook/methodOutlook.py
index 4a978b7a..8d80cef5 100644
--- a/modules/workflows/methods/methodOutlook/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook/methodOutlook.py
@@ -150,8 +150,8 @@ class MethodOutlook(MethodBase):
name="to",
type="List[str]",
frontendType=FrontendType.MULTISELECT,
- required=True,
- description="Recipient email addresses"
+ required=False,
+ description="Recipient email addresses (optional for drafts)"
),
"context": WorkflowActionParameter(
name="context",
diff --git a/modules/workflows/processing/modes/modeDynamic.py b/modules/workflows/processing/modes/modeDynamic.py
index 1510e512..e59a9253 100644
--- a/modules/workflows/processing/modes/modeDynamic.py
+++ b/modules/workflows/processing/modes/modeDynamic.py
@@ -204,29 +204,29 @@ class DynamicMode(BaseMode):
if quality_score is None:
quality_score = 0.0
logger.info(f"Content validation: {validationResult.get('overallSuccess', False)} (quality: {quality_score:.2f})")
+
+ # Record validation result for adaptive learning
+ actionValue = selection.get('action', 'unknown')
+ actionContext = {
+ 'actionName': actionValue,
+ 'workflowId': context.workflowId
+ }
+
+ self.adaptiveLearningEngine.recordValidationResult(
+ validationResult,
+ actionContext,
+ context.workflowId,
+ step
+ )
+
+ # Learn from feedback - use taskIntent (task-level), not workflowIntent
+ feedback = self._collectFeedback(result, validationResult, self.taskIntent)
+ self.learningEngine.learnFromFeedback(feedback, context, self.taskIntent)
+
+ # Update progress - use taskIntent (task-level), not workflowIntent
+ self.progressTracker.updateOperation(result, validationResult, self.taskIntent)
else:
logger.info("Content validation skipped: no documents to validate")
-
- # NEW: Record validation result for adaptive learning
- actionValue = selection.get('action', 'unknown')
- actionContext = {
- 'actionName': actionValue,
- 'workflowId': context.workflowId
- }
-
- self.adaptiveLearningEngine.recordValidationResult(
- validationResult,
- actionContext,
- context.workflowId,
- step
- )
-
- # NEW: Learn from feedback - use taskIntent (task-level), not workflowIntent
- feedback = self._collectFeedback(result, validationResult, self.taskIntent)
- self.learningEngine.learnFromFeedback(feedback, context, self.taskIntent)
-
- # NEW: Update progress - use taskIntent (task-level), not workflowIntent
- self.progressTracker.updateOperation(result, validationResult, self.taskIntent)
decision = await self._refineDecide(context, observation)
diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py
index b15c66b7..030a966f 100644
--- a/modules/workflows/workflowManager.py
+++ b/modules/workflows/workflowManager.py
@@ -430,11 +430,33 @@ The following is the user's original input message. Analyze intent, normalize th
workflow = self.services.workflow
checkWorkflowStopped(self.services)
+ # Send "first" message to mark round start (consistent with full workflow)
+ normalizedRequest = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+ roundNum = workflow.currentRound
+ contextLabel = f"round{roundNum}_usercontext"
+
+ firstMessageData = {
+ "workflowId": workflow.id,
+ "role": "user",
+ "message": normalizedRequest,
+ "status": "first",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": self.services.utils.timestampGetUtc(),
+ "documentsLabel": contextLabel,
+ "documents": [],
+ "roundNumber": roundNum,
+ "taskNumber": 0,
+ "actionNumber": 0,
+ "taskProgress": "pending",
+ "actionProgress": "pending"
+ }
+ self.services.chat.storeMessageWithDocuments(workflow, firstMessageData, [])
+
# Get user language if available
userLanguage = getattr(self.services, 'currentUserLanguage', None)
# Execute fast path - use normalizedRequest if available, otherwise use raw prompt
- normalizedPrompt = getattr(self.services, 'currentUserPromptNormalized', None) or userInput.prompt
+ normalizedPrompt = normalizedRequest
result = await self.workflowProcessor.fastPathExecute(
prompt=normalizedPrompt,
documents=documents,
@@ -491,14 +513,6 @@ The following is the user's original input message. Analyze intent, normalize th
}
chatDocuments.append(chatDoc)
- # Mark workflow as completed BEFORE storing message (so UI polling stops)
- workflow.status = "completed"
- workflow.lastActivity = self.services.utils.timestampGetUtc()
- self.services.chat.updateWorkflow(workflow.id, {
- "status": "completed",
- "lastActivity": workflow.lastActivity
- })
-
# Create ChatMessage with fast path response (in user's language)
messageData = {
"workflowId": workflow.id,
@@ -518,9 +532,18 @@ The following is the user's original input message. Analyze intent, normalize th
"actionProgress": "success"
}
- # Store message with documents
+ # Store message with documents BEFORE marking workflow as completed
+ # This ensures UI polling sees the "last" message before status changes
self.services.chat.storeMessageWithDocuments(workflow, messageData, chatDocuments)
+ # Mark workflow as completed AFTER storing message
+ workflow.status = "completed"
+ workflow.lastActivity = self.services.utils.timestampGetUtc()
+ self.services.chat.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
logger.info(f"Fast path completed successfully, response length: {len(responseText)} chars")
except Exception as e: