From 6c8cc843ce0db07d411eb77d7186b65b6f3c82e6 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Tue, 17 Mar 2026 19:19:27 +0100 Subject: [PATCH] file system and stt and ttss reevisions --- env_dev.env | 1 - env_int.env | 3 +- env_prod.env | 3 +- modules/connectors/connectorVoiceGoogle.py | 152 ++++- modules/datamodels/datamodelFiles.py | 4 +- .../commcoach/routeFeatureCommcoach.py | 5 +- .../features/commcoach/serviceCommcoach.py | 16 +- .../connectors/accountingConnectorRma.py | 21 +- modules/interfaces/interfaceDbBilling.py | 23 + modules/interfaces/interfaceDbManagement.py | 346 +++++++++++ modules/interfaces/interfaceVoiceObjects.py | 43 +- modules/routes/routeBilling.py | 248 ++++++-- modules/routes/routeDataFiles.py | 239 ++++++++ modules/routes/routeVoiceGoogle.py | 394 +++++++------ .../services/serviceAgent/mainServiceAgent.py | 538 +++++++++++++++++- .../services/serviceAgent/sandboxExecutor.py | 108 ++++ .../services/serviceBilling/stripeCheckout.py | 46 +- .../services/serviceChat/mainServiceChat.py | 17 +- .../renderers/rendererDocx.py | 10 + 19 files changed, 1922 insertions(+), 295 deletions(-) create mode 100644 modules/serviceCenter/services/serviceAgent/sandboxExecutor.py diff --git a/env_dev.env b/env_dev.env index d33c5598..7dfa75e4 100644 --- a/env_dev.env +++ b/env_dev.env @@ -39,7 +39,6 @@ Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback STRIPE_SECRET_KEY_SECRET = DEV_ENC:Z0FBQUFBQnBudkpGWDkxSldfM0NCZ3dmbHY5cS1nQlI3UWZ4ZWRrNVdUdEFKa25RckRiQWY0c1E5MjVsZzlfRkZEU0VFU2tNQ01qZnRNQ0pZVU9hVFN6OEU0RXhwdTl3algzLWJlSXRhYmZlMHltSC1XejlGWEU5TDF1LUlYNEh1aG9tRFI4YmlCYzUyei02U1dabWoyb0N2dVFSb1RhWTNnQjBCZkFjV0FfOWdYdDVpX1k5R2pYM1R6SHRiaE10V1l1dnQybjVHWDRiQUJLM0UxRDZnczhJZGFsc3JhOU82QT09 STRIPE_WEBHOOK_SECRET = DEV_ENC:Z0FBQUFBQnBudkpGcHNWTWpBWkFHRExtdU01N3RyZzNsMjhUS3NiVTNCZmMwN2NEcFZ6UkQ1a2I0aUkyNU4wR2dUdHJXYmtkaEFRUnFpcThObHBEQmJkdEFnT1FXeUxOTlU3UDFNRzl6LWdpRFpYdExvY3FTTG9MTkswdEhrVkNKQVFucnBjSnhLNm4= STRIPE_API_VERSION = 2026-01-28.clover -APP_FRONTEND_URL = http://localhost:5176 # AI configuration Connector_AiOpenai_API_SECRET = DEV_ENC:Z0FBQUFBQnBaSnM4TWFRRmxVQmNQblVIYmc1Y0Q3aW9zZUtDWlNWdGZjbFpncGp2NHN2QjkxMWxibUJnZDBId252MWk5TXN3Yk14ajFIdi1CTkx2ZWx2QzF5OFR6LUx5azQ3dnNLaXJBOHNxc0tlWmtZcTFVelF4eXBSM2JkbHd2eTM0VHNXdHNtVUprZWtPVzctNlJsZHNmM20tU1N6Q1Q2cHFYSi1tNlhZNDNabTVuaEVGWmIydEhadTcyMlBURmw2aUJxOF9GTzR0dTZiNGZfOFlHaVpPZ1A1LXhhOEFtN1J5TEVNNWtMcGpyNkMzSl8xRnZsaTF1WTZrOUZmb0cxVURjSGFLS2dIYTQyZEJtTm90bEYxVWxNNXVPdTVjaVhYbXhxT3JsVDM5VjZMVFZKSE1tZnM9 diff --git a/env_int.env b/env_int.env index 95e28430..783c7461 100644 --- a/env_int.env +++ b/env_int.env @@ -36,10 +36,9 @@ Service_MSFT_REDIRECT_URI = https://gateway-int.poweron-center.net/api/msft/auth Service_GOOGLE_REDIRECT_URI = https://gateway-int.poweron-center.net/api/google/auth/callback # Stripe Billing (both end with _SECRET for encryption script) -STRIPE_SECRET_KEY_SECRET = INT_ENC:Z0FBQUFBQnBudkpGOTVvaGhuSTVRTW5Fa0x3akFwZktQX21DZnFGQWgteDJwUWpnbTV5enRmbmZyWXlpY2lKVVNINkFManhNMnFQZ1VnOUxSQ0FTZFBVNmdhSGhwWFBHaDNnSzZXVnIxRmNUcnBQN0c3R19Xb2g1QnBxVXpiSXRRTk5NOGtzcU5HcUNiWDNvdmhYbGFkWkRCR25iVEJKTmwzcGRBZjNjaVNiWDJDaWlhLWpfdkdXYlQyUWk2NndKYW5lYXBaTkRzMWZsZjlFb3JOX1NzbkM4NWFyQU9MajZlZz09 +STRIPE_SECRET_KEY_SECRET = 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 STRIPE_WEBHOOK_SECRET = INT_ENC:Z0FBQUFBQnBudkpGamJBNW91VUdEaThWRTFiTWpyb3NqSDJJcGtjNkhUVVZqVElxUWExY05KcllSYVk1SkRuS1NjYWpZUk1uU29nb2pzdXUxRzBsOEgyRWtmUEw3dUF4ejFIXzNwTVZRM1R1bVVhTUs4ZHJMT0V4Xy1pcHVfWlBaQV9wVXo5MGlQYXA= STRIPE_API_VERSION = 2026-01-28.clover -APP_FRONTEND_URL = https://nyla-int.poweron-center.net # AI configuration Connector_AiOpenai_API_SECRET = INT_ENC:Z0FBQUFBQnBaSnM4MENkQ2xJVmE5WFZKUkh2SHJFby1YVXN3ZmVxRkptS3ZWRmlwdU93ZEJjSjlMV2NGbU5mS3NCdmFfcmFYTEJNZXFIQ3ozTWE4ZC1pemlQNk9wbjU1d3BPS0ZCTTZfOF8yWmVXMWx0TU1DamlJLVFhSTJXclZsY3hMVWlPcXVqQWtMdER4T252NHZUWEhUOTdIN1VGR3ltazEweXFqQ0lvb0hYWmxQQnpxb0JwcFNhRDNGWXdoRTVJWm9FalZpTUF5b1RqZlRaYnVKYkp0NWR5Vko1WWJ0Wmg2VWJzYXZ0Z3Q4UkpsTldDX2dsekhKMmM4YjRoa2RwemMwYVQwM2cyMFlvaU5mOTVTWGlROU8xY2ZVRXlxZzJqWkxURWlGZGI2STZNb0NpdEtWUnM9 diff --git a/env_prod.env b/env_prod.env index 5e60702c..e2d1189d 100644 --- a/env_prod.env +++ b/env_prod.env @@ -36,10 +36,9 @@ Service_MSFT_REDIRECT_URI = https://gateway-prod.poweron-center.net/api/msft/aut Service_GOOGLE_REDIRECT_URI = https://gateway-prod.poweron-center.net/api/google/auth/callback # Stripe Billing (both end with _SECRET for encryption script) -STRIPE_SECRET_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBudkpGNmx3N3Q4QWdlcXVBSlJuYzVJX2hZRW8wbklJYUI0Rzh2YWRPcWY5dC1rMjhfUDRTOE91TlZyLTBEZkY1N015dmg5akEta1d0M0NpNk9oNDZpQTlMUGlLalV6aVowbl9Jc2hKMVlxbE9aaTZNRUxDQ3VGSnJxN040VERUMDFiekhITXdTR0N4aUxwWGxtcHdlU2NtOVNsSlVpOE0xTkRSdGhnN09UWGxuLURUaFdfQWJ4ZEw3R0c0bVRQaTA1NURhVEZudHY4d2gtTzItOF9TcmMwajFmZz09 +STRIPE_SECRET_KEY_SECRET = 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 STRIPE_WEBHOOK_SECRET = PROD_ENC:Z0FBQUFBQnBudkpGNUpTWldsakYydFhFelBrR1lSaWxYT3kyMENOMUljZTJUZHBWcEhhdWVCMzYxZXQ5b3VlTFVRalFiTVdsbGxrdUx0RDFwSEpsOC1sTDJRTEJNQlA3S3ZaQzBtV1h6bWp5VnlMZUgwUlF3cXYxcnljZVE5SWdzLVg3V0syOWRYS08= STRIPE_API_VERSION = 2026-01-28.clover -APP_FRONTEND_URL = https://nyla.poweron-center.net # AI configuration Connector_AiOpenai_API_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4TWJOVm4xVkx6azRlNDdxN3UxLUdwY2hhdGYxRGp4VFJqYXZIcmkxM1ZyOWV2M0Z4MHdFNkVYQ0ROb1d6LUZFUEdvMHhLMEtXYVBCRzM5TlYyY3ROYWtJRk41cDZxd0tYYi00MjVqMTh4QVcyTXl0bmVocEFHbXQwREpwNi1vODdBNmwzazE5bkpNelE2WXpvblIzWlQwbGdEelI2WXFqT1RibXVHcjNWbVhwYzBOM25XTzNmTDAwUjRvYk4yNjIyZHc5c2RSZzREQUFCdUwyb0ZuOXN1dzI2c2FKdXI4NGxEbk92czZWamJXU3ZSbUlLejZjRklRRk4tLV9aVUFZekI2bTU4OHYxNTUybDg3RVo0ZTh6dXNKRW5GNXVackZvcm9laGI0X3R6V3M9 diff --git a/modules/connectors/connectorVoiceGoogle.py b/modules/connectors/connectorVoiceGoogle.py index 31f5f728..ddb0d864 100644 --- a/modules/connectors/connectorVoiceGoogle.py +++ b/modules/connectors/connectorVoiceGoogle.py @@ -9,7 +9,8 @@ import json import html import asyncio import logging -from typing import Dict, Optional, Any, List +import time +from typing import AsyncGenerator, Dict, Optional, Any, List, Tuple from google.cloud import speech from google.cloud import translate_v2 as translate from google.cloud import texttospeech @@ -403,6 +404,155 @@ class ConnectorGoogleSpeech: "error": str(e) } + async def streamingRecognize( + self, + audioQueue: asyncio.Queue, + language: str = "de-DE", + phraseHints: Optional[list] = None, + ) -> AsyncGenerator[Dict[str, Any], None]: + """ + Stream audio chunks to Google Cloud Speech-to-Text Streaming API. + Google handles silence/endpoint detection natively. + + Args: + audioQueue: Queue of (bytes, bool) tuples. bytes=audio data, bool=isLast. + Send (b"", True) to signal end of stream. + language: Language code + phraseHints: Optional boost phrases + + Yields: + Dicts with keys: isFinal, transcript, confidence, stabilityScore, audioDurationSec + """ + STREAM_LIMIT_SEC = 290 + streamStartTs = time.time() + totalAudioBytes = 0 + + configParams = { + "encoding": speech.RecognitionConfig.AudioEncoding.WEBM_OPUS, + "sample_rate_hertz": 48000, + "audio_channel_count": 1, + "language_code": language, + "enable_automatic_punctuation": True, + "model": "latest_long", + "use_enhanced": True, + } + if phraseHints: + configParams["speech_contexts"] = [speech.SpeechContext(phrases=phraseHints, boost=15.0)] + + recognitionConfig = speech.RecognitionConfig(**configParams) + streamingConfig = speech.StreamingRecognitionConfig( + config=recognitionConfig, + interim_results=True, + single_utterance=False, + ) + + import queue as threadQueue + audioInQ: threadQueue.Queue = threadQueue.Queue() + resultOutQ: asyncio.Queue = asyncio.Queue() + + async def _pumpAudioToThread(): + try: + while True: + item = await audioQueue.get() + audioInQ.put(item) + if item[1]: + return + except asyncio.CancelledError: + audioInQ.put((b"", True)) + + def _requestGenerator(): + nonlocal totalAudioBytes + while True: + try: + chunk, isLast = audioInQ.get(timeout=30.0) + except threadQueue.Empty: + return + if isLast or not chunk: + return + totalAudioBytes += len(chunk) + yield speech.StreamingRecognizeRequest(audio_content=chunk) + + def _runStreamingInThread(): + try: + responseStream = self.speech_client.streaming_recognize( + config=streamingConfig, + requests=_requestGenerator(), + ) + for response in responseStream: + elapsed = time.time() - streamStartTs + estimatedDurationSec = totalAudioBytes / (48000 * 1 * 2) if totalAudioBytes else 0 + + finalTexts = [] + interimTexts = [] + lastFinalConfidence = 0.0 + + for result in response.results: + alt = result.alternatives[0] if result.alternatives else None + if not alt or not alt.transcript.strip(): + continue + if result.is_final: + finalTexts.append(alt.transcript.strip()) + lastFinalConfidence = alt.confidence + else: + interimTexts.append(alt.transcript.strip()) + + for ft in finalTexts: + asyncio.run_coroutine_threadsafe(resultOutQ.put({ + "isFinal": True, + "transcript": ft, + "confidence": lastFinalConfidence, + "stabilityScore": 0.0, + "audioDurationSec": estimatedDurationSec, + }), loop) + + if interimTexts: + combined = " ".join(interimTexts) + asyncio.run_coroutine_threadsafe(resultOutQ.put({ + "isFinal": False, + "transcript": combined, + "confidence": 0.0, + "stabilityScore": 0.0, + "audioDurationSec": estimatedDurationSec, + }), loop) + if elapsed >= STREAM_LIMIT_SEC: + logger.info("Streaming STT approaching 5-min limit, client should reconnect") + asyncio.run_coroutine_threadsafe(resultOutQ.put({ + "isFinal": False, "transcript": "", "confidence": 0.0, + "reconnectRequired": True, "audioDurationSec": 0, + }), loop) + return + except Exception as e: + logger.error(f"Google Streaming STT error: {e}") + asyncio.run_coroutine_threadsafe(resultOutQ.put({ + "error": str(e), + }), loop) + finally: + asyncio.run_coroutine_threadsafe(resultOutQ.put(None), loop) + + loop = asyncio.get_running_loop() + pumpTask = asyncio.ensure_future(_pumpAudioToThread()) + streamFuture = loop.run_in_executor(None, _runStreamingInThread) + + try: + while True: + item = await resultOutQ.get() + if item is None: + break + if "error" in item: + raise RuntimeError(item["error"]) + yield item + finally: + pumpTask.cancel() + await asyncio.shield(streamFuture) + + def calculateSttCostCHF(self, audioDurationSec: float) -> float: + """Google STT cost: ~$0.016/min (standard model).""" + return round((audioDurationSec / 60.0) * 0.016, 8) + + def calculateTtsCostCHF(self, characterCount: int) -> float: + """Google TTS WaveNet cost: ~$0.000004/char.""" + return round(characterCount * 0.000004, 8) + async def translateText(self, text: str, targetLanguage: str = "en", sourceLanguage: str = "de") -> Dict: """ diff --git a/modules/datamodels/datamodelFiles.py b/modules/datamodels/datamodelFiles.py index e14879a0..afaad996 100644 --- a/modules/datamodels/datamodelFiles.py +++ b/modules/datamodels/datamodelFiles.py @@ -14,7 +14,7 @@ class FileItem(BaseModel): model_config = ConfigDict(extra='allow') # Preserve system fields (_createdBy, _createdAt, etc.) id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) mandateId: Optional[str] = Field(default="", description="ID of the mandate this file belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) - featureInstanceId: Optional[str] = Field(default="", description="ID of the feature instance this file belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) + featureInstanceId: Optional[str] = Field(default="", description="ID of the feature instance this file belongs to", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False, "frontend_fk_source": "/api/features/instances", "frontend_fk_display_field": "label"}) fileName: str = Field(description="Name of the file", json_schema_extra={"frontend_type": "text", "frontend_readonly": False, "frontend_required": True}) mimeType: str = Field(description="MIME type of the file", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) fileHash: str = Field(description="Hash of the file", json_schema_extra={"frontend_type": "text", "frontend_readonly": True, "frontend_required": False}) @@ -31,7 +31,7 @@ registerModelLabels( { "id": {"en": "ID", "fr": "ID"}, "mandateId": {"en": "Mandate ID", "fr": "ID du mandat"}, - "featureInstanceId": {"en": "Feature Instance ID", "fr": "ID de l'instance de fonctionnalité"}, + "featureInstanceId": {"en": "Feature Instance", "fr": "Instance de fonctionnalité"}, "fileName": {"en": "fileName", "fr": "Nom de fichier"}, "mimeType": {"en": "MIME Type", "fr": "Type MIME"}, "fileHash": {"en": "File Hash", "fr": "Hash du fichier"}, diff --git a/modules/features/commcoach/routeFeatureCommcoach.py b/modules/features/commcoach/routeFeatureCommcoach.py index 424b94f3..81585a19 100644 --- a/modules/features/commcoach/routeFeatureCommcoach.py +++ b/modules/features/commcoach/routeFeatureCommcoach.py @@ -10,8 +10,10 @@ import json import asyncio import base64 import uuid + + from typing import Optional -from fastapi import APIRouter, HTTPException, Depends, Request +from fastapi import APIRouter, HTTPException, Depends, Request, WebSocket, WebSocketDisconnect, Query from fastapi.responses import StreamingResponse, Response from modules.auth import limiter, getRequestContext, RequestContext @@ -31,7 +33,6 @@ from .datamodelCommcoach import ( StartSessionRequest, CreatePersonaRequest, UpdatePersonaRequest, ) from .serviceCommcoach import CommcoachService, emitSessionEvent, getSessionEventQueue, cleanupSessionEvents - logger = logging.getLogger(__name__) _activeProcessTasks: dict = {} diff --git a/modules/features/commcoach/serviceCommcoach.py b/modules/features/commcoach/serviceCommcoach.py index ac9ad085..be47a917 100644 --- a/modules/features/commcoach/serviceCommcoach.py +++ b/modules/features/commcoach/serviceCommcoach.py @@ -1011,15 +1011,15 @@ class CommcoachService: async def _callAi(self, systemPrompt: str, userPrompt: str): """Call the AI service with the given prompts.""" - from modules.serviceCenter.services.serviceAi.mainServiceAi import AiService + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext - serviceContext = type('Ctx', (), { - 'user': self.currentUser, - 'mandateId': self.mandateId, - 'featureInstanceId': self.instanceId, - 'featureCode': 'commcoach', - })() - aiService = AiService(serviceCenter=serviceContext) + serviceContext = ServiceCenterContext( + user=self.currentUser, + mandate_id=self.mandateId, + feature_instance_id=self.instanceId, + ) + aiService = getService("ai", serviceContext) await aiService.ensureAiObjectsInitialized() aiRequest = AiCallRequest( diff --git a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py index 4ec1ebd7..86dbcac9 100644 --- a/modules/features/trustee/accounting/connectors/accountingConnectorRma.py +++ b/modules/features/trustee/accounting/connectors/accountingConnectorRma.py @@ -3,7 +3,8 @@ """Run My Accounts (Infoniqa) accounting connector. API docs: https://runmyaccountsag.github.io/runmyaccounts-rest-api/ -Auth: API key (incl. ``pat_`` tokens since Sep 2025) via ``X-RMA-KEY`` request header. +Auth: PAT tokens (``pat_...``) via ``Authorization: Bearer``. +Fallback for legacy API keys via ``X-RMA-KEY``. Base URL: https://service.runmyaccounts.com/api/latest/clients/{clientName}/ """ @@ -60,11 +61,15 @@ class AccountingConnectorRma(BaseAccountingConnector): def _buildHeaders(self, config: Dict[str, Any]) -> Dict[str, str]: apiKey = config.get("apiKey", "") - return { - "X-RMA-KEY": apiKey, + headers = { "Accept": "application/json, application/xml, */*", "Content-Type": "application/json", } + if str(apiKey).startswith("pat_"): + headers["Authorization"] = f"Bearer {apiKey}" + else: + headers["X-RMA-KEY"] = apiKey + return headers async def testConnection(self, config: Dict[str, Any]) -> SyncResult: clientName = config.get("clientName", "") @@ -75,13 +80,17 @@ class AccountingConnectorRma(BaseAccountingConnector): url = self._buildUrl(config, "customers") headers = self._buildHeaders(config) - logger.info("RMA testConnection: url=%s, clientName=%s, apiKey=%s...", url, clientName, apiKey[:6] if len(apiKey) > 6 else "***") + authMethod = "Bearer" if str(apiKey).startswith("pat_") else "X-RMA-KEY" + logger.info( + "RMA testConnection: url=%s, clientName=%s, apiKey=%s..., auth=%s", + url, clientName, apiKey[:6] if len(apiKey) > 6 else "***", authMethod, + ) try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp: if resp.status == 200: - logger.info("RMA connection successful") - return SyncResult(success=True) + logger.info("RMA connection successful with auth method: %s", authMethod) + return SyncResult(success=True, rawResponse={"authMethod": authMethod}) body = await resp.text() logger.warning("RMA testConnection failed: status=%s, url=%s, body=%s", resp.status, url, body[:500]) return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:300]}") diff --git a/modules/interfaces/interfaceDbBilling.py b/modules/interfaces/interfaceDbBilling.py index 08c43189..3075966a 100644 --- a/modules/interfaces/interfaceDbBilling.py +++ b/modules/interfaces/interfaceDbBilling.py @@ -679,6 +679,29 @@ class BillingObjects: """ record = StripeWebhookEvent(event_id=event_id) return self.db.recordCreate(StripeWebhookEvent, record.model_dump()) + + def getPaymentTransactionByReferenceId(self, referenceId: str) -> Optional[Dict[str, Any]]: + """ + Find an existing Stripe payment credit transaction by Checkout Session ID. + + Args: + referenceId: Stripe Checkout Session ID (cs_xxx) + + Returns: + Transaction record if found, else None + """ + try: + results = self.db.getRecordset( + BillingTransaction, + recordFilter={ + "referenceType": ReferenceTypeEnum.PAYMENT.value, + "referenceId": referenceId, + } + ) + return results[0] if results else None + except Exception as e: + logger.error(f"Error checking Stripe payment transaction by referenceId: {e}") + return None # ========================================================================= # Balance Check Operations diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py index e065bf6d..f439a7f1 100644 --- a/modules/interfaces/interfaceDbManagement.py +++ b/modules/interfaces/interfaceDbManagement.py @@ -10,6 +10,7 @@ import logging import base64 import hashlib import math +import mimetypes from typing import Dict, Any, List, Optional, Union from modules.connectors.connectorDbPostgre import DatabaseConnector, _get_cached_connector @@ -18,6 +19,7 @@ from modules.security.rbac import RbacClass from modules.datamodels.datamodelRbac import AccessRuleContext from modules.datamodels.datamodelUam import AccessLevel from modules.datamodels.datamodelFiles import FilePreview, FileItem, FileData +from modules.datamodels.datamodelFileFolder import FileFolder from modules.datamodels.datamodelUtils import Prompt from modules.datamodels.datamodelVoice import VoiceSettings from modules.datamodels.datamodelMessaging import ( @@ -1143,6 +1145,350 @@ class ComponentObjects: logger.error(f"Error deleting file {fileId}: {str(e)}") raise FileDeletionError(f"Error deleting file: {str(e)}") + def deleteFilesBatch(self, fileIds: List[str]) -> Dict[str, Any]: + """Delete multiple files in a single SQL batch call.""" + uniqueIds = [str(fid) for fid in dict.fromkeys(fileIds or []) if fid] + if not uniqueIds: + return {"deletedFiles": 0} + + try: + self.db._ensure_connection() + with self.db.connection.cursor() as cursor: + cursor.execute( + 'SELECT "id" FROM "FileItem" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (uniqueIds, self.userId or ""), + ) + accessibleIds = [row["id"] for row in cursor.fetchall()] + + if len(accessibleIds) != len(uniqueIds): + missingIds = sorted(set(uniqueIds) - set(accessibleIds)) + raise FileNotFoundError(f"Files not found or not accessible: {missingIds}") + + cursor.execute('DELETE FROM "FileData" WHERE "id" = ANY(%s)', (accessibleIds,)) + cursor.execute( + 'DELETE FROM "FileItem" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (accessibleIds, self.userId or ""), + ) + deletedFiles = cursor.rowcount + + self.db.connection.commit() + return {"deletedFiles": deletedFiles} + except Exception as e: + logger.error(f"Error deleting files in batch: {e}") + self.db.connection.rollback() + raise FileDeletionError(f"Error deleting files in batch: {str(e)}") + + # ---- Folder methods ---- + + _RESERVED_FOLDER_NAMES = {"(Global)"} + + def _validateFolderName(self, name: str, parentId: Optional[str], excludeFolderId: Optional[str] = None): + """Ensures folder name is not reserved and is unique within parent.""" + if name in self._RESERVED_FOLDER_NAMES: + raise ValueError(f"Folder name '{name}' is reserved") + if not name or not name.strip(): + raise ValueError("Folder name cannot be empty") + existingFolders = self.db.getRecordset(FileFolder, recordFilter={"parentId": parentId or ""}) + for f in existingFolders: + if f.get("name") == name and f.get("id") != excludeFolderId: + raise ValueError(f"Folder '{name}' already exists in this directory") + + def _isDescendantOf(self, folderId: str, ancestorId: str) -> bool: + """Checks if folderId is a descendant of ancestorId (circular reference check).""" + visited = set() + currentId = folderId + while currentId: + if currentId == ancestorId: + return True + if currentId in visited: + break + visited.add(currentId) + folders = self.db.getRecordset(FileFolder, recordFilter={"id": currentId}) + if not folders: + break + currentId = folders[0].get("parentId") + return False + + def getFolder(self, folderId: str) -> Optional[Dict[str, Any]]: + """Returns a folder by ID if it belongs to the current user.""" + folders = self.db.getRecordset(FileFolder, recordFilter={"id": folderId, "_createdBy": self.userId or ""}) + return folders[0] if folders else None + + def listFolders(self, parentId: Optional[str] = None) -> List[Dict[str, Any]]: + """List folders for current user, optionally filtered by parentId.""" + recordFilter = {"_createdBy": self.userId or ""} + if parentId is not None: + recordFilter["parentId"] = parentId + return self.db.getRecordset(FileFolder, recordFilter=recordFilter) + + def createFolder(self, name: str, parentId: Optional[str] = None) -> Dict[str, Any]: + """Create a new folder with unique name validation.""" + self._validateFolderName(name, parentId) + folder = FileFolder( + name=name, + parentId=parentId, + mandateId=self.mandateId or "", + featureInstanceId=self.featureInstanceId or "", + ) + return self.db.recordCreate(FileFolder, folder) + + def renameFolder(self, folderId: str, newName: str) -> bool: + """Rename a folder with unique name validation.""" + folder = self.getFolder(folderId) + if not folder: + raise FileNotFoundError(f"Folder {folderId} not found") + self._validateFolderName(newName, folder.get("parentId"), excludeFolderId=folderId) + return self.db.recordModify(FileFolder, folderId, {"name": newName}) + + def moveFolder(self, folderId: str, targetParentId: Optional[str] = None) -> bool: + """Move a folder to a new parent, with circular reference and unique name checks.""" + folder = self.getFolder(folderId) + if not folder: + raise FileNotFoundError(f"Folder {folderId} not found") + if targetParentId and self._isDescendantOf(targetParentId, folderId): + raise ValueError("Cannot move folder into its own subtree") + self._validateFolderName(folder.get("name", ""), targetParentId, excludeFolderId=folderId) + return self.db.recordModify(FileFolder, folderId, {"parentId": targetParentId}) + + def moveFilesBatch(self, fileIds: List[str], targetFolderId: Optional[str] = None) -> Dict[str, Any]: + """Move multiple files with one SQL update.""" + uniqueIds = [str(fid) for fid in dict.fromkeys(fileIds or []) if fid] + if not uniqueIds: + return {"movedFiles": 0} + + if targetFolderId: + targetFolder = self.getFolder(targetFolderId) + if not targetFolder: + raise FileNotFoundError(f"Target folder {targetFolderId} not found") + + try: + self.db._ensure_connection() + with self.db.connection.cursor() as cursor: + cursor.execute( + 'SELECT "id" FROM "FileItem" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (uniqueIds, self.userId or ""), + ) + accessibleIds = [row["id"] for row in cursor.fetchall()] + if len(accessibleIds) != len(uniqueIds): + missingIds = sorted(set(uniqueIds) - set(accessibleIds)) + raise FileNotFoundError(f"Files not found or not accessible: {missingIds}") + + cursor.execute( + 'UPDATE "FileItem" SET "folderId" = %s, "_modifiedAt" = %s, "_modifiedBy" = %s ' + 'WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (targetFolderId, getUtcTimestamp(), self.userId or "", accessibleIds, self.userId or ""), + ) + movedFiles = cursor.rowcount + + self.db.connection.commit() + return {"movedFiles": movedFiles} + except Exception as e: + logger.error(f"Error moving files in batch: {e}") + self.db.connection.rollback() + raise FileError(f"Error moving files in batch: {str(e)}") + + def moveFoldersBatch(self, folderIds: List[str], targetParentId: Optional[str] = None) -> Dict[str, Any]: + """Move multiple folders with one SQL update after validation.""" + uniqueIds = [str(fid) for fid in dict.fromkeys(folderIds or []) if fid] + if not uniqueIds: + return {"movedFolders": 0} + + foldersToMove: List[Dict[str, Any]] = [] + for folderId in uniqueIds: + folder = self.getFolder(folderId) + if not folder: + raise FileNotFoundError(f"Folder {folderId} not found") + if targetParentId and self._isDescendantOf(targetParentId, folderId): + raise ValueError("Cannot move folder into its own subtree") + foldersToMove.append(folder) + + existingInTarget = self.db.getRecordset( + FileFolder, + recordFilter={"parentId": targetParentId or "", "_createdBy": self.userId or ""}, + ) + existingNames = {f.get("name"): f.get("id") for f in existingInTarget} + movingNames: Dict[str, str] = {} + movingIds = set(uniqueIds) + + for folder in foldersToMove: + name = folder.get("name", "") + folderId = folder.get("id") + if name in movingNames and movingNames[name] != folderId: + raise ValueError(f"Folder '{name}' already exists in this move batch") + movingNames[name] = folderId + + existingId = existingNames.get(name) + if existingId and existingId not in movingIds: + raise ValueError(f"Folder '{name}' already exists in target directory") + + try: + self.db._ensure_connection() + with self.db.connection.cursor() as cursor: + cursor.execute( + 'UPDATE "FileFolder" SET "parentId" = %s, "_modifiedAt" = %s, "_modifiedBy" = %s ' + 'WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (targetParentId, getUtcTimestamp(), self.userId or "", uniqueIds, self.userId or ""), + ) + movedFolders = cursor.rowcount + + self.db.connection.commit() + return {"movedFolders": movedFolders} + except Exception as e: + logger.error(f"Error moving folders in batch: {e}") + self.db.connection.rollback() + raise FileError(f"Error moving folders in batch: {str(e)}") + + def deleteFolder(self, folderId: str, recursive: bool = False) -> Dict[str, Any]: + """Delete a folder. If recursive, deletes all contents. Returns summary of deletions.""" + folder = self.getFolder(folderId) + if not folder: + raise FileNotFoundError(f"Folder {folderId} not found") + + childFolders = self.db.getRecordset(FileFolder, recordFilter={"parentId": folderId, "_createdBy": self.userId or ""}) + childFiles = self._getFilesByCurrentUser(recordFilter={"folderId": folderId}) + + if not recursive and (childFolders or childFiles): + raise ValueError( + f"Folder '{folder.get('name')}' is not empty " + f"({len(childFiles)} files, {len(childFolders)} subfolders). " + f"Use recursive=true to delete contents." + ) + + deletedFiles = 0 + deletedFolders = 0 + + if recursive: + for subFolder in childFolders: + subResult = self.deleteFolder(subFolder["id"], recursive=True) + deletedFiles += subResult.get("deletedFiles", 0) + deletedFolders += subResult.get("deletedFolders", 0) + for childFile in childFiles: + try: + self.deleteFile(childFile["id"]) + deletedFiles += 1 + except Exception as e: + logger.warning(f"Failed to delete file {childFile['id']} during folder deletion: {e}") + + self.db.recordDelete(FileFolder, folderId) + deletedFolders += 1 + + return {"deletedFiles": deletedFiles, "deletedFolders": deletedFolders} + + def deleteFoldersBatch(self, folderIds: List[str], recursive: bool = True) -> Dict[str, Any]: + """Delete multiple folders and their content in batched SQL calls.""" + uniqueIds = [str(fid) for fid in dict.fromkeys(folderIds or []) if fid] + if not uniqueIds: + return {"deletedFiles": 0, "deletedFolders": 0} + + if not recursive: + deletedFiles = 0 + deletedFolders = 0 + for folderId in uniqueIds: + result = self.deleteFolder(folderId, recursive=False) + deletedFiles += result.get("deletedFiles", 0) + deletedFolders += result.get("deletedFolders", 0) + return {"deletedFiles": deletedFiles, "deletedFolders": deletedFolders} + + try: + self.db._ensure_connection() + with self.db.connection.cursor() as cursor: + cursor.execute( + 'SELECT "id" FROM "FileFolder" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (uniqueIds, self.userId or ""), + ) + rootAccessibleIds = [row["id"] for row in cursor.fetchall()] + if len(rootAccessibleIds) != len(uniqueIds): + missingIds = sorted(set(uniqueIds) - set(rootAccessibleIds)) + raise FileNotFoundError(f"Folders not found or not accessible: {missingIds}") + + cursor.execute( + """ + WITH RECURSIVE folder_tree AS ( + SELECT "id" + FROM "FileFolder" + WHERE "id" = ANY(%s) AND "_createdBy" = %s + UNION ALL + SELECT child."id" + FROM "FileFolder" child + INNER JOIN folder_tree ft ON child."parentId" = ft."id" + WHERE child."_createdBy" = %s + ) + SELECT DISTINCT "id" FROM folder_tree + """, + (rootAccessibleIds, self.userId or "", self.userId or ""), + ) + allFolderIds = [row["id"] for row in cursor.fetchall()] + + cursor.execute( + 'SELECT "id" FROM "FileItem" WHERE "folderId" = ANY(%s) AND "_createdBy" = %s', + (allFolderIds, self.userId or ""), + ) + allFileIds = [row["id"] for row in cursor.fetchall()] + + if allFileIds: + cursor.execute('DELETE FROM "FileData" WHERE "id" = ANY(%s)', (allFileIds,)) + cursor.execute( + 'DELETE FROM "FileItem" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (allFileIds, self.userId or ""), + ) + deletedFiles = cursor.rowcount + else: + deletedFiles = 0 + + cursor.execute( + 'DELETE FROM "FileFolder" WHERE "id" = ANY(%s) AND "_createdBy" = %s', + (allFolderIds, self.userId or ""), + ) + deletedFolders = cursor.rowcount + + self.db.connection.commit() + return {"deletedFiles": deletedFiles, "deletedFolders": deletedFolders} + except Exception as e: + logger.error(f"Error deleting folders in batch: {e}") + self.db.connection.rollback() + raise FileDeletionError(f"Error deleting folders in batch: {str(e)}") + + def copyFile(self, sourceFileId: str, targetFolderId: Optional[str] = None, newFileName: Optional[str] = None) -> FileItem: + """Create a full duplicate of a file (FileItem + FileData).""" + sourceFile = self.getFile(sourceFileId) + if not sourceFile: + raise FileNotFoundError(f"File {sourceFileId} not found") + + sourceData = self.getFileData(sourceFileId) + if sourceData is None: + raise FileStorageError(f"No data found for file {sourceFileId}") + + fileName = newFileName or sourceFile.fileName + copiedFile = self.createFile(fileName, sourceFile.mimeType, sourceData) + + if targetFolderId: + self.updateFile(copiedFile.id, {"folderId": targetFolderId}) + elif sourceFile.folderId: + self.updateFile(copiedFile.id, {"folderId": sourceFile.folderId}) + + self.createFileData(copiedFile.id, sourceData) + return copiedFile + + def updateFileData(self, fileId: str, data: bytes) -> bool: + """Replace existing file data (delete + create). Updates FileItem metadata.""" + file = self.getFile(fileId) + if not file: + raise FileNotFoundError(f"File {fileId} not found") + + try: + self.db.recordDelete(FileData, fileId) + logger.debug(f"Deleted existing FileData for {fileId}") + except Exception as e: + logger.debug(f"No existing FileData to delete for {fileId}: {e}") + + success = self.createFileData(fileId, data) + if success: + newSize = len(data) + newHash = hashlib.sha256(data).hexdigest() + self.db.recordModify(FileItem, fileId, {"fileSize": newSize, "fileHash": newHash}) + logger.info(f"Updated file data for {fileId} ({newSize} bytes)") + return success + # FileData methods - data operations def createFileData(self, fileId: str, data: bytes) -> bool: diff --git a/modules/interfaces/interfaceVoiceObjects.py b/modules/interfaces/interfaceVoiceObjects.py index 5c84c047..dc391bae 100644 --- a/modules/interfaces/interfaceVoiceObjects.py +++ b/modules/interfaces/interfaceVoiceObjects.py @@ -6,8 +6,9 @@ Provides a generic interface layer between routes and voice connectors. Handles voice operations including speech-to-text, text-to-speech, and translation. """ +import asyncio import logging -from typing import Dict, Any, Optional, List +from typing import AsyncGenerator, Callable, Dict, Any, Optional, List from modules.connectors.connectorVoiceGoogle import ConnectorGoogleSpeech from modules.datamodels.datamodelVoice import VoiceSettings @@ -30,6 +31,7 @@ class VoiceObjects: self.currentUser: Optional[User] = None self.userId: Optional[str] = None self._google_speech_connector: Optional[ConnectorGoogleSpeech] = None + self.billingCallback: Optional[Callable[[Dict[str, Any]], None]] = None def setUserContext(self, currentUser: User, mandateId: Optional[str] = None): """Set the user context for the interface. @@ -115,6 +117,32 @@ class VoiceObjects: "error": str(e) } + async def streamingSpeechToText( + self, + audioQueue: asyncio.Queue, + language: str = "de-DE", + phraseHints: Optional[list] = None, + ) -> AsyncGenerator[Dict[str, Any], None]: + """ + Stream audio to Google Streaming STT and yield interim/final results. + Billing is recorded for each final result. + """ + connector = self._getGoogleSpeechConnector() + async for event in connector.streamingRecognize(audioQueue, language, phraseHints): + if event.get("isFinal") and self.billingCallback: + durationSec = event.get("audioDurationSec", 0) + priceCHF = connector.calculateSttCostCHF(durationSec) + if priceCHF > 0: + try: + self.billingCallback({ + "operation": "stt-streaming", + "priceCHF": priceCHF, + "audioDurationSec": durationSec, + }) + except Exception as e: + logger.warning(f"Voice STT billing callback failed: {e}") + yield event + # Translation Operations async def detectLanguage(self, text: str) -> Dict[str, Any]: @@ -277,7 +305,18 @@ class VoiceObjects: if result["success"]: logger.info(f"✅ Text-to-Speech successful: {len(result['audio_content'])} bytes") - # Map connector snake_case keys to camelCase for consistent API + if self.billingCallback: + connector = self._getGoogleSpeechConnector() + priceCHF = connector.calculateTtsCostCHF(len(text)) + if priceCHF > 0: + try: + self.billingCallback({ + "operation": "tts-wavenet", + "priceCHF": priceCHF, + "characterCount": len(text), + }) + except Exception as e: + logger.warning(f"Voice TTS billing callback failed: {e}") return { "success": True, "audioContent": result["audio_content"], diff --git a/modules/routes/routeBilling.py b/modules/routes/routeBilling.py index 194ebba0..ab12c568 100644 --- a/modules/routes/routeBilling.py +++ b/modules/routes/routeBilling.py @@ -236,6 +236,7 @@ class CheckoutCreateRequest(BaseModel): """Request model for creating Stripe Checkout Session.""" userId: Optional[str] = Field(None, description="Target user ID (for PREPAY_USER model)") amount: float = Field(..., gt=0, description="Amount to pay in CHF (must be in allowed presets)") + returnUrl: str = Field(..., min_length=1, description="Absolute frontend URL used for Stripe success/cancel redirects") class CheckoutCreateResponse(BaseModel): @@ -243,6 +244,20 @@ class CheckoutCreateResponse(BaseModel): redirectUrl: str = Field(..., description="Stripe Checkout URL for redirect") +class CheckoutConfirmRequest(BaseModel): + """Request model for confirming Stripe Checkout after redirect.""" + sessionId: str = Field(..., min_length=1, description="Stripe Checkout Session ID (cs_xxx)") + + +class CheckoutConfirmResponse(BaseModel): + """Response model for Stripe Checkout confirmation.""" + credited: bool = Field(..., description="True if a new billing credit was created") + alreadyCredited: bool = Field(..., description="True if session was already credited before") + sessionId: str = Field(..., description="Stripe Checkout Session ID") + mandateId: str = Field(..., description="Mandate ID from Stripe metadata") + amountChf: float = Field(..., description="Credited amount in CHF") + + class BillingSettingsUpdate(BaseModel): """Request model for updating billing settings.""" billingModel: Optional[BillingModelEnum] = None @@ -345,6 +360,107 @@ class UserTransactionResponse(BaseModel): userName: Optional[str] = None +def _getStripeClient(): + """Initialize and return configured Stripe SDK module.""" + import stripe + from modules.shared.configuration import APP_CONFIG + + api_version = APP_CONFIG.get("STRIPE_API_VERSION") + if api_version: + stripe.api_version = api_version + + secret_key = APP_CONFIG.get("STRIPE_SECRET_KEY_SECRET") or APP_CONFIG.get("STRIPE_SECRET_KEY") + if not secret_key: + raise ValueError("STRIPE_SECRET_KEY_SECRET not configured") + + stripe.api_key = secret_key + return stripe + + +def _creditStripeSessionIfNeeded( + billingInterface, + session: Dict[str, Any], + eventId: Optional[str] = None, +) -> CheckoutConfirmResponse: + """ + Credit balance from Stripe Checkout session if not already credited. + Uses Checkout session ID for idempotency across webhook + manual confirmation flows. + """ + from modules.serviceCenter.services.serviceBilling.stripeCheckout import ALLOWED_AMOUNTS_CHF + + session_id = session.get("id") + metadata = session.get("metadata") or {} + mandate_id = metadata.get("mandateId") + user_id = metadata.get("userId") or None + amount_chf_str = metadata.get("amountChf", "0") + + if not session_id: + raise HTTPException(status_code=400, detail="Stripe session id missing") + if not mandate_id: + raise HTTPException(status_code=400, detail="Invalid session metadata: mandateId missing") + + existing_payment_tx = billingInterface.getPaymentTransactionByReferenceId(session_id) + if existing_payment_tx: + if eventId and not billingInterface.getStripeWebhookEventByEventId(eventId): + billingInterface.createStripeWebhookEvent(eventId) + return CheckoutConfirmResponse( + credited=False, + alreadyCredited=True, + sessionId=session_id, + mandateId=mandate_id, + amountChf=float(existing_payment_tx.get("amount", 0.0)), + ) + + try: + amount_chf = float(amount_chf_str) + except (TypeError, ValueError): + amount_chf = None + + if amount_chf is None or amount_chf not in ALLOWED_AMOUNTS_CHF: + amount_total = session.get("amount_total") + if amount_total is not None: + amount_chf = amount_total / 100.0 + else: + raise HTTPException(status_code=400, detail="Invalid amount in Stripe session") + + settings = billingInterface.getSettings(mandate_id) + if not settings: + raise HTTPException(status_code=404, detail="Billing settings not found") + + billing_model = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + if billing_model == BillingModelEnum.PREPAY_USER: + if not user_id: + raise HTTPException(status_code=400, detail="userId required for PREPAY_USER") + account = billingInterface.getOrCreateUserAccount(mandate_id, user_id, initialBalance=0.0) + elif billing_model in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]: + account = billingInterface.getOrCreateMandateAccount(mandate_id, initialBalance=0.0) + else: + raise HTTPException(status_code=400, detail=f"Cannot add credit to {billing_model.value}") + + transaction = BillingTransaction( + accountId=account["id"], + transactionType=TransactionTypeEnum.CREDIT, + amount=amount_chf, + description="Stripe-Zahlung", + referenceType=ReferenceTypeEnum.PAYMENT, + referenceId=session_id, + createdByUserId=user_id, + ) + billingInterface.createTransaction(transaction) + + if eventId and not billingInterface.getStripeWebhookEventByEventId(eventId): + billingInterface.createStripeWebhookEvent(eventId) + + logger.info(f"Stripe credit applied: {amount_chf} CHF for session {session_id} on mandate {mandate_id}") + return CheckoutConfirmResponse( + credited=True, + alreadyCredited=False, + sessionId=session_id, + mandateId=mandate_id, + amountChf=amount_chf, + ) + + # ============================================================================= # Router Setup # ============================================================================= @@ -769,7 +885,8 @@ def createCheckoutSession( redirect_url = create_checkout_session( mandate_id=targetMandateId, user_id=checkoutRequest.userId, - amount_chf=checkoutRequest.amount + amount_chf=checkoutRequest.amount, + return_url=checkoutRequest.returnUrl ) return CheckoutCreateResponse(redirectUrl=redirect_url) @@ -782,6 +899,65 @@ def createCheckoutSession( raise HTTPException(status_code=500, detail=str(e)) +@router.post("/checkout/confirm", response_model=CheckoutConfirmResponse) +@limiter.limit("20/minute") +def confirmCheckoutSession( + request: Request, + confirmRequest: CheckoutConfirmRequest = Body(...), + ctx: RequestContext = Depends(getRequestContext), +): + """ + Confirm Stripe Checkout success by session ID and apply credit idempotently. + This is a fallback/reconciliation path in addition to webhook processing. + """ + try: + stripe = _getStripeClient() + session = stripe.checkout.Session.retrieve(confirmRequest.sessionId) + if not session: + raise HTTPException(status_code=404, detail="Stripe Checkout Session not found") + + session_dict = session.to_dict_recursive() if hasattr(session, "to_dict_recursive") else dict(session) + metadata = session_dict.get("metadata") or {} + mandate_id = metadata.get("mandateId") + user_id = metadata.get("userId") or None + + if not mandate_id: + raise HTTPException(status_code=400, detail="Invalid session metadata: mandateId missing") + + payment_status = session_dict.get("payment_status") + if payment_status != "paid": + raise HTTPException(status_code=409, detail=f"Payment not completed yet (payment_status={payment_status})") + + billingInterface = getBillingInterface(ctx.user, mandate_id) + settings = billingInterface.getSettings(mandate_id) + if not settings: + raise HTTPException(status_code=404, detail="Billing settings not found") + + billing_model = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) + if billing_model == BillingModelEnum.PREPAY_USER: + if not user_id: + raise HTTPException(status_code=400, detail="userId required for PREPAY_USER") + if str(user_id) != str(ctx.user.id): + raise HTTPException(status_code=403, detail="Users can only confirm their own payment sessions") + if not _isMemberOfMandate(ctx, mandate_id): + raise HTTPException(status_code=403, detail="User is not a member of this mandate") + elif billing_model in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]: + if not _isAdminOfMandate(ctx, mandate_id): + raise HTTPException(status_code=403, detail="Mandate admin role required") + else: + raise HTTPException(status_code=400, detail=f"Cannot add credit to {billing_model.value}") + + root_billing_interface = _getRootInterface() + return _creditStripeSessionIfNeeded(root_billing_interface, session_dict, eventId=None) + except HTTPException: + raise + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error confirming checkout session {confirmRequest.sessionId}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.post("/webhook/stripe") async def stripeWebhook( request: Request, @@ -792,8 +968,7 @@ async def stripeWebhook( No JWT auth - Stripe authenticates via Stripe-Signature header. """ from modules.shared.configuration import APP_CONFIG - from modules.serviceCenter.services.serviceBilling.stripeCheckout import ALLOWED_AMOUNTS_CHF - + webhook_secret = APP_CONFIG.get("STRIPE_WEBHOOK_SECRET") if not webhook_secret: logger.error("STRIPE_WEBHOOK_SECRET not configured") @@ -816,71 +991,26 @@ async def stripeWebhook( logger.warning(f"Stripe webhook signature verification failed: {e}") raise HTTPException(status_code=400, detail="Invalid signature") - if event.type != "checkout.session.completed": + logger.info(f"Stripe webhook received: event={event.id}, type={event.type}") + + accepted_event_types = {"checkout.session.completed", "checkout.session.async_payment_succeeded"} + if event.type not in accepted_event_types: return {"received": True} session = event.data.object event_id = event.id - session_id = session.id - + billingInterface = _getRootInterface() - if billingInterface.getStripeWebhookEventByEventId(event_id): logger.info(f"Stripe event {event_id} already processed, skipping") return {"received": True} - - metadata = session.get("metadata") or {} - mandate_id = metadata.get("mandateId") - user_id = metadata.get("userId") or None - amount_chf_str = metadata.get("amountChf", "0") - - if not mandate_id: - logger.error(f"Stripe webhook missing mandateId in session {session_id}") - raise HTTPException(status_code=400, detail="Invalid session metadata") - - try: - amount_chf = float(amount_chf_str) - except (TypeError, ValueError): - amount_chf = None - - if amount_chf is None or amount_chf not in ALLOWED_AMOUNTS_CHF: - amount_total = session.get("amount_total") - if amount_total is not None: - amount_chf = amount_total / 100.0 - else: - logger.error(f"Stripe webhook invalid amount for session {session_id}") - raise HTTPException(status_code=400, detail="Invalid amount") - - settings = billingInterface.getSettings(mandate_id) - if not settings: - logger.error(f"Stripe webhook: billing settings not found for mandate {mandate_id}") - raise HTTPException(status_code=404, detail="Billing settings not found") - - billing_model = BillingModelEnum(settings.get("billingModel", BillingModelEnum.UNLIMITED.value)) - - if billing_model == BillingModelEnum.PREPAY_USER: - if not user_id: - logger.error(f"Stripe webhook: userId required for PREPAY_USER mandate {mandate_id}") - raise HTTPException(status_code=400, detail="userId required") - account = billingInterface.getOrCreateUserAccount(mandate_id, user_id, initialBalance=0.0) - elif billing_model in [BillingModelEnum.PREPAY_MANDATE, BillingModelEnum.CREDIT_POSTPAY]: - account = billingInterface.getOrCreateMandateAccount(mandate_id, initialBalance=0.0) - else: - logger.error(f"Stripe webhook: cannot credit mandate {mandate_id} with model {billing_model}") - raise HTTPException(status_code=400, detail=f"Cannot add credit to {billing_model.value}") - - transaction = BillingTransaction( - accountId=account["id"], - transactionType=TransactionTypeEnum.CREDIT, - amount=amount_chf, - description="Stripe-Zahlung", - referenceType=ReferenceTypeEnum.PAYMENT, - referenceId=session_id + + session_dict = session.to_dict_recursive() if hasattr(session, "to_dict_recursive") else dict(session) + result = _creditStripeSessionIfNeeded(billingInterface, session_dict, eventId=event_id) + logger.info( + f"Stripe webhook processed session {result.sessionId}: " + f"credited={result.credited}, alreadyCredited={result.alreadyCredited}" ) - billingInterface.createTransaction(transaction) - billingInterface.createStripeWebhookEvent(event_id) - - logger.info(f"Stripe webhook: credited {amount_chf} CHF to account {account['id']} (session {session_id})") return {"received": True} diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py index 46182f60..c3138aed 100644 --- a/modules/routes/routeDataFiles.py +++ b/modules/routes/routeDataFiles.py @@ -12,6 +12,7 @@ from modules.auth import limiter, getCurrentUser, getRequestContext, RequestCont # Import interfaces import modules.interfaces.interfaceDbManagement as interfaceDbManagement from modules.datamodels.datamodelFiles import FileItem, FilePreview +from modules.datamodels.datamodelFileFolder import FileFolder from modules.shared.attributeUtils import getModelAttributeDefinitions from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict @@ -310,6 +311,222 @@ async def upload_file( detail=f"Error during file upload: {str(e)}" ) +# ── Folder endpoints (MUST be before /{fileId} catch-all) ───────────────────── + +@router.get("/folders", response_model=List[Dict[str, Any]]) +@limiter.limit("30/minute") +def list_folders( + request: Request, + parentId: Optional[str] = Query(None, description="Parent folder ID (omit for all folders)"), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> List[Dict[str, Any]]: + """List folders for the current user.""" + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + if parentId is not None: + return mgmt.listFolders(parentId=parentId) + return mgmt.listFolders() + except Exception as e: + logger.error(f"Error listing folders: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/folders", status_code=status.HTTP_201_CREATED) +@limiter.limit("10/minute") +def create_folder( + request: Request, + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Create a new folder.""" + name = body.get("name", "") + parentId = body.get("parentId") + if not name: + raise HTTPException(status_code=400, detail="name is required") + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + return mgmt.createFolder(name=name, parentId=parentId) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating folder: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.put("/folders/{folderId}") +@limiter.limit("10/minute") +def rename_folder( + request: Request, + folderId: str = Path(...), + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Rename a folder.""" + newName = body.get("name", "") + if not newName: + raise HTTPException(status_code=400, detail="name is required") + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + mgmt.renameFolder(folderId, newName) + return {"success": True, "folderId": folderId, "name": newName} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error renaming folder: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/folders/{folderId}") +@limiter.limit("10/minute") +def delete_folder( + request: Request, + folderId: str = Path(...), + recursive: bool = Query(False, description="Delete folder contents recursively"), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete a folder. Use recursive=true to delete non-empty folders.""" + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + return mgmt.deleteFolder(folderId, recursive=recursive) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error deleting folder: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/folders/{folderId}/move") +@limiter.limit("10/minute") +def move_folder( + request: Request, + folderId: str = Path(...), + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Move a folder to a new parent.""" + targetParentId = body.get("targetParentId") + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + mgmt.moveFolder(folderId, targetParentId) + return {"success": True, "folderId": folderId, "parentId": targetParentId} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error moving folder: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/batch-delete") +@limiter.limit("10/minute") +def batch_delete_items( + request: Request, + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Batch delete files/folders with a single SQL-backed operation per type.""" + fileIds = body.get("fileIds") or [] + folderIds = body.get("folderIds") or [] + recursiveFolders = bool(body.get("recursiveFolders", True)) + + if not isinstance(fileIds, list) or not isinstance(folderIds, list): + raise HTTPException(status_code=400, detail="fileIds and folderIds must be arrays") + + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + + result = {"deletedFiles": 0, "deletedFolders": 0} + + if fileIds: + fileResult = mgmt.deleteFilesBatch(fileIds) + result["deletedFiles"] += fileResult.get("deletedFiles", 0) + + if folderIds: + folderResult = mgmt.deleteFoldersBatch(folderIds, recursive=recursiveFolders) + result["deletedFiles"] += folderResult.get("deletedFiles", 0) + result["deletedFolders"] += folderResult.get("deletedFolders", 0) + + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error in batch delete: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/batch-move") +@limiter.limit("10/minute") +def batch_move_items( + request: Request, + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Batch move files/folders with a single SQL-backed operation per type.""" + fileIds = body.get("fileIds") or [] + folderIds = body.get("folderIds") or [] + targetFolderId = body.get("targetFolderId") + targetParentId = body.get("targetParentId") + + if not isinstance(fileIds, list) or not isinstance(folderIds, list): + raise HTTPException(status_code=400, detail="fileIds and folderIds must be arrays") + + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + + result = {"movedFiles": 0, "movedFolders": 0} + + if fileIds: + fileResult = mgmt.moveFilesBatch(fileIds, targetFolderId=targetFolderId) + result["movedFiles"] += fileResult.get("movedFiles", 0) + + if folderIds: + folderResult = mgmt.moveFoldersBatch(folderIds, targetParentId=targetParentId) + result["movedFolders"] += folderResult.get("movedFolders", 0) + + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error in batch move: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# ── File endpoints with path parameters (catch-all /{fileId}) ───────────────── + @router.get("/{fileId}", response_model=FileItem) @limiter.limit("30/minute") def get_file( @@ -557,3 +774,25 @@ def preview_file( ) +@router.post("/{fileId}/move") +@limiter.limit("10/minute") +def move_file( + request: Request, + fileId: str = Path(...), + body: Dict[str, Any] = Body(...), + currentUser: User = Depends(getCurrentUser), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Move a file to a different folder.""" + targetFolderId = body.get("targetFolderId") + try: + mgmt = interfaceDbManagement.getInterface( + currentUser, + mandateId=str(context.mandateId) if context.mandateId else None, + featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None, + ) + mgmt.updateFile(fileId, {"folderId": targetFolderId}) + return {"success": True, "fileId": fileId, "folderId": targetFolderId} + except Exception as e: + logger.error(f"Error moving file: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/modules/routes/routeVoiceGoogle.py b/modules/routes/routeVoiceGoogle.py index 8e72207c..af4db355 100644 --- a/modules/routes/routeVoiceGoogle.py +++ b/modules/routes/routeVoiceGoogle.py @@ -6,13 +6,16 @@ Replaces Azure voice services with Google Cloud Speech-to-Text and Translation Includes WebSocket support for real-time voice streaming """ +import asyncio import logging import json import base64 -from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException, Body, WebSocket, WebSocketDisconnect +import secrets +import time +from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException, Body, Query, Request, WebSocket, WebSocketDisconnect from fastapi.responses import Response from typing import Optional, Dict, Any, List -from modules.auth import getCurrentUser +from modules.auth import getCurrentUser, getRequestContext, RequestContext, limiter from modules.datamodels.datamodelUam import User from modules.interfaces.interfaceVoiceObjects import getVoiceInterface, VoiceObjects @@ -290,10 +293,11 @@ async def realtime_interpreter( @router.post("/text-to-speech") async def text_to_speech( + request: Request, text: str = Form(...), language: str = Form("de-DE"), voice: str = Form(None), - currentUser: User = Depends(getCurrentUser) + context: RequestContext = Depends(getRequestContext), ): """Convert text to speech using Google Cloud Text-to-Speech.""" try: @@ -305,7 +309,20 @@ async def text_to_speech( detail="Empty text provided for text-to-speech" ) - voiceInterface = _getVoiceInterface(currentUser) + mandateId = str(getattr(context, "mandateId", "") or "") + voiceInterface = getVoiceInterface(context.user, mandateId) + try: + from modules.serviceCenter.services.serviceBilling.mainServiceBilling import getService as getBillingService + billingService = getBillingService(context.user, mandateId) + def _billingCb(data): + priceCHF = data.get("priceCHF", 0.0) + operation = data.get("operation", "voice") + if priceCHF > 0: + billingService.recordUsage(priceCHF=priceCHF, aicoreProvider="google-voice", aicoreModel=operation, description=f"Voice {operation}") + voiceInterface.billingCallback = _billingCb + except Exception as e: + logger.warning(f"TTS billing setup skipped: {e}") + result = await voiceInterface.textToSpeech( text=text, languageCode=language, @@ -314,12 +331,12 @@ async def text_to_speech( if result["success"]: return Response( - content=result["audio_content"], + content=result["audioContent"], media_type="audio/mpeg", headers={ "Content-Disposition": "attachment; filename=speech.mp3", - "X-Voice-Name": result["voice_name"], - "X-Language-Code": result["language_code"] + "X-Voice-Name": result.get("voiceName", ""), + "X-Language-Code": result.get("languageCode", language), } ) else: @@ -533,189 +550,192 @@ async def save_voice_settings( detail=f"Failed to save voice settings: {str(e)}" ) -# WebSocket endpoints for real-time voice streaming +# ========================================================================= +# STT Streaming WebSocket — generic, used by all features +# ========================================================================= -@router.websocket("/ws/realtime-interpreter") -async def websocket_realtime_interpreter( - websocket: WebSocket, - userId: str = "default", - fromLanguage: str = "de-DE", - toLanguage: str = "en-US" -): - """WebSocket endpoint for real-time voice interpretation""" - connectionId = f"realtime_{userId}_{fromLanguage}_{toLanguage}" - - try: - await manager.connect(websocket, connectionId) - - # Send connection confirmation - await manager.sendPersonalMessage({ - "type": "connected", - "connection_id": connectionId, - "message": "Connected to real-time interpreter" - }, websocket) - - # Initialize voice interface - voiceInterface = _getVoiceInterface(User(id=userId)) - - while True: - # Receive message from client - data = await websocket.receive_text() - message = json.loads(data) - - if message["type"] == "audio_chunk": - # Process audio chunk - try: - # Decode base64 audio data - audioData = base64.b64decode(message["data"]) - - # For now, just acknowledge receipt - # In a full implementation, this would: - # 1. Buffer audio chunks - # 2. Process with Google Cloud Speech-to-Text streaming - # 3. Send partial results back - # 4. Handle translation - - await manager.sendPersonalMessage({ - "type": "audio_received", - "chunk_size": len(audioData), - "timestamp": message.get("timestamp") - }, websocket) - - except Exception as e: - logger.error(f"Error processing audio chunk: {e}") - await manager.send_personal_message({ - "type": "error", - "error": f"Failed to process audio: {str(e)}" - }, websocket) - - elif message["type"] == "ping": - # Respond to ping - await manager.sendPersonalMessage({ - "type": "pong", - "timestamp": message.get("timestamp") - }, websocket) - - else: - logger.warning(f"Unknown message type: {message['type']}") - - except WebSocketDisconnect: - manager.disconnect(websocket, connectionId) - logger.info(f"Client disconnected: {connectionId}") - except Exception as e: - logger.error(f"WebSocket error: {e}") - manager.disconnect(websocket, connectionId) +_sttTokens: Dict[str, Dict[str, Any]] = {} +_STT_TOKEN_TTL = 45 -@router.websocket("/ws/speech-to-text") -async def websocket_speech_to_text( - websocket: WebSocket, - userId: str = "default", - language: str = "de-DE" -): - """WebSocket endpoint for real-time speech-to-text""" - connectionId = f"stt_{userId}_{language}" - - try: - await manager.connect(websocket, connectionId) - - await manager.sendPersonalMessage({ - "type": "connected", - "connection_id": connectionId, - "message": "Connected to speech-to-text" - }, websocket) - - # Initialize voice interface - voiceInterface = _getVoiceInterface(User(id=userId)) - - while True: - data = await websocket.receive_text() - message = json.loads(data) - - if message["type"] == "audio_chunk": - try: - audioData = base64.b64decode(message["data"]) - - # Process audio chunk - # This would integrate with Google Cloud Speech-to-Text streaming API - - await manager.sendPersonalMessage({ - "type": "transcription_result", - "text": "Audio chunk received", # Placeholder - "confidence": 0.95, - "is_final": False - }, websocket) - - except Exception as e: - logger.error(f"Error processing audio: {e}") - await manager.sendPersonalMessage({ - "type": "error", - "error": f"Failed to process audio: {str(e)}" - }, websocket) - - elif message["type"] == "ping": - await manager.sendPersonalMessage({ - "type": "pong", - "timestamp": message.get("timestamp") - }, websocket) - - except WebSocketDisconnect: - manager.disconnect(websocket, connectionId) - except Exception as e: - logger.error(f"WebSocket error: {e}") - manager.disconnect(websocket, connectionId) -@router.websocket("/ws/text-to-speech") -async def websocket_text_to_speech( - websocket: WebSocket, - userId: str = "default", - language: str = "de-DE", - voice: str = "de-DE-Wavenet-A" +def _cleanupSttTokens(): + now = time.time() + expired = [t for t, p in _sttTokens.items() if p.get("expiresAt", 0) <= now] + for t in expired: + _sttTokens.pop(t, None) + + +@router.post("/stt/token") +@limiter.limit("60/minute") +async def createSttToken( + request: Request, + context: RequestContext = Depends(getRequestContext), ): - """WebSocket endpoint for real-time text-to-speech""" - connectionId = f"tts_{userId}_{language}_{voice}" - + """Issue a short-lived single-use token for the STT streaming WebSocket.""" + _cleanupSttTokens() + token = secrets.token_urlsafe(32) + _sttTokens[token] = { + "userId": str(context.user.id), + "mandateId": str(getattr(context, "mandateId", "") or ""), + "expiresAt": time.time() + _STT_TOKEN_TTL, + } + return {"wsToken": token, "expiresInSeconds": _STT_TOKEN_TTL} + + +@router.websocket("/stt/stream") +async def sttStream( + websocket: WebSocket, + wsToken: Optional[str] = Query(None), +): + """ + Generic STT streaming WebSocket. + + Protocol: + Client sends JSON: + {"type": "open", "language": "de-DE"} + {"type": "audio", "chunk": ""} + {"type": "close"} + Server sends JSON: + {"type": "interim", "text": "..."} + {"type": "final", "text": "...", "confidence": 0.95} + {"type": "error", "message": "..."} + {"type": "closed"} + """ + await websocket.accept() + + # --- authenticate via wsToken --- + if not wsToken: + await websocket.send_json({"type": "error", "code": "ws_token_required", "message": "wsToken query param required"}) + await websocket.close(code=1008) + return + + _cleanupSttTokens() + tokenPayload = _sttTokens.pop(wsToken, None) + if not tokenPayload: + await websocket.send_json({"type": "error", "code": "ws_token_invalid", "message": "Invalid or expired wsToken"}) + await websocket.close(code=1008) + return + + tokenUserId = tokenPayload["userId"] + tokenMandateId = tokenPayload.get("mandateId", "") + + # Resolve real user for billing + from modules.interfaces.interfaceDbApp import getRootInterface + rootInterface = getRootInterface() + currentUser = rootInterface.getUser(tokenUserId) + if not currentUser: + await websocket.send_json({"type": "error", "code": "user_not_found", "message": "User not found"}) + await websocket.close(code=1008) + return + + # --- billing pre-flight --- + billingService = None try: - await manager.connect(websocket, connectionId) - - await manager.sendPersonalMessage({ - "type": "connected", - "connection_id": connectionId, - "message": "Connected to text-to-speech" - }, websocket) - - while True: - data = await websocket.receive_text() - message = json.loads(data) - - if message["type"] == "text_to_speak": - try: - text = message["text"] - - # Process text-to-speech - # This would integrate with Google Cloud Text-to-Speech API - - # For now, send a placeholder response - await manager.sendPersonalMessage({ - "type": "audio_data", - "audio": "base64_encoded_audio_here", # Placeholder - "format": "mp3" - }, websocket) - - except Exception as e: - logger.error(f"Error processing text-to-speech: {e}") - await manager.sendPersonalMessage({ - "type": "error", - "error": f"Failed to process text: {str(e)}" - }, websocket) - - elif message["type"] == "ping": - await manager.sendPersonalMessage({ - "type": "pong", - "timestamp": message.get("timestamp") - }, websocket) - - except WebSocketDisconnect: - manager.disconnect(websocket, connectionId) + from modules.serviceCenter.services.serviceBilling.mainServiceBilling import getService as getBillingService + billingService = getBillingService(currentUser, tokenMandateId) + billingCheck = billingService.checkBalance(0.0) + if not billingCheck.allowed: + await websocket.send_json({"type": "error", "code": "billing_insufficient", "message": "Insufficient balance for voice services"}) + await websocket.close(code=1008) + return except Exception as e: - logger.error(f"WebSocket error: {e}") - manager.disconnect(websocket, connectionId) + logger.warning(f"STT billing pre-flight skipped: {e}") + + audioQueue: asyncio.Queue = asyncio.Queue() + language = "de-DE" + streamingTask: Optional[asyncio.Task] = None + voiceInterface: Optional[VoiceObjects] = None + + async def _sendJson(payload: Dict[str, Any]) -> bool: + try: + await websocket.send_json(payload) + return True + except Exception: + return False + + async def _runStreaming(): + nonlocal voiceInterface + voiceInterface = getVoiceInterface(currentUser, tokenMandateId) + if billingService: + def _billingCb(data): + priceCHF = data.get("priceCHF", 0.0) + operation = data.get("operation", "voice") + if priceCHF > 0: + billingService.recordUsage( + priceCHF=priceCHF, + aicoreProvider="google-voice", + aicoreModel=operation, + description=f"Voice {operation}", + ) + voiceInterface.billingCallback = _billingCb + + try: + async for event in voiceInterface.streamingSpeechToText(audioQueue, language): + if event.get("reconnectRequired"): + await _sendJson({"type": "reconnect_required"}) + return + if event.get("isFinal"): + if event.get("transcript"): + await _sendJson({"type": "final", "text": event["transcript"], "confidence": event.get("confidence", 0.0)}) + else: + if event.get("transcript"): + await _sendJson({"type": "interim", "text": event["transcript"]}) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"STT streaming error: {e}") + await _sendJson({"type": "error", "message": str(e)}) + + try: + await _sendJson({"type": "status", "label": "STT stream connected"}) + + while True: + raw = await websocket.receive_text() + msg = json.loads(raw) + msgType = (msg.get("type") or "").strip() + + if msgType == "open": + language = msg.get("language") or "de-DE" + if streamingTask and not streamingTask.done(): + await audioQueue.put((b"", True)) + streamingTask.cancel() + audioQueue = asyncio.Queue() + streamingTask = asyncio.create_task(_runStreaming()) + await _sendJson({"type": "status", "label": "Listening..."}) + + elif msgType == "audio": + chunkB64 = msg.get("chunk") + if not chunkB64: + continue + chunkBytes = base64.b64decode(chunkB64) + if len(chunkBytes) > 400_000: + await _sendJson({"type": "error", "code": "chunk_too_large", "message": "Audio chunk too large"}) + continue + await audioQueue.put((chunkBytes, False)) + + elif msgType == "close": + await audioQueue.put((b"", True)) + if streamingTask: + try: + await asyncio.wait_for(streamingTask, timeout=10.0) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + await _sendJson({"type": "closed"}) + await websocket.close() + break + + elif msgType == "ping": + await _sendJson({"type": "pong"}) + + except WebSocketDisconnect: + logger.info(f"STT WebSocket disconnected: userId={tokenUserId}") + except Exception as e: + logger.error(f"STT WebSocket error: {e}", exc_info=True) + try: + await websocket.send_json({"type": "error", "message": str(e)}) + except Exception: + pass + finally: + await audioQueue.put((b"", True)) + if streamingTask and not streamingTask.done(): + streamingTask.cancel() diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index 7a088911..77a98bf1 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -186,9 +186,32 @@ class AgentService: for fid in fileIds: try: info = chatService.getFileInfo(fid) - fileName = info.get("fileName", fid) if info else fid - mimeType = info.get("mimeType", "unknown") if info else "unknown" - fileSize = info.get("size", "?") if info else "?" + + if not info: + folderInfo = chatService.interfaceDbComponent.getFolder(fid) + if folderInfo: + folderName = folderInfo.get("name", fid) + folderFiles = chatService.listFiles(folderId=fid) + desc = f"### Folder: {folderName}\n - id: {fid}\n - type: folder\n - contains: {len(folderFiles)} file(s)" + if folderFiles: + desc += "\n - files:" + for ff in folderFiles[:30]: + ffName = ff.get("fileName", "?") + ffId = ff.get("id", "?") + ffMime = ff.get("mimeType", "?") + ffSize = ff.get("fileSize", ff.get("size", "?")) + desc += f"\n * {ffName} (id: {ffId}, type: {ffMime}, size: {ffSize} bytes)" + if len(folderFiles) > 30: + desc += f"\n ... and {len(folderFiles) - 30} more files" + desc += f'\nUse `listFiles(folderId="{fid}")` to get the full file list, then `readFile(fileId)` to read individual files.' + fileDescriptions.append(desc) + continue + fileDescriptions.append(f"### File id: {fid}") + continue + + fileName = info.get("fileName", fid) + mimeType = info.get("mimeType", "unknown") + fileSize = info.get("size", "?") desc = f"### File: {fileName}\n - id: {fid}\n - type: {mimeType}\n - size: {fileSize} bytes" @@ -226,10 +249,11 @@ class AgentService: if fileDescriptions: header = ( - "## Attached Files\n" - "These files have been uploaded and processed through the extraction pipeline.\n" + "## Attached Files & Folders\n" + "These files/folders have been uploaded and processed through the extraction pipeline.\n" "Use `readFile(fileId)` to read text content, `readContentObjects(fileId)` for structured access, " "or `describeImage(fileId)` for image analysis.\n" + "For folders, use `listFiles(folderId)` to get the files inside, then `readFile(fileId)` for each.\n" "When generating documents with `renderDocument`, embed images using `![alt text](file:fileId)` in the markdown content.\n\n" ) header += "\n\n".join(fileDescriptions) @@ -322,6 +346,24 @@ class AgentService: return _buildRagContext +def _getOrCreateTempFolder(chatService) -> Optional[str]: + """Return the ID of the root-level 'Temp' folder, creating it if it doesn't exist.""" + try: + allFolders = chatService.interfaceDbComponent.listFolders() + tempFolder = next( + (f for f in allFolders + if f.get("name") == "Temp" and not f.get("parentId")), + None, + ) + if tempFolder: + return tempFolder.get("id") + newFolder = chatService.interfaceDbComponent.createFolder("Temp", parentId=None) + return newFolder.get("id") if newFolder else None + except Exception as e: + logger.warning(f"Could not get/create Temp folder: {e}") + return None + + def _registerCoreTools(registry: ToolRegistry, services): """Register built-in core tools: file operations, search, and folder management.""" from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult @@ -730,6 +772,336 @@ def _registerCoreTools(registry: ToolRegistry, services): readOnly=False ) + # ---- Phase 1: deleteFile, renameFile, readUrl, translateText ---- + + async def _deleteFile(args: Dict[str, Any], context: Dict[str, Any]): + fileId = args.get("fileId", "") + if not fileId: + return ToolResult(toolCallId="", toolName="deleteFile", success=False, error="fileId is required") + try: + chatService = services.chat + file = chatService.interfaceDbComponent.getFile(fileId) + if not file: + return ToolResult(toolCallId="", toolName="deleteFile", success=False, error=f"File {fileId} not found") + fileName = file.fileName + try: + knowledgeService = services.getService("knowledge") + if knowledgeService and hasattr(knowledgeService, "removeFile"): + knowledgeService.removeFile(fileId) + except Exception: + pass + chatService.interfaceDbComponent.deleteFile(fileId) + return ToolResult( + toolCallId="", toolName="deleteFile", success=True, + data=f"File '{fileName}' (id: {fileId}) deleted", + sideEvents=[{"type": "fileDeleted", "data": {"fileId": fileId, "fileName": fileName}}], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="deleteFile", success=False, error=str(e)) + + async def _renameFile(args: Dict[str, Any], context: Dict[str, Any]): + fileId = args.get("fileId", "") + newName = args.get("newName", "") + if not fileId or not newName: + return ToolResult(toolCallId="", toolName="renameFile", success=False, error="fileId and newName are required") + try: + chatService = services.chat + chatService.interfaceDbComponent.updateFile(fileId, {"fileName": newName}) + return ToolResult( + toolCallId="", toolName="renameFile", success=True, + data=f"File {fileId} renamed to '{newName}'", + sideEvents=[{"type": "fileUpdated", "data": {"fileId": fileId, "fileName": newName}}], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="renameFile", success=False, error=str(e)) + + async def _readUrl(args: Dict[str, Any], context: Dict[str, Any]): + url = args.get("url", "") + if not url: + return ToolResult(toolCallId="", toolName="readUrl", success=False, error="url is required") + try: + webService = services.getService("web") + result = await webService._performWebCrawl( + instruction="Extract all content from this page", + urls=[url], + maxDepth=1, + maxWidth=1, + ) + if isinstance(result, list) and result: + content = "\n\n".join( + item.get("content", "") or item.get("text", "") or str(item) + for item in result if item + ) + elif isinstance(result, dict): + content = result.get("content", "") or result.get("summary", "") or str(result) + else: + content = str(result) if result else "No content retrieved" + _MAX = 30000 + if len(content) > _MAX: + content = content[:_MAX] + f"\n\n... (truncated at {_MAX} chars)" + return ToolResult(toolCallId="", toolName="readUrl", success=True, data=content) + except Exception as e: + return ToolResult(toolCallId="", toolName="readUrl", success=False, error=str(e)) + + async def _translateText(args: Dict[str, Any], context: Dict[str, Any]): + text = args.get("text", "") + targetLanguage = args.get("targetLanguage", "") + if not text or not targetLanguage: + return ToolResult(toolCallId="", toolName="translateText", success=False, error="text and targetLanguage are required") + try: + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + mandateId = context.get("mandateId", "") + voiceInterface = getVoiceInterface(currentUser=None, mandateId=mandateId) + sourceLanguage = args.get("sourceLanguage", "auto") + result = await voiceInterface.translateText(text, sourceLanguage=sourceLanguage, targetLanguage=targetLanguage) + if result and result.get("success"): + translated = result.get("translated_text", "") + return ToolResult(toolCallId="", toolName="translateText", success=True, data=translated) + return ToolResult(toolCallId="", toolName="translateText", success=False, error=result.get("error", "Translation failed")) + except Exception as e: + return ToolResult(toolCallId="", toolName="translateText", success=False, error=str(e)) + + registry.register( + "deleteFile", _deleteFile, + description="Delete a file from the workspace. Use when the user asks to remove or delete a file.", + parameters={ + "type": "object", + "properties": { + "fileId": {"type": "string", "description": "The file ID to delete"}, + }, + "required": ["fileId"] + }, + readOnly=False + ) + + registry.register( + "renameFile", _renameFile, + description="Rename a file in the workspace.", + parameters={ + "type": "object", + "properties": { + "fileId": {"type": "string", "description": "The file ID to rename"}, + "newName": {"type": "string", "description": "New file name including extension"}, + }, + "required": ["fileId", "newName"] + }, + readOnly=False + ) + + registry.register( + "readUrl", _readUrl, + description=( + "Read and extract content from a specific URL. " + "Use when the user provides a specific URL to read, or when you need to fetch content from a known web page. " + "For general information searches, use webSearch instead." + ), + parameters={ + "type": "object", + "properties": { + "url": {"type": "string", "description": "The URL to read"}, + }, + "required": ["url"] + }, + readOnly=True + ) + + registry.register( + "translateText", _translateText, + description=( + "Translate text to a target language using Google Cloud Translation. " + "More efficient than AI translation for large text volumes. " + "Use ISO language codes (e.g. 'en', 'de', 'fr', 'es', 'it', 'pt', 'zh', 'ja', 'ko', 'ar')." + ), + parameters={ + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to translate"}, + "targetLanguage": {"type": "string", "description": "Target language ISO code (e.g. 'en', 'de', 'fr')"}, + "sourceLanguage": {"type": "string", "description": "Source language ISO code (default: auto-detect)"}, + }, + "required": ["text", "targetLanguage"] + }, + readOnly=True + ) + + # ---- Phase 2: deleteFolder, renameFolder, moveFolder, copyFile, editFile ---- + + async def _deleteFolder(args: Dict[str, Any], context: Dict[str, Any]): + folderId = args.get("folderId", "") + recursive = args.get("recursive", False) + if not folderId: + return ToolResult(toolCallId="", toolName="deleteFolder", success=False, error="folderId is required") + try: + chatService = services.chat + result = chatService.interfaceDbComponent.deleteFolder(folderId, recursive=recursive) + summary = f"Deleted {result.get('deletedFolders', 1)} folder(s) and {result.get('deletedFiles', 0)} file(s)" + return ToolResult( + toolCallId="", toolName="deleteFolder", success=True, data=summary, + sideEvents=[{"type": "folderDeleted", "data": {"folderId": folderId, **result}}], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="deleteFolder", success=False, error=str(e)) + + async def _renameFolder(args: Dict[str, Any], context: Dict[str, Any]): + folderId = args.get("folderId", "") + newName = args.get("newName", "") + if not folderId or not newName: + return ToolResult(toolCallId="", toolName="renameFolder", success=False, error="folderId and newName are required") + try: + chatService = services.chat + chatService.interfaceDbComponent.renameFolder(folderId, newName) + return ToolResult( + toolCallId="", toolName="renameFolder", success=True, + data=f"Folder {folderId} renamed to '{newName}'", + sideEvents=[{"type": "folderUpdated", "data": {"folderId": folderId, "name": newName}}], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="renameFolder", success=False, error=str(e)) + + async def _moveFolder(args: Dict[str, Any], context: Dict[str, Any]): + folderId = args.get("folderId", "") + targetParentId = args.get("targetParentId") + if not folderId: + return ToolResult(toolCallId="", toolName="moveFolder", success=False, error="folderId is required") + try: + chatService = services.chat + chatService.interfaceDbComponent.moveFolder(folderId, targetParentId) + return ToolResult( + toolCallId="", toolName="moveFolder", success=True, + data=f"Folder {folderId} moved to {targetParentId or 'root'}", + sideEvents=[{"type": "folderUpdated", "data": {"folderId": folderId, "parentId": targetParentId}}], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="moveFolder", success=False, error=str(e)) + + async def _copyFile(args: Dict[str, Any], context: Dict[str, Any]): + fileId = args.get("fileId", "") + if not fileId: + return ToolResult(toolCallId="", toolName="copyFile", success=False, error="fileId is required") + try: + chatService = services.chat + copiedFile = chatService.interfaceDbComponent.copyFile( + fileId, + targetFolderId=args.get("targetFolderId"), + newFileName=args.get("newFileName"), + ) + return ToolResult( + toolCallId="", toolName="copyFile", success=True, + data=f"File copied as '{copiedFile.fileName}' (id: {copiedFile.id})", + sideEvents=[{ + "type": "fileCreated", + "data": {"fileId": copiedFile.id, "fileName": copiedFile.fileName, + "mimeType": copiedFile.mimeType, "fileSize": copiedFile.fileSize}, + }], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="copyFile", success=False, error=str(e)) + + async def _editFile(args: Dict[str, Any], context: Dict[str, Any]): + fileId = args.get("fileId", "") + content = args.get("content", "") + if not fileId or not content: + return ToolResult(toolCallId="", toolName="editFile", success=False, error="fileId and content are required") + try: + chatService = services.chat + dbMgmt = chatService.interfaceDbComponent + file = dbMgmt.getFile(fileId) + if not file: + return ToolResult(toolCallId="", toolName="editFile", success=False, error=f"File {fileId} not found") + if not dbMgmt.isTextMimeType(file.mimeType): + return ToolResult( + toolCallId="", toolName="editFile", success=False, + error=f"Cannot edit binary file ({file.mimeType}). Only text-based files are supported." + ) + contentBytes = content.encode("utf-8") + success = dbMgmt.updateFileData(fileId, contentBytes) + if not success: + return ToolResult(toolCallId="", toolName="editFile", success=False, error="Failed to update file data") + return ToolResult( + toolCallId="", toolName="editFile", success=True, + data=f"File '{file.fileName}' updated ({len(contentBytes)} bytes)", + sideEvents=[{ + "type": "fileUpdated", + "data": {"fileId": fileId, "fileName": file.fileName, "fileSize": len(contentBytes)}, + }], + ) + except Exception as e: + return ToolResult(toolCallId="", toolName="editFile", success=False, error=str(e)) + + registry.register( + "deleteFolder", _deleteFolder, + description="Delete a folder. Set recursive=true to delete folder with all contents.", + parameters={ + "type": "object", + "properties": { + "folderId": {"type": "string", "description": "The folder ID to delete"}, + "recursive": {"type": "boolean", "description": "If true, delete folder and all contents (files and subfolders). Default: false"}, + }, + "required": ["folderId"] + }, + readOnly=False + ) + + registry.register( + "renameFolder", _renameFolder, + description="Rename a folder. Folder names must be unique within their parent.", + parameters={ + "type": "object", + "properties": { + "folderId": {"type": "string", "description": "The folder ID to rename"}, + "newName": {"type": "string", "description": "New folder name"}, + }, + "required": ["folderId", "newName"] + }, + readOnly=False + ) + + registry.register( + "moveFolder", _moveFolder, + description="Move a folder to a different parent folder. Cannot move a folder into its own subtree.", + parameters={ + "type": "object", + "properties": { + "folderId": {"type": "string", "description": "The folder ID to move"}, + "targetParentId": {"type": "string", "description": "Target parent folder ID (null/omit for root)"}, + }, + "required": ["folderId"] + }, + readOnly=False + ) + + registry.register( + "copyFile", _copyFile, + description="Create a full copy of a file. The copy is independent and can be edited separately.", + parameters={ + "type": "object", + "properties": { + "fileId": {"type": "string", "description": "The file ID to copy"}, + "targetFolderId": {"type": "string", "description": "Target folder for the copy (default: same folder)"}, + "newFileName": {"type": "string", "description": "New file name (default: same name, auto-numbered if duplicate)"}, + }, + "required": ["fileId"] + }, + readOnly=False + ) + + registry.register( + "editFile", _editFile, + description=( + "Update the content of an existing text file. Only works for text-based files " + "(text/*, application/json, etc.). For binary files, create a new file instead." + ), + parameters={ + "type": "object", + "properties": { + "fileId": {"type": "string", "description": "The file ID to edit"}, + "content": {"type": "string", "description": "New file content (replaces entire file content)"}, + }, + "required": ["fileId", "content"] + }, + readOnly=False + ) + # ---- Connection tools (external data sources) ---- def _buildResolverDb(): @@ -803,11 +1175,18 @@ def _registerCoreTools(registry: ToolRegistry, services): fileName = path.split("/")[-1] or "downloaded_file" chatService = services.chat fileItem, _ = chatService.interfaceDbComponent.saveUploadedFile(fileBytes, fileName) + fid = fileItem.id if hasattr(fileItem, "id") else fileItem.get("id", "?") + fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "") + if fiId: + chatService.interfaceDbComponent.updateFile(fid, {"featureInstanceId": fiId}) + tempFolderId = _getOrCreateTempFolder(chatService) + if tempFolderId: + chatService.interfaceDbComponent.updateFile(fid, {"folderId": tempFolderId}) ext = fileName.rsplit(".", 1)[-1].lower() if "." in fileName else "" hint = "Use readFile to read text content." if ext in ("doc", "docx", "txt", "csv", "json", "xml", "html", "md", "rtf", "odt", "xls", "xlsx", "pptx") else "Use readFile to access the content." return ToolResult( toolCallId="", toolName="externalDownload", success=True, - data=f"Downloaded '{fileName}' ({len(fileBytes)} bytes) → local file id: {fileItem.id}. {hint}" + data=f"Downloaded '{fileName}' ({len(fileBytes)} bytes) → local file id: {fid}. {hint}" ) except Exception as e: return ToolResult(toolCallId="", toolName="externalDownload", success=False, error=str(e)) @@ -1710,6 +2089,9 @@ def _registerCoreTools(registry: ToolRegistry, services): fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "") if fiId: chatService.interfaceDbComponent.updateFile(fid, {"featureInstanceId": fiId}) + tempFolderId = _getOrCreateTempFolder(chatService) + if tempFolderId: + chatService.interfaceDbComponent.updateFile(fid, {"folderId": tempFolderId}) savedFiles.append(f"- {docName} (id: {fid})") sideEvents.append({ "type": "fileCreated", @@ -1947,6 +2329,9 @@ def _registerCoreTools(registry: ToolRegistry, services): fiId = context.get("featureInstanceId") or (services.featureInstanceId if services else "") if fiId: chatService.interfaceDbComponent.updateFile(fid, {"featureInstanceId": fiId}) + tempFolderId = _getOrCreateTempFolder(chatService) + if tempFolderId: + chatService.interfaceDbComponent.updateFile(fid, {"folderId": tempFolderId}) savedFiles.append(f"- {docName} (id: {fid})") sideEvents.append({ "type": "fileCreated", @@ -1984,3 +2369,144 @@ def _registerCoreTools(registry: ToolRegistry, services): }, readOnly=False, ) + + # ── Phase 3: speechToText, detectLanguage, neutralizeData, executeCode ── + + async def _speechToText(args: Dict[str, Any], context: Dict[str, Any]): + fileId = args.get("fileId", "") + if not fileId: + return ToolResult(toolCallId="", toolName="speechToText", success=False, error="fileId is required") + try: + chatService = services.chat + audioData = chatService.interfaceDbComponent.getFileData(fileId) + if not audioData: + return ToolResult(toolCallId="", toolName="speechToText", success=False, error=f"No data found for file {fileId}") + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + mandateId = context.get("mandateId", "") + voiceInterface = getVoiceInterface(currentUser=None, mandateId=mandateId) + language = args.get("language", "de-DE") + result = await voiceInterface.speechToText(audioData, language=language) + if result and result.get("success"): + transcript = result.get("text", "") + confidence = result.get("confidence", 0) + return ToolResult( + toolCallId="", toolName="speechToText", success=True, + data=f"Transcript (confidence: {confidence:.0%}):\n{transcript}" + ) + return ToolResult(toolCallId="", toolName="speechToText", success=False, error=result.get("error", "Transcription failed")) + except Exception as e: + return ToolResult(toolCallId="", toolName="speechToText", success=False, error=str(e)) + + async def _detectLanguage(args: Dict[str, Any], context: Dict[str, Any]): + text = args.get("text", "") + if not text: + return ToolResult(toolCallId="", toolName="detectLanguage", success=False, error="text is required") + try: + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + mandateId = context.get("mandateId", "") + voiceInterface = getVoiceInterface(currentUser=None, mandateId=mandateId) + result = await voiceInterface.detectLanguage(text) + if result and result.get("success"): + lang = result.get("language", "unknown") + return ToolResult(toolCallId="", toolName="detectLanguage", success=True, data=f"Detected language: {lang}") + return ToolResult(toolCallId="", toolName="detectLanguage", success=False, error=result.get("error", "Detection failed")) + except Exception as e: + return ToolResult(toolCallId="", toolName="detectLanguage", success=False, error=str(e)) + + async def _neutralizeData(args: Dict[str, Any], context: Dict[str, Any]): + text = args.get("text", "") + fileId = args.get("fileId", "") + if not text and not fileId: + return ToolResult(toolCallId="", toolName="neutralizeData", success=False, error="text or fileId is required") + try: + neutralizationService = services.getService("neutralization") + if not neutralizationService: + return ToolResult(toolCallId="", toolName="neutralizeData", success=False, error="Neutralization service not available") + if text: + result = neutralizationService.processText(text) + else: + result = neutralizationService.processFile(fileId) + if result: + neutralized = result.get("neutralized_text", "") or result.get("result", str(result)) + return ToolResult(toolCallId="", toolName="neutralizeData", success=True, data=neutralized) + return ToolResult(toolCallId="", toolName="neutralizeData", success=False, error="Neutralization returned no result") + except Exception as e: + return ToolResult(toolCallId="", toolName="neutralizeData", success=False, error=str(e)) + + async def _executeCode(args: Dict[str, Any], context: Dict[str, Any]): + code = args.get("code", "") + language = args.get("language", "python") + if not code: + return ToolResult(toolCallId="", toolName="executeCode", success=False, error="code is required") + if language != "python": + return ToolResult(toolCallId="", toolName="executeCode", success=False, error=f"Language '{language}' not supported. Only 'python' is available.") + try: + from modules.serviceCenter.services.serviceAgent.sandboxExecutor import executePython + result = await executePython(code) + if result.get("success"): + output = result.get("output", "(no output)") + return ToolResult(toolCallId="", toolName="executeCode", success=True, data=output) + error = result.get("error", "Execution failed") + tb = result.get("traceback", "") + return ToolResult(toolCallId="", toolName="executeCode", success=False, error=f"{error}\n{tb}" if tb else error) + except Exception as e: + return ToolResult(toolCallId="", toolName="executeCode", success=False, error=str(e)) + + registry.register( + "speechToText", _speechToText, + description="Transcribe an audio file to text. Provide the fileId of an audio file from the workspace.", + parameters={ + "type": "object", + "properties": { + "fileId": {"type": "string", "description": "Audio file ID from the workspace"}, + "language": {"type": "string", "description": "BCP-47 language code (e.g. 'de-DE', 'en-US'). Default: 'de-DE'"}, + }, + "required": ["fileId"] + }, + readOnly=True + ) + + registry.register( + "detectLanguage", _detectLanguage, + description="Detect the language of a text.", + parameters={ + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to analyze"}, + }, + "required": ["text"] + }, + readOnly=True + ) + + registry.register( + "neutralizeData", _neutralizeData, + description="Anonymize/neutralize text or file content. Replaces personal data (names, addresses, etc.) with placeholders. Does not modify the original.", + parameters={ + "type": "object", + "properties": { + "text": {"type": "string", "description": "Text to anonymize"}, + "fileId": {"type": "string", "description": "File ID to anonymize (alternative to text)"}, + }, + }, + readOnly=True + ) + + registry.register( + "executeCode", _executeCode, + description=( + "Execute Python code in a sandboxed environment for calculations and data analysis. " + "Available modules: math, statistics, json, csv, re, datetime, collections, itertools, functools, decimal, fractions, random. " + "No file system, network, or OS access. Max 30s execution time. " + "Use print() to produce output." + ), + parameters={ + "type": "object", + "properties": { + "code": {"type": "string", "description": "Python code to execute"}, + "language": {"type": "string", "description": "Programming language (only 'python' supported)", "default": "python"}, + }, + "required": ["code"] + }, + readOnly=True + ) diff --git a/modules/serviceCenter/services/serviceAgent/sandboxExecutor.py b/modules/serviceCenter/services/serviceAgent/sandboxExecutor.py new file mode 100644 index 00000000..1882d7eb --- /dev/null +++ b/modules/serviceCenter/services/serviceAgent/sandboxExecutor.py @@ -0,0 +1,108 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Sandboxed code execution for the AI agent executeCode tool.""" + +import logging +import signal +import sys +import io +import traceback +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +_PYTHON_ALLOWED_MODULES = { + "math", "statistics", "json", "csv", "re", "datetime", + "collections", "itertools", "functools", "decimal", "fractions", + "random", "string", "textwrap", "operator", "copy", +} + +_PYTHON_BLOCKED_BUILTINS = { + "open", "exec", "eval", "compile", "__import__", "globals", "locals", + "getattr", "setattr", "delattr", "breakpoint", "exit", "quit", + "input", "memoryview", "type", +} + +_MAX_EXECUTION_TIME_S = 30 +_MAX_OUTPUT_CHARS = 50000 + + +def _safeImport(name, *args, **kwargs): + """Restricted import that only allows whitelisted modules.""" + if name not in _PYTHON_ALLOWED_MODULES: + raise ImportError(f"Module '{name}' is not allowed. Permitted: {', '.join(sorted(_PYTHON_ALLOWED_MODULES))}") + return __builtins__["__import__"](name, *args, **kwargs) if isinstance(__builtins__, dict) else __import__(name, *args, **kwargs) + + +def _buildRestrictedGlobals() -> Dict[str, Any]: + """Build a restricted globals dict for exec().""" + import builtins + safeBuiltins = {} + for name in dir(builtins): + if name.startswith("_"): + continue + if name in _PYTHON_BLOCKED_BUILTINS: + continue + safeBuiltins[name] = getattr(builtins, name) + + safeBuiltins["__import__"] = _safeImport + safeBuiltins["__name__"] = "__sandbox__" + safeBuiltins["__builtins__"] = safeBuiltins + + for modName in _PYTHON_ALLOWED_MODULES: + try: + safeBuiltins[modName] = __import__(modName) + except ImportError: + pass + + return {"__builtins__": safeBuiltins} + + +async def executePython(code: str) -> Dict[str, Any]: + """Execute Python code in a restricted sandbox. Returns {success, output, error}.""" + import asyncio + + def _run(): + restrictedGlobals = _buildRestrictedGlobals() + capturedOutput = io.StringIO() + oldStdout = sys.stdout + oldStderr = sys.stderr + + try: + sys.stdout = capturedOutput + sys.stderr = capturedOutput + + if sys.platform != "win32": + signal.signal(signal.SIGALRM, lambda *_: (_ for _ in ()).throw(TimeoutError("Execution timed out"))) + signal.alarm(_MAX_EXECUTION_TIME_S) + + exec(compile(code, "", "exec"), restrictedGlobals) + + if sys.platform != "win32": + signal.alarm(0) + + output = capturedOutput.getvalue() + if len(output) > _MAX_OUTPUT_CHARS: + output = output[:_MAX_OUTPUT_CHARS] + f"\n... (truncated at {_MAX_OUTPUT_CHARS} chars)" + return {"success": True, "output": output} + + except TimeoutError: + return {"success": False, "error": f"Execution timed out after {_MAX_EXECUTION_TIME_S}s"} + except Exception as e: + tb = traceback.format_exc() + return {"success": False, "error": f"{type(e).__name__}: {e}", "traceback": tb} + finally: + sys.stdout = oldStdout + sys.stderr = oldStderr + if sys.platform != "win32": + signal.alarm(0) + + loop = asyncio.get_event_loop() + try: + result = await asyncio.wait_for( + loop.run_in_executor(None, _run), + timeout=_MAX_EXECUTION_TIME_S + 5, + ) + return result + except asyncio.TimeoutError: + return {"success": False, "error": f"Execution timed out after {_MAX_EXECUTION_TIME_S}s"} diff --git a/modules/serviceCenter/services/serviceBilling/stripeCheckout.py b/modules/serviceCenter/services/serviceBilling/stripeCheckout.py index 692e5087..9f3f7e68 100644 --- a/modules/serviceCenter/services/serviceBilling/stripeCheckout.py +++ b/modules/serviceCenter/services/serviceBilling/stripeCheckout.py @@ -7,6 +7,7 @@ Creates Checkout Sessions for redirect-based payment flow. import logging from typing import Optional +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit from modules.shared.configuration import APP_CONFIG @@ -16,10 +17,45 @@ logger = logging.getLogger(__name__) ALLOWED_AMOUNTS_CHF = [10, 25, 50, 100, 250, 500] +def _normalizeReturnUrl(returnUrl: str) -> str: + """ + Validate and normalize an absolute frontend return URL. + + Allowed examples: + - https://nyla.poweron-center.net/billing/transactions + - https://nyla-int.poweron-center.net/billing/transactions?tab=overview + """ + if not returnUrl: + raise ValueError("returnUrl is required") + + parsed = urlsplit(returnUrl.strip()) + + if parsed.scheme not in ("http", "https"): + raise ValueError("returnUrl must use http or https") + + if not parsed.netloc: + raise ValueError("returnUrl must contain a host") + + if parsed.username or parsed.password: + raise ValueError("returnUrl must not contain credentials") + + query_items = [ + (key, value) + for key, value in parse_qsl(parsed.query, keep_blank_values=True) + if key not in {"success", "canceled", "session_id"} + ] + normalized_query = urlencode(query_items, doseq=True) + normalized_path = parsed.path or "/" + + # Keep scheme + host + path + query, strip fragment for deterministic redirects. + return urlunsplit((parsed.scheme, parsed.netloc, normalized_path, normalized_query, "")) + + def create_checkout_session( mandate_id: str, user_id: Optional[str], - amount_chf: float + amount_chf: float, + return_url: str ) -> str: """ Create a Stripe Checkout Session for credit top-up. @@ -58,10 +94,10 @@ def create_checkout_session( stripe.api_key = secret_key - frontend_url = APP_CONFIG.get("APP_FRONTEND_URL", "https://nyla-int.poweron-center.net") - base_path = "/admin/billing" - success_url = f"{frontend_url.rstrip('/')}{base_path}?success=true&session_id={{CHECKOUT_SESSION_ID}}" - cancel_url = f"{frontend_url.rstrip('/')}{base_path}?canceled=true" + base_return_url = _normalizeReturnUrl(return_url) + query_separator = "&" if "?" in base_return_url else "?" + success_url = f"{base_return_url}{query_separator}success=true&session_id={{CHECKOUT_SESSION_ID}}" + cancel_url = f"{base_return_url}{query_separator}canceled=true" # Amount in cents for Stripe (CHF uses 2 decimal places) amount_cents = int(round(amount_chf * 100)) diff --git a/modules/serviceCenter/services/serviceChat/mainServiceChat.py b/modules/serviceCenter/services/serviceChat/mainServiceChat.py index 026dc70c..3ec1c504 100644 --- a/modules/serviceCenter/services/serviceChat/mainServiceChat.py +++ b/modules/serviceCenter/services/serviceChat/mainServiceChat.py @@ -492,24 +492,17 @@ class ChatService: """List file folders for the current user. Args: - parentId: Parent folder ID (None = root folders). + parentId: Optional parent folder ID to filter by. + None = return ALL folders (for tree building). Returns: List of folder dicts. """ - from modules.datamodels.datamodelFileFolder import FileFolder - recordFilter = {"_createdBy": self.user.id if self.user else ""} - if parentId is not None: - recordFilter["parentId"] = parentId - else: - recordFilter["parentId"] = None - return self.interfaceDbComponent.db.getRecordset(FileFolder, recordFilter=recordFilter) + return self.interfaceDbComponent.listFolders(parentId=parentId) def createFolder(self, name: str, parentId: str = None) -> Dict[str, Any]: - """Create a new file folder.""" - from modules.datamodels.datamodelFileFolder import FileFolder - folder = FileFolder(name=name, parentId=parentId) - return self.interfaceDbComponent.db.recordCreate(FileFolder, folder) + """Create a new file folder with unique name validation.""" + return self.interfaceDbComponent.createFolder(name=name, parentId=parentId) # ---- DataSource CRUD ---- diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererDocx.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererDocx.py index e580b07d..850f6aa8 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererDocx.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererDocx.py @@ -545,6 +545,10 @@ class RendererDocx(BaseRenderer): # Append table to document body body.append(tbl) + # Add an empty paragraph after the table to prevent Word from merging consecutive tables + separatorParagraph = OxmlElement('w:p') + body.append(separatorParagraph) + total_time = time.time() - create_start totalCells = (rowCount + 1) * len(headers) rate = totalCells / total_time if total_time > 0 else 0 @@ -1352,6 +1356,9 @@ class RendererDocx(BaseRenderer): # Style the table self._styleTable(table) + # Add an empty paragraph after the table to prevent Word from merging consecutive tables + doc.add_paragraph() + except Exception as e: self.logger.warning(f"Could not add table: {str(e)}") @@ -1505,6 +1512,9 @@ class RendererDocx(BaseRenderer): for run in paragraph.runs: run.bold = True + # Add an empty paragraph after the table to prevent Word from merging consecutive tables + doc.add_paragraph() + # Add placeholder to mark where table was inserted processed_lines.append(f"[TABLE_INSERTED_{len(processed_lines)}]")