From dd48e0f009a76dd212f26943da4c7bd1969dd8c8 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 12 Jun 2025 01:11:33 +0200
Subject: [PATCH] object-structure and references
---
modules/connectors/connectorAiAnthropic.py | 6 +-
modules/connectors/connectorAiOpenai.py | 10 +-
modules/connectors/connectorDbJson.py | 16 +-
modules/interfaces/interfaceAi.py | 38 ++
modules/interfaces/serviceAppAccess.py | 13 +-
modules/interfaces/serviceChatAccess.py | 2 +-
modules/interfaces/serviceChatClass.py | 184 +++++-
modules/interfaces/serviceChatModel.py | 109 +++-
modules/interfaces/serviceManagementAccess.py | 2 +-
modules/interfaces/serviceManagementClass.py | 139 ++++-
modules/methods/methodBase.py | 35 +-
modules/methods/methodCoder.py | 3 +-
modules/methods/methodOperator.py | 178 ++++++
modules/methods/methodOutlook.py | 3 +-
modules/methods/methodPowerpoint.py | 3 +-
modules/methods/methodSharepoint.py | 3 +-
modules/methods/methodWeb.py | 3 +-
modules/shared/attributeUtils.py | 4 +-
modules/workflow/managerChat.py | 308 +++-------
modules/workflow/managerDocument.py | 83 +--
modules/workflow/managerWorkflow.py | 6 +-
modules/workflow/processorDocument.py | 151 ++---
modules/workflow/serviceContainer.py | 548 +++++++-----------
notes/changelog.txt | 81 ++-
test/pytest.ini | 3 -
test/test_chat.py | 71 ---
26 files changed, 1045 insertions(+), 957 deletions(-)
create mode 100644 modules/interfaces/interfaceAi.py
create mode 100644 modules/methods/methodOperator.py
delete mode 100644 test/pytest.ini
delete mode 100644 test/test_chat.py
diff --git a/modules/connectors/connectorAiAnthropic.py b/modules/connectors/connectorAiAnthropic.py
index b1e435ee..9f452790 100644
--- a/modules/connectors/connectorAiAnthropic.py
+++ b/modules/connectors/connectorAiAnthropic.py
@@ -137,7 +137,11 @@ class ChatService:
textParts.append(part)
if textParts:
- textParts[0]["text"] = systemContent + textParts[0].get("text", "")
+ # Create a new text part with combined content
+ textParts[0] = {
+ "type": "text",
+ "text": systemContent + textParts[0].get("text", "")
+ }
# Anthropic only supports "user" and "assistant" roles
if role not in ["user", "assistant"]:
diff --git a/modules/connectors/connectorAiOpenai.py b/modules/connectors/connectorAiOpenai.py
index ad79f445..1014137d 100644
--- a/modules/connectors/connectorAiOpenai.py
+++ b/modules/connectors/connectorAiOpenai.py
@@ -18,7 +18,7 @@ def loadConfigData():
"maxTokens": int(APP_CONFIG.get('Connector_AiOpenai_MAX_TOKENS'))
}
-class ChatService:
+class AiConnector:
"""Connector for communication with the OpenAI API."""
def __init__(self):
@@ -38,7 +38,7 @@ class ChatService:
)
logger.info(f"OpenAI Connector initialized with model: {self.modelName}")
- async def callApi(self, messages: List[Dict[str, Any]], temperature: float = None, maxTokens: int = None) -> str:
+ async def callAiBasic(self, messages: List[Dict[str, Any]], temperature: float = None, maxTokens: int = None) -> str:
"""
Calls the OpenAI API with the given messages.
@@ -85,11 +85,7 @@ class ChatService:
logger.error(f"Error calling OpenAI API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error calling OpenAI API: {str(e)}")
- async def close(self):
- """Closes the HTTP client when the application exits"""
- await self.httpClient.aclose()
-
- async def analyzeImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str:
+ async def callAiImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str:
"""
Analyzes an image with the OpenAI Vision API.
diff --git a/modules/connectors/connectorDbJson.py b/modules/connectors/connectorDbJson.py
index fd38ffcc..0d6658ae 100644
--- a/modules/connectors/connectorDbJson.py
+++ b/modules/connectors/connectorDbJson.py
@@ -1,6 +1,6 @@
import json
import os
-from typing import List, Dict, Any, Optional, Union
+from typing import List, Dict, Any, Optional, Union, TypedDict
import logging
from datetime import datetime
import uuid
@@ -10,6 +10,10 @@ from modules.shared.attributeUtils import to_dict
logger = logging.getLogger(__name__)
+class TableCache(TypedDict):
+ """Type definition for table cache entries"""
+ recordIds: List[str]
+
class DatabaseConnector:
"""
A connector for JSON-based data storage.
@@ -31,8 +35,8 @@ class DatabaseConnector:
os.makedirs(self.dbFolder, exist_ok=True)
# Cache for loaded data
- self._tablesCache = {}
- self._tableMetadataCache = {} # Cache for table metadata (record IDs, etc.)
+ self._tablesCache: Dict[str, List[Dict[str, Any]]] = {}
+ self._tableMetadataCache: Dict[str, TableCache] = {} # Cache for table metadata (record IDs, etc.)
# Initialize system table
self._systemTableName = "_system"
@@ -91,7 +95,7 @@ class DatabaseConnector:
"""Returns the full path to a table folder"""
return os.path.join(self.dbFolder, table)
- def _getRecordPath(self, table: str, recordId: Union[str, int]) -> str:
+ def _getRecordPath(self, table: str, recordId: str) -> str:
"""Returns the full path to a record file"""
return os.path.join(self._getTablePath(table), f"{recordId}.json")
@@ -134,7 +138,7 @@ class DatabaseConnector:
return metadata
- def _loadRecord(self, table: str, recordId: Union[str, int]) -> Optional[Dict[str, Any]]:
+ def _loadRecord(self, table: str, recordId: str) -> Optional[Dict[str, Any]]:
"""Loads a single record from the table."""
recordPath = self._getRecordPath(table, recordId)
try:
@@ -521,4 +525,4 @@ class DatabaseConnector:
def getAllInitialIds(self) -> Dict[str, str]:
"""Returns all registered initial IDs."""
systemData = self._loadSystemTable()
- return systemData.copy() # Return a copy to protect the original
+ return systemData.copy() # Return a copy to protect the original
\ No newline at end of file
diff --git a/modules/interfaces/interfaceAi.py b/modules/interfaces/interfaceAi.py
new file mode 100644
index 00000000..5c19ffb0
--- /dev/null
+++ b/modules/interfaces/interfaceAi.py
@@ -0,0 +1,38 @@
+import logging
+from typing import Dict, Any, List, Union
+from modules.connectors.connectorAiOpenai import AiConnector
+
+logger = logging.getLogger(__name__)
+
+class AiInterface:
+ """Interface for AI service interactions"""
+
+ def __init__(self):
+ self.service = AiConnector()
+
+ async def callAiBasic(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str:
+ """Enhanced AI service call with language support."""
+
+ # Add language instruction for user-facing responses
+ if produceUserAnswer and hasattr(self, 'userLanguage') and self.userLanguage:
+ ltext = f"Please respond in '{self.userLanguage}' language."
+ if messages and messages[0]["role"] == "system":
+ if "language" not in messages[0]["content"].lower():
+ messages[0]["content"] = f"{ltext} {messages[0]['content']}"
+ else:
+ # Insert a system message with language instruction
+ messages.insert(0, {
+ "role": "system",
+ "content": ltext
+ })
+
+ # Call the AI service
+ return await self.service.callAiBasic(messages, temperature=temperature)
+
+ async def callAiImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str:
+ """Enhanced AI service call with language support."""
+ if not self.service:
+ logger.error("AI service not set in AiInterface")
+ return "Error: AI service not available"
+ return await self.service.callAiImage(imageData, mimeType, prompt)
+
diff --git a/modules/interfaces/serviceAppAccess.py b/modules/interfaces/serviceAppAccess.py
index 9ecac84c..a7b301f8 100644
--- a/modules/interfaces/serviceAppAccess.py
+++ b/modules/interfaces/serviceAppAccess.py
@@ -66,7 +66,8 @@ class AppAccess:
filtered_records = recordset
elif self.privilege == UserPrivilege.ADMIN:
# Admin sees connections for users in their mandate
- user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})]
+ users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
+ user_ids: List[str] = [str(u["id"]) for u in users]
filtered_records = [r for r in recordset if r.get("userId") in user_ids]
else:
# Regular users only see their own connections
@@ -114,7 +115,8 @@ class AppAccess:
record["_hideDelete"] = False
# Admin can edit/delete connections for users in their mandate
elif self.privilege == UserPrivilege.ADMIN:
- user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})]
+ users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
+ user_ids: List[str] = [str(u["id"]) for u in users]
record["_hideEdit"] = record.get("userId") not in user_ids
record["_hideDelete"] = record.get("userId") not in user_ids
# Regular users can only edit/delete their own connections
@@ -167,7 +169,7 @@ class AppAccess:
# Check specific record permissions
if recordId is not None:
# Get the record to check ownership
- records = self.db.getRecordset(table, recordFilter={"id": recordId})
+ records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": str(recordId)})
if not records:
return False
@@ -177,7 +179,8 @@ class AppAccess:
if table == "connections":
# Admin can modify connections for users in their mandate
if self.privilege == UserPrivilege.ADMIN:
- user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})]
+ users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})
+ user_ids: List[str] = [str(u["id"]) for u in users]
return record.get("userId") in user_ids
# Users can only modify their own connections
return record.get("userId") == self.userId
@@ -213,7 +216,7 @@ class AppAccess:
"""
try:
# Get session
- sessions = self.db.getRecordset("sessions", recordFilter={"id": sessionId})
+ sessions: List[Dict[str, Any]] = self.db.getRecordset("sessions", recordFilter={"id": sessionId})
if not sessions:
return False
diff --git a/modules/interfaces/serviceChatAccess.py b/modules/interfaces/serviceChatAccess.py
index 12e37814..cd9428ff 100644
--- a/modules/interfaces/serviceChatAccess.py
+++ b/modules/interfaces/serviceChatAccess.py
@@ -94,7 +94,7 @@ class ChatAccess:
# For regular users and admins, check specific cases
if recordId is not None:
# Get the record to check ownership
- records = self.db.getRecordset(table, recordFilter={"id": recordId})
+ records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": recordId})
if not records:
return False
diff --git a/modules/interfaces/serviceChatClass.py b/modules/interfaces/serviceChatClass.py
index 0b80150e..eb730e58 100644
--- a/modules/interfaces/serviceChatClass.py
+++ b/modules/interfaces/serviceChatClass.py
@@ -14,9 +14,7 @@ import asyncio
from modules.interfaces.serviceChatAccess import ChatAccess
from modules.interfaces.serviceChatModel import (
- TaskStatus, UserInputRequest, ContentMetadata, ContentItem,
- ChatDocument, TaskDocument, ExtractedContent, TaskItem,
- TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
+ TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction
)
from modules.interfaces.serviceAppModel import User
@@ -846,6 +844,186 @@ class ChatInterface:
logger.error(f"Error stopping workflow: {str(e)}")
raise
+ # Task Management
+
+ def getTask(self, taskId: str) -> Optional[TaskItem]:
+ """Returns a task by ID if user has access."""
+ tasks = self.db.getRecordset("tasks", recordFilter={"id": taskId})
+ if not tasks:
+ return None
+
+ filteredTasks = self._uam("tasks", tasks)
+ if not filteredTasks:
+ return None
+
+ task = filteredTasks[0]
+ try:
+ # Validate task data against TaskItem model
+ return TaskItem(
+ id=task["id"],
+ workflowId=task["workflowId"],
+ status=task.get("status", TaskStatus.PENDING),
+ error=task.get("error"),
+ startedAt=task.get("startedAt"),
+ finishedAt=task.get("finishedAt"),
+ actionList=[TaskAction(**action) for action in task.get("actionList", [])],
+ documentsOutput=task.get("documentsOutput", []),
+ retryCount=task.get("retryCount", 0),
+ retryMax=task.get("retryMax", 3),
+ rollbackOnFailure=task.get("rollbackOnFailure", True),
+ dependencies=task.get("dependencies", []),
+ feedback=task.get("feedback")
+ )
+ except Exception as e:
+ logger.error(f"Error validating task data: {str(e)}")
+ return None
+
+ def getWorkflowTasks(self, workflowId: str) -> List[TaskItem]:
+ """Returns tasks for a workflow if user has access to the workflow."""
+ # Check workflow access first
+ workflow = self.getWorkflow(workflowId)
+ if not workflow:
+ return []
+
+ # Get tasks for this workflow
+ tasks = self.db.getRecordset("tasks", recordFilter={"workflowId": workflowId})
+ return [TaskItem(**task) for task in self._uam("tasks", tasks)]
+
+ def createTask(self, taskData: Dict[str, Any]) -> TaskItem:
+ """Creates a new task if user has access to the workflow."""
+ try:
+ # Check workflow access
+ workflowId = taskData.get("workflowId")
+ if not workflowId:
+ logger.error("No workflowId provided for createTask")
+ return None
+
+ workflow = self.getWorkflow(workflowId)
+ if not workflow:
+ logger.warning(f"No access to workflow {workflowId}")
+ return None
+
+ if not self._canModify("workflows", workflowId):
+ logger.warning(f"No permission to modify workflow {workflowId}")
+ return None
+
+ # Ensure required fields
+ if "id" not in taskData:
+ taskData["id"] = f"task_{uuid.uuid4()}"
+
+ if "status" not in taskData:
+ taskData["status"] = TaskStatus.PENDING
+
+ if "startedAt" not in taskData:
+ taskData["startedAt"] = self._getCurrentTimestamp()
+
+ # Create task in database
+ createdTask = self.db.recordCreate("tasks", taskData)
+
+ # Convert to TaskItem model
+ task = TaskItem(
+ id=createdTask["id"],
+ workflowId=createdTask["workflowId"],
+ status=createdTask.get("status", TaskStatus.PENDING),
+ error=createdTask.get("error"),
+ startedAt=createdTask.get("startedAt"),
+ finishedAt=createdTask.get("finishedAt"),
+ actionList=[TaskAction(**action) for action in createdTask.get("actionList", [])],
+ documentsOutput=createdTask.get("documentsOutput", []),
+ retryCount=createdTask.get("retryCount", 0),
+ retryMax=createdTask.get("retryMax", 3),
+ rollbackOnFailure=createdTask.get("rollbackOnFailure", True),
+ dependencies=createdTask.get("dependencies", []),
+ feedback=createdTask.get("feedback")
+ )
+
+ # Update workflow's task list
+ workflowTasks = workflow.get("tasks", [])
+ if task.id not in workflowTasks:
+ workflowTasks.append(task.id)
+ self.updateWorkflow(workflowId, {"tasks": workflowTasks})
+
+ return task
+
+ except Exception as e:
+ logger.error(f"Error creating task: {str(e)}")
+ return None
+
+ def updateTask(self, taskId: str, taskData: Dict[str, Any]) -> TaskItem:
+ """Updates a task if user has access to the workflow."""
+ try:
+ # Get existing task
+ task = self.getTask(taskId)
+ if not task:
+ logger.warning(f"Task {taskId} not found")
+ return None
+
+ # Check workflow access
+ workflow = self.getWorkflow(task.workflowId)
+ if not workflow:
+ logger.warning(f"No access to workflow {task.workflowId}")
+ return None
+
+ if not self._canModify("workflows", task.workflowId):
+ logger.warning(f"No permission to modify workflow {task.workflowId}")
+ return None
+
+ # Update task in database
+ updatedTask = self.db.recordModify("tasks", taskId, taskData)
+
+ # Convert to TaskItem model
+ return TaskItem(
+ id=updatedTask["id"],
+ workflowId=updatedTask["workflowId"],
+ status=updatedTask.get("status", task.status),
+ error=updatedTask.get("error", task.error),
+ startedAt=updatedTask.get("startedAt", task.startedAt),
+ finishedAt=updatedTask.get("finishedAt", task.finishedAt),
+ actionList=[TaskAction(**action) for action in updatedTask.get("actionList", task.actionList)],
+ documentsOutput=updatedTask.get("documentsOutput", task.documentsOutput),
+ retryCount=updatedTask.get("retryCount", task.retryCount),
+ retryMax=updatedTask.get("retryMax", task.retryMax),
+ rollbackOnFailure=updatedTask.get("rollbackOnFailure", task.rollbackOnFailure),
+ dependencies=updatedTask.get("dependencies", task.dependencies),
+ feedback=updatedTask.get("feedback", task.feedback)
+ )
+
+ except Exception as e:
+ logger.error(f"Error updating task: {str(e)}")
+ return None
+
+ def deleteTask(self, taskId: str) -> bool:
+ """Deletes a task if user has access to the workflow."""
+ try:
+ # Get existing task
+ task = self.getTask(taskId)
+ if not task:
+ logger.warning(f"Task {taskId} not found")
+ return False
+
+ # Check workflow access
+ workflow = self.getWorkflow(task.workflowId)
+ if not workflow:
+ logger.warning(f"No access to workflow {task.workflowId}")
+ return False
+
+ if not self._canModify("workflows", task.workflowId):
+ logger.warning(f"No permission to modify workflow {task.workflowId}")
+ return False
+
+ # Delete task
+ if self.db.recordDelete("tasks", taskId):
+ # Update workflow's task list
+ workflowTasks = workflow.get("tasks", [])
+ if taskId in workflowTasks:
+ workflowTasks.remove(taskId)
+ self.updateWorkflow(task.workflowId, {"tasks": workflowTasks})
+ return True
+ return False
+
+ except Exception as e:
+ logger.error(f"Error deleting task: {str(e)}")
+ return False
def getInterface(currentUser: Optional[User] = None) -> 'ChatInterface':
"""
diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py
index 1d65d26e..f81a882b 100644
--- a/modules/interfaces/serviceChatModel.py
+++ b/modules/interfaces/serviceChatModel.py
@@ -3,13 +3,36 @@ Chat model classes for the chat system.
"""
from pydantic import BaseModel, Field
-from typing import List, Dict, Any, Optional, Union
+from typing import List, Dict, Any, Optional
from datetime import datetime, UTC
import uuid
from enum import Enum
from modules.shared.attributeUtils import register_model_labels, ModelMixin
+# ===== Method Models =====
+
+class MethodResult(BaseModel, ModelMixin):
+ """Model for method results"""
+ success: bool = Field(description="Whether the method execution was successful")
+ data: Dict[str, Any] = Field(description="Result data")
+ metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
+ validation: List[str] = Field(default_factory=list, description="Validation messages")
+ error: Optional[str] = Field(None, description="Error message if any")
+
+# Register labels for MethodResult
+register_model_labels(
+ "MethodResult",
+ {"en": "Method Result", "fr": "Résultat de méthode"},
+ {
+ "success": {"en": "Success", "fr": "Succès"},
+ "data": {"en": "Data", "fr": "Données"},
+ "metadata": {"en": "Metadata", "fr": "Métadonnées"},
+ "validation": {"en": "Validation", "fr": "Validation"},
+ "error": {"en": "Error", "fr": "Erreur"}
+ }
+)
+
# ===== Base Enums and Simple Models =====
class TaskStatus(str, Enum):
@@ -121,31 +144,9 @@ register_model_labels(
}
)
-class TaskDocument(BaseModel, ModelMixin):
- """Data model for a task document"""
- id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
- data: str = Field(description="Base64 encoded file data")
- filename: str = Field(description="Name of the file")
- fileSize: int = Field(description="Size of the file")
- mimeType: str = Field(description="MIME type of the file")
-
-# Register labels for TaskDocument
-register_model_labels(
- "TaskDocument",
- {"en": "Task Document", "fr": "Document de tâche"},
- {
- "id": {"en": "ID", "fr": "ID"},
- "filename": {"en": "Filename", "fr": "Nom de fichier"},
- "fileSize": {"en": "File Size", "fr": "Taille du fichier"},
- "mimeType": {"en": "MIME Type", "fr": "Type MIME"},
- "data": {"en": "Data", "fr": "Données"}
- }
-)
-
class ExtractedContent(BaseModel, ModelMixin):
"""Data model for extracted content"""
- objectId: str = Field(description="Reference to source document")
- objectType: str = Field(description="Type of source object ('ChatDocument' or 'TaskDocument')")
+ id: str = Field(description="Reference to source ChatDocument")
contents: List[ContentItem] = Field(default_factory=list, description="List of content items")
# Register labels for ExtractedContent
@@ -161,6 +162,61 @@ register_model_labels(
# ===== Task Models =====
+class TaskAction(BaseModel, ModelMixin):
+ """Model for task actions"""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique action identifier")
+ method: str = Field(..., description="Method to execute")
+ action: str = Field(..., description="Action to perform")
+ parameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters")
+ status: TaskStatus = Field(default=TaskStatus.PENDING, description="Current action status")
+ retryCount: int = Field(default=0, description="Number of retry attempts")
+ retryMax: int = Field(default=3, description="Maximum number of retry attempts")
+ error: Optional[str] = Field(None, description="Error message if action failed")
+ startedAt: Optional[datetime] = Field(None, description="Action start timestamp")
+ finishedAt: Optional[datetime] = Field(None, description="Action completion timestamp")
+
+ def start(self) -> None:
+ """Start the action"""
+ self.status = TaskStatus.RUNNING
+ self.startedAt = datetime.now(UTC)
+
+ def complete(self) -> None:
+ """Mark action as completed"""
+ self.status = TaskStatus.COMPLETED
+ self.finishedAt = datetime.now(UTC)
+
+ def fail(self, error: str) -> None:
+ """Mark action as failed"""
+ self.status = TaskStatus.FAILED
+ self.error = error
+ self.finishedAt = datetime.now(UTC)
+
+ def canRetry(self) -> bool:
+ """Check if action can be retried"""
+ return self.retryCount < self.retryMax
+
+ def incrementRetry(self) -> None:
+ """Increment retry count"""
+ self.retryCount += 1
+
+# Register labels for TaskAction
+register_model_labels(
+ "TaskAction",
+ {"en": "Task Action", "fr": "Action de tâche"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "method": {"en": "Method", "fr": "Méthode"},
+ "action": {"en": "Action", "fr": "Action"},
+ "parameters": {"en": "Parameters", "fr": "Paramètres"},
+ "status": {"en": "Status", "fr": "Statut"},
+ "retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
+ "retryMax": {"en": "Max Retries", "fr": "Tentatives maximales"},
+ "error": {"en": "Error", "fr": "Erreur"},
+ "startedAt": {"en": "Started At", "fr": "Démarré le"},
+ "finishedAt": {"en": "Finished At", "fr": "Terminé le"}
+ }
+)
+
class TaskItem(BaseModel, ModelMixin):
"""Model for tasks"""
id: str = Field(..., description="Unique task identifier")
@@ -169,7 +225,7 @@ class TaskItem(BaseModel, ModelMixin):
error: Optional[str] = Field(None, description="Error message if task failed")
startedAt: Optional[datetime] = Field(None, description="Task start timestamp")
finishedAt: Optional[datetime] = Field(None, description="Task completion timestamp")
- actionList: List[Dict[str, Any]] = Field(default_factory=list, description="List of actions to execute")
+ actionList: List[TaskAction] = Field(default_factory=list, description="List of actions to execute")
documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents")
retryCount: int = Field(default=0, description="Number of retry attempts")
retryMax: int = Field(default=3, description="Maximum number of retry attempts")
@@ -229,7 +285,7 @@ class TaskItem(BaseModel, ModelMixin):
if taskId in self.dependencies:
self.dependencies.remove(taskId)
- def addAction(self, action: Dict[str, Any]) -> None:
+ def addAction(self, action: TaskAction) -> None:
"""Add an action to the task"""
self.actionList.append(action)
@@ -270,6 +326,7 @@ class TaskResult(BaseModel, ModelMixin):
error: Optional[str] = Field(None, description="Error message if task failed")
data: Optional[Dict[str, Any]] = Field(None, description="Result data")
documents: List[ChatDocument] = Field(default_factory=list, description="Output documents")
+ documentsLabel: Optional[str] = Field(None, description="Label for the set of documents")
feedback: Optional[str] = Field(None, description="Task feedback message")
processingTime: Optional[float] = Field(None, description="Processing time in seconds")
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC), description="When the result was created")
diff --git a/modules/interfaces/serviceManagementAccess.py b/modules/interfaces/serviceManagementAccess.py
index f52a253d..ef842b2f 100644
--- a/modules/interfaces/serviceManagementAccess.py
+++ b/modules/interfaces/serviceManagementAccess.py
@@ -153,7 +153,7 @@ class ManagementAccess:
# For regular users and admins, check specific cases
if recordId is not None:
# Get the record to check ownership
- records = self.db.getRecordset(table, recordFilter={"id": recordId})
+ records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": recordId})
if not records:
return False
diff --git a/modules/interfaces/serviceManagementClass.py b/modules/interfaces/serviceManagementClass.py
index bf97d845..e478ac18 100644
--- a/modules/interfaces/serviceManagementClass.py
+++ b/modules/interfaces/serviceManagementClass.py
@@ -6,14 +6,14 @@ Uses the JSON connector for data access with added language support.
import os
import logging
import uuid
-from datetime import datetime
+from datetime import datetime, UTC
from typing import Dict, Any, List, Optional, Union
import hashlib
from modules.interfaces.serviceManagementAccess import ManagementAccess
from modules.interfaces.serviceManagementModel import (
- Prompt, FileItem, FileData
+ FilePreview, Prompt, FileItem, FileData
)
from modules.interfaces.serviceAppModel import User, Mandate, UserPrivilege
@@ -376,6 +376,47 @@ class ServiceManagement:
}
return extensionToMime.get(ext.lower(), "application/octet-stream")
+ def isTextMimeType(self, mimeType: str) -> bool:
+ """Determines if a MIME type represents a text-based format."""
+ textMimeTypes = {
+ 'text/plain',
+ 'text/html',
+ 'text/css',
+ 'text/javascript',
+ 'text/x-python',
+ 'text/csv',
+ 'text/xml',
+ 'application/json',
+ 'application/xml',
+ 'application/javascript',
+ 'application/x-python',
+ 'application/x-httpd-php',
+ 'application/x-sh',
+ 'application/x-shellscript',
+ 'application/x-yaml',
+ 'application/x-toml',
+ 'application/x-markdown',
+ 'application/x-latex',
+ 'application/x-tex',
+ 'application/x-rst',
+ 'application/x-asciidoc',
+ 'application/x-markdown',
+ 'application/x-httpd-php',
+ 'application/x-httpd-php-source',
+ 'application/x-httpd-php3',
+ 'application/x-httpd-php4',
+ 'application/x-httpd-php5',
+ 'application/x-httpd-php7',
+ 'application/x-httpd-php8',
+ 'application/x-httpd-php-source',
+ 'application/x-httpd-php3-source',
+ 'application/x-httpd-php4-source',
+ 'application/x-httpd-php5-source',
+ 'application/x-httpd-php7-source',
+ 'application/x-httpd-php8-source'
+ }
+ return mimeType.lower() in textMimeTypes
+
# File methods - metadata-based operations
def getAllFiles(self) -> List[FileItem]:
@@ -440,15 +481,47 @@ class ServiceManagement:
logger.error(f"Error converting file record: {str(e)}")
return None
+ def _isFilenameUnique(self, filename: str, excludeFileId: Optional[str] = None) -> bool:
+ """Checks if a filename is unique for the current user."""
+ # Get all files for current user
+ files = self.db.getRecordset("files", recordFilter={
+ "_createdBy": self.currentUser.id
+ })
+
+ # Check if filename exists (excluding the current file if updating)
+ for file in files:
+ if file["filename"] == filename and (excludeFileId is None or file["id"] != excludeFileId):
+ return False
+ return True
+
+ def _generateUniqueFilename(self, filename: str, excludeFileId: Optional[str] = None) -> str:
+ """Generates a unique filename by adding a number if necessary."""
+ if self._isFilenameUnique(filename, excludeFileId):
+ return filename
+
+ # Split filename into name and extension
+ name, ext = os.path.splitext(filename)
+ counter = 1
+
+ # Try filenames with increasing numbers until we find a unique one
+ while True:
+ newFilename = f"{name}_{counter}{ext}"
+ if self._isFilenameUnique(newFilename, excludeFileId):
+ return newFilename
+ counter += 1
+
def createFile(self, name: str, mimeType: str, size: int = None, fileHash: str = None) -> FileItem:
"""Creates a new file entry if user has permission."""
if not self._canModify("files"):
raise PermissionError("No permission to create files")
+ # Ensure filename is unique
+ uniqueName = self._generateUniqueFilename(name)
+
# Create FileItem instance
fileItem = FileItem(
mandateId=self.currentUser.mandateId,
- filename=name,
+ filename=uniqueName,
mimeType=mimeType,
fileSize=size,
fileHash=fileHash
@@ -468,6 +541,10 @@ class ServiceManagement:
if not self._canModify("files", fileId):
raise PermissionError(f"No permission to update file {fileId}")
+ # If filename is being updated, ensure it's unique
+ if "filename" in updateData:
+ updateData["filename"] = self._generateUniqueFilename(updateData["filename"], fileId)
+
# Update file
return self.db.recordModify("files", fileId, updateData)
@@ -525,7 +602,7 @@ class ServiceManagement:
# Determine if this is a text-based format
mimeType = file.mimeType
- isTextFormat = isTextMimeType(mimeType)
+ isTextFormat = self.isTextMimeType(mimeType)
base64Encoded = False
fileData = None
@@ -598,8 +675,8 @@ class ServiceManagement:
logger.error(f"Error processing file data for {fileId}: {str(e)}")
return None
- def getFilePreview(self, fileId: str) -> Optional[Dict[str, Any]]:
- """Returns a preview of the file content if user has access."""
+ def getFileContent(self, fileId: str) -> Optional[FilePreview]:
+ """Returns the full file content if user has access."""
try:
# Get file metadata
file = self.getFile(fileId)
@@ -613,35 +690,39 @@ class ServiceManagement:
logger.warning(f"No content found for file ID {fileId}")
return None
- # Determine if content is text based on MIME type
- isText = file.mimeType.startswith(('text/', 'application/json', 'application/xml', 'application/javascript'))
+ # Process content based on file type
+ contentType = "binary"
+ content = ""
- # For text content, decode to string
- if isText:
+ if file.get("mimeType", "").startswith("text/"):
+ # For text files, return full content
try:
content = fileContent.decode('utf-8')
- encoding = 'utf-8'
+ contentType = "text"
except UnicodeDecodeError:
- try:
- content = fileContent.decode('latin-1')
- encoding = 'latin-1'
- except:
- content = fileContent
- encoding = None
+ content = fileContent.decode('latin-1')
+ contentType = "text"
+ elif file.get("mimeType", "").startswith("image/"):
+ # For images, return base64
+ contentType = "base64"
+ content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
else:
- content = fileContent
- encoding = None
+ # For other files, return as base64
+ contentType = "base64"
+ content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
+
+ return FilePreview(
+ id=fileId,
+ name=file.get("name", "Unknown"),
+ mimeType=file.get("mimeType", "application/octet-stream"),
+ size=file.get("size", 0),
+ content=content,
+ contentType=contentType,
+ metadata=file.get("metadata", {})
+ )
- return {
- "content": content,
- "mimeType": file.mimeType,
- "filename": file.filename,
- "isText": isText,
- "encoding": encoding,
- "size": len(fileContent)
- }
except Exception as e:
- logger.error(f"Error getting file preview for {fileId}: {str(e)}")
+ logger.error(f"Error getting file content: {str(e)}")
return None
def updateFileData(self, fileId: str, data: Union[bytes, str]) -> bool:
@@ -661,7 +742,7 @@ class ServiceManagement:
# Determine if this is a text-based format
mimeType = file.mimeType
- isTextFormat = isTextMimeType(mimeType)
+ isTextFormat = self.isTextMimeType(mimeType)
base64Encoded = False
fileData = None
diff --git a/modules/methods/methodBase.py b/modules/methods/methodBase.py
index 2eaa845f..7952b68e 100644
--- a/modules/methods/methodBase.py
+++ b/modules/methods/methodBase.py
@@ -3,32 +3,10 @@ from typing import Dict, List, Optional, Any, Literal
from datetime import datetime, UTC
from pydantic import BaseModel, Field
import logging
+from modules.interfaces.serviceChatModel import MethodResult
logger = logging.getLogger(__name__)
-class AuthSource(str, Enum):
- """Authentication source enumeration"""
- LOCAL = "local"
- MSFT = "msft"
- GOOGLE = "google"
- # Add more auth sources as needed
-
-class MethodParameter(BaseModel):
- """Model for method parameters"""
- name: str
- type: str
- required: bool
- validation: Optional[callable] = None
- description: str
-
-class MethodResult(BaseModel):
- """Model for method results"""
- success: bool
- data: Dict[str, Any]
- metadata: Dict[str, Any] = Field(default_factory=dict)
- validation: List[str] = Field(default_factory=list)
- error: Optional[str] = Field(None, description="Error message if any")
-
class MethodBase:
"""Base class for all methods"""
@@ -37,7 +15,6 @@ class MethodBase:
self.service = serviceContainer
self.name: str
self.description: str
- self.authSource: AuthSource = AuthSource.LOCAL # Default to local auth
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
@property
@@ -123,16 +100,6 @@ class MethodBase:
"""Rollback specific action - to be implemented by subclasses"""
pass
- def _validateAuth(self, authData: Optional[Dict[str, Any]] = None) -> bool:
- """Validate authentication data"""
- try:
- if self.authSource == AuthSource.LOCAL:
- return True
- return bool(authData and authData.get('source') == self.authSource)
- except Exception as e:
- self.logger.error(f"Error validating auth: {str(e)}")
- return False
-
def _createResult(self, success: bool, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, error: Optional[str] = None) -> MethodResult:
"""Create a method result"""
return MethodResult(
diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py
index f9b06f21..7b9a1755 100644
--- a/modules/methods/methodCoder.py
+++ b/modules/methods/methodCoder.py
@@ -3,7 +3,7 @@ import logging
import ast
import re
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.methods.methodBase import MethodBase, MethodResult
logger = logging.getLogger(__name__)
@@ -14,7 +14,6 @@ class MethodCoder(MethodBase):
super().__init__()
self.name = "coder"
self.description = "Handle code operations like analysis, generation, and refactoring"
- self.authSource = AuthSource.LOCAL # Code operations typically don't need auth
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
diff --git a/modules/methods/methodOperator.py b/modules/methods/methodOperator.py
new file mode 100644
index 00000000..9082ccf5
--- /dev/null
+++ b/modules/methods/methodOperator.py
@@ -0,0 +1,178 @@
+from typing import Dict, List, Any, Optional
+from datetime import datetime, UTC
+import logging
+from .methodBase import MethodBase
+from modules.interfaces.serviceChatModel import MethodResult
+
+logger = logging.getLogger(__name__)
+
+class MethodOperator(MethodBase):
+ """Operator methods for handling collections and AI operations"""
+
+ def __init__(self, serviceContainer: Any):
+ super().__init__(serviceContainer)
+ self.name = "operator"
+ self.description = "Operator methods for handling collections and AI operations"
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "forEach": {
+ "description": "Execute an action for each item in a list",
+ "parameters": {
+ "items": {
+ "type": "List[Any]",
+ "description": "List of items to process",
+ "required": True
+ },
+ "action": {
+ "type": "Dict[str, Any]",
+ "description": "Action to execute for each item",
+ "required": True,
+ "properties": {
+ "method": {"type": "str", "required": True},
+ "action": {"type": "str", "required": True},
+ "parameters": {"type": "Dict[str, Any]", "required": False}
+ }
+ }
+ }
+ },
+ "aiCall": {
+ "description": "Call AI service with document content",
+ "parameters": {
+ "prompt": {
+ "type": "str",
+ "description": "Prompt for AI processing",
+ "required": True
+ },
+ "extractedDocumentContent": {
+ "type": "List[Dict[str, str]]",
+ "description": "List of documents and their extraction prompts",
+ "required": True,
+ "items": {
+ "type": "object",
+ "properties": {
+ "document": {"type": "str", "required": True},
+ "promptForContentExtraction": {"type": "str", "required": True}
+ }
+ }
+ }
+ }
+ }
+ }
+
+ async def _executeAction(self, action: str, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute operator action"""
+ if action == "forEach":
+ return await self._executeForEach(parameters)
+ elif action == "aiCall":
+ return await self._executeAiCall(parameters)
+ else:
+ raise ValueError(f"Unsupported action: {action}")
+
+ async def _executeForEach(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Execute forEach operation"""
+ try:
+ items = parameters.get("items", [])
+ action = parameters.get("action", {})
+
+ if not items or not action:
+ return self._createResult(
+ success=False,
+ data={},
+ error="Missing required parameters"
+ )
+
+ results = []
+ for item in items:
+ try:
+ # Execute action for each item
+ method = action.get("method")
+ action_name = action.get("action")
+ action_params = action.get("parameters", {})
+
+ # Add current item to parameters
+ action_params["item"] = item
+
+ # Execute method action
+ method_result = await self.service.methods[method][action_name](action_params)
+ results.append(method_result)
+
+ except Exception as e:
+ logger.error(f"Error processing item: {str(e)}")
+ results.append({
+ "success": False,
+ "error": str(e)
+ })
+
+ return self._createResult(
+ success=True,
+ data={"results": results}
+ )
+
+ except Exception as e:
+ logger.error(f"Error in forEach execution: {str(e)}")
+ return self._createResult(
+ success=False,
+ data={},
+ error=str(e)
+ )
+
+ async def _executeAiCall(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Execute AI call with document content"""
+ try:
+ prompt = parameters.get("prompt")
+ documents = parameters.get("extractedDocumentContent", [])
+
+ if not prompt:
+ return self._createResult(
+ success=False,
+ data={},
+ error="Missing prompt parameter"
+ )
+
+ # Extract content from documents
+ extracted_content = []
+ for doc in documents:
+ try:
+ doc_ref = doc.get("document")
+ doc_prompt = doc.get("promptForContentExtraction")
+
+ if not doc_ref or not doc_prompt:
+ continue
+
+ # Extract content using document manager
+ content = self.service.extractContent(doc_prompt, doc_ref)
+ extracted_content.append({
+ "document": doc_ref,
+ "content": content
+ })
+
+ except Exception as e:
+ logger.error(f"Error extracting document content: {str(e)}")
+ continue
+
+ # Prepare AI prompt with extracted content
+ full_prompt = f"{prompt}\n\nExtracted Content:\n"
+ for content in extracted_content:
+ full_prompt += f"\nDocument: {content['document']}\n{content['content']}\n"
+
+ # Call AI service
+ response = await self.service.callAiBasic(full_prompt)
+
+ return self._createResult(
+ success=True,
+ data={
+ "response": response,
+ "processedDocuments": len(extracted_content)
+ }
+ )
+
+ except Exception as e:
+ logger.error(f"Error in AI call execution: {str(e)}")
+ return self._createResult(
+ success=False,
+ data={},
+ error=str(e)
+ )
diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py
index 8391ec96..68dd07d0 100644
--- a/modules/methods/methodOutlook.py
+++ b/modules/methods/methodOutlook.py
@@ -3,7 +3,7 @@ import logging
from datetime import datetime, UTC
from O365 import Account, MSGraphProtocol
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.methods.methodBase import MethodBase, MethodResult
from modules.models.userConnection import UserConnection
logger = logging.getLogger(__name__)
@@ -15,7 +15,6 @@ class MethodOutlook(MethodBase):
super().__init__()
self.name = "outlook"
self.description = "Handle Outlook email operations like reading and sending emails"
- self.authSource = AuthSource.MICROSOFT
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
diff --git a/modules/methods/methodPowerpoint.py b/modules/methods/methodPowerpoint.py
index f85915fa..26ccf458 100644
--- a/modules/methods/methodPowerpoint.py
+++ b/modules/methods/methodPowerpoint.py
@@ -3,7 +3,7 @@ import logging
import os
from pathlib import Path
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.methods.methodBase import MethodBase, MethodResult
from modules.models.userConnection import UserConnection
from modules.models.account import Account
from modules.protocols.msGraphProtocol import MSGraphProtocol
@@ -17,7 +17,6 @@ class MethodPowerpoint(MethodBase):
super().__init__()
self.name = "powerpoint"
self.description = "Handle PowerPoint operations like reading, writing, and converting presentations"
- self.authSource = AuthSource.MICROSOFT # PowerPoint operations need Microsoft auth
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py
index 06cd4ee8..e75fbd6f 100644
--- a/modules/methods/methodSharepoint.py
+++ b/modules/methods/methodSharepoint.py
@@ -7,7 +7,7 @@ from office365.sharepoint.files.file import File
from office365.sharepoint.lists.list import List
from office365.sharepoint.lists.list_creation_information import ListCreationInformation
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.methods.methodBase import MethodBase, MethodResult
from modules.models.userConnection import UserConnection
logger = logging.getLogger(__name__)
@@ -19,7 +19,6 @@ class MethodSharepoint(MethodBase):
super().__init__()
self.name = "sharepoint"
self.description = "Handle SharePoint document operations like search, read, and write"
- self.authSource = AuthSource.MICROSOFT
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py
index 437bca37..9398741c 100644
--- a/modules/methods/methodWeb.py
+++ b/modules/methods/methodWeb.py
@@ -10,7 +10,7 @@ import requests
import time
import json
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.methods.methodBase import MethodBase, MethodResult
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
@@ -22,7 +22,6 @@ class MethodWeb(MethodBase):
super().__init__()
self.name = "web"
self.description = "Handle web operations like search, crawl, and content extraction"
- self.auth_source = AuthSource.LOCAL # Web operations typically don't need auth
# Web crawling configuration from agentWebcrawler
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
diff --git a/modules/shared/attributeUtils.py b/modules/shared/attributeUtils.py
index 6382592f..f6b0e291 100644
--- a/modules/shared/attributeUtils.py
+++ b/modules/shared/attributeUtils.py
@@ -23,9 +23,9 @@ class ModelMixin:
"""
# Get the raw dictionary
if hasattr(self, 'model_dump'):
- data = self.model_dump() # Pydantic v2
+ data: Dict[str, Any] = self.model_dump() # Pydantic v2
else:
- data = self.dict() # Pydantic v1
+ data: Dict[str, Any] = self.dict() # Pydantic v1
# Convert datetime fields to ISO format strings
for key, value in data.items():
diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py
index 90d07904..fbc2429e 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/workflow/managerChat.py
@@ -1,22 +1,14 @@
import logging
-import importlib
-import pkgutil
-import inspect
-from typing import Dict, Any, Optional, List, Type, Callable, Awaitable, Union
+from typing import Dict, Any, Optional, List, Union
from datetime import datetime, UTC
import json
-import base64
import uuid
-from modules.interfaces.serviceAppClass import User
-from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
-from modules.workflow.serviceContainer import ServiceContainer
+from modules.interfaces.serviceAppModel import User
from modules.interfaces.serviceChatModel import (
- TaskStatus, UserInputRequest, ContentMetadata, ContentItem,
- ChatDocument, TaskDocument, ExtractedContent, TaskItem,
- TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
+ TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
)
-from modules.workflow.processorDocument import DocumentProcessor
+from modules.workflow.serviceContainer import ServiceContainer
logger = logging.getLogger(__name__)
@@ -24,68 +16,21 @@ class ChatManager:
"""Chat manager with improved AI integration and method handling"""
def __init__(self, currentUser: User):
- self._discoverMethods()
- self.workflow: Optional[ChatWorkflow] = None
- self.currentTask: Optional[TaskItem] = None
- self.workflowHistory: List[ChatMessage] = []
- self.documentProcessor = DocumentProcessor()
- self.userLanguage = None
self.currentUser = currentUser
+ self.service: ServiceContainer = None
# ===== Initialization and Setup =====
async def initialize(self, workflow: ChatWorkflow) -> None:
"""Initialize chat manager with workflow"""
- self.service.workflow = workflow
-
- # Initialize AI model
- self.service.model = {
- 'callAiBasic': self._callAiBasic,
- 'callAiAdvanced': self._callAiAdvanced
- }
-
- # Initialize document processor
- self.service.documentProcessor.initialize()
+ self.workflow = workflow
+ self.service = ServiceContainer(self.currentUser, self.workflow)
- def setUserLanguage(self, languageCode: str):
- """Set the user's preferred language"""
- self.userLanguage = languageCode
- logger.debug(f"User language set to: {languageCode}")
-
- def _discoverMethods(self):
- """Dynamically discover all method classes in modules.methods package"""
- try:
- # Import the methods package
- methodsPackage = importlib.import_module('modules.methods')
-
- # Discover all modules in the package
- for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
- if not isPkg and name.startswith('method'):
- try:
- # Import the module
- module = importlib.import_module(f'modules.methods.{name}')
-
- # Find all classes in the module that inherit from MethodBase
- for itemName, item in inspect.getmembers(module):
- if (inspect.isclass(item) and
- issubclass(item, MethodBase) and
- item != MethodBase):
- # Instantiate the method and add to service
- methodInstance = item()
- self.service.methods[methodInstance.name] = methodInstance
- logger.info(f"Discovered method: {methodInstance.name}")
-
- except Exception as e:
- logger.error(f"Error loading method module {name}: {str(e)}")
-
- except Exception as e:
- logger.error(f"Error discovering methods: {str(e)}")
-
# ===== Task Creation and Management =====
async def createInitialTask(self, workflow: ChatWorkflow, initialMessage: ChatMessage) -> Optional[TaskItem]:
"""Create the initial task from the first message"""
try:
# Get available methods and their actions
- methodCatalog = self.service.getAvailableMethods()
+ methodCatalog = self.service.getMethodsCatalog()
# Process user input with AI
processedInput = await self._processUserInput(initialMessage.message, methodCatalog)
@@ -93,21 +38,19 @@ class ChatManager:
# Create actions from processed input
actions = await self._createActions(processedInput['actions'])
- # Create task
- task = TaskItem(
- id=f"task_{datetime.now(UTC).timestamp()}",
- workflowId=workflow.id,
- userInput=processedInput['objective'],
- dataList=initialMessage.documents,
- actionList=actions,
- status=TaskStatus.PENDING,
- createdAt=datetime.now(UTC),
- updatedAt=datetime.now(UTC)
- )
+ # Create task data
+ taskData = {
+ "workflowId": workflow.id,
+ "userInput": processedInput['objective'],
+ "dataList": initialMessage.documents,
+ "actionList": [action.dict() for action in actions],
+ "status": TaskStatus.PENDING,
+ "startedAt": datetime.now(UTC).isoformat(),
+ "updatedAt": datetime.now(UTC).isoformat()
+ }
- # Add task to workflow
- workflow.tasks.append(task)
- return task
+ # Create task using ChatInterface
+ return self.service.createTask(taskData)
except Exception as e:
logger.error(f"Error creating initial task: {str(e)}")
@@ -127,20 +70,18 @@ class ChatManager:
logger.error("No task data in previous result")
return None
- # Create next task
- nextTask = TaskItem(
- id=f"task_{datetime.now(UTC).timestamp()}",
- workflowId=workflow.id,
- userInput=taskData.get('objective', ''),
- actionList=await self._createActions(taskData.get('actions', [])),
- status=TaskStatus.PENDING,
- createdAt=datetime.now(UTC),
- updatedAt=datetime.now(UTC)
- )
+ # Create task data
+ taskData = {
+ "workflowId": workflow.id,
+ "userInput": taskData.get('objective', ''),
+ "actionList": [action.dict() for action in await self._createActions(taskData.get('actions', []))],
+ "status": TaskStatus.PENDING,
+ "startedAt": datetime.now(UTC).isoformat(),
+ "updatedAt": datetime.now(UTC).isoformat()
+ }
- # Add task to workflow
- workflow.tasks.append(nextTask)
- return nextTask
+ # Create task using ChatInterface
+ return self.service.createTask(taskData)
except Exception as e:
logger.error(f"Error creating next task: {str(e)}")
@@ -163,7 +104,7 @@ class ChatManager:
"""
# Get AI response
- response = await self._callAiBasic(prompt)
+ response = await self.service.callAiBasic(prompt)
# Parse response
try:
@@ -173,7 +114,8 @@ class ChatManager:
status=TaskStatus.COMPLETED,
success=True,
timestamp=datetime.now(UTC),
- data=result
+ data=result,
+ documentsLabel="Task Results"
)
except json.JSONDecodeError as e:
logger.error(f"Error parsing AI response: {str(e)}")
@@ -182,7 +124,8 @@ class ChatManager:
status=TaskStatus.FAILED,
success=False,
timestamp=datetime.now(UTC),
- error=f"Error parsing AI response: {str(e)}"
+ error=f"Error parsing AI response: {str(e)}",
+ documentsLabel="Task Error"
)
except Exception as e:
@@ -192,58 +135,10 @@ class ChatManager:
status=TaskStatus.FAILED,
success=False,
timestamp=datetime.now(UTC),
- error=f"Error identifying next task: {str(e)}"
+ error=f"Error identifying next task: {str(e)}",
+ documentsLabel="Task Error"
)
- async def callAi(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str:
- """Enhanced AI service call with language support."""
- if not self.service or not self.service.base:
- logger.error("AI service not set in ChatManager")
- return "Error: AI service not available"
-
- # Add language instruction for user-facing responses
- if produceUserAnswer and self.userLanguage:
- ltext = f"Please respond in '{self.userLanguage}' language."
- if messages and messages[0]["role"] == "system":
- if "language" not in messages[0]["content"].lower():
- messages[0]["content"] = f"{ltext} {messages[0]['content']}"
- else:
- # Insert a system message with language instruction
- messages.insert(0, {
- "role": "system",
- "content": ltext
- })
-
- # Call the AI service
- if temperature is not None:
- return await self.service.base.callAi(messages, temperature=temperature)
- else:
- return await self.service.base.callAi(messages)
-
- async def callAi4Image(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str:
- """Enhanced AI service call with language support."""
- if not self.service or not self.service.base:
- logger.error("AI service not set in ChatManager")
- return "Error: AI service not available"
- return await self.service.base.analyzeImage(imageData, mimeType, prompt)
-
- async def _callAiBasic(self, prompt: str) -> str:
- """Call basic AI service"""
- try:
- if not self.service or not self.service.base:
- raise ValueError("Service or base interface not initialized")
- return await self.callAi([
- {"role": "system", "content": "You are an AI assistant that helps process user requests."},
- {"role": "user", "content": prompt}
- ])
- except Exception as e:
- logger.error(f"Error calling AI service: {str(e)}")
- raise
-
- async def _callAiAdvanced(self, prompt: str, context: Dict[str, Any]) -> str:
- """Call advanced AI model with context"""
- # TODO: Implement actual AI call
- return "AI response placeholder"
async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
"""
@@ -264,9 +159,6 @@ class ChatManager:
"duration": (datetime.now(UTC) - datetime.fromisoformat(workflow.startedAt)).total_seconds()
}
- # Get user language from workflow mandate
- userLanguage = workflow.mandateId.split('_')[0] if workflow.mandateId else 'en'
- self.setUserLanguage(userLanguage)
# Prepare messages for AI context
messages = [
@@ -377,9 +269,22 @@ Document: {document.filename} ({document.mimeType})
# Create output documents
result.documents = await self._createOutputDocuments(task)
+ # Set documents label based on task input
+ result.documentsLabel = "TaskResult"
else:
result.feedback = f"Task failed: {result.error}"
+ # Update task in database
+ self.service.updateTask(task.id, {
+ "status": result.status,
+ "error": result.error,
+ "finishedAt": datetime.now(UTC).isoformat(),
+ "actionList": [action.dict() for action in task.actionList],
+ "documentsOutput": result.documents,
+ "feedback": result.feedback,
+ "documentsLabel": result.documentsLabel
+ })
+
return result
except Exception as e:
@@ -390,15 +295,17 @@ Document: {document.filename} ({document.mimeType})
"""Process and store task result in workflow"""
try:
# Find task in workflow
- task = next((t for t in workflow.tasks if t.id == result.taskId), None)
+ task = self.service.getTask(result.taskId)
if not task:
logger.error(f"Task {result.taskId} not found in workflow")
return
# Update task status
- task.status = result.status
- if result.error:
- task.error = result.error
+ self.service.updateTask(task.id, {
+ "status": result.status,
+ "error": result.error,
+ "finishedAt": datetime.now(UTC).isoformat()
+ })
# Create feedback message if available
if result.feedback:
@@ -410,13 +317,14 @@ Document: {document.filename} ({document.mimeType})
status="step",
documents=result.documents
)
- workflow.messages.append(message)
+ self.service.createWorkflowMessage(message.dict())
# Update workflow stats
if result.processingTime:
if not workflow.stats:
workflow.stats = ChatStat()
workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime
+ self.service.updateWorkflow(workflow.id, {"stats": workflow.stats.dict()})
except Exception as e:
logger.error(f"Error parsing task result: {str(e)}")
@@ -429,13 +337,16 @@ Document: {document.filename} ({document.mimeType})
if workflow.status in ["completed", "failed", "stopped"]:
return False
+ # Get all tasks for the workflow
+ tasks = self.service.tasks
+
# Check if there are any pending tasks
- hasPendingTasks = any(t.status == TaskStatus.PENDING for t in workflow.tasks)
+ hasPendingTasks = any(t.status == TaskStatus.PENDING for t in tasks)
if not hasPendingTasks:
return False
# Check if any task is currently running
- hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in workflow.tasks)
+ hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in tasks)
if hasRunningTasks:
return True
@@ -444,7 +355,7 @@ Document: {document.filename} ({document.mimeType})
except Exception as e:
logger.error(f"Error checking workflow continuation: {str(e)}")
return False
-
+
async def _summarizeWorkflow(self) -> str:
"""Summarize workflow history"""
if not self.workflow.messages:
@@ -460,7 +371,7 @@ Document: {document.filename} ({document.mimeType})
4. Any issues or blockers
"""
- return await self._callAiBasic(prompt)
+ return await self.service.callAiBasic(prompt)
async def _analyzeTaskResults(self, task: TaskItem) -> Dict[str, Any]:
"""Analyze task results to determine next steps"""
@@ -469,8 +380,8 @@ Document: {document.filename} ({document.mimeType})
# Generate prompt for analysis
prompt = f"""Based on the workflow summary and task results:
- {summary}
-
+{summary}
+
Task: {task.userInput}
Status: {task.status}
Error: {task.error if task.error else 'None'}
@@ -483,7 +394,7 @@ Document: {document.filename} ({document.mimeType})
"""
# Get AI response
- response = await self._callAiBasic(prompt)
+ response = await self.service.callAiBasic(prompt)
# Parse response
return json.loads(response)
@@ -548,10 +459,10 @@ User Input: {userInput.get('message', '')}
{self._promptInstructions(methodCatalog, isInitialTask=True)}"""
# Call AI service
- response = await self._callAiBasic(prompt)
+ response = await self.service.callAiBasic(prompt)
return json.loads(response)
- async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskItem]:
+ async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskAction]:
"""Create action objects from processed input"""
actions = []
for actionData in actionsData:
@@ -561,7 +472,7 @@ User Input: {userInput.get('message', '')}
logger.warning(f"Skipping invalid action data: {actionData}")
continue
- action = TaskItem(
+ action = TaskAction(
id=f"action_{datetime.now(UTC).timestamp()}",
method=actionData['method'],
action=actionData['action'],
@@ -585,9 +496,9 @@ User Input: {userInput.get('message', '')}
docPrompt = self._generateDocumentPrompt(task.userInput)
# Get AI response for document generation
- docResponse = await self._callAiBasic(docPrompt)
+ docResponse = await self.service.callAiBasic(docPrompt)
- # Parse response into TaskDocument objects
+ # Parse response into Task-Document objects
try:
taskDocs = json.loads(docResponse)
task.documentsOutput = taskDocs
@@ -596,7 +507,7 @@ User Input: {userInput.get('message', '')}
return f"Error processing results: {str(e)}"
# Generate feedback
- feedback = await self._callAiBasic(
+ feedback = await self.service.callAiBasic(
f"""Generate feedback for the completed task:
Task: {task.userInput}
Generated Documents: {len(task.documentsOutput)} files
@@ -616,24 +527,15 @@ User Input: {userInput.get('message', '')}
try:
fileIds = []
- # Process each TaskDocument from AI output
+ # Process each Task-Document from AI output
for taskDoc in task.documentsOutput:
# Store file in database
- fileItem = self.service.functions.createFile(
- name=taskDoc.filename,
- mimeType=taskDoc.mimeType
+ fileItem = self.service.createFile(
+ fileName=taskDoc.filename,
+ mimeType=taskDoc.mimeType,
+ content=taskDoc.data,
+ base64Encoded=taskDoc.base64Encoded
)
-
- # Store file content
- if taskDoc.base64Encoded:
- # Decode base64 content
- content = base64.b64decode(taskDoc.data)
- else:
- # Use text content directly
- content = taskDoc.data.encode('utf-8')
-
- # Store file data
- self.service.functions.createFileData(fileItem.id, content)
fileIds.append(fileItem.id)
# Convert all files to ChatDocuments in one call
@@ -650,8 +552,8 @@ User Input: {userInput.get('message', '')}
documents = []
for fileId in fileIds:
# Get file metadata
- fileMetadata = self.service.functions.getFile(fileId)
- if not fileMetadata:
+ fileInfo = self.service.getFileInfo(fileId)
+ if not fileInfo:
logger.warning(f"File metadata not found for {fileId}")
continue
@@ -659,48 +561,14 @@ User Input: {userInput.get('message', '')}
document = ChatDocument(
id=str(uuid.uuid4()),
fileId=fileId,
- filename=fileMetadata.get("name", "Unknown"),
- fileSize=fileMetadata.get("size", 0),
- mimeType=fileMetadata.get("mimeType", "text/plain")
+ filename=fileInfo.get("name", "Unknown"),
+ fileSize=fileInfo.get("size", 0),
+ mimeType=fileInfo.get("mimeType", "text/plain")
)
documents.append(document)
return documents
- async def addTaskResult(self, workflow: ChatWorkflow, result: TaskResult) -> None:
- """Add task result to workflow and update status"""
- try:
- # Find task in workflow
- task = next((t for t in workflow.tasks if t.id == result.taskId), None)
- if not task:
- logger.error(f"Task {result.taskId} not found in workflow")
- return
-
- # Update task status
- task.status = result.status
- if result.error:
- task.error = result.error
-
- # Create feedback message if available
- if result.feedback:
- message = ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=workflow.id,
- role="assistant",
- message=result.feedback,
- status="step",
- documents=result.documents
- )
- workflow.messages.append(message)
-
- # Update workflow stats
- if result.processingTime:
- if not workflow.stats:
- workflow.stats = ChatStat()
- workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime
-
- except Exception as e:
- logger.error(f"Error adding task result: {str(e)}")
def _generateDocumentPrompt(self, task: str) -> str:
"""Generate a prompt for document generation"""
@@ -708,7 +576,7 @@ User Input: {userInput.get('message', '')}
Task: {task}
-For each document you need to generate, provide a TaskDocument object with the following structure:
+For each document you need to generate, provide a Document object with the following structure:
{{
"filename": "string", # Filename with extension
"mimeType": "string", # MIME type of the file
@@ -722,5 +590,5 @@ Rules:
3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf)
4. Include file extensions in filenames
-Return a JSON array of TaskDocument objects.
+Return a JSON array of Document objects.
"""
\ No newline at end of file
diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py
index 30cb8194..1e461c0a 100644
--- a/modules/workflow/managerDocument.py
+++ b/modules/workflow/managerDocument.py
@@ -8,10 +8,7 @@ import uuid
from modules.interfaces.serviceChatModel import (
ChatDocument,
- TaskDocument,
- ExtractedContent,
- ContentItem,
- ContentMetadata
+ ExtractedContent
)
from modules.workflow.serviceContainer import ServiceContainer
from modules.workflow.processorDocument import DocumentProcessor
@@ -23,82 +20,12 @@ class DocumentManager:
def __init__(self, serviceContainer: ServiceContainer):
self.service = serviceContainer
- self._processor = DocumentProcessor()
+ self._processor = DocumentProcessor(serviceContainer)
- async def extractFromChatDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent:
- """
- Extract content from a ChatDocument with AI processing.
-
- Args:
- prompt: Prompt for AI content extraction
- document: The ChatDocument to process
-
- Returns:
- ExtractedContent containing the processed content
- """
- # Convert ChatDocument to TaskDocument
- taskDoc = await self._convertToTaskDocument(document)
-
- # Process document using processor
- extractedContent = await self._processor.processDocument(taskDoc, prompt)
-
- # Update the objectId and objectType to reference the original ChatDocument
- extractedContent.objectId = document.id
- extractedContent.objectType = "ChatDocument"
-
- return extractedContent
-
- async def extractFromTaskDocument(self, prompt: str, document: TaskDocument) -> ExtractedContent:
- """
- Extract content directly from a task document.
-
- Args:
- prompt: The prompt to use for content extraction
- document: The task document to extract content from
-
- Returns:
- ExtractedContent containing the processed content
-
- Raises:
- ValueError: If document is invalid
- IOError: If file cannot be read
- """
+ async def extractContent(self, prompt: str, document: ChatDocument) -> ExtractedContent:
+ """Extract content from document using prompt"""
try:
return await self._processor.processDocument(document, prompt)
except Exception as e:
- logger.error(f"Error extracting from task document: {str(e)}")
- raise
-
- async def _convertToTaskDocument(self, chatDoc: ChatDocument) -> TaskDocument:
- """
- Convert a ChatDocument to a TaskDocument.
-
- Args:
- chatDoc: The chat document to convert
-
- Returns:
- TaskDocument containing the converted data
-
- Raises:
- ValueError: If document is invalid
- IOError: If file cannot be read
- """
- try:
- # Get file content
- fileContent = await self.service.functions.getFileData(chatDoc.fileId)
- if not fileContent:
- raise ValueError(f"Could not get content for file {chatDoc.fileId}")
-
- # Convert to base64
- base64Data = base64.b64encode(fileContent).decode('utf-8')
-
- return TaskDocument(
- id=str(uuid.uuid4()),
- filename=chatDoc.filename,
- fileSize=chatDoc.fileSize,
- mimeType=chatDoc.mimeType,
- data=base64Data
- )
- except Exception as e:
- logger.error(f"Error converting chat document to task document: {str(e)}")
+ logger.error(f"Error extracting from document: {str(e)}")
raise
diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py
index b0dcedf8..40408ab8 100644
--- a/modules/workflow/managerWorkflow.py
+++ b/modules/workflow/managerWorkflow.py
@@ -5,11 +5,7 @@ import uuid
from modules.interfaces.serviceAppClass import User
-from modules.interfaces.serviceChatModel import (
- TaskStatus, UserInputRequest, ContentMetadata, ContentItem,
- ChatDocument, TaskDocument, ExtractedContent, TaskItem,
- TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow
-)
+from modules.interfaces.serviceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow)
from modules.interfaces.serviceChatClass import ChatInterface
from modules.workflow.managerChat import ChatManager
diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py
index 51b02c67..75565874 100644
--- a/modules/workflow/processorDocument.py
+++ b/modules/workflow/processorDocument.py
@@ -11,15 +11,13 @@ from bs4 import BeautifulSoup
from modules.interfaces.serviceChatModel import (
ChatDocument,
- TaskDocument,
ExtractedContent,
ContentItem,
ContentMetadata
)
-from modules.interfaces.serviceManagementClass import ServiceManagement, getInterface
-from modules.interfaces.serviceAppModel import User
from modules.neutralizer.neutralizer import DataAnonymizer
from modules.shared.configuration import APP_CONFIG
+from modules.workflow.serviceContainer import ServiceContainer
logger = logging.getLogger(__name__)
@@ -35,14 +33,12 @@ class FileProcessingError(Exception):
class DocumentProcessor:
"""Processor for handling document operations and content extraction."""
- def __init__(self, currentUser: Optional[User] = None):
+ def __init__(self, serviceContainer: ServiceContainer):
"""Initialize the document processor."""
-
- self.serviceManagement = getInterface(currentUser)
-
+ self.service = serviceContainer
self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None
- self.supportedTypes: Dict[str, Callable[[Union[ChatDocument, TaskDocument]], Awaitable[List[ContentItem]]]] = {
+ self.supportedTypes: Dict[str, Callable[[ChatDocument], Awaitable[List[ContentItem]]]] = {
'text/plain': self._processText,
'text/csv': self._processCsv,
'application/json': self._processJson,
@@ -115,7 +111,18 @@ class DocumentProcessor:
except ImportError as e:
logger.warning(f"Image processing libraries could not be loaded: {e}")
- async def processDocument(self, document: TaskDocument, prompt: str) -> ExtractedContent:
+ async def _getFileData(self, document: ChatDocument) -> bytes:
+ """Centralized function to get file data"""
+ try:
+ fileData = self.service.getFileData(document.fileId)
+ if fileData is None:
+ raise FileProcessingError(f"Could not get file data for {document.fileId}")
+ return fileData
+ except Exception as e:
+ logger.error(f"Error getting file data: {str(e)}")
+ raise FileProcessingError(f"Failed to get file data: {str(e)}")
+
+ async def processDocument(self, document: ChatDocument, prompt: str) -> ExtractedContent:
"""
Process a document and extract its contents with AI processing.
@@ -149,15 +156,13 @@ class DocumentProcessor:
try:
# Process each content item with AI
processedItems = await self._aiDataExtraction(contentItems, prompt)
-
contentItems = processedItems
-
except Exception as e:
logger.error(f"Error processing content with AI: {str(e)}")
return ExtractedContent(
objectId=document.id,
- objectType="TaskDocument",
+ objectType="ChatDocument",
contents=contentItems
)
@@ -165,7 +170,7 @@ class DocumentProcessor:
logger.error(f"Error processing document: {str(e)}")
raise FileProcessingError(f"Failed to process document: {str(e)}")
- def _detectContentType(self, document: Union[ChatDocument, TaskDocument]) -> str:
+ def _detectContentType(self, document: ChatDocument) -> str:
"""Detect content type from file content"""
try:
# Check file extension first
@@ -195,32 +200,16 @@ class DocumentProcessor:
if ext in extToMime:
return extToMime[ext]
- # Try to detect if it's text content
- if isinstance(document, TaskDocument):
- try:
- content = base64.b64decode(document.data)
- content.decode('utf-8')
- return 'text/plain'
- except UnicodeDecodeError:
- pass
-
return 'application/octet-stream'
except Exception as e:
logger.error(f"Error detecting content type: {str(e)}")
return 'application/octet-stream'
- async def _processText(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processText(self, document: ChatDocument) -> List[ContentItem]:
"""Process text document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
return [ContentItem(
label="main",
data=content,
@@ -235,17 +224,10 @@ class DocumentProcessor:
logger.error(f"Error processing text document: {str(e)}")
raise FileProcessingError(f"Failed to process text document: {str(e)}")
- async def _processCsv(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processCsv(self, document: ChatDocument) -> List[ContentItem]:
"""Process CSV document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
return [ContentItem(
label="main",
data=content,
@@ -260,17 +242,10 @@ class DocumentProcessor:
logger.error(f"Error processing CSV document: {str(e)}")
raise FileProcessingError(f"Failed to process CSV document: {str(e)}")
- async def _processJson(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processJson(self, document: ChatDocument) -> List[ContentItem]:
"""Process JSON document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
# Parse JSON to validate
jsonData = json.loads(content)
@@ -288,17 +263,10 @@ class DocumentProcessor:
logger.error(f"Error processing JSON document: {str(e)}")
raise FileProcessingError(f"Failed to process JSON document: {str(e)}")
- async def _processXml(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processXml(self, document: ChatDocument) -> List[ContentItem]:
"""Process XML document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
return [ContentItem(
label="main",
data=content,
@@ -313,17 +281,10 @@ class DocumentProcessor:
logger.error(f"Error processing XML document: {str(e)}")
raise FileProcessingError(f"Failed to process XML document: {str(e)}")
- async def _processHtml(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processHtml(self, document: ChatDocument) -> List[ContentItem]:
"""Process HTML document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
return [ContentItem(
label="main",
data=content,
@@ -338,17 +299,10 @@ class DocumentProcessor:
logger.error(f"Error processing HTML document: {str(e)}")
raise FileProcessingError(f"Failed to process HTML document: {str(e)}")
- async def _processSvg(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]:
+ async def _processSvg(self, document: ChatDocument) -> List[ContentItem]:
"""Process SVG document"""
try:
- if isinstance(document, TaskDocument):
- content = base64.b64decode(document.data).decode('utf-8')
- else:
- content = self.serviceManagement.getFileData(document.fileId)
- if content is None:
- raise FileProcessingError(f"Could not get file data for {document.fileId}")
- content = content.decode('utf-8')
-
+ content = (await self._getFileData(document)).decode('utf-8')
# Check if it's actually SVG
isSvg = "