From 628cca0ed4b1b73a8fb4a61d7c967d0eb0030c39 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 22 May 2025 00:48:56 +0200
Subject: [PATCH] refactory complete to review and test
---
app.py | 3 +
config.ini | 5 +
env_dev.env | 5 +-
env_prod.env | 3 +-
modules/agents/agentAnalyst.py | 226 ++++-------
modules/agents/agentDocumentation.py | 238 +++++------
modules/interfaces/gatewayInterface.py | 264 ++++++++++---
modules/interfaces/gatewayModel.py | 30 +-
modules/interfaces/googleAccess.py | 113 ++++++
modules/interfaces/googleInterface.py | 287 ++++++++++++++
modules/interfaces/googleModel.py | 35 ++
modules/interfaces/msftInterface.py | 525 +++++++++++++++----------
modules/interfaces/msftModel.py | 82 ++--
modules/routes/routeFiles.py | 10 +-
modules/routes/routeGeneral.py | 30 +-
modules/routes/routeGoogle.py | 322 +++++++++++++++
modules/routes/routeMsft.py | 67 ++--
modules/routes/routeUsers.py | 8 +-
modules/routes/routeWorkflows.py | 147 ++++---
modules/security/auth.py | 25 --
modules/workflow/agentBase.py | 141 ++++---
modules/workflow/agentRegistry.py | 19 +-
modules/workflow/documentProcessor.py | 234 +++--------
modules/workflow/workflowManager.py | 276 ++++++-------
notes/changelog.txt | 63 ++-
requirements.txt | 2 +
26 files changed, 1981 insertions(+), 1179 deletions(-)
create mode 100644 modules/interfaces/googleAccess.py
create mode 100644 modules/interfaces/googleInterface.py
create mode 100644 modules/interfaces/googleModel.py
create mode 100644 modules/routes/routeGoogle.py
diff --git a/app.py b/app.py
index 9642cdaa..3e405e65 100644
--- a/app.py
+++ b/app.py
@@ -119,3 +119,6 @@ app.include_router(workflowRouter)
from modules.routes.routeMsft import router as msftRouter
app.include_router(msftRouter)
+
+from modules.routes.routeGoogle import router as googleRouter
+app.include_router(googleRouter)
diff --git a/config.ini b/config.ini
index de1494c6..b59db403 100644
--- a/config.ini
+++ b/config.ini
@@ -50,3 +50,8 @@ Agent_Coder_EXECUTION_RETRY = 5
Service_MSFT_CLIENT_ID = c7e7112d-61dc-4f3a-8cd3-08cc4cd7504c
Service_MSFT_CLIENT_SECRET = Kxf8Q~2lJIteZ~JaI32kMf1lfaWKATqxXiNiFbzV
Service_MSFT_TENANT_ID = common
+
+# Google Service configuration
+Service_GOOGLE_CLIENT_ID = your-google-client-id
+Service_GOOGLE_CLIENT_SECRET = your-google-client-secret
+Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback
diff --git a/env_dev.env b/env_dev.env
index fd85ba89..8e01929b 100644
--- a/env_dev.env
+++ b/env_dev.env
@@ -40,5 +40,6 @@ APP_LOGGING_FILE_ENABLED = True
APP_LOGGING_ROTATION_SIZE = 10485760
APP_LOGGING_BACKUP_COUNT = 5
-# Agent Mail
-Service_MSFT_REDIRECT_URI = http://localhost:8000/api/msft/auth/callback
\ No newline at end of file
+# Service Redirects
+Service_MSFT_REDIRECT_URI = http://localhost:8000/api/msft/auth/callback
+Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback
\ No newline at end of file
diff --git a/env_prod.env b/env_prod.env
index a3ffc6ed..869452d3 100644
--- a/env_prod.env
+++ b/env_prod.env
@@ -40,5 +40,6 @@ APP_LOGGING_FILE_ENABLED = True
APP_LOGGING_ROTATION_SIZE = 10485760
APP_LOGGING_BACKUP_COUNT = 5
-# Service MSFT
+# Service Redirects
Service_MSFT_REDIRECT_URI = https://gateway.poweron-center.net/api/msft/auth/callback
+Service_GOOGLE_REDIRECT_URI = http://gateway.poweron-center.net/api/google/auth/callback
diff --git a/modules/agents/agentAnalyst.py b/modules/agents/agentAnalyst.py
index 04f5b090..fd0864fb 100644
--- a/modules/agents/agentAnalyst.py
+++ b/modules/agents/agentAnalyst.py
@@ -7,12 +7,15 @@ import logging
import json
import io
import base64
-from typing import Dict, Any, List
+import os
+import time
+from typing import Dict, Any, List, Optional
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from modules.workflow.agentBase import AgentBase
+from modules.interfaces.lucydomModel import ChatContent
logger = logging.getLogger(__name__)
@@ -634,97 +637,33 @@ class AgentAnalyst(AgentBase):
return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str,
- analysisPlan: Dict, description: str) -> Dict:
+ analysisPlan: Dict, description: str) -> ChatContent:
"""
- Create a data document (e.g., CSV, JSON) based on analysis.
+ Create a data document (CSV, JSON, Excel) from analysis results.
Args:
datasets: Dictionary of datasets
prompt: Original task prompt
outputLabel: Output filename
- analysisPlan: Analysis plan from AI
+ analysisPlan: Analysis plan
description: Output description
Returns:
- Data document
+ ChatContent object
"""
- # Determine format from filename
- formatType = outputLabel.split('.')[-1].lower()
-
- # If no datasets available, return error message
- if not datasets:
- return {
- "label": outputLabel,
- "content": f"No data available for processing into {formatType} format.",
- "metadata": {
- "contentType": "text/plain"
- }
- }
-
- # Generate data processing instructions
- dataPrompt = f"""
- Create Python code to process datasets and generate a {formatType} file for:
-
- TASK: {prompt}
-
- OUTPUT REQUIREMENTS:
- - Format: {formatType}
- - Filename: {outputLabel}
- - Description: {description}
-
- ANALYSIS CONTEXT:
- {json.dumps(analysisPlan, indent=2)}
-
- AVAILABLE DATASETS:
- """
-
- # Add dataset info
- for name, df in datasets.items():
- dataPrompt += f"\nDataset '{name}':\n"
- dataPrompt += f"- Shape: {df.shape}\n"
- dataPrompt += f"- Columns: {df.columns.tolist()}\n"
- dataPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n"
-
- dataPrompt += """
- Generate Python code that:
- 1. Processes the available dataset(s)
- 2. Performs necessary transformations, aggregations, or calculations
- 3. Outputs the result in the requested format
- 4. Returns the content as a string variable named 'result'
-
- Return ONLY executable Python code, no explanations or markdown.
- """
-
try:
- # Get data processing code from AI
- dataCode = await self.service.base.callAi([
- {"role": "system", "content": "You are a data processing expert. Provide only executable Python code."},
- {"role": "user", "content": dataPrompt}
- ], produceUserAnswer = True)
+ # Determine format from filename
+ formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "csv"
- # Clean code
- dataCode = dataCode.replace("```python", "").replace("```", "").strip()
-
- # Setup execution environment
- localVars = {"pd": pd, "np": __import__('numpy'), "io": io}
-
- # Add datasets to local variables
- for name, df in datasets.items():
- # Create a sanitized variable name
- varName = ''.join(c if c.isalnum() else '_' for c in name)
- localVars[varName] = df
-
- # Also add with standard names for simpler code
- if "df" not in localVars:
- localVars["df"] = df
- elif "df2" not in localVars:
- localVars["df2"] = df
-
- # Execute the code
- exec(dataCode, globals(), localVars)
-
- # Get the result
- result = localVars.get("result", "No output was generated.")
+ # Process data based on format
+ if formatType == "csv":
+ result = self._convertToCsv(datasets)
+ elif formatType == "json":
+ result = json.dumps(datasets, indent=2)
+ elif formatType == "xlsx":
+ result = self._convertToExcel(datasets)
+ else:
+ result = str(datasets)
# Determine content type
contentType = "text/csv" if formatType == "csv" else \
@@ -733,90 +672,72 @@ class AgentAnalyst(AgentBase):
"text/plain"
return self.formatAgentDocumentOutput(outputLabel, result, contentType)
-
except Exception as e:
logger.error(f"Error creating data document: {str(e)}", exc_info=True)
- return {
- "label": outputLabel,
- "content": f"Error generating {formatType} document: {str(e)}",
- "metadata": {
- "contentType": "text/plain"
- }
- }
+ errorContent = f"Error generating {formatType} document: {str(e)}"
+ return self.formatAgentDocumentOutput(outputLabel, errorContent, "text/plain")
async def _createTextDocument(self, datasets: Dict, context: str, prompt: str,
outputLabel: str, formatType: str,
- analysisPlan: Dict, description: str) -> Dict:
+ analysisPlan: Dict, description: str) -> ChatContent:
"""
- Create a text document (report, analysis, etc.) based on analysis.
+ Create a text document (markdown, HTML, text) from analysis results.
Args:
datasets: Dictionary of datasets
- context: Document context text
+ context: Document context
prompt: Original task prompt
outputLabel: Output filename
- formatType: Output format type
- analysisPlan: Analysis plan from AI
+ formatType: Output format
+ analysisPlan: Analysis plan
description: Output description
Returns:
- Text document
+ ChatContent object
"""
- # Create dataset summaries
- datasetSummaries = []
- for name, df in datasets.items():
- summary = f"Dataset: {name}\n"
- summary += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n"
- summary += f"- Columns: {', '.join(df.columns.tolist())}\n"
-
- # Basic statistics for numeric columns
- numericCols = df.select_dtypes(include=['number']).columns
- if len(numericCols) > 0:
- summary += "- Numeric Columns Stats:\n"
- for col in numericCols[:3]: # Limit to first 3
- stats = df[col].describe()
- summary += f" - {col}: min={stats['min']:.2f}, max={stats['max']:.2f}, mean={stats['mean']:.2f}\n"
-
- datasetSummaries.append(summary)
-
- # Determine content type based on format
- contentType = "text/markdown" if formatType in ["md", "markdown"] else \
- "text/html" if formatType == "html" else \
- "text/plain"
-
- # Generate analysis prompt
- analysisPrompt = f"""
- Create a detailed {formatType} document for:
-
- TASK: {prompt}
-
- OUTPUT REQUIREMENTS:
- - Format: {formatType}
- - Filename: {outputLabel}
- - Description: {description}
-
- ANALYSIS CONTEXT:
- {json.dumps(analysisPlan, indent=2)}
-
- DATASET SUMMARIES:
- {"".join(datasetSummaries)}
-
- DOCUMENT CONTEXT:
- {context[:2000]}... (truncated)
-
- Create a comprehensive, professional analysis document that addresses the task requirements.
- The document should:
- 1. Have a clear structure with headings and sections
- 2. Include relevant data findings and insights
- 3. Provide appropriate interpretations and recommendations
- 4. Format the content according to the required output format
-
- Your response should be the complete document content in the specified format.
- """
-
try:
+ # Generate dataset summaries
+ datasetSummaries = []
+ for name, df in datasets.items():
+ summary = f"\nDataset: {name}\n"
+ summary += f"Shape: {df.shape}\n"
+ summary += f"Columns: {', '.join(df.columns)}\n"
+ if not df.empty:
+ summary += f"Sample data:\n{df.head(3).to_string()}\n"
+ datasetSummaries.append(summary)
+
+ # Generate analysis prompt
+ analysisPrompt = f"""
+ Create a detailed {formatType} document for:
+
+ TASK: {prompt}
+
+ OUTPUT REQUIREMENTS:
+ - Format: {formatType}
+ - Filename: {outputLabel}
+ - Description: {description}
+
+ ANALYSIS CONTEXT:
+ {json.dumps(analysisPlan, indent=2)}
+
+ DATASET SUMMARIES:
+ {"".join(datasetSummaries)}
+
+ DOCUMENT CONTEXT:
+ {context[:2000]}... (truncated)
+
+ Create a comprehensive, professional analysis document that addresses the task requirements.
+ The document should:
+ 1. Have a clear structure with headings and sections
+ 2. Include relevant data findings and insights
+ 3. Provide appropriate interpretations and recommendations
+ 4. Format the content according to the required output format
+
+ Your response should be the complete document content in the specified format.
+ """
+
# Get document content from AI
documentContent = await self.service.base.callAi([
{"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."},
@@ -829,6 +750,11 @@ class AgentAnalyst(AgentBase):
elif formatType == "html" and not "{documentContent}"
+ # Determine content type
+ contentType = "text/markdown" if formatType in ["md", "markdown"] else \
+ "text/html" if formatType == "html" else \
+ "text/plain"
+
return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType)
except Exception as e:
@@ -842,13 +768,7 @@ class AgentAnalyst(AgentBase):
else:
content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}"
- return {
- "label": outputLabel,
- "content": content,
- "metadata": {
- "contentType": contentType
- }
- }
+ return self.formatAgentDocumentOutput(outputLabel, content, contentType)
def _getImageBase64(self, formatType: str = 'png') -> str:
"""
diff --git a/modules/agents/agentDocumentation.py b/modules/agents/agentDocumentation.py
index 839c2856..81d3ae43 100644
--- a/modules/agents/agentDocumentation.py
+++ b/modules/agents/agentDocumentation.py
@@ -8,8 +8,10 @@ from typing import Dict, Any, List
import json
import re
from datetime import datetime
+import os
from modules.workflow.agentBase import AgentBase
+from modules.interfaces.lucydomModel import ChatContent
logger = logging.getLogger(__name__)
@@ -292,7 +294,7 @@ class AgentDocumentation(AgentBase):
}
async def _createDocumentMultiStep(self, prompt: str, context: str, outputLabel: str,
- outputDescription: str, documentationPlan: Dict) -> Dict:
+ outputDescription: str, documentationPlan: Dict) -> ChatContent:
"""
Create a document using a multi-step approach with separate AI calls for each section.
@@ -304,155 +306,123 @@ class AgentDocumentation(AgentBase):
documentationPlan: Documentation plan from AI
Returns:
- Document object
+ ChatContent object
"""
- # Determine format from filename
- formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "md"
-
- # Map format to contentType
- contentTypeMap = {
- "md": "text/markdown",
- "markdown": "text/markdown",
- "html": "text/html",
- "txt": "text/plain",
- "text": "text/plain",
- "json": "application/json",
- "csv": "text/csv"
- }
-
- contentType = contentTypeMap.get(formatType, "text/plain")
-
- # Get document information
- title = documentationPlan.get("title", "Documentation")
- documentType = documentationPlan.get("documentType", "document")
- audience = documentationPlan.get("audience", "general")
- tone = documentationPlan.get("tone", "formal")
- keyTopics = documentationPlan.get("keyTopics", [])
- formattingRequirements = documentationPlan.get("formattingRequirements", [])
-
- # Get the detailed structure
- detailedStructure = documentationPlan.get("detailedStructure", [])
- if not detailedStructure:
- # Fallback structure if none provided
- detailedStructure = [
- {
- "title": "Introduction (Default)",
- "keyPoints": ["Purpose", "Scope"],
- "importance": "high"
- },
- {
- "title": "Main Content (Default)",
- "keyPoints": ["Core Information"],
- "importance": "high"
- },
- {
- "title": "Conclusion (Default)",
- "keyPoints": ["Summary", "Next Steps"],
- "importance": "medium"
- }
- ]
-
try:
- # Step 1: Generate document introduction
+ # Determine format from filename
+ formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "md"
+
+ # Map format to contentType
+ contentTypeMap = {
+ "md": "text/markdown",
+ "markdown": "text/markdown",
+ "html": "text/html",
+ "txt": "text/plain",
+ "text": "text/plain",
+ "json": "application/json",
+ "csv": "text/csv"
+ }
+
+ contentType = contentTypeMap.get(formatType, "text/plain")
+
+ # Get document information
+ title = documentationPlan.get("title", "Documentation")
+ documentType = documentationPlan.get("documentType", "document")
+ audience = documentationPlan.get("audience", "general")
+ tone = documentationPlan.get("tone", "formal")
+ keyTopics = documentationPlan.get("keyTopics", [])
+ formattingRequirements = documentationPlan.get("formattingRequirements", [])
+
+ # Get the detailed structure
+ detailedStructure = documentationPlan.get("detailedStructure", [])
+
+ # Step 1: Generate executive summary
+ summaryPrompt = f"""
+ Create an executive summary for a {documentType} titled "{title}".
+
+ DOCUMENT OVERVIEW:
+ - Type: {documentType}
+ - Audience: {audience}
+ - Key Topics: {', '.join(keyTopics)}
+
+ TASK CONTEXT: {prompt}
+
+ The executive summary should:
+ 1. Provide a concise overview of the document's purpose
+ 2. Highlight key points and findings
+ 3. Be clear and engaging for the target audience
+ 4. Set expectations for the document's content
+
+ Keep the summary brief but comprehensive.
+ """
+
+ executiveSummary = await self.service.base.callAi([
+ {"role": "system", "content": f"You are a documentation expert creating an executive summary in {formatType} format."},
+ {"role": "user", "content": summaryPrompt}
+ ], produceUserAnswer = True)
+
+ # Step 2: Generate introduction
introPrompt = f"""
-Create the introduction for a {documentType} titled "{title}".
-
-DOCUMENT OVERVIEW:
-- Type: {documentType}
-- Audience: {audience}
-- Tone: {tone}
-- Key Topics: {', '.join(keyTopics)}
-- Format: {formatType}
-
-TASK CONTEXT: {prompt}
-
-This introduction should:
-1. Clearly state the purpose and scope of the document
-2. Provide context and background information
-3. Outline what the reader will find in the document
-4. Set the appropriate tone for the {audience} audience
-
-The introduction should be professional and engaging, but short and precise, formatted according to {formatType} standards. do not add details, which are not requested by the Task Context.
-"""
+ Create an introduction for a {documentType} titled "{title}".
+
+ DOCUMENT OVERVIEW:
+ - Type: {documentType}
+ - Audience: {audience}
+ - Key Topics: {', '.join(keyTopics)}
+
+ TASK CONTEXT: {prompt}
+
+ The introduction should:
+ 1. Set the context and purpose of the document
+ 2. Outline the scope and objectives
+ 3. Preview the main topics to be covered
+ 4. Engage the reader's interest
+
+ Format the introduction according to {formatType} standards.
+ """
introduction = await self.service.base.callAi([
{"role": "system", "content": f"You are a documentation expert creating an introduction in {formatType} format."},
{"role": "user", "content": introPrompt}
], produceUserAnswer = True)
- # Step 2: Generate executive summary (if applicable)
- if documentType in ["report", "whitepaper", "case study"]:
- summaryPrompt = f"""
- Create an executive summary for a {documentType} titled "{title}".
+ # Step 3: Generate main sections
+ sections = []
+ for section in detailedStructure:
+ sectionTitle = section.get("title", "Section")
+ keyPoints = section.get("keyPoints", [])
+ subsections = section.get("subsections", [])
+ importance = section.get("importance", "medium")
+ estimatedLength = section.get("estimatedLength", "medium")
- DOCUMENT OVERVIEW:
+ sectionPrompt = f"""
+ Create the {sectionTitle} section for a {documentType} titled "{title}".
+
+ SECTION DETAILS:
+ - Title: {sectionTitle}
+ - Key Points: {', '.join(keyPoints)}
+ - Subsections: {', '.join(subsections)}
+ - Importance: {importance}
+ - Estimated Length: {estimatedLength}
+
+ DOCUMENT CONTEXT:
- Type: {documentType}
- Audience: {audience}
- Key Topics: {', '.join(keyTopics)}
TASK CONTEXT: {prompt}
- This executive summary should:
- 1. Provide a concise overview of the entire document
- 2. Highlight key findings, recommendations, or conclusions
- 3. Be suitable for executives or busy readers who may only read this section
- 4. Be professionally formatted according to {formatType} standards
+ The section should:
+ 1. Cover all key points thoroughly
+ 2. Include relevant subsections
+ 3. Maintain appropriate depth based on importance
+ 4. Follow the document's tone and style
- Keep the summary focused and impactful, approximately 200-300 words.
+ Format the section according to {formatType} standards.
"""
- executiveSummary = await self.service.base.callAi([
- {"role": "system", "content": f"You are a documentation expert creating an executive summary in {formatType} format."},
- {"role": "user", "content": summaryPrompt}
- ], produceUserAnswer = True)
- else:
- executiveSummary = ""
-
- # Step 3: Generate each section
- sections = []
-
- for section in detailedStructure:
- sectionTitle = section.get("title", "Section")
- keyPoints = section.get("keyPoints", [])
- subsections = section.get("subsections", [])
- importance = section.get("importance", "medium")
-
- # Adjust depth based on importance
- detailLevel = "high" if importance == "high" else "medium"
-
- sectionPrompt = f"""
-Create the "{sectionTitle}" section for a {documentType} titled "{title}".
-
-SECTION DETAILS:
-- Title: {sectionTitle}
-- Key Points to Cover: {', '.join(keyPoints)}
-- Subsections: {', '.join(subsections)}
-- Detail Level: {detailLevel}
-
-DOCUMENT CONTEXT:
-- Type: {documentType}
-- Audience: {audience}
-- Tone: {tone}
-- Format: {formatType}
-
-TASK CONTEXT: {prompt}
-
-AVAILABLE INFORMATION:
-{context[:500]}... (truncated)
-
-This section should:
-1. Be comprehensive and well-structured
-2. Cover all the key points listed
-3. Include the specified subsections with appropriate headings
-4. Maintain a {tone} tone suitable for the {audience} audience
-5. Be properly formatted according to {formatType} standards
-6. Include specific examples, data, or evidence where appropriate
-
-Be thorough in your coverage of this section, providing substantive content focussing on the Task content.
-"""
-
sectionContent = await self.service.base.callAi([
- {"role": "system", "content": f"You are a documentation expert creating detailed content for the {sectionTitle} section."},
+ {"role": "system", "content": f"You are a documentation expert creating a section in {formatType} format."},
{"role": "user", "content": sectionPrompt}
], produceUserAnswer = True)
@@ -548,13 +518,7 @@ Be thorough in your coverage of this section, providing substantive content focu
else:
content = f"Error in Documentation\n\nThere was an error generating the documentation: {str(e)}"
- return {
- "label": outputLabel,
- "content": content,
- "metadata": {
- "contentType": contentType
- }
- }
+ return self.formatAgentDocumentOutput(outputLabel, content, contentType)
# Factory function for the Documentation agent
diff --git a/modules/interfaces/gatewayInterface.py b/modules/interfaces/gatewayInterface.py
index ea917b98..1d4268eb 100644
--- a/modules/interfaces/gatewayInterface.py
+++ b/modules/interfaces/gatewayInterface.py
@@ -14,7 +14,7 @@ from passlib.context import CryptContext
from modules.connectors.connectorDbJson import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.gatewayAccess import GatewayAccess
-from modules.interfaces.gatewayModel import User, Mandate, UserInDB
+from modules.interfaces.gatewayModel import User, Mandate, UserInDB, UserConnection
logger = logging.getLogger(__name__)
@@ -22,6 +22,9 @@ logger = logging.getLogger(__name__)
# Singleton factory for GatewayInterface instances per context
_gatewayInterfaces = {}
+# Root interface instance
+_rootGatewayInterface = None
+
# Password-Hashing
pwdContext = CryptContext(schemes=["argon2"], deprecated="auto")
@@ -32,18 +35,22 @@ class GatewayInterface:
Manages users and mandates.
"""
- def __init__(self):
+ def __init__(self, currentUser: Dict[str, Any] = None):
"""Initializes the Gateway Interface."""
+ # Initialize variables
+ self.currentUser = currentUser
+ self.userId = currentUser.get("id") if currentUser else None
+ self.access = None # Will be set when user context is provided
+
# Initialize database
self._initializeDatabase()
# Initialize standard records if needed
self._initRecords()
- # Initialize variables
- self.currentUser = None
- self.userId = None
- self.access = None # Will be set when user context is provided
+ # Set user context if provided
+ if currentUser:
+ self.setUserContext(currentUser)
def setUserContext(self, currentUser: Dict[str, Any]):
"""Sets the user context for the interface."""
@@ -338,10 +345,117 @@ class GatewayInterface:
return User(**user)
+ def addUserConnection(self, userId: str, authority: str, externalId: str, externalUsername: str, externalEmail: Optional[str] = None) -> UserConnection:
+ """Add a new connection to an external service for a user"""
+ try:
+ # Get user
+ user = self.getUser(userId)
+ if not user:
+ raise ValueError(f"User {userId} not found")
+
+ # Check if connection already exists
+ for conn in user.connections:
+ if conn.authority == authority and conn.externalId == externalId:
+ raise ValueError(f"Connection to {authority} already exists for user {userId}")
+
+ # Create new connection
+ connection = UserConnection(
+ authority=authority,
+ externalId=externalId,
+ externalUsername=externalUsername,
+ externalEmail=externalEmail
+ )
+
+ # Add connection to user
+ user.connections.append(connection)
+
+ # Update user record
+ self.db.recordModify("users", userId, {"connections": [c.model_dump() for c in user.connections]})
+
+ return connection
+
+ except Exception as e:
+ logger.error(f"Error adding user connection: {str(e)}")
+ raise ValueError(f"Failed to add user connection: {str(e)}")
+
+ def removeUserConnection(self, userId: str, connectionId: str) -> None:
+ """Remove a connection to an external service for a user"""
+ try:
+ # Get user
+ user = self.getUser(userId)
+ if not user:
+ raise ValueError(f"User {userId} not found")
+
+ # Find and remove connection
+ user.connections = [c for c in user.connections if c.id != connectionId]
+
+ # Update user record
+ self.db.recordModify("users", userId, {"connections": [c.model_dump() for c in user.connections]})
+
+ except Exception as e:
+ logger.error(f"Error removing user connection: {str(e)}")
+ raise ValueError(f"Failed to remove user connection: {str(e)}")
+
+ def authenticateUser(self, username: str, password: str = None, authority: str = "local", external_token: str = None) -> Optional[User]:
+ """Authenticates a user by username and password or external authority."""
+ # Clear the users table from cache and reload it
+ if "users" in self.db._tablesCache:
+ del self.db._tablesCache["users"]
+
+ # Get user by username
+ user = self.getUserByUsername(username)
+
+ if not user:
+ raise ValueError("User not found")
+
+ # Check if the user is disabled
+ if user.disabled:
+ raise ValueError("User is disabled")
+
+ # Handle authentication based on authority
+ if authority == "local":
+ if not password:
+ raise ValueError("Password is required for local authentication")
+ # Get the full user record with password hash for verification
+ userWithPassword = UserInDB(**self.db.getRecordset("users", recordFilter={"id": user.id})[0])
+ if not self._verifyPassword(password, userWithPassword.hashedPassword):
+ raise ValueError("Invalid password")
+ elif authority in ["microsoft", "google"]: # Support for multiple external auth providers
+ # Verify that the user has the correct authentication authority
+ if user.authenticationAuthority != authority:
+ raise ValueError(f"User does not have {authority} authentication enabled")
+
+ # Verify that the user has a valid connection for this authority
+ if not any(conn.authority == authority for conn in user.connections):
+ raise ValueError(f"User does not have a valid {authority} connection")
+
+ # Verify the external token
+ if not external_token:
+ raise ValueError(f"External token is required for {authority} authentication")
+
+ # Get the appropriate auth service
+ if authority == "microsoft":
+ from .msftInterface import getInterface as getMsftInterface
+ auth_service = getMsftInterface({"_mandateId": user._mandateId, "id": user.id})
+ elif authority == "google":
+ from .googleInterface import getInterface as getGoogleInterface
+ auth_service = getGoogleInterface({"_mandateId": user._mandateId, "id": user.id})
+ else:
+ raise ValueError(f"Unsupported authentication authority: {authority}")
+
+ # Verify the token
+ if not auth_service.verifyToken(external_token):
+ raise ValueError(f"Invalid or expired {authority} token")
+ else:
+ raise ValueError(f"Unknown authentication authority: {authority}")
+
+ return user
+
def createUser(self, username: str, password: str = None, email: str = None, fullName: str = None,
language: str = "en", disabled: bool = False,
- privilege: str = "user", authenticationAuthority: str = "local") -> User:
- """Create a new user"""
+ privilege: str = "user", authenticationAuthority: str = "local",
+ externalId: str = None, externalUsername: str = None, externalEmail: str = None) -> User:
+ """Create a new user with optional external connection"""
try:
# Validate username
if not username:
@@ -369,7 +483,8 @@ class GatewayInterface:
disabled=disabled,
privilege=privilege,
authenticationAuthority=authenticationAuthority,
- hashedPassword=self._getPasswordHash(password) if authenticationAuthority == "local" else None
+ hashedPassword=self._getPasswordHash(password) if authenticationAuthority == "local" else None,
+ connections=[]
)
# Create user record
@@ -377,6 +492,16 @@ class GatewayInterface:
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create user record")
+ # Add external connection if provided
+ if externalId and externalUsername:
+ self.addUserConnection(
+ createdRecord["id"],
+ authenticationAuthority,
+ externalId,
+ externalUsername,
+ externalEmail
+ )
+
# Get created user using the returned ID
createdUser = self.db.getRecordset("users", recordFilter={"id": createdRecord["id"]})
if not createdUser or len(createdUser) == 0:
@@ -398,41 +523,6 @@ class GatewayInterface:
logger.error(f"Unexpected error creating user: {str(e)}")
raise ValueError(f"Failed to create user: {str(e)}")
- def authenticateUser(self, username: str, password: str = None) -> Optional[User]:
- """Authenticates a user by username and password."""
- # Clear the users table from cache and reload it
- if "users" in self.db._tablesCache:
- del self.db._tablesCache["users"]
-
- # Get user by username
- user = self.getUserByUsername(username)
-
- if not user:
- raise ValueError("Benutzer nicht gefunden")
-
- # Check if the user is disabled
- if user.disabled:
- raise ValueError("Benutzer ist deaktiviert")
-
- # Handle authentication based on authority
- auth_authority = user.authenticationAuthority
-
- if auth_authority == "local":
- if not password:
- raise ValueError("Passwort ist erforderlich")
- # Get the full user record with password hash for verification
- userWithPassword = UserInDB(**self.db.getRecordset("users", recordFilter={"id": user.id})[0])
- if not self._verifyPassword(password, userWithPassword.hashedPassword):
- raise ValueError("Falsches Passwort")
- elif auth_authority == "microsoft":
- # For Microsoft users, we don't verify the password here
- # The authentication is handled by the Microsoft OAuth flow
- pass
- else:
- raise ValueError(f"Unbekannte Authentifizierungsmethode: {auth_authority}")
-
- return user
-
def updateUser(self, userId: str, userData: Dict[str, Any]) -> User:
"""Updates a user if current user has permission."""
# Check if the user exists and current user has access
@@ -525,21 +615,83 @@ class GatewayInterface:
return success
-def getInterface(currentUser: Dict[str, Any] = None) -> 'GatewayInterface':
+ def setupLocalAuth(self, userId: str, password: str) -> User:
+ """Set up local authentication for a user who registered with Microsoft"""
+ try:
+ # Get user
+ user = self.getUser(userId)
+ if not user:
+ raise ValueError(f"User {userId} not found")
+
+ # Validate password
+ if not password:
+ raise ValueError("Password is required")
+ if len(password) < 8:
+ raise ValueError("Password must be at least 8 characters long")
+
+ # Update user with local password
+ userData = {
+ "hashedPassword": self._getPasswordHash(password),
+ "authenticationAuthority": "local" # Change to local auth
+ }
+
+ return self.updateUser(userId, userData)
+
+ except Exception as e:
+ logger.error(f"Error setting up local authentication: {str(e)}")
+ raise ValueError(f"Failed to set up local authentication: {str(e)}")
+
+
+def getInterface(currentUser: Dict[str, Any]) -> GatewayInterface:
"""
- Returns a GatewayInterface instance.
- If currentUser is provided, initializes with user context.
- Otherwise, returns an instance with only database access.
+ Returns a GatewayInterface instance for the current user.
+ Handles initialization of database and records.
"""
+ mandateId = currentUser.get("mandateId")
+ userId = currentUser.get("id")
+ if not mandateId or not userId:
+ raise ValueError("Invalid user context: mandateId and id are required")
+
+ # Create context key
+ contextKey = f"{mandateId}_{userId}"
+
# Create new instance if not exists
- if "default" not in _gatewayInterfaces:
- _gatewayInterfaces["default"] = GatewayInterface()
+ if contextKey not in _gatewayInterfaces:
+ _gatewayInterfaces[contextKey] = GatewayInterface(currentUser)
- interface = _gatewayInterfaces["default"]
+ return _gatewayInterfaces[contextKey]
+
+def getRootUser() -> Dict[str, Any]:
+ """
+ Returns the root user from the database.
+ This is the user with the initial ID in the users table.
+ """
+ try:
+ readInterface = getInterface()
+ # Get the initial user ID
+ initialUserId = readInterface.db.getInitialId("users")
+ if not initialUserId:
+ raise ValueError("No initial user ID found in database")
+
+ # Get the user record
+ users = readInterface.db.getRecordset("users", recordFilter={"id": initialUserId})
+ if not users:
+ raise ValueError(f"Root user with ID {initialUserId} not found in database")
+
+ return users[0]
+ except Exception as e:
+ logger.error(f"Error getting root user: {str(e)}")
+ raise ValueError(f"Failed to get root user: {str(e)}")
+
+def getRootInterface() -> GatewayInterface:
+ """
+ Returns a GatewayInterface instance with root privileges.
+ This is used for initial setup and user creation.
+ """
+ global _rootGatewayInterface
- if currentUser:
- interface.setUserContext(currentUser)
- else:
- logger.info("Returning interface without user context")
+ if _rootGatewayInterface is None:
+ rootUser = getRootUser()
+ _rootGatewayInterface = GatewayInterface(rootUser)
- return interface
+ return _rootGatewayInterface
diff --git a/modules/interfaces/gatewayModel.py b/modules/interfaces/gatewayModel.py
index f9059d3b..9c9ea178 100644
--- a/modules/interfaces/gatewayModel.py
+++ b/modules/interfaces/gatewayModel.py
@@ -56,6 +56,30 @@ class Mandate(BaseModel):
"language": Label(default="Language", translations={"en": "Language", "fr": "Langue"})
}
+class UserConnection(BaseModel):
+ """Data model for a user's connection to an external service"""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the connection")
+ authority: str = Field(description="Authentication authority (microsoft, google, etc.)")
+ externalId: str = Field(description="User ID in the external system")
+ externalUsername: str = Field(description="Username in the external system")
+ externalEmail: Optional[str] = Field(None, description="Email in the external system")
+ connectedAt: datetime = Field(default_factory=datetime.now, description="When the connection was established")
+
+ label: Label = Field(
+ default=Label(default="User Connection", translations={"en": "User Connection", "fr": "Connexion utilisateur"}),
+ description="Label for the class"
+ )
+
+ # Labels for attributes
+ fieldLabels: Dict[str, Label] = {
+ "id": Label(default="ID", translations={}),
+ "authority": Label(default="Authority", translations={"en": "Authority", "fr": "Autorité"}),
+ "externalId": Label(default="External ID", translations={"en": "External ID", "fr": "ID externe"}),
+ "externalUsername": Label(default="External Username", translations={"en": "External Username", "fr": "Nom d'utilisateur externe"}),
+ "externalEmail": Label(default="External Email", translations={"en": "External Email", "fr": "Email externe"}),
+ "connectedAt": Label(default="Connected At", translations={"en": "Connected At", "fr": "Connecté le"})
+ }
+
class User(BaseModel):
"""Data model for a user"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the user")
@@ -65,8 +89,9 @@ class User(BaseModel):
language: str = Field(description="Preferred language of the user")
disabled: Optional[bool] = Field(False, description="Indicates whether the user is disabled")
privilege: str = Field(description="Permission level") #sysadmin,admin,user
- authenticationAuthority: str = Field(default="local", description="Authentication authority (local, microsoft)")
+ authenticationAuthority: str = Field(default="local", description="Primary authentication authority (local, microsoft)")
mandateId: str = Field(description="ID of the mandate this user belongs to")
+ connections: List[UserConnection] = Field(default_factory=list, description="List of external service connections")
label: Label = Field(
default=Label(default="User", translations={"en": "User", "fr": "Utilisateur"}),
@@ -83,7 +108,8 @@ class User(BaseModel):
"language": Label(default="Language", translations={"en": "Language", "fr": "Langue"}),
"disabled": Label(default="Disabled", translations={"en": "Disabled", "fr": "Désactivé"}),
"privilege": Label(default="Permission level", translations={"en": "Access level", "fr": "Niveau d'accès"}),
- "authenticationAuthority": Label(default="Authentication Authority", translations={"en": "Authentication Authority", "fr": "Autorité d'authentification"})
+ "authenticationAuthority": Label(default="Authentication Authority", translations={"en": "Authentication Authority", "fr": "Autorité d'authentification"}),
+ "connections": Label(default="External Connections", translations={"en": "External Connections", "fr": "Connexions externes"})
}
diff --git a/modules/interfaces/googleAccess.py b/modules/interfaces/googleAccess.py
new file mode 100644
index 00000000..5ad21087
--- /dev/null
+++ b/modules/interfaces/googleAccess.py
@@ -0,0 +1,113 @@
+"""
+Access control module for Google interface.
+Handles user access management and permission checks for Google tokens.
+"""
+
+from typing import Dict, Any, List, Optional
+
+class GoogleAccess:
+ """
+ Access control class for Google interface.
+ Handles user access management and permission checks for Google tokens.
+ """
+
+ def __init__(self, currentUser: Dict[str, Any], db):
+ """Initialize with user context."""
+ self.currentUser = currentUser
+ self._mandateId = currentUser.get("_mandateId")
+ self._userId = currentUser.get("id")
+
+ if not self._mandateId or not self._userId:
+ raise ValueError("Invalid user context: _mandateId and id are required")
+
+ self.db = db
+
+ def uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """
+ Unified user access management function that filters data based on user privileges
+ and adds access control attributes.
+
+ Args:
+ table: Name of the table
+ recordset: Recordset to filter based on access rules
+
+ Returns:
+ Filtered recordset with access control attributes
+ """
+ userPrivilege = self.currentUser.get("privilege", "user")
+ filtered_records = []
+
+ # Apply filtering based on privilege
+ if userPrivilege == "sysadmin":
+ filtered_records = recordset # System admins see all records
+ elif userPrivilege == "admin":
+ # Admins see records in their mandate
+ filtered_records = [r for r in recordset if r.get("_mandateId") == self._mandateId]
+ else: # Regular users
+ # Users only see their own Google tokens
+ filtered_records = [r for r in recordset
+ if r.get("_mandateId") == self._mandateId and r.get("_userId") == self._userId]
+
+ # Add access control attributes to each record
+ for record in filtered_records:
+ record_id = record.get("id")
+
+ # Set access control flags based on user permissions
+ if table == "googleTokens":
+ record["_hideView"] = False # Everyone can view their own tokens
+ record["_hideEdit"] = not self.canModify("googleTokens", record_id)
+ record["_hideDelete"] = not self.canModify("googleTokens", record_id)
+ else:
+ # Default access control for other tables
+ record["_hideView"] = False
+ record["_hideEdit"] = not self.canModify(table, record_id)
+ record["_hideDelete"] = not self.canModify(table, record_id)
+
+ return filtered_records
+
+ def canModify(self, table: str, recordId: Optional[str] = None) -> bool:
+ """
+ Checks if the current user can modify (create/update/delete) records in a table.
+
+ Args:
+ table: Name of the table
+ recordId: Optional record ID for specific record check
+
+ Returns:
+ Boolean indicating permission
+ """
+ userPrivilege = self.currentUser.get("privilege", "user")
+
+ # System admins can modify anything
+ if userPrivilege == "sysadmin":
+ return True
+
+ # Check specific record permissions
+ if recordId is not None:
+ # Get the record to check ownership
+ records = self.db.getRecordset(table, recordFilter={"id": recordId})
+ if not records:
+ return False
+
+ record = records[0]
+
+ # Admins can modify anything in their mandate
+ if userPrivilege == "admin" and record.get("_mandateId") == self._mandateId:
+ return True
+
+ # Users can only modify their own Google tokens
+ if (record.get("_mandateId") == self._mandateId and
+ record.get("_userId") == self._userId):
+ return True
+
+ return False
+ else:
+ # For general table modify permission (e.g., create)
+ # Admins can create anything in their mandate
+ if userPrivilege == "admin":
+ return True
+
+ # Regular users can create their own Google tokens
+ if table == "googleTokens":
+ return True
+ return False
\ No newline at end of file
diff --git a/modules/interfaces/googleInterface.py b/modules/interfaces/googleInterface.py
new file mode 100644
index 00000000..7f60a53c
--- /dev/null
+++ b/modules/interfaces/googleInterface.py
@@ -0,0 +1,287 @@
+"""
+Google interface for handling Google authentication and API operations.
+"""
+
+import logging
+import requests
+from typing import Dict, Any, Optional, Tuple
+from datetime import datetime
+import secrets
+from google.oauth2.credentials import Credentials
+from google_auth_oauthlib.flow import Flow
+from google.auth.transport.requests import Request
+import os
+
+from modules.shared.configuration import APP_CONFIG
+from modules.interfaces.googleModel import GoogleToken, GoogleUserInfo, GoogleConfig
+from modules.connectors.connectorDbJson import DatabaseConnector
+from modules.interfaces.googleAccess import GoogleAccess
+from modules.interfaces.gatewayInterface import getRootUser
+
+logger = logging.getLogger(__name__)
+
+# Singleton factory for GoogleInterface instances per context
+_googleInterfaces = {}
+
+# Root interface instance
+_rootGoogleInterface = None
+
+class GoogleInterface:
+ """Interface for Google authentication and API operations"""
+
+ def __init__(self, currentUser: Dict[str, Any] = None):
+ """Initialize the Google interface"""
+ # Initialize variables
+ self.currentUser = currentUser
+ self.mandateId = currentUser.get("mandateId") if currentUser else None
+ self.userId = currentUser.get("id") if currentUser else None
+ self.access = None # Will be set when user context is provided
+
+ # Initialize configuration
+ self.clientId = APP_CONFIG.get("Service_GOOGLE_CLIENT_ID")
+ self.clientSecret = APP_CONFIG.get("Service_GOOGLE_CLIENT_SECRET")
+ self.redirectUri = APP_CONFIG.get("Service_GOOGLE_REDIRECT_URI")
+ self.authorityUrl = "https://accounts.google.com"
+ self.tokenUrl = "https://oauth2.googleapis.com/token"
+ self.userInfoUrl = "https://www.googleapis.com/oauth2/v3/userinfo"
+ self.scopes = ["openid", "profile", "email"]
+
+ # Initialize database
+ self._initializeDatabase()
+
+ # Initialize OAuth2 flow
+ self.flow = Flow.from_client_config(
+ {
+ "web": {
+ "client_id": self.clientId,
+ "client_secret": self.clientSecret,
+ "auth_uri": f"{self.authorityUrl}/o/oauth2/auth",
+ "token_uri": self.tokenUrl,
+ "redirect_uris": [self.redirectUri]
+ }
+ },
+ scopes=self.scopes
+ )
+
+ # Set user context if provided
+ if currentUser:
+ self.setUserContext(currentUser)
+
+ def _initializeDatabase(self):
+ """Initializes the database connection."""
+ try:
+ # Get configuration values with defaults
+ dbHost = APP_CONFIG.get("DB_GOOGLE_HOST", "data")
+ dbDatabase = APP_CONFIG.get("DB_GOOGLE_DATABASE", "google")
+ dbUser = APP_CONFIG.get("DB_GOOGLE_USER")
+ dbPassword = APP_CONFIG.get("DB_GOOGLE_PASSWORD_SECRET")
+
+ # Ensure the database directory exists
+ os.makedirs(dbHost, exist_ok=True)
+
+ self.db = DatabaseConnector(
+ dbHost=dbHost,
+ dbDatabase=dbDatabase,
+ dbUser=dbUser,
+ dbPassword=dbPassword,
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ # Set context
+ self.db.updateContext(self.mandateId, self.userId)
+
+ logger.info("Database initialized successfully")
+ except Exception as e:
+ logger.error(f"Failed to initialize database: {str(e)}")
+ raise
+
+ def initiateLogin(self) -> str:
+ """Initiate Google login flow"""
+ try:
+ # Generate auth URL
+ auth_url, _ = self.flow.authorization_url(
+ access_type="offline",
+ include_granted_scopes="true",
+ state=self._generateState()
+ )
+ return auth_url
+ except Exception as e:
+ logger.error(f"Error initiating Google login: {str(e)}")
+ return None
+
+ def handleAuthCallback(self, code: str) -> Optional[GoogleToken]:
+ """Handle Google OAuth callback"""
+ try:
+ # Exchange code for token
+ self.flow.fetch_token(code=code)
+ credentials = self.flow.credentials
+
+ # Get user info
+ user_info = self.getUserInfoFromToken(credentials.token)
+ if not user_info:
+ return None
+
+ # Create token model
+ token = GoogleToken(
+ access_token=credentials.token,
+ refresh_token=credentials.refresh_token,
+ expires_in=credentials.expiry.timestamp() - datetime.now().timestamp(),
+ token_type=credentials.token_type,
+ expires_at=credentials.expiry.timestamp(),
+ user_info=user_info.model_dump(),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ return token
+
+ except Exception as e:
+ logger.error(f"Error handling auth callback: {str(e)}")
+ return None
+
+ def verifyToken(self, token: str) -> bool:
+ """Verify Google token"""
+ try:
+ # Get user info from token
+ user_info = self.getUserInfoFromToken(token)
+ if not user_info:
+ return False
+
+ # Get current user's Google connection
+ user = self.db.getRecordset("users", recordFilter={"id": self.userId})[0]
+ google_connection = next((conn for conn in user.get("connections", [])
+ if conn.get("authority") == "google"), None)
+
+ if not google_connection:
+ return False
+
+ # Verify the token belongs to this user
+ return user_info.id == google_connection.get("externalId")
+
+ except Exception as e:
+ logger.error(f"Error verifying Google token: {str(e)}")
+ return False
+
+ def getUserInfoFromToken(self, token: str) -> Optional[GoogleUserInfo]:
+ """Get user info from Google API"""
+ try:
+ # Call Google API
+ response = requests.get(
+ self.userInfoUrl,
+ headers={"Authorization": f"Bearer {token}"}
+ )
+
+ if response.status_code != 200:
+ logger.error(f"Failed to get user info: {response.text}")
+ return None
+
+ data = response.json()
+
+ # Create user info model
+ return GoogleUserInfo(
+ id=data["sub"], # Google uses 'sub' as the unique identifier
+ email=data["email"],
+ name=data.get("name", ""),
+ picture=data.get("picture") # Google provides profile picture URL
+ )
+
+ except Exception as e:
+ logger.error(f"Error getting user info: {str(e)}")
+ return None
+
+ def refreshToken(self, refresh_token: str) -> Optional[GoogleToken]:
+ """Refresh Google token"""
+ try:
+ # Create credentials object
+ credentials = Credentials(
+ None, # No access token
+ refresh_token=refresh_token,
+ token_uri=self.tokenUrl,
+ client_id=self.clientId,
+ client_secret=self.clientSecret
+ )
+
+ # Refresh token
+ credentials.refresh(Request())
+
+ # Get user info
+ user_info = self.getUserInfoFromToken(credentials.token)
+ if not user_info:
+ return None
+
+ # Create token model
+ token = GoogleToken(
+ access_token=credentials.token,
+ refresh_token=credentials.refresh_token or refresh_token,
+ expires_in=credentials.expiry.timestamp() - datetime.now().timestamp(),
+ token_type=credentials.token_type,
+ expires_at=credentials.expiry.timestamp(),
+ user_info=user_info.model_dump(),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ return token
+
+ except Exception as e:
+ logger.error(f"Error refreshing token: {str(e)}")
+ return None
+
+ def _generateState(self) -> str:
+ """Generate secure state token"""
+ return secrets.token_urlsafe(32)
+
+ def setUserContext(self, currentUser: Dict[str, Any]):
+ """Set user context for the interface"""
+ if not currentUser:
+ logger.info("Initializing interface without user context")
+ return
+
+ self.currentUser = currentUser
+ self.mandateId = currentUser.get("mandateId")
+ self.userId = currentUser.get("id")
+
+ if not self.mandateId or not self.userId:
+ raise ValueError("Invalid user context: mandateId and id are required")
+
+ # Initialize access control with user context
+ self.access = GoogleAccess(self.currentUser, self.db)
+
+ # Update database context
+ self.db.updateContext(self.mandateId, self.userId)
+
+ logger.debug(f"User context set: userId={self.userId}")
+
+def getRootInterface() -> GoogleInterface:
+ """
+ Returns a GoogleInterface instance with root privileges.
+ This is used for initial setup and user creation.
+ """
+ global _rootGoogleInterface
+
+ if _rootGoogleInterface is None:
+ # Get root user from gateway
+ rootUser = getRootUser()
+ _rootGoogleInterface = GoogleInterface(rootUser)
+
+ return _rootGoogleInterface
+
+def getInterface(currentUser: Dict[str, Any] = None) -> GoogleInterface:
+ """
+ Returns a GoogleInterface instance.
+ If currentUser is provided, initializes with user context.
+ Otherwise, returns an instance with only database access.
+ """
+ # Create new instance if not exists
+ if "default" not in _googleInterfaces:
+ _googleInterfaces["default"] = GoogleInterface(currentUser or {})
+
+ interface = _googleInterfaces["default"]
+
+ if currentUser:
+ interface.setUserContext(currentUser)
+ else:
+ logger.info("Returning interface without user context")
+
+ return interface
\ No newline at end of file
diff --git a/modules/interfaces/googleModel.py b/modules/interfaces/googleModel.py
new file mode 100644
index 00000000..a85ca644
--- /dev/null
+++ b/modules/interfaces/googleModel.py
@@ -0,0 +1,35 @@
+"""
+Models for Google authentication and API operations.
+"""
+
+from pydantic import BaseModel, Field
+from typing import Optional, Dict, Any
+from datetime import datetime
+
+class GoogleToken(BaseModel):
+ """Model for Google OAuth tokens"""
+ access_token: str
+ refresh_token: Optional[str] = None
+ expires_in: int
+ token_type: str = "bearer"
+ expires_at: float
+ user_info: Dict[str, Any]
+ mandateId: str
+ userId: str
+
+class GoogleUserInfo(BaseModel):
+ """Model for Google user information"""
+ id: str # Google uses 'sub' as the unique identifier
+ email: str
+ name: str
+ picture: Optional[str] = None # Google provides profile picture URL
+
+class GoogleConfig(BaseModel):
+ """Configuration for Google authentication service"""
+ client_id: str
+ client_secret: str
+ redirect_uri: str
+ scopes: list[str]
+ authority_url: str = "https://accounts.google.com"
+ token_url: str = "https://oauth2.googleapis.com/token"
+ user_info_url: str = "https://www.googleapis.com/oauth2/v3/userinfo"
\ No newline at end of file
diff --git a/modules/interfaces/msftInterface.py b/modules/interfaces/msftInterface.py
index c864d483..d6720aae 100644
--- a/modules/interfaces/msftInterface.py
+++ b/modules/interfaces/msftInterface.py
@@ -13,47 +13,51 @@ import secrets
import os
from modules.shared.configuration import APP_CONFIG
-from modules.interfaces.msftModel import MsftToken, MsftUserInfo
+from .msftModel import MsftToken, MsftUserInfo, MsftConfig
from modules.connectors.connectorDbJson import DatabaseConnector
-from modules.interfaces.msftAccess import MsftAccess
+from .msftAccess import MsftAccess
+from modules.interfaces.gatewayInterface import getRootUser
logger = logging.getLogger(__name__)
# Singleton factory for MsftInterface instances per context
_msftInterfaces = {}
+# Root interface instance
+_rootMsftInterface = None
+
class MsftInterface:
"""Interface for Microsoft authentication and Graph API operations"""
- def __init__(self, currentUser: Dict[str, Any]):
+ def __init__(self, currentUser: Dict[str, Any] = None):
"""Initialize the Microsoft interface"""
+ # Initialize variables
self.currentUser = currentUser
- self._mandateId = currentUser.get("_mandateId")
- self._userId = currentUser.get("id")
-
- if not self._mandateId or not self._userId:
- raise ValueError("Invalid user context: _mandateId and id are required")
+ self.mandateId = currentUser.get("mandateId") if currentUser else None
+ self.userId = currentUser.get("id") if currentUser else None
+ self.access = None # Will be set when user context is provided
# Initialize configuration
- self.client_id = APP_CONFIG.get("Service_MSFT_CLIENT_ID")
- self.client_secret = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET")
- self.tenant_id = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
- self.redirect_uri = APP_CONFIG.get("Service_MSFT_REDIRECT_URI")
- self.authority = f"https://login.microsoftonline.com/{self.tenant_id}"
+ self.clientId = APP_CONFIG.get("Service_MSFT_CLIENT_ID")
+ self.clientSecret = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET")
+ self.tenantId = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
+ self.redirectUri = APP_CONFIG.get("Service_MSFT_REDIRECT_URI")
+ self.authority = f"https://login.microsoftonline.com/{self.tenantId}"
self.scopes = ["Mail.ReadWrite", "User.Read"]
# Initialize database
self._initializeDatabase()
- # Initialize access control
- self.access = MsftAccess(self.currentUser, self.db)
-
# Initialize MSAL application
self.msal_app = msal.ConfidentialClientApplication(
- self.client_id,
+ self.clientId,
authority=self.authority,
- client_credential=self.client_secret
+ client_credential=self.clientSecret
)
+
+ # Set user context if provided
+ if currentUser:
+ self.setUserContext(currentUser)
def _initializeDatabase(self):
"""Initializes the database connection."""
@@ -72,12 +76,12 @@ class MsftInterface:
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
- _mandateId=self._mandateId,
- _userId=self._userId
+ mandateId=self.mandateId,
+ userId=self.userId
)
# Set context
- self.db.updateContext(self._mandateId, self._userId)
+ self.db.updateContext(self.mandateId, self.userId)
logger.info("Database initialized successfully")
except Exception as e:
@@ -111,156 +115,147 @@ class MsftInterface:
"""
return self.access.canModify(table, recordId)
- def getMsftToken(self) -> Optional[MsftToken]:
- """Get Microsoft token for current user"""
+ def initiateLogin(self) -> str:
+ """Initiate Microsoft login flow"""
try:
- tokens = self.db.getRecordset("msftTokens", recordFilter={
- "_mandateId": self._mandateId,
- "_userId": self._userId
- })
- if not tokens:
- return None
-
- # Apply access control
- filtered_tokens = self._uam("msftTokens", tokens)
- if not filtered_tokens:
- return None
-
- return MsftToken(**filtered_tokens[0])
+ # Generate auth URL
+ auth_url = self.msal_app.get_authorization_request_url(
+ scopes=self.scopes,
+ redirect_uri=self.redirectUri,
+ state=self._generateState()
+ )
+ return auth_url
except Exception as e:
- logger.error(f"Error getting Microsoft token: {str(e)}")
+ logger.error(f"Error initiating Microsoft login: {str(e)}")
return None
- def saveMsftToken(self, token_data: Dict[str, Any]) -> bool:
- """Save Microsoft token data"""
+ def handleAuthCallback(self, code: str) -> Optional[MsftToken]:
+ """Handle Microsoft OAuth callback"""
try:
- # Check if user can modify tokens
- if not self._canModify("msftTokens"):
- raise PermissionError("No permission to save Microsoft token")
-
- # Add user and mandate IDs to token data
- token_data["_mandateId"] = self._mandateId
- token_data["_userId"] = self._userId
-
- # Validate token data using Pydantic model
- try:
- token = MsftToken(**token_data)
- except Exception as e:
- raise ValueError(f"Invalid token data: {str(e)}")
-
- # Check if token already exists
- existing_token = self.getMsftToken()
-
- if existing_token:
- # Update existing token
- return self.db.recordModify("msftTokens", existing_token.id, token.model_dump())
- else:
- # Create new token record
- return self.db.recordCreate("msftTokens", token.model_dump())
-
- except Exception as e:
- logger.error(f"Error saving Microsoft token: {str(e)}")
- return False
-
- def deleteMsftToken(self) -> bool:
- """Delete Microsoft token for current user"""
- try:
- if not self._canModify("msftTokens"):
- raise PermissionError("No permission to delete Microsoft token")
-
- existing_token = self.getMsftToken()
- if existing_token:
- return self.db.recordDelete("msftTokens", existing_token.id)
- return True
- except Exception as e:
- logger.error(f"Error deleting Microsoft token: {str(e)}")
- return False
-
- def getCurrentUserToken(self) -> Tuple[Optional[MsftUserInfo], Optional[str]]:
- """Get current user's Microsoft token and info"""
- try:
- token_data = self.getMsftToken()
- if not token_data:
- return None, None
-
- # Verify token is still valid
- if not self.verifyToken(token_data.access_token):
- if not self.refreshToken(token_data):
- return None, None
- token_data = self.getMsftToken()
-
- user_info = token_data.user_info
- if not user_info:
- user_info = self.getUserInfoFromToken(token_data.access_token)
- if user_info:
- token_data.user_info = user_info
- self.saveMsftToken(token_data.model_dump())
-
- return MsftUserInfo(**user_info) if user_info else None, token_data.access_token
-
- except Exception as e:
- logger.error(f"Error getting current user token: {str(e)}")
- return None, None
-
- def verifyToken(self, token: str) -> bool:
- """Verify the access token is valid"""
- try:
- headers = {
- 'Authorization': f'Bearer {token}',
- 'Content-Type': 'application/json'
- }
- response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
- return response.status_code == 200
- except Exception as e:
- logger.error(f"Error verifying token: {str(e)}")
- return False
-
- def refreshToken(self, token_data: MsftToken) -> bool:
- """Refresh the access token using the stored refresh token"""
- try:
- if not token_data or not token_data.refresh_token:
- return False
-
- result = self.msal_app.acquire_token_by_refresh_token(
- token_data.refresh_token,
- scopes=self.scopes
+ # Get token from code
+ token_response = self.msal_app.acquire_token_by_authorization_code(
+ code,
+ scopes=self.scopes,
+ redirect_uri=self.redirectUri
)
- if "error" in result:
- logger.error(f"Error refreshing token: {result.get('error')}")
- return False
+ if "error" in token_response:
+ logger.error(f"Token acquisition failed: {token_response['error']}")
+ return None
+
+ # Get user info
+ user_info = self.getUserInfoFromToken(token_response["access_token"])
+ if not user_info:
+ return None
+
+ # Create token model
+ token = MsftToken(
+ access_token=token_response["access_token"],
+ refresh_token=token_response.get("refresh_token", ""),
+ expires_in=token_response.get("expires_in", 0),
+ token_type=token_response.get("token_type", "bearer"),
+ expires_at=datetime.now().timestamp() + token_response.get("expires_in", 0),
+ user_info=user_info.model_dump(),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
- # Update token data
- token_data.access_token = result["access_token"]
- if "refresh_token" in result:
- token_data.refresh_token = result["refresh_token"]
-
- return self.saveMsftToken(token_data.model_dump())
+ return token
except Exception as e:
- logger.error(f"Error refreshing token: {str(e)}")
+ logger.error(f"Error handling auth callback: {str(e)}")
+ return None
+
+ def verifyToken(self, token: str) -> bool:
+ """Verify Microsoft token"""
+ try:
+ # Get user info from token
+ user_info = self.getUserInfoFromToken(token)
+ if not user_info:
+ return False
+
+ # Get current user's Microsoft connection
+ user = self.db.getRecordset("users", recordFilter={"id": self.userId})[0]
+ msft_connection = next((conn for conn in user.get("connections", [])
+ if conn.get("authority") == "microsoft"), None)
+
+ if not msft_connection:
+ return False
+
+ # Verify the token belongs to this user
+ return user_info.id == msft_connection.get("externalId")
+
+ except Exception as e:
+ logger.error(f"Error verifying Microsoft token: {str(e)}")
return False
- def getUserInfoFromToken(self, access_token: str) -> Optional[Dict[str, Any]]:
- """Get user information using the access token"""
+ def getUserInfoFromToken(self, token: str) -> Optional[MsftUserInfo]:
+ """Get user info from Microsoft Graph"""
try:
- headers = {
- 'Authorization': f'Bearer {access_token}',
- 'Content-Type': 'application/json'
- }
- response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
- if response.status_code == 200:
- user_data = response.json()
- return {
- "name": user_data.get("displayName", ""),
- "email": user_data.get("userPrincipalName", ""),
- "id": user_data.get("id", "")
- }
- return None
+ # Call Microsoft Graph API
+ response = requests.get(
+ "https://graph.microsoft.com/v1.0/me",
+ headers={"Authorization": f"Bearer {token}"}
+ )
+
+ if response.status_code != 200:
+ logger.error(f"Failed to get user info: {response.text}")
+ return None
+
+ data = response.json()
+
+ # Create user info model
+ return MsftUserInfo(
+ id=data["id"],
+ email=data.get("mail") or data.get("userPrincipalName"),
+ name=data.get("displayName", ""),
+ picture=None # Microsoft Graph doesn't provide profile picture by default
+ )
+
except Exception as e:
logger.error(f"Error getting user info: {str(e)}")
return None
+ def refreshToken(self, refresh_token: str) -> Optional[MsftToken]:
+ """Refresh Microsoft token"""
+ try:
+ # Refresh token
+ token_response = self.msal_app.acquire_token_by_refresh_token(
+ refresh_token,
+ scopes=self.scopes
+ )
+
+ if "error" in token_response:
+ logger.error(f"Token refresh failed: {token_response['error']}")
+ return None
+
+ # Get user info
+ user_info = self.getUserInfoFromToken(token_response["access_token"])
+ if not user_info:
+ return None
+
+ # Create token model
+ token = MsftToken(
+ access_token=token_response["access_token"],
+ refresh_token=token_response.get("refresh_token", refresh_token),
+ expires_in=token_response.get("expires_in", 0),
+ token_type=token_response.get("token_type", "bearer"),
+ expires_at=datetime.now().timestamp() + token_response.get("expires_in", 0),
+ user_info=user_info.model_dump(),
+ mandateId=self.mandateId,
+ userId=self.userId
+ )
+
+ return token
+
+ except Exception as e:
+ logger.error(f"Error refreshing token: {str(e)}")
+ return None
+
+ def _generateState(self) -> str:
+ """Generate secure state token"""
+ return secrets.token_urlsafe(32)
+
def createDraftEmail(self, recipient: str, subject: str, body: str, attachments: List[Dict[str, Any]] = None) -> bool:
"""Create a draft email using Microsoft Graph API"""
try:
@@ -340,70 +335,186 @@ class MsftInterface:
logger.error(f"Error creating draft email: {str(e)}")
return False
- def initiateLogin(self) -> str:
- """Initiate Microsoft login flow"""
+ def saveMsftToken(self, token_data: Dict[str, Any]) -> bool:
+ """
+ Save Microsoft token data to the database.
+
+ Args:
+ token_data: Token data to save
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
try:
- state = secrets.token_urlsafe(32)
- auth_url = self.msal_app.get_authorization_request_url(
- self.scopes,
- state=state,
- redirect_uri=self.redirect_uri
- )
- return auth_url
- except Exception as e:
- logger.error(f"Error initiating login: {str(e)}")
- return None
-
- def handleAuthCallback(self, code: str) -> Optional[Dict[str, Any]]:
- """Handle Microsoft OAuth callback"""
- try:
- token_response = self.msal_app.acquire_token_by_authorization_code(
- code,
- self.scopes,
- redirect_uri=self.redirect_uri
+ # Get existing token if any
+ existing_tokens = self.db.getRecordset(
+ "msftTokens",
+ recordFilter={
+ "mandateId": self.mandateId,
+ "userId": self.userId
+ }
)
- if "error" in token_response:
- logger.error(f"Token acquisition failed: {token_response['error']}")
- return None
+ if existing_tokens:
+ # Update existing token
+ token_id = existing_tokens[0]["id"]
+ success = self.db.updateRecord(
+ "msftTokens",
+ token_id,
+ token_data
+ )
+ else:
+ # Create new token record
+ success = self.db.createRecord(
+ "msftTokens",
+ token_data
+ )
- user_info = self.getUserInfoFromToken(token_response["access_token"])
- if not user_info:
- return None
-
- # Create MsftToken instance
- token_data = MsftToken(
- access_token=token_response["access_token"],
- refresh_token=token_response.get("refresh_token", ""),
- expires_in=token_response.get("expires_in", 0),
- token_type=token_response.get("token_type", "bearer"),
- expires_at=datetime.now().timestamp() + token_response.get("expires_in", 0),
- user_info=user_info,
- _mandateId=self._mandateId,
- _userId=self._userId
- )
-
- return token_data.model_dump()
+ return success
except Exception as e:
- logger.error(f"Error handling auth callback: {str(e)}")
+ logger.error(f"Error saving Microsoft token: {str(e)}")
+ return False
+
+ def getMsftToken(self) -> Optional[Dict[str, Any]]:
+ """
+ Get Microsoft token data for current user.
+
+ Returns:
+ Optional[Dict[str, Any]]: Token data if found, None otherwise
+ """
+ try:
+ tokens = self.db.getRecordset(
+ "msftTokens",
+ recordFilter={
+ "mandateId": self.mandateId,
+ "userId": self.userId
+ }
+ )
+
+ if not tokens:
+ return None
+
+ return tokens[0]
+
+ except Exception as e:
+ logger.error(f"Error getting Microsoft token: {str(e)}")
return None
+
+ def getCurrentUserToken(self) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
+ """
+ Get current user's Microsoft token and user info.
+
+ Returns:
+ Tuple[Optional[Dict[str, Any]], Optional[str]]: User info and access token
+ """
+ try:
+ token_data = self.getMsftToken()
+ if not token_data:
+ return None, None
+
+ # Check if token needs refresh
+ if datetime.now().timestamp() >= token_data["expires_at"]:
+ if not token_data.get("refresh_token"):
+ return None, None
+
+ # Refresh token
+ new_token = self.refreshToken(token_data["refresh_token"])
+ if not new_token:
+ return None, None
+
+ # Save new token
+ self.saveMsftToken(new_token.model_dump())
+ token_data = new_token.model_dump()
+
+ return token_data["user_info"], token_data["access_token"]
+
+ except Exception as e:
+ logger.error(f"Error getting current user token: {str(e)}")
+ return None, None
-def getInterface(currentUser: Dict[str, Any]) -> MsftInterface:
+ def deleteMsftToken(self) -> bool:
+ """
+ Delete Microsoft token for current user.
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+ try:
+ # Get existing token
+ existing_tokens = self.db.getRecordset(
+ "msftTokens",
+ recordFilter={
+ "mandateId": self.mandateId,
+ "userId": self.userId
+ }
+ )
+
+ if not existing_tokens:
+ return True # No token to delete
+
+ # Delete token
+ success = self.db.deleteRecord(
+ "msftTokens",
+ existing_tokens[0]["id"]
+ )
+
+ return success
+
+ except Exception as e:
+ logger.error(f"Error deleting Microsoft token: {str(e)}")
+ return False
+
+ def setUserContext(self, currentUser: Dict[str, Any]):
+ """Set user context for the interface"""
+ if not currentUser:
+ logger.info("Initializing interface without user context")
+ return
+
+ self.currentUser = currentUser
+ self.mandateId = currentUser.get("mandateId")
+ self.userId = currentUser.get("id")
+
+ if not self.mandateId or not self.userId:
+ raise ValueError("Invalid user context: mandateId and id are required")
+
+ # Initialize access control with user context
+ self.access = MsftAccess(self.currentUser, self.db)
+
+ # Update database context
+ self.db.updateContext(self.mandateId, self.userId)
+
+ logger.debug(f"User context set: userId={self.userId}")
+
+def getRootInterface() -> MsftInterface:
"""
- Returns a MsftInterface instance for the current user.
- Handles initialization of database and records.
+ Returns a MsftInterface instance with root privileges.
+ This is used for initial setup and user creation.
"""
- mandateId = currentUser.get("_mandateId")
- userId = currentUser.get("id")
- if not mandateId or not userId:
- raise ValueError("Invalid user context: _mandateId and id are required")
+ global _rootMsftInterface
- # Create context key
- contextKey = f"{mandateId}_{userId}"
+ if _rootMsftInterface is None:
+ # Get root user from gateway
+ rootUser = getRootUser()
+ _rootMsftInterface = MsftInterface(rootUser)
+ return _rootMsftInterface
+
+def getInterface(currentUser: Dict[str, Any] = None) -> MsftInterface:
+ """
+ Returns a MsftInterface instance.
+ If currentUser is provided, initializes with user context.
+ Otherwise, returns an instance with only database access.
+ """
# Create new instance if not exists
- if contextKey not in _msftInterfaces:
- _msftInterfaces[contextKey] = MsftInterface(currentUser)
+ if "default" not in _msftInterfaces:
+ _msftInterfaces["default"] = MsftInterface(currentUser or {})
- return _msftInterfaces[contextKey]
\ No newline at end of file
+ interface = _msftInterfaces["default"]
+
+ if currentUser:
+ interface.setUserContext(currentUser)
+ else:
+ logger.info("Returning interface without user context")
+
+ return interface
\ No newline at end of file
diff --git a/modules/interfaces/msftModel.py b/modules/interfaces/msftModel.py
index 7b6f062a..9ea1927d 100644
--- a/modules/interfaces/msftModel.py
+++ b/modules/interfaces/msftModel.py
@@ -1,10 +1,38 @@
"""
-Data models for Microsoft integration.
+Models for Microsoft authentication and Graph API operations.
"""
+
from pydantic import BaseModel, Field
-from typing import List, Dict, Any, Optional
+from typing import Optional, Dict, Any
from datetime import datetime
-import uuid
+
+class MsftToken(BaseModel):
+ """Model for Microsoft OAuth tokens"""
+ access_token: str
+ refresh_token: Optional[str] = None
+ expires_in: int
+ token_type: str = "bearer"
+ expires_at: float
+ user_info: Dict[str, Any]
+ mandateId: str
+ userId: str
+
+class MsftUserInfo(BaseModel):
+ """Model for Microsoft user information"""
+ id: str
+ email: str
+ name: str
+ picture: Optional[str] = None # Microsoft Graph doesn't provide profile picture by default
+
+class MsftConfig(BaseModel):
+ """Configuration for Microsoft authentication service"""
+ client_id: str
+ client_secret: str
+ redirect_uri: str
+ scopes: list[str]
+ authority_url: str = "https://login.microsoftonline.com/common"
+ token_url: str = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
+ user_info_url: str = "https://graph.microsoft.com/v1.0/me"
# Get all attributes of the model
def getModelAttributes(modelClass):
@@ -24,54 +52,6 @@ class Label(BaseModel):
return self.translations[language]
return self.default
-class MsftToken(BaseModel):
- """Data model for Microsoft authentication token"""
- id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique ID of the token")
- access_token: str = Field(description="Microsoft access token")
- refresh_token: str = Field(description="Microsoft refresh token")
- expires_in: int = Field(description="Token expiration time in seconds")
- token_type: str = Field(description="Type of token (usually 'bearer')")
- expires_at: float = Field(description="Timestamp when token expires")
- user_info: Optional[Dict[str, Any]] = Field(None, description="User information from Microsoft")
- _mandateId: str = Field(description="Mandate ID associated with the token")
- _userId: str = Field(description="User ID associated with the token")
-
- label: Label = Field(
- default=Label(default="Microsoft Token", translations={"en": "Microsoft Token", "fr": "Jeton Microsoft"}),
- description="Label for the class"
- )
-
- # Labels for attributes
- fieldLabels: Dict[str, Label] = {
- "id": Label(default="ID", translations={}),
- "access_token": Label(default="Access Token", translations={"en": "Access Token", "fr": "Jeton d'accès"}),
- "refresh_token": Label(default="Refresh Token", translations={"en": "Refresh Token", "fr": "Jeton de rafraîchissement"}),
- "expires_in": Label(default="Expires In", translations={"en": "Expires In", "fr": "Expire dans"}),
- "token_type": Label(default="Token Type", translations={"en": "Token Type", "fr": "Type de jeton"}),
- "expires_at": Label(default="Expires At", translations={"en": "Expires At", "fr": "Expire à"}),
- "user_info": Label(default="User Info", translations={"en": "User Info", "fr": "Info utilisateur"}),
- "_mandateId": Label(default="Mandate ID", translations={"en": "Mandate ID", "fr": "ID de mandat"}),
- "_userId": Label(default="User ID", translations={"en": "User ID", "fr": "ID utilisateur"})
- }
-
-class MsftUserInfo(BaseModel):
- """Data model for Microsoft user information"""
- name: str = Field(description="User's display name")
- email: str = Field(description="User's email address")
- id: str = Field(description="User's Microsoft ID")
-
- label: Label = Field(
- default=Label(default="Microsoft User Info", translations={"en": "Microsoft User Info", "fr": "Info utilisateur Microsoft"}),
- description="Label for the class"
- )
-
- # Labels for attributes
- fieldLabels: Dict[str, Label] = {
- "name": Label(default="Name", translations={"en": "Name", "fr": "Nom"}),
- "email": Label(default="Email", translations={"en": "Email", "fr": "E-mail"}),
- "id": Label(default="ID", translations={})
- }
-
# Response models for Microsoft routes
class MsftAuthStatus(BaseModel):
"""Response model for Microsoft authentication status"""
diff --git a/modules/routes/routeFiles.py b/modules/routes/routeFiles.py
index af42212d..5e9aae14 100644
--- a/modules/routes/routeFiles.py
+++ b/modules/routes/routeFiles.py
@@ -145,13 +145,13 @@ async def get_file(
async def update_file(
file_id: str,
file_data: FileItem,
- current_user: Dict[str, Any] = Depends(auth.getCurrentActiveUser)
+ currentUser: Dict[str, Any] = Depends(auth.getCurrentActiveUser)
):
"""
Update file metadata
"""
try:
- interfaceLucydom = lucydomInterface.getInterface(current_user)
+ interfaceLucydom = lucydomInterface.getInterface(currentUser)
# Get the file from the database
file = interfaceLucydom.getFile(file_id)
@@ -159,7 +159,7 @@ async def update_file(
raise HTTPException(status_code=404, detail="File not found")
# Check if user has access to the file
- if file.get("userId", 0) != current_user.get("id", 0):
+ if file.get("userId", 0) != currentUser.get("id", 0):
raise HTTPException(status_code=403, detail="Not authorized to update this file")
# Convert FileItem to dict for interface
@@ -171,8 +171,8 @@ async def update_file(
raise HTTPException(status_code=500, detail="Failed to update file")
# Get updated file and convert to FileItem
- updated_file = interfaceLucydom.getFile(file_id)
- return FileItem(**updated_file)
+ updatedFile = interfaceLucydom.getFile(file_id)
+ return FileItem(**updatedFile)
except HTTPException as he:
raise he
diff --git a/modules/routes/routeGeneral.py b/modules/routes/routeGeneral.py
index 2268a3b5..7fb669ae 100644
--- a/modules/routes/routeGeneral.py
+++ b/modules/routes/routeGeneral.py
@@ -1,8 +1,8 @@
from fastapi import APIRouter, HTTPException, Depends, Body, status, Response
-from fastapi.responses import FileResponse
+from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse, JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.staticfiles import StaticFiles
-from typing import Dict, Any
+from typing import Dict, Any, Optional
from datetime import timedelta
import pathlib
import os
@@ -11,7 +11,6 @@ from pathlib import Path as FilePath
from modules.shared.configuration import APP_CONFIG
import modules.security.auth as auth
-import modules.interfaces.gatewayModel as gatewayModel
import modules.interfaces.gatewayInterface as gatewayInterface
router = APIRouter(
@@ -54,16 +53,23 @@ async def options_route(fullPath: str):
return Response(status_code=200)
@router.post("/api/token", response_model=gatewayModel.Token, tags=["General"])
-async def login_for_access_token(formData: OAuth2PasswordRequestForm = Depends()):
+async def login_for_access_token(
+ formData: OAuth2PasswordRequestForm = Depends(),
+ authority: str = "local",
+ external_token: Optional[str] = None
+):
+ """Get access token for user authentication"""
# Create a new gateway interface instance with admin context
- interfaceRoot = auth.getRootInterface()
+ interfaceRoot = gatewayInterface.getRootInterface()
try:
- # Authenticate user
- user = interfaceRoot.authenticateUser(formData.username, formData.password)
-
- # Authenticate user and get token
- token = interfaceRoot.authenticateAndGetToken(formData.username, formData.password)
+ # Get token directly
+ token = interfaceRoot.authenticateAndGetToken(
+ username=formData.username,
+ password=formData.password,
+ authority=authority,
+ external_token=external_token
+ )
return token
except ValueError as e:
# Handle authentication errors
@@ -91,7 +97,7 @@ async def read_user_me(currentUser: Dict[str, Any] = Depends(auth.getCurrentActi
async def register_user(userData: gatewayModel.User):
"""Register a new user."""
try:
- interfaceRoot = auth.getRootInterface()
+ interfaceRoot = gatewayInterface.getRootInterface()
return interfaceRoot.registerUser(userData.model_dump())
except ValueError as e:
raise HTTPException(
@@ -112,7 +118,7 @@ async def check_username_availability(
):
"""Check if a username is available for registration"""
try:
- interfaceRoot = auth.getRootInterface()
+ interfaceRoot = gatewayInterface.getRootInterface()
return interfaceRoot.checkUsernameAvailability(username, authenticationAuthority)
except Exception as e:
logger.error(f"Error checking username availability: {str(e)}")
diff --git a/modules/routes/routeGoogle.py b/modules/routes/routeGoogle.py
new file mode 100644
index 00000000..bfd96f2b
--- /dev/null
+++ b/modules/routes/routeGoogle.py
@@ -0,0 +1,322 @@
+"""
+Routes for Google authentication.
+"""
+
+from fastapi import APIRouter, HTTPException, Depends, Request, Response, status, Cookie, Body
+from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
+import logging
+import json
+from typing import Dict, Any, Optional, List
+from datetime import datetime, timedelta
+
+# Import auth module
+import modules.security.auth as auth
+
+# Import interfaces
+import modules.interfaces.googleInterface as googleInterface
+import modules.interfaces.gatewayInterface as gatewayInterface
+from modules.interfaces.googleModel import (
+ GoogleToken,
+ GoogleUserInfo,
+ GoogleAuthStatus,
+ GoogleTokenResponse,
+ GoogleSaveTokenResponse
+)
+
+# Configure logger
+logger = logging.getLogger(__name__)
+
+# Create router for Google Auth endpoints
+router = APIRouter(
+ prefix="/api/google",
+ tags=["Google"],
+ responses={
+ 404: {"description": "Not found"},
+ 400: {"description": "Bad request"},
+ 401: {"description": "Unauthorized"},
+ 403: {"description": "Forbidden"},
+ 500: {"description": "Internal server error"}
+ }
+)
+
+@router.get("/login")
+async def login():
+ """Initiate Google login for the current user"""
+ try:
+ # Get Google interface with root context for initial setup
+ google = googleInterface.getRootInterface()
+
+ # Get login URL
+ auth_url = google.initiateLogin()
+ if not auth_url:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Failed to initiate Google login"
+ )
+
+ logger.info("Redirecting to Google login")
+ return RedirectResponse(auth_url)
+
+ except Exception as e:
+ logger.error(f"Error initiating Google login: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Failed to initiate Google login: {str(e)}"
+ )
+
+@router.get("/auth/callback")
+async def auth_callback(code: str, state: str, request: Request):
+ """Handle Google OAuth callback"""
+ try:
+ # Get Google interface with root context for initial setup
+ google = googleInterface.getRootInterface()
+
+ # Handle auth callback
+ token_response = google.handleAuthCallback(code)
+ if not token_response:
+ return HTMLResponse(
+ content="""
+
+
+ Authentication Failed
+
+
+
+ Authentication Failed
+ Could not acquire access token.
+
+
+
+ """,
+ status_code=400
+ )
+
+ # Get gateway interface for user operations
+ gateway = gatewayInterface.getRootInterface()
+
+ # Check if user exists
+ user = gateway.getUserByUsername(token_response.user_info["email"])
+
+ # If user doesn't exist, create a new user in the default mandate
+ if not user:
+ try:
+ # Get the root mandate ID
+ rootMandateId = gateway.getInitialId("mandates")
+ if not rootMandateId:
+ raise ValueError("Root mandate not found")
+
+ # Create new user with Google authentication
+ user = gateway.createUser(
+ username=token_response.user_info["email"],
+ email=token_response.user_info["email"],
+ fullName=token_response.user_info.get("name", token_response.user_info["email"]),
+ mandateId=rootMandateId,
+ authenticationAuthority="google"
+ )
+ logger.info(f"Created new user for Google account: {token_response.user_info['email']}")
+
+ # Verify user was created by retrieving it
+ user = gateway.getUserByUsername(token_response.user_info["email"])
+ if not user:
+ raise ValueError("Failed to retrieve created user")
+
+ except Exception as e:
+ logger.error(f"Failed to create user for Google account: {str(e)}")
+ return HTMLResponse(
+ content="""
+
+
+ Registration Failed
+
+
+
+ Registration Failed
+ Could not create user account.
+
+
+
+ """,
+ status_code=400
+ )
+
+ # Create backend token
+ access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
+ access_token = auth.createAccessToken(
+ data={
+ "sub": user["username"],
+ "mandateId": str(user["mandateId"]),
+ "userId": str(user["id"]),
+ "authenticationAuthority": "google"
+ },
+ expiresDelta=access_token_expires
+ )
+
+ # Store tokens in session storage for the frontend to pick up
+ response = HTMLResponse(
+ content=f"""
+
+
+ Authentication Successful
+
+
+
+ Authentication Successful
+ Welcome, {token_response.user_info.get('name', 'User')}!
+ This window will close automatically.
+
+
+
+ """
+ )
+
+ return response
+
+ except Exception as e:
+ logger.error(f"Error in auth callback: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Authentication failed: {str(e)}"
+ )
+
+@router.get("/status", response_model=GoogleAuthStatus)
+async def auth_status(currentUser: Dict[str, Any] = Depends(auth.getCurrentActiveUser)):
+ """Check Google authentication status"""
+ try:
+ # For authenticated endpoints, use the current user's context
+ google = googleInterface.getInterface(currentUser)
+
+ # Get current user token and info
+ user_info, access_token = google.getCurrentUserToken()
+
+ if not user_info or not access_token:
+ return GoogleAuthStatus(
+ authenticated=False,
+ message="Not authenticated with Google"
+ )
+
+ # Convert user_info to GoogleUserInfo model
+ user_info_model = GoogleUserInfo(**user_info)
+
+ return GoogleAuthStatus(
+ authenticated=True,
+ user=user_info_model
+ )
+
+ except Exception as e:
+ logger.error(f"Error checking authentication status: {str(e)}")
+ return GoogleAuthStatus(
+ authenticated=False,
+ message=f"Error checking authentication status: {str(e)}"
+ )
+
+@router.get("/token", response_model=GoogleTokenResponse)
+async def get_token(currentUser: Dict[str, Any] = Depends(auth.getCurrentActiveUser)):
+ """Get Google token for current user."""
+ try:
+ # For authenticated endpoints, use the current user's context
+ google = googleInterface.getInterface(currentUser)
+
+ # Get token
+ token_data = google.getGoogleToken()
+ if not token_data:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail="No token found"
+ )
+
+ # Convert to GoogleToken model
+ token = GoogleToken(**token_data)
+ return GoogleTokenResponse(token=token)
+
+ except Exception as e:
+ logger.error(f"Error getting token: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=str(e)
+ )
+
+@router.post("/save-token", response_model=GoogleSaveTokenResponse)
+async def save_token(
+ token_data: GoogleToken,
+ currentUser: Dict[str, Any] = Depends(auth.getCurrentActiveUser)
+):
+ """Save Google token data from frontend"""
+ try:
+ # For authenticated endpoints, use the current user's context
+ google = googleInterface.getInterface(currentUser)
+
+ # Save token
+ success = google.saveGoogleToken(token_data.model_dump())
+
+ if success:
+ return GoogleSaveTokenResponse(
+ success=True,
+ message="Token saved successfully",
+ token=token_data
+ )
+ else:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Failed to save token"
+ )
+
+ except Exception as e:
+ logger.error(f"Error saving token: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error saving token: {str(e)}"
+ )
+
+@router.post("/logout")
+async def logout(currentUser: Dict[str, Any] = Depends(auth.getCurrentActiveUser)):
+ """Logout from Google"""
+ try:
+ # For authenticated endpoints, use the current user's context
+ google = googleInterface.getInterface(currentUser)
+
+ # Delete token
+ success = google.deleteGoogleToken()
+
+ if success:
+ return JSONResponse({
+ "message": "Successfully logged out from Google"
+ })
+ else:
+ return JSONResponse({
+ "message": "Failed to logout from Google"
+ })
+
+ except Exception as e:
+ logger.error(f"Error during logout: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Logout failed: {str(e)}"
+ )
\ No newline at end of file
diff --git a/modules/routes/routeMsft.py b/modules/routes/routeMsft.py
index f34613d3..514e0a1e 100644
--- a/modules/routes/routeMsft.py
+++ b/modules/routes/routeMsft.py
@@ -1,3 +1,7 @@
+"""
+Routes for Microsoft authentication.
+"""
+
from fastapi import APIRouter, HTTPException, Depends, Request, Response, status, Cookie, Body
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
@@ -10,6 +14,7 @@ import modules.security.auth as auth
# Import interfaces
import modules.interfaces.msftInterface as msftInterface
+import modules.interfaces.gatewayInterface as gatewayInterface
from modules.interfaces.msftModel import (
MsftToken,
MsftUserInfo,
@@ -38,8 +43,8 @@ router = APIRouter(
async def login():
"""Initiate Microsoft login for the current user"""
try:
- # Get Microsoft interface
- msft = msftInterface.getInterface({"_mandateId": "root", "id": "root"})
+ # Get Microsoft interface with root context for initial setup
+ msft = msftInterface.getRootInterface()
# Get login URL
auth_url = msft.initiateLogin()
@@ -63,8 +68,8 @@ async def login():
async def auth_callback(code: str, state: str, request: Request):
"""Handle Microsoft OAuth callback"""
try:
- # Get Microsoft interface
- msft = msftInterface.getInterface({"_mandateId": "root", "id": "root"})
+ # Get Microsoft interface with root context for initial setup
+ msft = msftInterface.getRootInterface()
# Handle auth callback
token_response = msft.handleAuthCallback(code)
@@ -92,10 +97,10 @@ async def auth_callback(code: str, state: str, request: Request):
)
# Get gateway interface for user operations
- gateway = auth.getRootInterface()
+ gateway = gatewayInterface.getRootInterface()
# Check if user exists
- user = gateway.getUserByUsername(token_response["user_info"]["email"])
+ user = gateway.getUserByUsername(token_response.user_info["email"])
# If user doesn't exist, create a new user in the default mandate
if not user:
@@ -107,16 +112,16 @@ async def auth_callback(code: str, state: str, request: Request):
# Create new user with Microsoft authentication
user = gateway.createUser(
- username=token_response["user_info"]["email"],
- email=token_response["user_info"]["email"],
- fullName=token_response["user_info"].get("name", token_response["user_info"]["email"]),
- _mandateId=rootMandateId,
+ username=token_response.user_info["email"],
+ email=token_response.user_info["email"],
+ fullName=token_response.user_info.get("name", token_response.user_info["email"]),
+ mandateId=rootMandateId,
authenticationAuthority="microsoft"
)
- logger.info(f"Created new user for Microsoft account: {token_response['user_info']['email']}")
+ logger.info(f"Created new user for Microsoft account: {token_response.user_info['email']}")
# Verify user was created by retrieving it
- user = gateway.getUserByUsername(token_response["user_info"]["email"])
+ user = gateway.getUserByUsername(token_response.user_info["email"])
if not user:
raise ValueError("Failed to retrieve created user")
@@ -149,8 +154,8 @@ async def auth_callback(code: str, state: str, request: Request):
access_token = auth.createAccessToken(
data={
"sub": user["username"],
- "_mandateId": str(user["_mandateId"]),
- "_userId": str(user["id"]),
+ "mandateId": str(user["mandateId"]),
+ "userId": str(user["id"]),
"authenticationAuthority": "microsoft"
},
expiresDelta=access_token_expires
@@ -169,18 +174,18 @@ async def auth_callback(code: str, state: str, request: Request):
Authentication Successful
- Welcome, {token_response['user_info'].get('name', 'User')}!
+ Welcome, {token_response.user_info.get('name', 'User')}!
This window will close automatically.