user registration and password reset with magic link

This commit is contained in:
ValueOn AG 2026-01-12 23:31:20 +01:00
parent 700690f08d
commit 4b49eab26e
15 changed files with 947 additions and 473 deletions

74
.dockerignore Normal file
View file

@ -0,0 +1,74 @@
# Docker ignore file for Gateway
# Excludes unnecessary files from Docker build context
# Git
.git
.gitignore
.github
# Python
__pycache__
*.pyc
*.pyo
*.pyd
.Python
*.so
*.egg
*.egg-info
dist
build
venv
env
ENV
.venv
# IDE
.vscode
.idea
*.swp
*.swo
*~
# Environment files (env_gcp.env will be copied as .env by workflow)
env_*.env
.env.local
# Note: .env is NOT ignored - it will be created from env_gcp.env by the workflow
# Logs
*.log
logs/
local/logs/
# Tests
tests/
.pytest_cache
.coverage
htmlcov/
# Documentation
docs/
*.md
README.txt
LICENSE.txt
# Local development files
local/
*.txt
!requirements.txt
# Debug files
debug/
test-chat/
# OS files
.DS_Store
Thumbs.db
# Temporary files
tmp/
temp/
*.tmp
# Build artifacts
*.zip
release.zip

73
.gcloudignore Normal file
View file

@ -0,0 +1,73 @@
# Google Cloud Build ignore file
# Similar to .dockerignore but for Cloud Build
# Git
.git
.gitignore
.github
# Python
__pycache__
*.pyc
*.pyo
*.pyd
.Python
*.so
*.egg
*.egg-info
dist
build
venv
env
ENV
.venv
# IDE
.vscode
.idea
*.swp
*.swo
*~
# Environment files (will be handled separately)
env_*.env
.env.local
# Logs
*.log
logs/
local/logs/
# Tests
tests/
.pytest_cache
.coverage
htmlcov/
# Documentation
docs/
*.md
README.txt
LICENSE.txt
# Local development files
local/
*.txt
!requirements.txt
# Debug files
debug/
test-chat/
# OS files
.DS_Store
Thumbs.db
# Temporary files
tmp/
temp/
*.tmp
# Build artifacts
*.zip
release.zip

119
.github/workflows/deploy-gcp.yml vendored Normal file
View file

@ -0,0 +1,119 @@
# GitHub Actions workflow for deploying Gateway to Google Cloud Run
# Documentation: https://cloud.google.com/run/docs/deploying
#
# Required GitHub Secrets:
# - GCP_PROJECT_ID: Your Google Cloud Project ID
# - GCP_SA_KEY: Service Account JSON key with Cloud Run Admin and Cloud Build Editor roles
# - GCP_SERVICE_ACCOUNT_EMAIL: Email of the service account to run Cloud Run service as
#
# Required Google Cloud Setup:
# 1. Create a service account with Cloud Run Admin and Cloud Build Editor roles
# 2. Create secret "CONFIG_KEY" in Secret Manager with your master key
# 3. Grant the service account access to Secret Manager secrets
# 4. Create Cloud SQL instance (if not exists)
# 5. Create env_gcp.env file with your configuration
name: Deploy Gateway to Google Cloud Run
on:
push:
branches:
- main
paths:
- 'gateway/**'
workflow_dispatch:
inputs:
environment:
description: 'Environment to deploy to'
required: true
default: 'prod'
type: choice
options:
- prod
- int
env:
PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
SERVICE_NAME: gateway-prod
REGION: europe-west6 # Zurich region
ENV_FILE: env_gcp.env
jobs:
deploy:
runs-on: ubuntu-latest
permissions:
contents: read
id-token: write # Required for Workload Identity Federation
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
# Alternative: Use Workload Identity Federation (more secure)
# workload_identity_provider: ${{ secrets.WIF_PROVIDER }}
# service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }}
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v2
- name: Configure Docker for GCR
run: |
gcloud auth configure-docker
- name: Set productive environment
run: |
cd gateway
if [ -f "${{ env.ENV_FILE }}" ]; then
cp ${{ env.ENV_FILE }} .env
else
echo "Warning: ${{ env.ENV_FILE }} not found, using env_prod.env as fallback"
cp env_prod.env .env
fi
# Clean up other env files (optional, for security)
rm -f env_*.env
- name: Build and push container image
working-directory: ./gateway
run: |
# Build container image using Cloud Build
# If Dockerfile exists, it will be used; otherwise Cloud Buildpacks will be used
gcloud builds submit \
--tag gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:${{ github.sha }} \
--tag gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:latest \
--project ${{ env.PROJECT_ID }}
- name: Deploy to Cloud Run
run: |
gcloud run deploy ${{ env.SERVICE_NAME }} \
--image gcr.io/${{ env.PROJECT_ID }}/${{ env.SERVICE_NAME }}:${{ github.sha }} \
--region ${{ env.REGION }} \
--platform managed \
--allow-unauthenticated \
--project ${{ env.PROJECT_ID }} \
--set-env-vars "APP_ENV_TYPE=prod" \
--set-secrets "CONFIG_KEY=CONFIG_KEY:latest" \
--memory 2Gi \
--cpu 2 \
--timeout 300 \
--max-instances 10 \
--min-instances 1 \
--port 8000 \
--service-account ${{ secrets.GCP_SERVICE_ACCOUNT_EMAIL }}
- name: Get service URL
id: service-url
run: |
SERVICE_URL=$(gcloud run services describe ${{ env.SERVICE_NAME }} \
--region ${{ env.REGION }} \
--project ${{ env.PROJECT_ID }} \
--format 'value(status.url)')
echo "url=$SERVICE_URL" >> $GITHUB_OUTPUT
- name: Output deployment URL
run: |
echo "🚀 Deployment successful!"
echo "Service URL: ${{ steps.service-url.outputs.url }}"

49
Dockerfile Normal file
View file

@ -0,0 +1,49 @@
# Dockerfile for PowerOn Gateway - Google Cloud Run
# Python 3.11 base image optimized for Cloud Run
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
NUMEXPR_MAX_THREADS=12 \
PORT=8000
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
postgresql-client \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Copy application code (includes .env file created by workflow from env_gcp.env)
COPY . .
# Create directories for logs (Cloud Run uses /tmp for writable storage)
RUN mkdir -p /tmp/logs /tmp/debug
# Note: .env file (created from env_gcp.env by workflow) contains encrypted secrets
# These are decrypted at runtime using the master key from Secret Manager
# (mounted as CONFIG_KEY environment variable in Cloud Run)
# Expose port (Cloud Run sets PORT env var, but we default to 8000)
EXPOSE 8000
# Health check for Cloud Run
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/api/admin/health', timeout=5)" || exit 1
# Run the application
# Cloud Run will set PORT env var, uvicorn reads it automatically
CMD exec uvicorn app:app --host 0.0.0.0 --port ${PORT:-8000} --workers 1

1
app.py
View file

@ -234,6 +234,7 @@ def initLogging():
"asyncio",
"fastapi.security.oauth2",
"msal",
"azure.core.pipeline.policies.http_logging_policy",
]
for loggerName in noisyLoggers:
logging.getLogger(loggerName).setLevel(logging.WARNING)

View file

@ -4,6 +4,8 @@
# Auth configuration
Auth_ALGORITHM = HS256
Auth_TOKEN_TYPE = bearer
Auth_RESET_TOKEN_EXPIRY_HOURS = 24
# File management configuration
File_Management_MAX_UPLOAD_SIZE_MB = 50

View file

@ -1,314 +0,0 @@
# Streaming Utility Architecture: Transforming eventManager into a Shared Utility
## Current State Analysis
### Existing Implementation
The `eventManager.py` in `modules/features/chatbot/` currently provides:
- **Event Queue Management**: Per-workflow asyncio queues for SSE streaming
- **Event Emission**: `emit_event()` method for chatbot-specific events
- **Cleanup**: Automatic queue cleanup after workflow completion
- **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint
### Current Limitations
1. **Chatbot-Specific**: Hardcoded for chatbot workflows only
2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE
3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it
4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming
### Frontend Polling Pattern
Currently, the frontend uses:
- `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds
- `useWorkflowLifecycle.ts` - Manages polling lifecycle and state
- Rate limit handling and backoff logic for failed polls
## Proposed Architecture: Shared Streaming Utility
### 1. Generic Event Manager (`modules/shared/streamingManager.py`)
```python
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default" # "chat", "workflow", "document", etc.
):
"""Emit event to all subscribers of a context"""
def create_stream(
self,
context_id: str,
event_categories: Optional[List[str]] = None # Filter by category
) -> asyncio.Queue:
"""Create a new stream for a context"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events"""
```
### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`)
```python
def create_sse_stream(
context_id: str,
event_categories: Optional[List[str]] = None,
initial_data_callback: Optional[Callable] = None,
timeout: float = 300.0
) -> StreamingResponse:
"""
Create a generic SSE streaming response.
Args:
context_id: Workflow ID, document ID, or other context identifier
event_categories: Filter events by category (e.g., ["chat", "workflow"])
initial_data_callback: Optional function to fetch initial state
timeout: Stream timeout in seconds
"""
streaming_manager = get_streaming_manager()
async def event_stream():
# Send initial data if callback provided
if initial_data_callback:
initial_data = await initial_data_callback(context_id)
yield format_sse_event("initial", initial_data)
# Stream events from manager
async for event in streaming_manager.stream_events(context_id, event_categories):
yield format_sse_event(event["type"], event["data"])
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
```
### 3. Integration Points
#### A. Workflow Processing
```python
# In workflow processing code
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
# Emit progress updates
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="progress",
data={"step": "analyzing", "message": "Processing documents..."},
event_category="workflow"
)
# Emit new messages
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="message",
data={"role": "assistant", "content": "Response text"},
event_category="chat"
)
```
#### B. Route Endpoints
```python
# Generic streaming endpoint for any context
@router.get("/{contextId}/stream")
async def stream_context_updates(
contextId: str,
categories: Optional[str] = Query(None), # Comma-separated categories
currentUser: User = Depends(getCurrentUser)
):
event_categories = categories.split(",") if categories else None
# Optional: Fetch initial state
async def get_initial_data(ctx_id: str):
interfaceDbChat = getServiceChat(currentUser)
return interfaceDbChat.getUnifiedChatData(ctx_id, None)
return create_sse_stream(
context_id=contextId,
event_categories=event_categories,
initial_data_callback=get_initial_data
)
```
## Benefits of Streaming vs Polling
### Performance
- **Reduced Server Load**: No constant database queries every 0.5-3 seconds
- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- **Bandwidth Efficiency**: Only send data when it changes, not empty responses
### User Experience
- **Real-time Updates**: Users see progress instantly
- **Better Responsiveness**: No perceived delay from polling intervals
- **Reduced Battery**: Mobile devices consume less power without constant polling
### Scalability
- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ)
- **Connection Management**: Better handling of many concurrent streams
- **Resource Efficiency**: One persistent connection vs many HTTP requests
## Migration Strategy
### Phase 1: Create Shared Utility
1. Move `eventManager.py``modules/shared/streamingManager.py`
2. Generalize for any context type (not just workflows)
3. Add event categorization and filtering
4. Create `sseUtils.py` helper functions
### Phase 2: Update Chatbot Feature
1. Update chatbot to use shared streaming manager
2. Replace internal polling in SSE endpoint with pure event-driven streaming
3. Emit events directly when data changes (in database write operations)
### Phase 3: Migrate Other Features
1. **Workflows**: Add streaming to workflow processing
2. **Document Generation**: Stream document creation progress
3. **Data Processing**: Stream extraction/transformation progress
4. **Any Long-Running Task**: Use streaming for status updates
### Phase 4: Frontend Migration
1. Replace `useWorkflowPolling` with SSE EventSource connections
2. Create generic `useStreaming` hook
3. Update all components to use streaming instead of polling
4. Remove polling logic entirely
## Implementation Details
### Event-Driven Data Emission
Instead of polling `getUnifiedChatData()`, emit events when data changes:
```python
# In interfaceDbChatObjects.py - when creating a message
def createMessage(self, workflowId: str, message: ChatMessage):
# ... existing database write ...
# Emit streaming event
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
asyncio.create_task(streaming_manager.emit_event(
context_id=workflowId,
event_type="message",
data={
"type": "message",
"createdAt": message.publishedAt,
"item": message.dict()
},
event_category="chat"
))
```
### Frontend Integration
```typescript
// Generic streaming hook
function useStreaming<T>(
contextId: string,
categories?: string[],
onEvent?: (event: T) => void
) {
useEffect(() => {
const eventSource = new EventSource(
`/api/stream/${contextId}?categories=${categories?.join(',')}`
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent?.(event);
};
return () => eventSource.close();
}, [contextId, categories]);
}
```
## Key Design Decisions
### 1. Context-Based Streaming
- Use generic `context_id` instead of `workflow_id`
- Supports workflows, documents, tasks, user sessions, etc.
### 2. Event Categories
- Allow filtering by category (chat, workflow, document, etc.)
- Enables multiple features to stream from same context
### 3. Backward Compatibility
- Keep existing polling endpoints during migration
- Gradually migrate features one at a time
- Frontend can use both during transition
### 4. Error Handling
- Graceful degradation if streaming unavailable
- Automatic reconnection logic in frontend
- Fallback to polling if SSE fails
## Example: Complete Flow
### Backend: Workflow Processing
```python
async def process_workflow(workflow_id: str):
streaming = get_streaming_manager()
# Emit status update
await streaming.emit_event(workflow_id, "status",
{"status": "running"}, "workflow")
# Process and emit messages
result = await ai_call(...)
await streaming.emit_event(workflow_id, "message",
{"role": "assistant", "content": result}, "chat")
# Emit completion
await streaming.emit_event(workflow_id, "complete",
{"status": "completed"}, "workflow")
```
### Frontend: React Hook
```typescript
function ChatComponent({ workflowId }: { workflowId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useStreaming(workflowId, ["chat"], (event) => {
if (event.type === "message") {
setMessages(prev => [...prev, event.item]);
}
});
return <MessageList messages={messages} />;
}
```
## Conclusion
By transforming `eventManager.py` into a shared streaming utility:
1. **Eliminates Polling**: All features can stream updates in real-time
2. **Improves Performance**: Reduces server load and latency
3. **Better UX**: Instant updates instead of polling delays
4. **Reusable**: Any feature can use streaming with minimal code
5. **Scalable**: Foundation for future real-time features
The migration can be done incrementally, feature by feature, without breaking existing functionality.

View file

@ -1,134 +0,0 @@
# Web Search Content Extraction Fixes
## Problem Summary
The Tavily web search integration was failing to extract content from search results, causing web research to return empty or incomplete data. The main issues were related to handling `None` values and incomplete error recovery.
## Main Issues Fixed
### 1. Incomplete Content Extraction from Search Results
**Problem:**
- When Tavily API returned search results, some results had `raw_content` set to `None` (not missing, but explicitly `None`)
- The code used `result.get("raw_content") or result.get("content", "")` which failed when `raw_content` existed but was `None`
- This caused `None` values to propagate through the system instead of falling back to the `content` field or empty string
**Fix:**
Changed the content extraction in `aicorePluginTavily.py` to properly handle `None` values:
```python
# Before (line 344):
rawContent=result.get("raw_content") or result.get("content", "")
# After:
rawContent=result.get("raw_content") or result.get("content") or ""
```
This ensures that if `raw_content` is `None`, it falls back to `content`, and if that's also `None`, it defaults to an empty string.
**Additional Fix:**
Added defensive checks in the `webSearch` method to safely extract content even when result objects have unexpected structures:
```python
# Safely extract content with multiple fallbacks
content = ""
if hasattr(result, 'rawContent'):
content = result.rawContent or ""
if not content and hasattr(result, 'content'):
content = result.content or ""
```
### 2. NoneType Error When Logging Content Length
**Problem:**
- Code attempted to check `len(first_result.get('raw_content', ''))` for logging
- When `raw_content` key existed but value was `None`, `.get()` returned `None` instead of the default `''`
- This caused `len(None)` to fail with `TypeError: object of type 'NoneType' has no len()`
**Fix:**
Changed the logging code to safely handle `None` values:
```python
# Before (line 338):
logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(first_result.get('raw_content', ''))}")
# After:
raw_content = first_result.get('raw_content') or ''
logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}")
```
### 3. Missing Error Recovery in Content Extraction
**Problem:**
- When processing search results, if one result failed to extract, the entire extraction could fail
- No recovery mechanism to extract at least URLs even when content extraction failed
- Errors were logged but processing stopped, losing potentially useful data
**Fix:**
Added per-result error handling with recovery:
```python
for result in searchResults:
try:
# Extract URL, content, title safely
# ... extraction logic ...
except Exception as resultError:
logger.warning(f"Error processing individual search result: {resultError}")
# Continue processing other results instead of failing completely
continue
```
Also added recovery at the extraction level:
```python
except Exception as extractionError:
logger.error(f"Error extracting URLs and content from search results: {extractionError}")
# Try to recover at least URLs
try:
urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url]
logger.info(f"Recovered {len(urls)} URLs after extraction error")
except Exception:
logger.error("Failed to recover any URLs from search results")
```
### 4. Incomplete Crawl Result Processing
**Problem:**
- When crawl returned results but individual page processing failed, entire crawl was lost
- No fallback to extract at least URLs from failed crawl results
- Missing content fields could cause errors when formatting results
**Fix:**
Added error handling for individual page processing:
```python
for i, result in enumerate(crawlResults, 1):
try:
# Format page content
# ... formatting logic ...
except Exception as pageError:
logger.warning(f"Error formatting page {i} from crawl: {pageError}")
# Try to add at least the URL
try:
pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url)
except Exception:
pass
```
Also ensured all result fields have safe defaults:
```python
results.append(WebCrawlResult(
url=result_url or url, # Fallback to base URL
content=result_content, # Already ensured to be string
title=result_title # Already ensured to be string
))
```
## Impact
These fixes ensure that:
1. **Content is always extracted** - Even when `raw_content` is `None`, the system falls back to `content` field or empty string
2. **Partial results are preserved** - If some results fail, others are still processed and returned
3. **URLs are recovered** - Even when content extraction fails completely, URLs can still be extracted for crawling
4. **No crashes from None values** - All `None` values are properly handled before operations like `len()` are called
## Testing Recommendations
- Test with Tavily search results that have `raw_content` set to `None`
- Test with mixed results (some with content, some without)
- Test error recovery when individual results fail
- Verify that URLs are still extracted even when content extraction fails

View file

@ -88,3 +88,8 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = None
# Preprocessor API Configuration
PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss

View file

@ -88,3 +88,7 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = INT_ENC:Z0FBQUFBQnBaSnM4UkNBelhvckxCQUVjZm94N3B
# Preprocessor API Configuration
PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss

View file

@ -88,3 +88,7 @@ PREPROCESS_ALTHAUS_CHAT_SECRET = PROD_ENC:Z0FBQUFBQnBaSnM4RVRmYW5IelNIbklTUDZIME
# Preprocessor API Configuration
PP_QUERY_API_KEY=ouho02j0rj2oijroi3rj2oijro23jr0990
PP_QUERY_BASE_URL=https://poweron-althaus-preprocess-prod-e3fegaatc7faency.switzerlandnorth-01.azurewebsites.net/api/v1/dataquery/query
# Azure Communication Services Email Configuration
MESSAGING_ACS_CONNECTION_STRING = endpoint=https://mailing-poweron-prod.switzerland.communication.azure.com/;accesskey=4UizRfBKBgMhDgQ92IYINM6dJsO1HIeL6W1DvIX9S0GtaS1PjIXqJQQJ99CAACULyCpHwxUcAAAAAZCSuSCt
MESSAGING_ACS_SENDER_EMAIL = DoNotReply@poweron.swiss

View file

@ -165,8 +165,14 @@ registerModelLabels(
class UserInDB(User):
hashedPassword: Optional[str] = Field(None, description="Hash of the user password")
resetToken: Optional[str] = Field(None, description="Password reset token (UUID)")
resetTokenExpires: Optional[float] = Field(None, description="Reset token expiration (UTC timestamp in seconds)")
registerModelLabels(
"UserInDB",
{"en": "User Access", "fr": "Accès de l'utilisateur"},
{"hashedPassword": {"en": "Password hash", "fr": "Hachage de mot de passe"}},
{
"hashedPassword": {"en": "Password hash", "fr": "Hachage de mot de passe"},
"resetToken": {"en": "Reset Token", "fr": "Jeton de réinitialisation"},
"resetTokenExpires": {"en": "Reset Token Expires", "fr": "Expiration du jeton"},
},
)

View file

@ -587,6 +587,11 @@ class AppObjects:
# Get the full user record with password hash for verification
userRecord = self.db.getRecordset(UserInDB, recordFilter={"id": user.id})[0]
# Check if user has a reset token set (password reset required)
if userRecord.get("resetToken"):
raise ValueError("Passwort-Zurücksetzung erforderlich. Bitte prüfen Sie Ihre E-Mail.")
if not userRecord.get("hashedPassword"):
raise ValueError("User has no password set")
@ -613,11 +618,14 @@ class AppObjects:
try:
# Ensure username is a string
username = str(username).strip()
# Check global username uniqueness (across ALL mandates)
if not self.isUsernameGloballyUnique(username):
raise ValueError(f"Username '{username}' is already taken")
# Validate password for local authentication
if authenticationAuthority == AuthAuthority.LOCAL:
if not password:
raise ValueError("Password is required for local authentication")
# For LOCAL auth: password is set exclusively via magic link, not during registration
# Password parameter is only used for internal/admin operations (e.g., resetUserPassword)
if password:
if not isinstance(password, str):
raise ValueError("Password must be a string")
if not password.strip():
@ -740,6 +748,277 @@ class AppObjects:
"""Enables a user if current user has permission."""
return self.updateUser(userId, {"enabled": True})
def resetUserPassword(self, userId: str, newPassword: str) -> bool:
"""Reset a user's password (admin function)."""
try:
if not newPassword or len(newPassword) < 8:
raise ValueError("Password must be at least 8 characters long")
hashedPassword = self._getPasswordHash(newPassword)
self.db.recordModify(UserInDB, userId, {"hashedPassword": hashedPassword})
logger.info(f"Password reset for user {userId}")
return True
except Exception as e:
logger.error(f"Error resetting password for user {userId}: {str(e)}")
return False
def generateResetTokenAndExpiry(self) -> tuple:
"""Generate a new reset token and expiration timestamp.
Returns:
tuple: (tokenUuid: str, expiresTimestamp: float)
"""
token = str(uuid.uuid4())
expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24"))
expires = getUtcTimestamp() + (expiryHours * 3600)
return token, expires
def findUserByEmailLocalAuth(self, email: str) -> Optional[User]:
"""Find LOCAL auth user by email (searches across all mandates).
Note: If multiple users exist with the same email (in different mandates),
this returns only the first one. Use findAllUsersByEmailLocalAuth() to get all.
Args:
email: Email address to search for (case-insensitive)
Returns:
User if found, None otherwise
"""
users = self.findAllUsersByEmailLocalAuth(email)
return users[0] if users else None
def findAllUsersByEmailLocalAuth(self, email: str) -> List[User]:
"""Find ALL LOCAL auth users by email (searches across all mandates).
Use this when a user might have multiple accounts with the same email
in different mandates.
Args:
email: Email address to search for (case-insensitive)
Returns:
List of Users (empty list if none found)
"""
if not email:
return []
normalizedEmail = email.lower().strip()
try:
# Search directly without RBAC for cross-mandate search
users = self.db.getRecordset(
UserInDB,
recordFilter={
"email": normalizedEmail,
"authenticationAuthority": AuthAuthority.LOCAL.value
}
)
result = []
for userRecord in users:
cleanedUser = {k: v for k, v in userRecord.items() if not k.startswith("_")}
if cleanedUser.get("roleLabels") is None:
cleanedUser["roleLabels"] = []
result.append(User(**cleanedUser))
return result
except Exception as e:
logger.error(f"Error finding users by email: {str(e)}")
return []
def findUserByEmailAndUsernameLocalAuth(self, email: str, username: str) -> Optional[User]:
"""Find LOCAL auth user by email AND username combination.
This uniquely identifies a user even if they have multiple accounts
with the same email in different mandates.
Args:
email: Email address to search for (case-insensitive)
username: Username to search for (case-sensitive)
Returns:
User if found, None otherwise
"""
if not email or not username:
return None
normalizedEmail = email.lower().strip()
try:
# Search directly without RBAC for cross-mandate search
users = self.db.getRecordset(
UserInDB,
recordFilter={
"email": normalizedEmail,
"username": username,
"authenticationAuthority": AuthAuthority.LOCAL.value
}
)
if users:
cleanedUser = {k: v for k, v in users[0].items() if not k.startswith("_")}
if cleanedUser.get("roleLabels") is None:
cleanedUser["roleLabels"] = []
return User(**cleanedUser)
return None
except Exception as e:
logger.error(f"Error finding user by email and username: {str(e)}")
return None
def isUsernameGloballyUnique(self, username: str) -> bool:
"""Check if username is unique across ALL mandates (no RBAC filtering).
This is used for registration to ensure usernames are globally unique.
Args:
username: Username to check
Returns:
True if username is available (not used), False if already taken
"""
if not username:
return False
try:
# Search directly without RBAC for cross-mandate search
users = self.db.getRecordset(
UserInDB,
recordFilter={"username": username}
)
return len(users) == 0
except Exception as e:
logger.error(f"Error checking username uniqueness: {str(e)}")
return False # Fail safe - assume not unique on error
def findUserByUsernameLocalAuth(self, username: str) -> Optional[User]:
"""Find LOCAL auth user by username (searches across all mandates).
Username is globally unique, so this returns at most one user.
Args:
username: Username to search for
Returns:
User if found, None otherwise
"""
if not username:
return None
try:
# Search directly without RBAC for cross-mandate search
users = self.db.getRecordset(
UserInDB,
recordFilter={
"username": username,
"authenticationAuthority": AuthAuthority.LOCAL.value
}
)
if users:
cleanedUser = {k: v for k, v in users[0].items() if not k.startswith("_")}
if cleanedUser.get("roleLabels") is None:
cleanedUser["roleLabels"] = []
return User(**cleanedUser)
return None
except Exception as e:
logger.error(f"Error finding user by username: {str(e)}")
return None
def setResetToken(self, userId: str, token: str, expires: float, clearPassword: bool = True) -> bool:
"""Set reset token for a user.
Args:
userId: User ID
token: Reset token UUID
expires: Expiration timestamp (float)
clearPassword: If True, clears the password hash
"""
try:
updateData = {
"resetToken": token,
"resetTokenExpires": expires
}
if clearPassword:
updateData["hashedPassword"] = None
self.db.recordModify(UserInDB, userId, updateData)
return True
except Exception as e:
logger.error(f"Error setting reset token for user {userId}: {str(e)}")
return False
def verifyResetToken(self, token: str) -> Optional[User]:
"""Verify reset token and return user if valid.
Returns:
User if token is valid and not expired, None otherwise
"""
if not token:
return None
try:
users = self.db.getRecordset(UserInDB, recordFilter={"resetToken": token})
if not users:
return None
userRecord = users[0]
# Check expiration - ensure expires is converted to float for comparison
expires = userRecord.get("resetTokenExpires")
if expires is not None:
try:
expires = float(expires)
except (ValueError, TypeError):
logger.warning(f"Invalid resetTokenExpires value for user {userRecord.get('id')}: {expires}")
return None
if not expires or getUtcTimestamp() > expires:
logger.warning(f"Reset token expired for user {userRecord.get('id')}")
return None
cleanedUser = {k: v for k, v in userRecord.items() if not k.startswith("_")}
if cleanedUser.get("roleLabels") is None:
cleanedUser["roleLabels"] = []
return User(**cleanedUser)
except Exception as e:
logger.error(f"Error verifying reset token: {str(e)}")
return None
def resetPasswordWithToken(self, token: str, newPassword: str) -> bool:
"""Reset password using token (atomic operation).
Returns:
True if successful, False otherwise
"""
try:
user = self.verifyResetToken(token)
if not user:
return False
if not newPassword or len(newPassword) < 8:
raise ValueError("Password must be at least 8 characters long")
hashedPassword = self._getPasswordHash(newPassword)
# Atomic update: set password, clear token, enable user
self.db.recordModify(UserInDB, user.id, {
"hashedPassword": hashedPassword,
"resetToken": None,
"resetTokenExpires": None,
"enabled": True
})
logger.info(f"Password reset completed for user {user.id}")
return True
except Exception as e:
logger.error(f"Error in resetPasswordWithToken: {str(e)}")
return False
def _deleteUserReferencedData(self, userId: str) -> None:
"""Deletes all data associated with a user."""
try:
@ -810,19 +1089,20 @@ class AppObjects:
return None
def checkUsernameAvailability(self, checkData: Dict[str, Any]) -> Dict[str, Any]:
"""Checks if a username is available for registration."""
"""Checks if a username is available for registration.
Uses global uniqueness check (across ALL mandates) to ensure
usernames are unique system-wide.
"""
try:
username = checkData.get("username")
authenticationAuthority = checkData.get("authenticationAuthority", "local")
# authenticationAuthority not used - usernames must be globally unique
if not username:
return {"available": False, "message": "Username is required"}
# Get user by username
user = self.getUserByUsername(username)
# Check if user exists (User model instance)
if user is not None:
# Check global uniqueness (across all mandates, no RBAC filtering)
if not self.isUsernameGloballyUnique(username):
return {"available": False, "message": "Username is already taken"}
return {"available": True, "message": "Username is available"}

View file

@ -19,6 +19,7 @@ from modules.auth import createAccessToken, createRefreshToken, setAccessTokenCo
from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface
from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority
from modules.datamodels.datamodelSecurity import Token
from modules.shared.configuration import APP_CONFIG
# Configure logger
logger = logging.getLogger(__name__)
@ -189,14 +190,19 @@ async def login(
detail=error_msg
)
@router.post("/register", response_model=User)
@router.post("/register")
@limiter.limit("10/minute")
async def register_user(
request: Request,
userData: User = Body(...),
password: str = Body(..., embed=True)
) -> User:
"""Register a new local user."""
frontendUrl: str = Body(..., embed=True)
) -> Dict[str, Any]:
"""Register a new local user (magic link based - no password required).
Args:
userData: User data (username, email, fullName, language)
frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend)
"""
try:
# Get gateway interface with root privileges since this is a public endpoint
appInterface = getRootInterface()
@ -213,16 +219,23 @@ async def register_user(
# Set the mandate ID on the interface
appInterface.mandateId = defaultMandateId
# Create user with local authentication
# Set safe default role for new registrations
# New users are disabled by default and require admin approval
# Frontend URL is required - no fallback
baseUrl = frontendUrl.rstrip("/")
# Normalize email
normalizedEmail = userData.email.lower().strip() if userData.email else None
# Note: Email can be shared across multiple users (different mandates)
# Username uniqueness is enforced in createUser() - that's the primary constraint
# Create user with local authentication (no password - magic link based)
user = appInterface.createUser(
username=userData.username,
password=password,
email=userData.email,
password=None, # No password - will be set via magic link
email=normalizedEmail,
fullName=userData.fullName,
language=userData.language,
enabled=False, # New users are disabled by default
enabled=True, # Users are enabled by default (can login after setting password)
roleLabels=["user"], # Default role for new registrations
authenticationAuthority=AuthAuthority.LOCAL
)
@ -232,8 +245,51 @@ async def register_user(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to register user"
)
# Generate reset token for password setup
token, expires = appInterface.generateResetTokenAndExpiry()
appInterface.setResetToken(user.id, token, expires, clearPassword=False)
# Send registration email with magic link
try:
from modules.services import Services
services = Services(user)
return user
magicLink = f"{baseUrl}/reset.html?token={token}"
expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24"))
emailSubject = "PowerOn Registrierung - Passwort setzen"
emailBody = f"""
Hallo {user.fullName or user.username},
Vielen Dank für Ihre Registrierung bei PowerOn.
Ihr Benutzername: {user.username}
Klicken Sie auf den folgenden Link, um Ihr Passwort zu setzen:
{magicLink}
Dieser Link ist {expiryHours} Stunden gültig.
Falls Sie sich nicht registriert haben, können Sie diese E-Mail ignorieren.
"""
emailSent = services.messaging.sendEmailDirect(
recipient=user.email,
subject=emailSubject,
message=emailBody,
userId=str(user.id)
)
if not emailSent:
logger.warning(f"Failed to send registration email to {user.email}")
except Exception as emailErr:
logger.error(f"Error sending registration email: {str(emailErr)}")
# Don't fail registration if email fails - user can request reset later
return {
"message": "Registrierung erfolgreich! Bitte prüfen Sie Ihre E-Mail für den Link zum Setzen Ihres Passworts."
}
except ValueError as e:
raise HTTPException(
@ -432,4 +488,146 @@ async def check_username_availability(
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to check username availability: {str(e)}"
)
)
@router.post("/password-reset-request")
@limiter.limit("5/minute")
async def passwordResetRequest(
request: Request,
username: str = Body(..., embed=True),
frontendUrl: str = Body(..., embed=True)
) -> Dict[str, Any]:
"""Request password reset email.
Finds user by username (globally unique) and sends reset email to their email address.
Args:
username: User's username (globally unique identifier)
frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend)
"""
try:
rootInterface = getRootInterface()
# Frontend URL is required - no fallback
baseUrl = frontendUrl.rstrip("/")
# Find user by username (username is globally unique)
user = rootInterface.findUserByUsernameLocalAuth(username)
if user and user.email:
from modules.services import Services
expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24"))
try:
# Generate reset token
token, expires = rootInterface.generateResetTokenAndExpiry()
# Set reset token (clears password)
rootInterface.setResetToken(user.id, token, expires)
# Get services for email sending
services = Services(user)
# Generate magic link using provided frontend URL
magicLink = f"{baseUrl}/reset.html?token={token}"
# Send email
emailSubject = "PowerOn - Passwort zurücksetzen"
emailBody = f"""
Hallo {user.fullName or user.username},
Sie haben eine Passwort-Zurücksetzung für Ihren PowerOn Account angefordert.
Benutzername: {user.username}
Klicken Sie auf den folgenden Link, um Ihr Passwort zurückzusetzen:
{magicLink}
Dieser Link ist {expiryHours} Stunden gültig.
Falls Sie diese Anforderung nicht gestellt haben, können Sie diese E-Mail ignorieren.
"""
services.messaging.sendEmailDirect(
recipient=user.email,
subject=emailSubject,
message=emailBody,
userId=str(user.id)
)
logger.info(f"Password reset email sent to {user.email} for user {user.username}")
except Exception as userErr:
logger.error(f"Failed to send reset email for user {username}: {str(userErr)}")
else:
logger.info(f"Password reset requested for unknown username: {username}")
# Always return same message (security - don't reveal if user exists)
return {
"message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet."
}
except Exception as e:
logger.error(f"Error in password reset request: {str(e)}")
# Still return success for security
return {
"message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet."
}
@router.post("/password-reset")
@limiter.limit("10/minute")
async def passwordReset(
request: Request,
token: str = Body(..., embed=True),
password: str = Body(..., embed=True)
) -> Dict[str, Any]:
"""Reset password using token from magic link."""
try:
# Validate token format (UUID)
try:
uuid.UUID(token)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ungültiger oder abgelaufener Reset-Link"
)
# Validate password strength
if len(password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Passwort muss mindestens 8 Zeichen lang sein"
)
rootInterface = getRootInterface()
# Verify and reset
success = rootInterface.resetPasswordWithToken(token, password)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Ungültiger oder abgelaufener Reset-Link"
)
# Log success
try:
from modules.shared.auditLogger import audit_logger
audit_logger.logSecurityEvent(
userId="unknown",
mandateId="unknown",
action="password_reset_via_token",
details="Password reset completed via magic link"
)
except Exception:
pass
return {"message": "Passwort erfolgreich gesetzt"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in password reset: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Passwort-Zurücksetzung fehlgeschlagen"
)

View file

@ -75,12 +75,17 @@ class MessagingService:
)
try:
# Convert plain text to HTML for email channel
messageToSend = message
if registration.channel == MessagingChannel.EMAIL:
messageToSend = self._textToHtml(message)
# Versende über interfaceMessaging
success = self._getMessagingInterface().send(
channel=registration.channel,
recipient=registration.channelConfig,
subject=subject,
message=message
message=messageToSend
)
if success:
@ -130,6 +135,108 @@ class MessagingService:
errorMessage=str(e)
)
def _textToHtml(self, text: str) -> str:
"""
Convert plain text to simple HTML for email display.
- Escapes HTML special characters
- Converts newlines to <br> tags
- Wraps URLs in clickable links
- Wraps in a basic HTML structure with nice styling
Args:
text: Plain text message
Returns:
HTML formatted message
"""
import html
# Check if already HTML (contains HTML tags)
if re.search(r'<[^>]+>', text):
return text
# Escape HTML special characters
escaped = html.escape(text)
# Convert URLs to clickable links (before converting newlines)
urlPattern = r'(https?://[^\s<>"\']+)'
escaped = re.sub(urlPattern, r'<a href="\1" style="color: #0066cc;">\1</a>', escaped)
# Convert newlines to <br> tags
escaped = escaped.replace('\n', '<br>\n')
# Wrap in a nice HTML structure
htmlContent = f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
font-size: 14px;
line-height: 1.6;
color: #333333;
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
a {{
color: #0066cc;
}}
</style>
</head>
<body>
{escaped}
</body>
</html>"""
return htmlContent
def sendEmailDirect(
self,
recipient: str,
subject: str,
message: str,
userId: Optional[str] = None
) -> bool:
"""
Send email directly without requiring a subscription.
Used for authentication flows (registration, password reset).
Plain text messages are automatically converted to HTML format.
Args:
recipient: Email address of the recipient
subject: Email subject
message: Email body (can be HTML or plain text - plain text is auto-converted)
userId: Optional user ID for logging/audit purposes
Returns:
bool: True if email was sent successfully, False otherwise
"""
try:
# Convert plain text to HTML if needed
htmlMessage = self._textToHtml(message)
messagingInterface = self._getMessagingInterface()
success = messagingInterface.send(
channel=MessagingChannel.EMAIL,
recipient=recipient,
subject=subject,
message=htmlMessage
)
if success:
logger.info(f"Email sent successfully to {recipient} (userId: {userId})")
else:
logger.warning(f"Failed to send email to {recipient} (userId: {userId})")
return success
except Exception as e:
logger.error(f"Error sending email to {recipient}: {str(e)}", exc_info=True)
return False
def executeSubscription(
self,
subscriptionId: str,