diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..bf5e88f5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,74 @@ +# Docker ignore file for Gateway +# Excludes unnecessary files from Docker build context + +# Git +.git +.gitignore +.github + +# Python +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +*.so +*.egg +*.egg-info +dist +build +venv +env +ENV +.venv + +# IDE +.vscode +.idea +*.swp +*.swo +*~ + +# Environment files (env_gcp.env will be copied as .env by workflow) +env_*.env +.env.local +# Note: .env is NOT ignored - it will be created from env_gcp.env by the workflow + +# Logs +*.log +logs/ +local/logs/ + +# Tests +tests/ +.pytest_cache +.coverage +htmlcov/ + +# Documentation +docs/ +*.md +README.txt +LICENSE.txt + +# Local development files +local/ +*.txt +!requirements.txt + +# Debug files +debug/ +test-chat/ + +# OS files +.DS_Store +Thumbs.db + +# Temporary files +tmp/ +temp/ +*.tmp + +# Build artifacts +*.zip +release.zip diff --git a/.gcloudignore b/.gcloudignore new file mode 100644 index 00000000..ca683390 --- /dev/null +++ b/.gcloudignore @@ -0,0 +1,73 @@ +# Google Cloud Build ignore file +# Similar to .dockerignore but for Cloud Build + +# Git +.git +.gitignore +.github + +# Python +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +*.so +*.egg +*.egg-info +dist +build +venv +env +ENV +.venv + +# IDE +.vscode +.idea +*.swp +*.swo +*~ + +# Environment files (will be handled separately) +env_*.env +.env.local + +# Logs +*.log +logs/ +local/logs/ + +# Tests +tests/ +.pytest_cache +.coverage +htmlcov/ + +# Documentation +docs/ +*.md +README.txt +LICENSE.txt + +# Local development files +local/ +*.txt +!requirements.txt + +# Debug files +debug/ +test-chat/ + +# OS files +.DS_Store +Thumbs.db + +# Temporary files +tmp/ +temp/ +*.tmp + +# Build artifacts +*.zip +release.zip diff --git a/.github/workflows/deploy-gcp.yml b/.github/workflows/deploy-gcp.yml new file mode 100644 index 00000000..7cfb2afa --- /dev/null +++ b/.github/workflows/deploy-gcp.yml @@ -0,0 +1,119 @@ +# GitHub Actions workflow for deploying Gateway to Google Cloud Run +# Documentation: https://cloud.google.com/run/docs/deploying +# +# Required GitHub Secrets: +# - GCP_PROJECT_ID: Your Google Cloud Project ID +# - GCP_SA_KEY: Service Account JSON key with Cloud Run Admin and Cloud Build Editor roles +# - GCP_SERVICE_ACCOUNT_EMAIL: Email of the service account to run Cloud Run service as +# +# Required Google Cloud Setup: +# 1. Create a service account with Cloud Run Admin and Cloud Build Editor roles +# 2. Create secret "CONFIG_KEY" in Secret Manager with your master key +# 3. Grant the service account access to Secret Manager secrets +# 4. Create Cloud SQL instance (if not exists) +# 5. Create env_gcp.env file with your configuration + +name: Deploy Gateway to Google Cloud Run + +on: + push: + branches: + - main + paths: + - 'gateway/**' + workflow_dispatch: + inputs: + environment: + description: 'Environment to deploy to' + required: true + default: 'prod' + type: choice + options: + - prod + - int + +env: + PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} + SERVICE_NAME: gateway-prod + REGION: europe-west6 # Zurich region + ENV_FILE: env_gcp.env + +jobs: + deploy: + runs-on: ubuntu-latest + permissions: + contents: read + id-token: write # Required for Workload Identity Federation + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v2 + with: + credentials_json: ${{ secrets.GCP_SA_KEY }} + # Alternative: Use Workload Identity Federation (more secure) + # workload_identity_provider: ${{ secrets.WIF_PROVIDER }} + # service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }} + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@v2 + + - name: Configure Docker for GCR + run: | + gcloud auth configure-docker + + - name: Set productive environment + run: | + cd gateway + if [ -f "${{ env.ENV_FILE }}" ]; then + cp ${{ env.ENV_FILE }} .env + else + echo "Warning: ${{ env.ENV_FILE }} not found, using env_prod.env as fallback" + cp env_prod.env .env + fi + # Clean up other env files (optional, for security) + rm -f env_*.env + + - name: Build and push container image + working-directory: ./gateway + run: | + # Build container image using Cloud Build + # If Dockerfile exists, it will be used; otherwise Cloud Buildpacks will be used + gcloud builds submit \ + --tag gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:${{ github.sha }} \ + --tag gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:latest \ + --project ${{ env.PROJECT_ID }} + + - name: Deploy to Cloud Run + run: | + gcloud run deploy ${{ env.SERVICE_NAME }} \ + --image gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:${{ github.sha }} \ + --region ${{ env.REGION }} \ + --platform managed \ + --allow-unauthenticated \ + --project ${{ env.PROJECT_ID }} \ + --set-env-vars "APP_ENV_TYPE=prod" \ + --set-secrets "CONFIG_KEY=CONFIG_KEY:latest" \ + --memory 2Gi \ + --cpu 2 \ + --timeout 300 \ + --max-instances 10 \ + --min-instances 1 \ + --port 8000 \ + --service-account ${{ secrets.GCP_SERVICE_ACCOUNT_EMAIL }} + + - name: Get service URL + id: service-url + run: | + SERVICE_URL=$(gcloud run services describe ${{ env.SERVICE_NAME }} \ + --region ${{ env.REGION }} \ + --project ${{ env.PROJECT_ID }} \ + --format 'value(status.url)') + echo "url=$SERVICE_URL" >> $GITHUB_OUTPUT + + - name: Output deployment URL + run: | + echo "🚀 Deployment successful!" + echo "Service URL: ${{ steps.service-url.outputs.url }}" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..ace78e12 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,49 @@ +# Dockerfile for PowerOn Gateway - Google Cloud Run +# Python 3.11 base image optimized for Cloud Run + +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Set environment variables +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + NUMEXPR_MAX_THREADS=12 \ + PORT=8000 + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + postgresql-client \ + libpq-dev \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir -r requirements.txt + +# Copy application code (includes .env file created by workflow from env_gcp.env) +COPY . . + +# Create directories for logs (Cloud Run uses /tmp for writable storage) +RUN mkdir -p /tmp/logs /tmp/debug + +# Note: .env file (created from env_gcp.env by workflow) contains encrypted secrets +# These are decrypted at runtime using the master key from Secret Manager +# (mounted as CONFIG_KEY environment variable in Cloud Run) + +# Expose port (Cloud Run sets PORT env var, but we default to 8000) +EXPOSE 8000 + +# Health check for Cloud Run +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD python -c "import requests; requests.get('http://localhost:8000/api/admin/health', timeout=5)" || exit 1 + +# Run the application +# Cloud Run will set PORT env var, uvicorn reads it automatically +CMD exec uvicorn app:app --host 0.0.0.0 --port ${PORT:-8000} --workers 1 diff --git a/app.py b/app.py index 5f7593a8..84324a90 100644 --- a/app.py +++ b/app.py @@ -234,6 +234,7 @@ def initLogging(): "asyncio", "fastapi.security.oauth2", "msal", + "azure.core.pipeline.policies.http_logging_policy", ] for loggerName in noisyLoggers: logging.getLogger(loggerName).setLevel(logging.WARNING) diff --git a/config.ini b/config.ini index 88a80b11..c01c324c 100644 --- a/config.ini +++ b/config.ini @@ -4,6 +4,8 @@ # Auth configuration Auth_ALGORITHM = HS256 Auth_TOKEN_TYPE = bearer +Auth_RESET_TOKEN_EXPIRY_HOURS = 24 + # File management configuration File_Management_MAX_UPLOAD_SIZE_MB = 50 diff --git a/docs/STREAMING_UTILITY_ARCHITECTURE.md b/docs/STREAMING_UTILITY_ARCHITECTURE.md deleted file mode 100644 index 368fb374..00000000 --- a/docs/STREAMING_UTILITY_ARCHITECTURE.md +++ /dev/null @@ -1,314 +0,0 @@ -# Streaming Utility Architecture: Transforming eventManager into a Shared Utility - -## Current State Analysis - -### Existing Implementation -The `eventManager.py` in `modules/features/chatbot/` currently provides: -- **Event Queue Management**: Per-workflow asyncio queues for SSE streaming -- **Event Emission**: `emit_event()` method for chatbot-specific events -- **Cleanup**: Automatic queue cleanup after workflow completion -- **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint - -### Current Limitations - -1. **Chatbot-Specific**: Hardcoded for chatbot workflows only -2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE -3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it -4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming - -### Frontend Polling Pattern -Currently, the frontend uses: -- `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds -- `useWorkflowLifecycle.ts` - Manages polling lifecycle and state -- Rate limit handling and backoff logic for failed polls - -## Proposed Architecture: Shared Streaming Utility - -### 1. Generic Event Manager (`modules/shared/streamingManager.py`) - -```python -class StreamingEventManager: - """ - Generic event manager for real-time streaming across all features. - Supports multiple event types and contexts (workflows, documents, tasks, etc.) - """ - - def __init__(self): - self._queues: Dict[str, asyncio.Queue] = {} - self._locks: Dict[str, asyncio.Lock] = {} - self._cleanup_tasks: Dict[str, asyncio.Task] = {} - self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids - - async def emit_event( - self, - context_id: str, # workflow_id, document_id, task_id, etc. - event_type: str, # "message", "log", "status", "progress", "complete", "error" - data: Dict[str, Any], # Flexible data structure - event_category: str = "default" # "chat", "workflow", "document", etc. - ): - """Emit event to all subscribers of a context""" - - def create_stream( - self, - context_id: str, - event_categories: Optional[List[str]] = None # Filter by category - ) -> asyncio.Queue: - """Create a new stream for a context""" - - async def stream_events( - self, - context_id: str, - event_categories: Optional[List[str]] = None - ) -> AsyncIterator[Dict[str, Any]]: - """Async generator for streaming events""" -``` - -### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`) - -```python -def create_sse_stream( - context_id: str, - event_categories: Optional[List[str]] = None, - initial_data_callback: Optional[Callable] = None, - timeout: float = 300.0 -) -> StreamingResponse: - """ - Create a generic SSE streaming response. - - Args: - context_id: Workflow ID, document ID, or other context identifier - event_categories: Filter events by category (e.g., ["chat", "workflow"]) - initial_data_callback: Optional function to fetch initial state - timeout: Stream timeout in seconds - """ - streaming_manager = get_streaming_manager() - - async def event_stream(): - # Send initial data if callback provided - if initial_data_callback: - initial_data = await initial_data_callback(context_id) - yield format_sse_event("initial", initial_data) - - # Stream events from manager - async for event in streaming_manager.stream_events(context_id, event_categories): - yield format_sse_event(event["type"], event["data"]) - - return StreamingResponse( - event_stream(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no" - } - ) -``` - -### 3. Integration Points - -#### A. Workflow Processing -```python -# In workflow processing code -from modules.shared.streamingManager import get_streaming_manager - -streaming_manager = get_streaming_manager() - -# Emit progress updates -await streaming_manager.emit_event( - context_id=workflow_id, - event_type="progress", - data={"step": "analyzing", "message": "Processing documents..."}, - event_category="workflow" -) - -# Emit new messages -await streaming_manager.emit_event( - context_id=workflow_id, - event_type="message", - data={"role": "assistant", "content": "Response text"}, - event_category="chat" -) -``` - -#### B. Route Endpoints -```python -# Generic streaming endpoint for any context -@router.get("/{contextId}/stream") -async def stream_context_updates( - contextId: str, - categories: Optional[str] = Query(None), # Comma-separated categories - currentUser: User = Depends(getCurrentUser) -): - event_categories = categories.split(",") if categories else None - - # Optional: Fetch initial state - async def get_initial_data(ctx_id: str): - interfaceDbChat = getServiceChat(currentUser) - return interfaceDbChat.getUnifiedChatData(ctx_id, None) - - return create_sse_stream( - context_id=contextId, - event_categories=event_categories, - initial_data_callback=get_initial_data - ) -``` - -## Benefits of Streaming vs Polling - -### Performance -- **Reduced Server Load**: No constant database queries every 0.5-3 seconds -- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) -- **Bandwidth Efficiency**: Only send data when it changes, not empty responses - -### User Experience -- **Real-time Updates**: Users see progress instantly -- **Better Responsiveness**: No perceived delay from polling intervals -- **Reduced Battery**: Mobile devices consume less power without constant polling - -### Scalability -- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) -- **Connection Management**: Better handling of many concurrent streams -- **Resource Efficiency**: One persistent connection vs many HTTP requests - -## Migration Strategy - -### Phase 1: Create Shared Utility -1. Move `eventManager.py` → `modules/shared/streamingManager.py` -2. Generalize for any context type (not just workflows) -3. Add event categorization and filtering -4. Create `sseUtils.py` helper functions - -### Phase 2: Update Chatbot Feature -1. Update chatbot to use shared streaming manager -2. Replace internal polling in SSE endpoint with pure event-driven streaming -3. Emit events directly when data changes (in database write operations) - -### Phase 3: Migrate Other Features -1. **Workflows**: Add streaming to workflow processing -2. **Document Generation**: Stream document creation progress -3. **Data Processing**: Stream extraction/transformation progress -4. **Any Long-Running Task**: Use streaming for status updates - -### Phase 4: Frontend Migration -1. Replace `useWorkflowPolling` with SSE EventSource connections -2. Create generic `useStreaming` hook -3. Update all components to use streaming instead of polling -4. Remove polling logic entirely - -## Implementation Details - -### Event-Driven Data Emission - -Instead of polling `getUnifiedChatData()`, emit events when data changes: - -```python -# In interfaceDbChatObjects.py - when creating a message -def createMessage(self, workflowId: str, message: ChatMessage): - # ... existing database write ... - - # Emit streaming event - from modules.shared.streamingManager import get_streaming_manager - streaming_manager = get_streaming_manager() - asyncio.create_task(streaming_manager.emit_event( - context_id=workflowId, - event_type="message", - data={ - "type": "message", - "createdAt": message.publishedAt, - "item": message.dict() - }, - event_category="chat" - )) -``` - -### Frontend Integration - -```typescript -// Generic streaming hook -function useStreaming( - contextId: string, - categories?: string[], - onEvent?: (event: T) => void -) { - useEffect(() => { - const eventSource = new EventSource( - `/api/stream/${contextId}?categories=${categories?.join(',')}` - ); - - eventSource.onmessage = (e) => { - const event = JSON.parse(e.data); - onEvent?.(event); - }; - - return () => eventSource.close(); - }, [contextId, categories]); -} -``` - -## Key Design Decisions - -### 1. Context-Based Streaming -- Use generic `context_id` instead of `workflow_id` -- Supports workflows, documents, tasks, user sessions, etc. - -### 2. Event Categories -- Allow filtering by category (chat, workflow, document, etc.) -- Enables multiple features to stream from same context - -### 3. Backward Compatibility -- Keep existing polling endpoints during migration -- Gradually migrate features one at a time -- Frontend can use both during transition - -### 4. Error Handling -- Graceful degradation if streaming unavailable -- Automatic reconnection logic in frontend -- Fallback to polling if SSE fails - -## Example: Complete Flow - -### Backend: Workflow Processing -```python -async def process_workflow(workflow_id: str): - streaming = get_streaming_manager() - - # Emit status update - await streaming.emit_event(workflow_id, "status", - {"status": "running"}, "workflow") - - # Process and emit messages - result = await ai_call(...) - await streaming.emit_event(workflow_id, "message", - {"role": "assistant", "content": result}, "chat") - - # Emit completion - await streaming.emit_event(workflow_id, "complete", - {"status": "completed"}, "workflow") -``` - -### Frontend: React Hook -```typescript -function ChatComponent({ workflowId }: { workflowId: string }) { - const [messages, setMessages] = useState([]); - - useStreaming(workflowId, ["chat"], (event) => { - if (event.type === "message") { - setMessages(prev => [...prev, event.item]); - } - }); - - return ; -} -``` - -## Conclusion - -By transforming `eventManager.py` into a shared streaming utility: - -1. **Eliminates Polling**: All features can stream updates in real-time -2. **Improves Performance**: Reduces server load and latency -3. **Better UX**: Instant updates instead of polling delays -4. **Reusable**: Any feature can use streaming with minimal code -5. **Scalable**: Foundation for future real-time features - -The migration can be done incrementally, feature by feature, without breaking existing functionality. diff --git a/docs/WEBSEARCH_FIXES.md b/docs/WEBSEARCH_FIXES.md deleted file mode 100644 index 420a9845..00000000 --- a/docs/WEBSEARCH_FIXES.md +++ /dev/null @@ -1,134 +0,0 @@ -# Web Search Content Extraction Fixes - -## Problem Summary - -The Tavily web search integration was failing to extract content from search results, causing web research to return empty or incomplete data. The main issues were related to handling `None` values and incomplete error recovery. - -## Main Issues Fixed - -### 1. Incomplete Content Extraction from Search Results - -**Problem:** -- When Tavily API returned search results, some results had `raw_content` set to `None` (not missing, but explicitly `None`) -- The code used `result.get("raw_content") or result.get("content", "")` which failed when `raw_content` existed but was `None` -- This caused `None` values to propagate through the system instead of falling back to the `content` field or empty string - -**Fix:** -Changed the content extraction in `aicorePluginTavily.py` to properly handle `None` values: -```python -# Before (line 344): -rawContent=result.get("raw_content") or result.get("content", "") - -# After: -rawContent=result.get("raw_content") or result.get("content") or "" -``` - -This ensures that if `raw_content` is `None`, it falls back to `content`, and if that's also `None`, it defaults to an empty string. - -**Additional Fix:** -Added defensive checks in the `webSearch` method to safely extract content even when result objects have unexpected structures: -```python -# Safely extract content with multiple fallbacks -content = "" -if hasattr(result, 'rawContent'): - content = result.rawContent or "" -if not content and hasattr(result, 'content'): - content = result.content or "" -``` - -### 2. NoneType Error When Logging Content Length - -**Problem:** -- Code attempted to check `len(first_result.get('raw_content', ''))` for logging -- When `raw_content` key existed but value was `None`, `.get()` returned `None` instead of the default `''` -- This caused `len(None)` to fail with `TypeError: object of type 'NoneType' has no len()` - -**Fix:** -Changed the logging code to safely handle `None` values: -```python -# Before (line 338): -logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(first_result.get('raw_content', ''))}") - -# After: -raw_content = first_result.get('raw_content') or '' -logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}") -``` - -### 3. Missing Error Recovery in Content Extraction - -**Problem:** -- When processing search results, if one result failed to extract, the entire extraction could fail -- No recovery mechanism to extract at least URLs even when content extraction failed -- Errors were logged but processing stopped, losing potentially useful data - -**Fix:** -Added per-result error handling with recovery: -```python -for result in searchResults: - try: - # Extract URL, content, title safely - # ... extraction logic ... - except Exception as resultError: - logger.warning(f"Error processing individual search result: {resultError}") - # Continue processing other results instead of failing completely - continue -``` - -Also added recovery at the extraction level: -```python -except Exception as extractionError: - logger.error(f"Error extracting URLs and content from search results: {extractionError}") - # Try to recover at least URLs - try: - urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url] - logger.info(f"Recovered {len(urls)} URLs after extraction error") - except Exception: - logger.error("Failed to recover any URLs from search results") -``` - -### 4. Incomplete Crawl Result Processing - -**Problem:** -- When crawl returned results but individual page processing failed, entire crawl was lost -- No fallback to extract at least URLs from failed crawl results -- Missing content fields could cause errors when formatting results - -**Fix:** -Added error handling for individual page processing: -```python -for i, result in enumerate(crawlResults, 1): - try: - # Format page content - # ... formatting logic ... - except Exception as pageError: - logger.warning(f"Error formatting page {i} from crawl: {pageError}") - # Try to add at least the URL - try: - pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url) - except Exception: - pass -``` - -Also ensured all result fields have safe defaults: -```python -results.append(WebCrawlResult( - url=result_url or url, # Fallback to base URL - content=result_content, # Already ensured to be string - title=result_title # Already ensured to be string -)) -``` - -## Impact - -These fixes ensure that: -1. **Content is always extracted** - Even when `raw_content` is `None`, the system falls back to `content` field or empty string -2. **Partial results are preserved** - If some results fail, others are still processed and returned -3. **URLs are recovered** - Even when content extraction fails completely, URLs can still be extracted for crawling -4. **No crashes from None values** - All `None` values are properly handled before operations like `len()` are called - -## Testing Recommendations - -- Test with Tavily search results that have `raw_content` set to `None` -- Test with mixed results (some with content, some without) -- Test error recovery when individual results fail -- Verify that URLs are still extracted even when content extraction fails diff --git a/env_dev.env b/env_dev.env index a2aca22d..19d718d1 100644 --- a/env_dev.env +++ b/env_dev.env @@ -88,3 +88,8 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = None # Preprocessor API Configuration PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990 PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query + +# Azure Communication Services Email Configuration +MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt +MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss + diff --git a/env_int.env b/env_int.env index bee09a6f..7047dc6e 100644 --- a/env_int.env +++ b/env_int.env @@ -88,3 +88,7 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = INT_ENC:Z0FBQUFBQnBaSnM4UkNBelhvckxCQUVjZm94N3B # Preprocessor API Configuration PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990 PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query + +# Azure Communication Services Email Configuration +MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt +MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss diff --git a/env_prod.env b/env_prod.env index e41d2486..32cfe3ad 100644 --- a/env_prod.env +++ b/env_prod.env @@ -88,3 +88,7 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4RVRmYW5IelNIbklTUDZIME # Preprocessor API Configuration PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990 PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query + +# Azure Communication Services Email Configuration +MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt +MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py index 554e7fd6..adea38b0 100644 --- a/modules/datamodels/datamodelUam.py +++ b/modules/datamodels/datamodelUam.py @@ -165,8 +165,14 @@ registerModelLabels( class UserInDB(User): hashedPassword: Optional[str] = Field(None, description="Hash of the user password") + resetToken: Optional[str] = Field(None, description="Password reset token (UUID)") + resetTokenExpires: Optional[float] = Field(None, description="Reset token expiration (UTC timestamp in seconds)") registerModelLabels( "UserInDB", {"en": "User Access", "fr": "Accès de l'utilisateur"}, - {"hashedPassword": {"en": "Password hash", "fr": "Hachage de mot de passe"}}, + { + "hashedPassword": {"en": "Password hash", "fr": "Hachage de mot de passe"}, + "resetToken": {"en": "Reset Token", "fr": "Jeton de réinitialisation"}, + "resetTokenExpires": {"en": "Reset Token Expires", "fr": "Expiration du jeton"}, + }, ) diff --git a/modules/interfaces/interfaceDbAppObjects.py b/modules/interfaces/interfaceDbAppObjects.py index dd9aef3d..cab79d76 100644 --- a/modules/interfaces/interfaceDbAppObjects.py +++ b/modules/interfaces/interfaceDbAppObjects.py @@ -587,6 +587,11 @@ class AppObjects: # Get the full user record with password hash for verification userRecord = self.db.getRecordset(UserInDB, recordFilter={"id": user.id})[0] + + # Check if user has a reset token set (password reset required) + if userRecord.get("resetToken"): + raise ValueError("Passwort-Zurücksetzung erforderlich. Bitte prüfen Sie Ihre E-Mail.") + if not userRecord.get("hashedPassword"): raise ValueError("User has no password set") @@ -613,11 +618,14 @@ class AppObjects: try: # Ensure username is a string username = str(username).strip() + + # Check global username uniqueness (across ALL mandates) + if not self.isUsernameGloballyUnique(username): + raise ValueError(f"Username '{username}' is already taken") - # Validate password for local authentication - if authenticationAuthority == AuthAuthority.LOCAL: - if not password: - raise ValueError("Password is required for local authentication") + # For LOCAL auth: password is set exclusively via magic link, not during registration + # Password parameter is only used for internal/admin operations (e.g., resetUserPassword) + if password: if not isinstance(password, str): raise ValueError("Password must be a string") if not password.strip(): @@ -740,6 +748,277 @@ class AppObjects: """Enables a user if current user has permission.""" return self.updateUser(userId, {"enabled": True}) + def resetUserPassword(self, userId: str, newPassword: str) -> bool: + """Reset a user's password (admin function).""" + try: + if not newPassword or len(newPassword) < 8: + raise ValueError("Password must be at least 8 characters long") + + hashedPassword = self._getPasswordHash(newPassword) + self.db.recordModify(UserInDB, userId, {"hashedPassword": hashedPassword}) + logger.info(f"Password reset for user {userId}") + return True + except Exception as e: + logger.error(f"Error resetting password for user {userId}: {str(e)}") + return False + + def generateResetTokenAndExpiry(self) -> tuple: + """Generate a new reset token and expiration timestamp. + + Returns: + tuple: (tokenUuid: str, expiresTimestamp: float) + """ + token = str(uuid.uuid4()) + expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) + expires = getUtcTimestamp() + (expiryHours * 3600) + return token, expires + + def findUserByEmailLocalAuth(self, email: str) -> Optional[User]: + """Find LOCAL auth user by email (searches across all mandates). + + Note: If multiple users exist with the same email (in different mandates), + this returns only the first one. Use findAllUsersByEmailLocalAuth() to get all. + + Args: + email: Email address to search for (case-insensitive) + + Returns: + User if found, None otherwise + """ + users = self.findAllUsersByEmailLocalAuth(email) + return users[0] if users else None + + def findAllUsersByEmailLocalAuth(self, email: str) -> List[User]: + """Find ALL LOCAL auth users by email (searches across all mandates). + + Use this when a user might have multiple accounts with the same email + in different mandates. + + Args: + email: Email address to search for (case-insensitive) + + Returns: + List of Users (empty list if none found) + """ + if not email: + return [] + + normalizedEmail = email.lower().strip() + + try: + # Search directly without RBAC for cross-mandate search + users = self.db.getRecordset( + UserInDB, + recordFilter={ + "email": normalizedEmail, + "authenticationAuthority": AuthAuthority.LOCAL.value + } + ) + + result = [] + for userRecord in users: + cleanedUser = {k: v for k, v in userRecord.items() if not k.startswith("_")} + if cleanedUser.get("roleLabels") is None: + cleanedUser["roleLabels"] = [] + result.append(User(**cleanedUser)) + + return result + except Exception as e: + logger.error(f"Error finding users by email: {str(e)}") + return [] + + def findUserByEmailAndUsernameLocalAuth(self, email: str, username: str) -> Optional[User]: + """Find LOCAL auth user by email AND username combination. + + This uniquely identifies a user even if they have multiple accounts + with the same email in different mandates. + + Args: + email: Email address to search for (case-insensitive) + username: Username to search for (case-sensitive) + + Returns: + User if found, None otherwise + """ + if not email or not username: + return None + + normalizedEmail = email.lower().strip() + + try: + # Search directly without RBAC for cross-mandate search + users = self.db.getRecordset( + UserInDB, + recordFilter={ + "email": normalizedEmail, + "username": username, + "authenticationAuthority": AuthAuthority.LOCAL.value + } + ) + + if users: + cleanedUser = {k: v for k, v in users[0].items() if not k.startswith("_")} + if cleanedUser.get("roleLabels") is None: + cleanedUser["roleLabels"] = [] + return User(**cleanedUser) + + return None + except Exception as e: + logger.error(f"Error finding user by email and username: {str(e)}") + return None + + def isUsernameGloballyUnique(self, username: str) -> bool: + """Check if username is unique across ALL mandates (no RBAC filtering). + + This is used for registration to ensure usernames are globally unique. + + Args: + username: Username to check + + Returns: + True if username is available (not used), False if already taken + """ + if not username: + return False + + try: + # Search directly without RBAC for cross-mandate search + users = self.db.getRecordset( + UserInDB, + recordFilter={"username": username} + ) + + return len(users) == 0 + except Exception as e: + logger.error(f"Error checking username uniqueness: {str(e)}") + return False # Fail safe - assume not unique on error + + def findUserByUsernameLocalAuth(self, username: str) -> Optional[User]: + """Find LOCAL auth user by username (searches across all mandates). + + Username is globally unique, so this returns at most one user. + + Args: + username: Username to search for + + Returns: + User if found, None otherwise + """ + if not username: + return None + + try: + # Search directly without RBAC for cross-mandate search + users = self.db.getRecordset( + UserInDB, + recordFilter={ + "username": username, + "authenticationAuthority": AuthAuthority.LOCAL.value + } + ) + + if users: + cleanedUser = {k: v for k, v in users[0].items() if not k.startswith("_")} + if cleanedUser.get("roleLabels") is None: + cleanedUser["roleLabels"] = [] + return User(**cleanedUser) + + return None + except Exception as e: + logger.error(f"Error finding user by username: {str(e)}") + return None + + def setResetToken(self, userId: str, token: str, expires: float, clearPassword: bool = True) -> bool: + """Set reset token for a user. + + Args: + userId: User ID + token: Reset token UUID + expires: Expiration timestamp (float) + clearPassword: If True, clears the password hash + """ + try: + updateData = { + "resetToken": token, + "resetTokenExpires": expires + } + if clearPassword: + updateData["hashedPassword"] = None + + self.db.recordModify(UserInDB, userId, updateData) + return True + except Exception as e: + logger.error(f"Error setting reset token for user {userId}: {str(e)}") + return False + + def verifyResetToken(self, token: str) -> Optional[User]: + """Verify reset token and return user if valid. + + Returns: + User if token is valid and not expired, None otherwise + """ + if not token: + return None + + try: + users = self.db.getRecordset(UserInDB, recordFilter={"resetToken": token}) + + if not users: + return None + + userRecord = users[0] + + # Check expiration - ensure expires is converted to float for comparison + expires = userRecord.get("resetTokenExpires") + if expires is not None: + try: + expires = float(expires) + except (ValueError, TypeError): + logger.warning(f"Invalid resetTokenExpires value for user {userRecord.get('id')}: {expires}") + return None + + if not expires or getUtcTimestamp() > expires: + logger.warning(f"Reset token expired for user {userRecord.get('id')}") + return None + + cleanedUser = {k: v for k, v in userRecord.items() if not k.startswith("_")} + if cleanedUser.get("roleLabels") is None: + cleanedUser["roleLabels"] = [] + return User(**cleanedUser) + except Exception as e: + logger.error(f"Error verifying reset token: {str(e)}") + return None + + def resetPasswordWithToken(self, token: str, newPassword: str) -> bool: + """Reset password using token (atomic operation). + + Returns: + True if successful, False otherwise + """ + try: + user = self.verifyResetToken(token) + if not user: + return False + + if not newPassword or len(newPassword) < 8: + raise ValueError("Password must be at least 8 characters long") + + hashedPassword = self._getPasswordHash(newPassword) + + # Atomic update: set password, clear token, enable user + self.db.recordModify(UserInDB, user.id, { + "hashedPassword": hashedPassword, + "resetToken": None, + "resetTokenExpires": None, + "enabled": True + }) + + logger.info(f"Password reset completed for user {user.id}") + return True + except Exception as e: + logger.error(f"Error in resetPasswordWithToken: {str(e)}") + return False + def _deleteUserReferencedData(self, userId: str) -> None: """Deletes all data associated with a user.""" try: @@ -810,19 +1089,20 @@ class AppObjects: return None def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]: - """Checks if a username is available for registration.""" + """Checks if a username is available for registration. + + Uses global uniqueness check (across ALL mandates) to ensure + usernames are unique system-wide. + """ try: username = checkData.get("username") - authenticationAuthority = checkData.get("authenticationAuthority", "local") + # authenticationAuthority not used - usernames must be globally unique if not username: return {"available": False, "message": "Username is required"} - # Get user by username - user = self.getUserByUsername(username) - - # Check if user exists (User model instance) - if user is not None: + # Check global uniqueness (across all mandates, no RBAC filtering) + if not self.isUsernameGloballyUnique(username): return {"available": False, "message": "Username is already taken"} return {"available": True, "message": "Username is available"} diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py index 414a292e..7d58ad8d 100644 --- a/modules/routes/routeSecurityLocal.py +++ b/modules/routes/routeSecurityLocal.py @@ -19,6 +19,7 @@ from modules.auth import createAccessToken, createRefreshToken, setAccessTokenCo from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority from modules.datamodels.datamodelSecurity import Token +from modules.shared.configuration import APP_CONFIG # Configure logger logger = logging.getLogger(__name__) @@ -189,14 +190,19 @@ async def login( detail=error_msg ) -@router.post("/register", response_model=User) +@router.post("/register") @limiter.limit("10/minute") async def register_user( request: Request, userData: User = Body(...), - password: str = Body(..., embed=True) -) -> User: - """Register a new local user.""" + frontendUrl: str = Body(..., embed=True) +) -> Dict[str, Any]: + """Register a new local user (magic link based - no password required). + + Args: + userData: User data (username, email, fullName, language) + frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend) + """ try: # Get gateway interface with root privileges since this is a public endpoint appInterface = getRootInterface() @@ -213,16 +219,23 @@ async def register_user( # Set the mandate ID on the interface appInterface.mandateId = defaultMandateId - # Create user with local authentication - # Set safe default role for new registrations - # New users are disabled by default and require admin approval + # Frontend URL is required - no fallback + baseUrl = frontendUrl.rstrip("/") + + # Normalize email + normalizedEmail = userData.email.lower().strip() if userData.email else None + + # Note: Email can be shared across multiple users (different mandates) + # Username uniqueness is enforced in createUser() - that's the primary constraint + + # Create user with local authentication (no password - magic link based) user = appInterface.createUser( username=userData.username, - password=password, - email=userData.email, + password=None, # No password - will be set via magic link + email=normalizedEmail, fullName=userData.fullName, language=userData.language, - enabled=False, # New users are disabled by default + enabled=True, # Users are enabled by default (can login after setting password) roleLabels=["user"], # Default role for new registrations authenticationAuthority=AuthAuthority.LOCAL ) @@ -232,8 +245,51 @@ async def register_user( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to register user" ) + + # Generate reset token for password setup + token, expires = appInterface.generateResetTokenAndExpiry() + appInterface.setResetToken(user.id, token, expires, clearPassword=False) + + # Send registration email with magic link + try: + from modules.services import Services + services = Services(user) - return user + magicLink = f"{baseUrl}/reset.html?token={token}" + expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) + + emailSubject = "PowerOn Registrierung - Passwort setzen" + emailBody = f""" +Hallo {user.fullName or user.username}, + +Vielen Dank für Ihre Registrierung bei PowerOn. + +Ihr Benutzername: {user.username} + +Klicken Sie auf den folgenden Link, um Ihr Passwort zu setzen: +{magicLink} + +Dieser Link ist {expiryHours} Stunden gültig. + +Falls Sie sich nicht registriert haben, können Sie diese E-Mail ignorieren. +""" + + emailSent = services.messaging.sendEmailDirect( + recipient=user.email, + subject=emailSubject, + message=emailBody, + userId=str(user.id) + ) + + if not emailSent: + logger.warning(f"Failed to send registration email to {user.email}") + except Exception as emailErr: + logger.error(f"Error sending registration email: {str(emailErr)}") + # Don't fail registration if email fails - user can request reset later + + return { + "message": "Registrierung erfolgreich! Bitte prüfen Sie Ihre E-Mail für den Link zum Setzen Ihres Passworts." + } except ValueError as e: raise HTTPException( @@ -432,4 +488,146 @@ async def check_username_availability( raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to check username availability: {str(e)}" - ) + ) + +@router.post("/password-reset-request") +@limiter.limit("5/minute") +async def passwordResetRequest( + request: Request, + username: str = Body(..., embed=True), + frontendUrl: str = Body(..., embed=True) +) -> Dict[str, Any]: + """Request password reset email. + + Finds user by username (globally unique) and sends reset email to their email address. + + Args: + username: User's username (globally unique identifier) + frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend) + """ + try: + rootInterface = getRootInterface() + + # Frontend URL is required - no fallback + baseUrl = frontendUrl.rstrip("/") + + # Find user by username (username is globally unique) + user = rootInterface.findUserByUsernameLocalAuth(username) + + if user and user.email: + from modules.services import Services + expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) + + try: + # Generate reset token + token, expires = rootInterface.generateResetTokenAndExpiry() + + # Set reset token (clears password) + rootInterface.setResetToken(user.id, token, expires) + + # Get services for email sending + services = Services(user) + + # Generate magic link using provided frontend URL + magicLink = f"{baseUrl}/reset.html?token={token}" + + # Send email + emailSubject = "PowerOn - Passwort zurücksetzen" + emailBody = f""" +Hallo {user.fullName or user.username}, + +Sie haben eine Passwort-Zurücksetzung für Ihren PowerOn Account angefordert. + +Benutzername: {user.username} + +Klicken Sie auf den folgenden Link, um Ihr Passwort zurückzusetzen: +{magicLink} + +Dieser Link ist {expiryHours} Stunden gültig. + +Falls Sie diese Anforderung nicht gestellt haben, können Sie diese E-Mail ignorieren. +""" + + services.messaging.sendEmailDirect( + recipient=user.email, + subject=emailSubject, + message=emailBody, + userId=str(user.id) + ) + + logger.info(f"Password reset email sent to {user.email} for user {user.username}") + except Exception as userErr: + logger.error(f"Failed to send reset email for user {username}: {str(userErr)}") + else: + logger.info(f"Password reset requested for unknown username: {username}") + + # Always return same message (security - don't reveal if user exists) + return { + "message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet." + } + + except Exception as e: + logger.error(f"Error in password reset request: {str(e)}") + # Still return success for security + return { + "message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet." + } + +@router.post("/password-reset") +@limiter.limit("10/minute") +async def passwordReset( + request: Request, + token: str = Body(..., embed=True), + password: str = Body(..., embed=True) +) -> Dict[str, Any]: + """Reset password using token from magic link.""" + try: + # Validate token format (UUID) + try: + uuid.UUID(token) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Ungültiger oder abgelaufener Reset-Link" + ) + + # Validate password strength + if len(password) < 8: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Passwort muss mindestens 8 Zeichen lang sein" + ) + + rootInterface = getRootInterface() + + # Verify and reset + success = rootInterface.resetPasswordWithToken(token, password) + + if not success: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Ungültiger oder abgelaufener Reset-Link" + ) + + # Log success + try: + from modules.shared.auditLogger import audit_logger + audit_logger.logSecurityEvent( + userId="unknown", + mandateId="unknown", + action="password_reset_via_token", + details="Password reset completed via magic link" + ) + except Exception: + pass + + return {"message": "Passwort erfolgreich gesetzt"} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error in password reset: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Passwort-Zurücksetzung fehlgeschlagen" + ) diff --git a/modules/services/serviceMessaging/mainServiceMessaging.py b/modules/services/serviceMessaging/mainServiceMessaging.py index 23fce81d..10c79e31 100644 --- a/modules/services/serviceMessaging/mainServiceMessaging.py +++ b/modules/services/serviceMessaging/mainServiceMessaging.py @@ -75,12 +75,17 @@ class MessagingService: ) try: + # Convert plain text to HTML for email channel + messageToSend = message + if registration.channel == MessagingChannel.EMAIL: + messageToSend = self._textToHtml(message) + # Versende über interfaceMessaging success = self._getMessagingInterface().send( channel=registration.channel, recipient=registration.channelConfig, subject=subject, - message=message + message=messageToSend ) if success: @@ -130,6 +135,108 @@ class MessagingService: errorMessage=str(e) ) + def _textToHtml(self, text: str) -> str: + """ + Convert plain text to simple HTML for email display. + + - Escapes HTML special characters + - Converts newlines to
tags + - Wraps URLs in clickable links + - Wraps in a basic HTML structure with nice styling + + Args: + text: Plain text message + + Returns: + HTML formatted message + """ + import html + + # Check if already HTML (contains HTML tags) + if re.search(r'<[^>]+>', text): + return text + + # Escape HTML special characters + escaped = html.escape(text) + + # Convert URLs to clickable links (before converting newlines) + urlPattern = r'(https?://[^\s<>"\']+)' + escaped = re.sub(urlPattern, r'\1', escaped) + + # Convert newlines to
tags + escaped = escaped.replace('\n', '
\n') + + # Wrap in a nice HTML structure + htmlContent = f""" + + + + + + +{escaped} + +""" + + return htmlContent + + def sendEmailDirect( + self, + recipient: str, + subject: str, + message: str, + userId: Optional[str] = None + ) -> bool: + """ + Send email directly without requiring a subscription. + Used for authentication flows (registration, password reset). + + Plain text messages are automatically converted to HTML format. + + Args: + recipient: Email address of the recipient + subject: Email subject + message: Email body (can be HTML or plain text - plain text is auto-converted) + userId: Optional user ID for logging/audit purposes + + Returns: + bool: True if email was sent successfully, False otherwise + """ + try: + # Convert plain text to HTML if needed + htmlMessage = self._textToHtml(message) + + messagingInterface = self._getMessagingInterface() + success = messagingInterface.send( + channel=MessagingChannel.EMAIL, + recipient=recipient, + subject=subject, + message=htmlMessage + ) + + if success: + logger.info(f"Email sent successfully to {recipient} (userId: {userId})") + else: + logger.warning(f"Failed to send email to {recipient} (userId: {userId})") + + return success + except Exception as e: + logger.error(f"Error sending email to {recipient}: {str(e)}", exc_info=True) + return False + def executeSubscription( self, subscriptionId: str,