52 KiB
Services API Reference
Complete API reference documentation for all services in the Gateway application. This document provides detailed method signatures, parameters, return types, examples, and usage guidelines for each service.
Table of Contents
- Introduction
- AI Service API
- Chat Service API
- Extraction Service API
- Generation Service API
- Neutralization Service API
- SharePoint Service API
- Ticket Service API
- Utils Service API
- Common Patterns
- Error Handling
Introduction
All services are accessed through the Services container, which is initialized with user and workflow context:
from modules.services import Services
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelChat import ChatWorkflow
# Initialize services
services = Services(user=current_user, workflow=current_workflow)
# Access any service
result = await services.ai.someMethod()
Service Access Pattern
graph LR
User[User Code] --> Container[Services Container]
Container --> AI[services.ai]
Container --> Chat[services.chat]
Container --> Extract[services.extraction]
Container --> Gen[services.generation]
Container --> Neut[services.neutralization]
Container --> SP[services.sharepoint]
Container --> Ticket[services.ticket]
Container --> Utils[services.utils]
style Container fill:#e1f5ff,stroke:#01579b,stroke-width:2px
AI Service API
Location: modules/services/serviceAi/mainServiceAi.py
Overview
The AI Service provides methods for interacting with AI models for planning, document processing, text generation, image generation, and web operations.
Class: AiService
Initialization
class AiService:
def __init__(self, serviceCenter=None) -> None:
"""
Initialize AI service with service center access.
Args:
serviceCenter: Services container instance
"""
The AI Service is automatically initialized by the Services container. Access via:
services.ai.method_name()
Method: callAiPlanning
Planning AI calls for task planning, action planning, and intent analysis.
Signature
async def callAiPlanning(
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None,
debugType: Optional[str] = None
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt |
str |
Yes | The planning prompt template |
placeholders |
List[PromptPlaceholder] |
No | List of placeholder replacements using {{KEY:name}} format |
debugType |
str |
No | Debug file type identifier (e.g., 'taskplan', 'actionplan', 'intentanalysis') |
Returns
- Type:
str - Description: Planning JSON response from the AI model
Behavior
- Always uses static parameters optimized for planning tasks
- Operation type:
PLAN - Priority:
QUALITY - Processing mode:
DETAILED - No compression applied to prompt or context
- Returns single-shot JSON (no iterative generation)
Example Usage
# Basic planning call
planning_result = await services.ai.callAiPlanning(
prompt="Analyze the user's request and create a task plan",
debugType="taskplan"
)
# With placeholders
from modules.datamodels.datamodelChat import PromptPlaceholder
placeholders = [
PromptPlaceholder(
label="user_request",
content="Create a quarterly sales report"
),
PromptPlaceholder(
label="available_documents",
content="3 documents available: sales_q1.xlsx, sales_q2.xlsx, sales_q3.xlsx"
)
]
planning_result = await services.ai.callAiPlanning(
prompt="""
Analyze this request: {{KEY:user_request}}
Available resources:
{{KEY:available_documents}}
Create a task plan in JSON format.
""",
placeholders=placeholders,
debugType="taskplan"
)
Error Handling
try:
result = await services.ai.callAiPlanning(prompt="...")
except Exception as e:
logger.error(f"Planning call failed: {str(e)}")
# Handle error
Method: callAiDocuments
Document generation AI call for all non-planning operations including document processing, generation, and web operations.
Signature
async def callAiDocuments(
prompt: str,
documents: Optional[List[ChatDocument]] = None,
options: Optional[AiCallOptions] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None
) -> Union[str, Dict[str, Any]]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt |
str |
Yes | The main prompt for the AI call |
documents |
List[ChatDocument] |
No | List of documents to process |
options |
AiCallOptions |
No | AI call configuration options (auto-analyzed if not provided) |
outputFormat |
str |
No | Output format for document generation (html, pdf, docx, xlsx, csv, json, md, txt, png) |
title |
str |
No | Title for generated documents |
Returns
- Type:
Union[str, Dict[str, Any]] - Description:
- If
outputFormatis specified: Dictionary with generated document(s) - If no
outputFormat: String with processed text
- If
Document Result Format
When outputFormat is specified, returns:
{
"success": True,
"content": {...}, # Structured JSON content
"documents": [
{
"documentName": "report.pdf",
"documentData": "<base64-encoded-content>",
"mimeType": "application/pdf",
"title": "Quarterly Report"
}
],
"is_multi_file": False,
"format": "pdf",
"title": "Quarterly Report",
"split_strategy": "single",
"total_documents": 1,
"processed_documents": 1
}
Behavior
- Auto-analysis: If
optionsis None oroptions.operationTypeis None, automatically analyzes prompt to determine optimal parameters - Document Processing: If
documentsprovided, extracts and processes content - Iterative Generation: Supports multi-iteration generation with automatic continuation
- Progress Tracking: Provides granular progress updates
- Format-Specific Handling: Different behavior for image generation, web operations, and document generation
Example Usage
Text Processing
# Process documents and return text
result = await services.ai.callAiDocuments(
prompt="Summarize the key findings from these documents",
documents=chat_documents,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
)
# Returns: str with summary text
Document Generation
# Generate PDF report
result = await services.ai.callAiDocuments(
prompt="Create a comprehensive quarterly sales report",
documents=sales_documents,
outputFormat="pdf",
title="Q4 2024 Sales Report"
)
# Returns: Dict with PDF document data
Image Generation
# Generate image
result = await services.ai.callAiDocuments(
prompt="Generate a professional bar chart showing sales by region",
outputFormat="base64",
options=AiCallOptions(
operationType=OperationTypeEnum.IMAGE_GENERATE,
priority=PriorityEnum.BALANCED
)
)
# Returns: Dict with base64-encoded image
Web Search
# Web search operation
search_prompt = json.dumps({
"query": "latest AI developments 2024",
"max_results": 5
})
result = await services.ai.callAiDocuments(
prompt=search_prompt,
options=AiCallOptions(
operationType=OperationTypeEnum.WEB_SEARCH,
priority=PriorityEnum.SPEED
)
)
# Returns: str with search results
Auto-Analysis Mode
# Let AI analyze the prompt and determine parameters
result = await services.ai.callAiDocuments(
prompt="Extract key insights from these contracts and create a summary",
documents=contract_documents
)
# AI automatically determines operation type, priority, and processing mode
Advanced Options
from modules.datamodels.datamodelAi import (
AiCallOptions,
OperationTypeEnum,
PriorityEnum,
ProcessingModeEnum
)
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
compressPrompt=False, # Don't compress prompt (important for JSON templates)
compressContext=False # Don't compress context
)
result = await services.ai.callAiDocuments(
prompt=detailed_extraction_prompt,
documents=documents,
options=options,
outputFormat="json",
title="Extracted Data"
)
Operation Types
| Operation Type | Description | Use Case |
|---|---|---|
PLAN |
Task and action planning | Strategic planning, workflow design |
DATA_EXTRACT |
Extract structured data | Parse documents, extract entities |
DATA_ANALYSE |
Analyze and summarize | Insights, summaries, analysis |
TEXT_GENERATE |
Generate text content | Articles, reports, descriptions |
IMAGE_GENERATE |
Generate images | Charts, diagrams, illustrations |
WEB_SEARCH |
Search the web | Research, fact-checking |
WEB_CRAWL |
Crawl web pages | Deep content extraction |
Priority Levels
| Priority | Description | Performance |
|---|---|---|
SPEED |
Fast responses | Lower quality, faster |
BALANCED |
Balance speed/quality | Good for most cases |
QUALITY |
Best quality | Slower, higher quality |
Processing Modes
| Mode | Description | Detail Level |
|---|---|---|
BASIC |
Simple processing | Quick, basic analysis |
STANDARD |
Normal processing | Standard detail |
DETAILED |
Comprehensive processing | Deep analysis |
Method: callAiText
Process documents with text extraction and AI analysis.
Signature
async def callAiText(
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
operationId: Optional[str] = None
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
prompt |
str |
Yes | Processing prompt |
documents |
List[ChatDocument] |
No | Documents to process |
options |
AiCallOptions |
Yes | AI call options |
operationId |
str |
No | Operation ID for progress tracking |
Returns
- Type:
str - Description: Processed text result
Behavior
- Automatically extracts content from documents
- Applies model-aware chunking if needed
- Processes chunks in parallel
- Merges results intelligently
- Tracks progress if
operationIdprovided
Example Usage
# Process documents with text extraction
result = await services.ai.callAiText(
prompt="Extract all dates and amounts from these invoices",
documents=invoice_documents,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=PriorityEnum.BALANCED
),
operationId="extract_invoices_123"
)
Chat Service API
Location: modules/services/serviceChat/mainServiceChat.py
Overview
The Chat Service manages workflow operations, message handling, document resolution, connection management, and progress tracking.
Class: ChatService
Initialization
class ChatService:
def __init__(self, serviceCenter):
"""
Initialize Chat service with service center access.
Args:
serviceCenter: Services container instance
"""
Access via:
services.chat.method_name()
Document Resolution Methods
Method: getChatDocumentsFromDocumentList
Resolve document references to actual ChatDocument objects.
Signature
def getChatDocumentsFromDocumentList(
documentList: List[str]
) -> List[ChatDocument]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
documentList |
List[str] |
Yes | List of document references |
Returns
- Type:
List[ChatDocument] - Description: Resolved ChatDocument objects with file access
Document Reference Formats
Three reference formats are supported:
-
docItem: Single document by ID
"docItem:<document-id>:<filename>" -
docList with message ID: All documents from a specific message
"docList:<message-id>:<label>" -
docList with label only: Documents by label (newest message)
"docList:<label>"
Example Usage
# Single document reference
doc_refs = ["docItem:123e4567-e89b-12d3-a456:report.pdf"]
documents = services.chat.getChatDocumentsFromDocumentList(doc_refs)
# Multiple document references
doc_refs = [
"docItem:123e4567-e89b-12d3-a456:report.pdf",
"docList:round1_task1_action1_contextinfo",
"docList:456:user_upload"
]
documents = services.chat.getChatDocumentsFromDocumentList(doc_refs)
# Use resolved documents with AI
result = await services.ai.callAiDocuments(
prompt="Analyze these documents",
documents=documents
)
Error Handling
try:
documents = services.chat.getChatDocumentsFromDocumentList(doc_refs)
if not documents:
logger.warning("No documents found")
except Exception as e:
logger.error(f"Document resolution failed: {str(e)}")
Method: getDocumentReferenceFromChatDocument
Get a document reference string from a ChatDocument object.
Signature
def getDocumentReferenceFromChatDocument(
document: ChatDocument
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
document |
ChatDocument |
Yes | Document to create reference for |
Returns
- Type:
str - Description: Document reference string in format
"docItem:<id>:<filename>"
Example Usage
# Create reference for a document
doc_ref = services.chat.getDocumentReferenceFromChatDocument(document)
# Returns: "docItem:123e4567-e89b-12d3:report.pdf"
# Store reference in action result
action_result = {
"documents_used": [
services.chat.getDocumentReferenceFromChatDocument(doc)
for doc in processed_documents
]
}
Workflow Management Methods
Method: createWorkflow
Create a new workflow.
Signature
def createWorkflow(
workflowData: Dict[str, Any]
) -> ChatWorkflow
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflowData |
Dict[str, Any] |
Yes | Workflow creation data |
Workflow Data Structure
workflow_data = {
"userId": "user-id",
"status": "active",
"currentRound": 1,
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0,
"metadata": {}
}
Returns
- Type:
ChatWorkflow - Description: Created workflow object
Example Usage
# Create new workflow
workflow = services.chat.createWorkflow({
"userId": current_user.id,
"status": "active",
"currentRound": 1,
"metadata": {
"workflow_type": "real_estate_analysis"
}
})
# Use workflow in services
services_with_workflow = Services(user=current_user, workflow=workflow)
Method: updateWorkflow
Update an existing workflow.
Signature
def updateWorkflow(
workflowId: str,
updateData: Dict[str, Any]
) -> ChatWorkflow
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflowId |
str |
Yes | ID of workflow to update |
updateData |
Dict[str, Any] |
Yes | Fields to update |
Returns
- Type:
ChatWorkflow - Description: Updated workflow object
Example Usage
# Update workflow status
updated_workflow = services.chat.updateWorkflow(
workflowId=workflow.id,
updateData={
"status": "completed",
"totalTasks": 5,
"totalActions": 12
}
)
Method: getWorkflow
Retrieve a workflow by ID.
Signature
def getWorkflow(
workflowId: str
) -> ChatWorkflow
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflowId |
str |
Yes | ID of workflow to retrieve |
Returns
- Type:
ChatWorkflow - Description: Workflow object with all messages and logs
Example Usage
# Get workflow
workflow = services.chat.getWorkflow(workflow_id)
# Access workflow data
print(f"Status: {workflow.status}")
print(f"Messages: {len(workflow.messages)}")
print(f"Current round: {workflow.currentRound}")
Context Management Methods
Method: getWorkflowContext
Get current workflow context (round, task, action numbers).
Signature
def getWorkflowContext() -> Dict[str, int]
Returns
{
"currentRound": 1,
"currentTask": 2,
"currentAction": 3
}
Example Usage
context = services.chat.getWorkflowContext()
print(f"Processing round {context['currentRound']}, task {context['currentTask']}")
Method: setWorkflowContext
Update workflow context numbers.
Signature
def setWorkflowContext(
roundNumber: int = None,
taskNumber: int = None,
actionNumber: int = None
) -> None
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
roundNumber |
int |
No | New round number |
taskNumber |
int |
No | New task number |
actionNumber |
int |
No | New action number |
Example Usage
# Start new round
services.chat.setWorkflowContext(roundNumber=2, taskNumber=0, actionNumber=0)
# Move to next task
services.chat.setWorkflowContext(taskNumber=1)
# Move to next action
services.chat.setWorkflowContext(actionNumber=1)
Method: getWorkflowStats
Get comprehensive workflow statistics.
Signature
def getWorkflowStats() -> Dict[str, Any]
Returns
{
"currentRound": 1,
"currentTask": 2,
"currentAction": 3,
"totalTasks": 5,
"totalActions": 12,
"workflowStatus": "active",
"workflowId": "workflow-123"
}
Example Usage
stats = services.chat.getWorkflowStats()
progress = (stats['currentTask'] / stats['totalTasks']) * 100
print(f"Workflow progress: {progress:.1f}%")
Message & Document Storage Methods
Method: storeMessageWithDocuments
Persist a message with documents and sync with in-memory workflow.
Signature
def storeMessageWithDocuments(
workflow: Any,
messageData: Dict[str, Any],
documents: List[Any]
) -> ChatMessage
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow |
ChatWorkflow |
Yes | Current workflow object |
messageData |
Dict[str, Any] |
Yes | Message data |
documents |
List[ChatDocument] |
Yes | Documents to attach |
Message Data Structure
message_data = {
"role": "assistant", # or "user", "system"
"content": "Message content",
"status": "published", # or "first", "hidden"
"documentsLabel": "round1_task1_action1_result",
"roundNumber": 1,
"taskNumber": 1,
"actionNumber": 1
}
Returns
- Type:
ChatMessage - Description: Created message with attached documents
Behavior
- Stores message in database
- Stores all documents in database
- Syncs in-memory
workflow.messageslist - Ensures workflowId is set on message
Example Usage
# Store result message with generated documents
message = services.chat.storeMessageWithDocuments(
workflow=services.workflow,
messageData={
"role": "assistant",
"content": "I've generated the quarterly report.",
"status": "published",
"documentsLabel": f"round{round}_task{task}_action{action}_result",
"roundNumber": round,
"taskNumber": task,
"actionNumber": action
},
documents=generated_documents
)
# Access stored message
print(f"Message ID: {message.id}")
print(f"Documents attached: {len(message.documents)}")
Method: storeLog
Store a workflow log entry.
Signature
def storeLog(
workflow: Any,
logData: Dict[str, Any]
) -> ChatLog
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow |
ChatWorkflow |
Yes | Current workflow object |
logData |
Dict[str, Any] |
Yes | Log entry data |
Log Data Structure
log_data = {
"message": "AI Service",
"type": "info", # or "warning", "error", "success"
"status": "Processing documents",
"progress": 0.5 # 0.0 to 1.0
}
Returns
- Type:
ChatLog - Description: Created log entry
Example Usage
# Store information log
log = services.chat.storeLog(
workflow=services.workflow,
logData={
"message": "Document Processing",
"type": "info",
"status": "Extracting content from 3 documents",
"progress": 0.3
}
)
# Store error log
error_log = services.chat.storeLog(
workflow=services.workflow,
logData={
"message": "AI Service",
"type": "error",
"status": "Failed to process document",
"progress": 0.5
}
)
Method: storeWorkflowStat
Store workflow statistics from AI operations.
Signature
def storeWorkflowStat(
workflow: Any,
aiResponse: AiCallResponse,
process: str
) -> ChatStat
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow |
ChatWorkflow |
Yes | Current workflow object |
aiResponse |
AiCallResponse |
Yes | AI response with metrics |
process |
str |
Yes | Process identifier (e.g., "ai.text_call", "extraction.pdf") |
Returns
- Type:
ChatStat - Description: Created statistics record
Example Usage
# AI service automatically stores stats
response = await services.ai.callAiDocuments(...)
# Internally calls:
services.chat.storeWorkflowStat(
workflow=services.workflow,
aiResponse=response,
process="ai.document_generation"
)
# Manual stat storage
services.chat.storeWorkflowStat(
workflow=services.workflow,
aiResponse=custom_response,
process="custom.process.name"
)
Progress Tracking Methods
Method: progressLogStart
Start tracking progress for an operation.
Signature
def progressLogStart(
operationId: str,
serviceName: str,
actionName: str,
context: str = ""
) -> None
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
operationId |
str |
Yes | Unique operation identifier |
serviceName |
str |
Yes | Name of the service |
actionName |
str |
Yes | Name of the action |
context |
str |
No | Additional context information |
Example Usage
operation_id = f"process_docs_{workflow_id}_{timestamp}"
services.chat.progressLogStart(
operationId=operation_id,
serviceName="Document Processor",
actionName="Extract Content",
context="Processing 5 PDF documents"
)
Method: progressLogUpdate
Update progress for an ongoing operation.
Signature
def progressLogUpdate(
operationId: str,
progress: float,
statusUpdate: str = ""
) -> None
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
operationId |
str |
Yes | Operation identifier from progressLogStart |
progress |
float |
Yes | Progress value (0.0 to 1.0) |
statusUpdate |
str |
No | Status message |
Example Usage
# Update progress through operation stages
services.chat.progressLogUpdate(operation_id, 0.2, "Extracting text")
services.chat.progressLogUpdate(operation_id, 0.5, "Processing with AI")
services.chat.progressLogUpdate(operation_id, 0.8, "Generating output")
Method: progressLogFinish
Mark an operation as complete.
Signature
def progressLogFinish(
operationId: str,
success: bool = True
) -> None
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
operationId |
str |
Yes | Operation identifier |
success |
bool |
No | Whether operation succeeded (default: True) |
Example Usage
try:
# Start tracking
services.chat.progressLogStart(op_id, "Service", "Action")
# Perform work with updates
services.chat.progressLogUpdate(op_id, 0.5, "Processing")
result = do_work()
# Mark success
services.chat.progressLogFinish(op_id, True)
except Exception as e:
# Mark failure
services.chat.progressLogFinish(op_id, False)
raise
Connection Management Methods
Method: getConnectionReferenceFromUserConnection
Get connection reference string from UserConnection object.
Signature
def getConnectionReferenceFromUserConnection(
connection: UserConnection
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connection |
UserConnection |
Yes | User connection object |
Returns
- Type:
str - Description: Connection reference with status info
- Format:
"connection:{authority}:{username} [status:{status}, token:{token_status}]"
Example Usage
# Get user's connections
connections = services.interfaceDbApp.getUserConnections(user.id)
# Create references
connection_refs = [
services.chat.getConnectionReferenceFromUserConnection(conn)
for conn in connections
]
# Returns: ["connection:msft:user@company.com [status:active, token:valid]"]
Method: getUserConnectionFromConnectionReference
Resolve connection reference to UserConnection object.
Signature
def getUserConnectionFromConnectionReference(
connectionReference: str
) -> Optional[UserConnection]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connectionReference |
str |
Yes | Connection reference string |
Returns
- Type:
Optional[UserConnection] - Description: UserConnection object or None if not found
Example Usage
# Resolve connection reference
conn_ref = "connection:msft:user@company.com"
connection = services.chat.getUserConnectionFromConnectionReference(conn_ref)
if connection:
# Use connection
token = services.chat.getFreshConnectionToken(connection.id)
Method: getFreshConnectionToken
Get a fresh token for a connection (refreshes if needed).
Signature
def getFreshConnectionToken(
connectionId: str
) -> Optional[Token]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connectionId |
str |
Yes | Connection ID |
Returns
- Type:
Optional[Token] - Description: Fresh token or None if unavailable
Example Usage
# Get fresh token for SharePoint access
token = services.chat.getFreshConnectionToken(connection.id)
if token:
# Use token for API call
await services.sharepoint.uploadDocument(
siteUrl=site_url,
token=token.accessToken,
...
)
File Information Methods
Method: getFileInfo
Get file metadata.
Signature
def getFileInfo(
fileId: str
) -> Dict[str, Any]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
fileId |
str |
Yes | File ID |
Returns
{
"id": "file-123",
"fileName": "document.pdf",
"size": 1048576,
"mimeType": "application/pdf",
"fileHash": "abc123...",
"creationDate": 1640000000
}
Example Usage
file_info = services.chat.getFileInfo(document.fileId)
print(f"File: {file_info['fileName']} ({file_info['size']} bytes)")
Method: getFileData
Get file raw data bytes.
Signature
def getFileData(
fileId: str
) -> bytes
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
fileId |
str |
Yes | File ID |
Returns
- Type:
bytes - Description: Raw file data
Example Usage
# Get file data
file_data = services.chat.getFileData(document.fileId)
# Process bytes
with open("output.pdf", "wb") as f:
f.write(file_data)
Extraction Service API
Location: modules/services/serviceExtraction/mainServiceExtraction.py
Overview
The Extraction Service extracts and processes content from various document formats with intelligent chunking and merging.
Class: ExtractionService
Access via:
services.extraction.method_name()
Method: extractContent
Extract content from documents.
Signature
def extractContent(
documents: List[ChatDocument],
options: ExtractionOptions
) -> List[ContentExtracted]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
documents |
List[ChatDocument] |
Yes | Documents to extract content from |
options |
ExtractionOptions |
Yes | Extraction configuration |
ExtractionOptions Structure
from modules.datamodels.datamodelExtraction import (
ExtractionOptions,
MergeStrategy
)
options = ExtractionOptions(
prompt="Extract all text and tables",
operationType=OperationTypeEnum.DATA_EXTRACT,
processDocumentsIndividually=True,
mergeStrategy=MergeStrategy(
useIntelligentMerging=True,
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate"
)
)
Returns
- Type:
List[ContentExtracted] - Description: List of extracted content objects, one per document
ContentExtracted Structure
{
"id": "extracted-1",
"parts": [
{
"id": "part-1",
"parentId": None,
"label": "text_content",
"typeGroup": "text",
"mimeType": "text/plain",
"data": "Extracted text content...",
"metadata": {
"documentId": "doc-123",
"pageNumber": 1,
"size": 1024
}
},
# More parts...
]
}
Example Usage
from modules.datamodels.datamodelExtraction import (
ExtractionOptions,
MergeStrategy
)
from modules.datamodels.datamodelAi import OperationTypeEnum
# Define extraction options
options = ExtractionOptions(
prompt="Extract content for analysis",
operationType=OperationTypeEnum.DATA_EXTRACT,
processDocumentsIndividually=True,
mergeStrategy=MergeStrategy(
useIntelligentMerging=True,
mergeType="concatenate"
)
)
# Extract content
extracted = services.extraction.extractContent(
documents=chat_documents,
options=options
)
# Process extracted content
for content in extracted:
print(f"Document {content.id}: {len(content.parts)} parts")
for part in content.parts:
print(f" - {part.typeGroup}: {len(part.data)} chars")
Method: processDocumentsPerChunk
Process documents with model-aware chunking and AI analysis.
Signature
async def processDocumentsPerChunk(
documents: List[ChatDocument],
prompt: str,
aiObjects: Any,
options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
documents |
List[ChatDocument] |
Yes | Documents to process |
prompt |
str |
Yes | AI processing prompt |
aiObjects |
AiObjects |
Yes | AI objects interface |
options |
AiCallOptions |
No | AI call options |
operationId |
str |
No | Operation ID for progress tracking |
Returns
- Type:
str - Description: Merged text result from all processed chunks
Behavior
- Extracts content from documents
- Automatically chunks content based on model limits
- Processes chunks in parallel (max 5 concurrent)
- Merges results intelligently
- Tracks progress if operation ID provided
Example Usage
# Process documents with chunking
result = await services.extraction.processDocumentsPerChunk(
documents=large_documents,
prompt="Extract all key information",
aiObjects=services.ai.aiObjects,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=PriorityEnum.BALANCED
),
operationId="extract_large_docs_123"
)
print(f"Extracted text: {len(result)} characters")
Generation Service API
Location: modules/services/serviceGeneration/mainServiceGeneration.py
Overview
The Generation Service renders documents in various formats from structured content.
Class: GenerationService
Access via:
services.generation.method_name()
Method: renderReport
Render structured content to specific output format.
Signature
async def renderReport(
extractedContent: Dict[str, Any],
outputFormat: str,
title: str,
userPrompt: str = None,
aiService = None
) -> tuple[str, str]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
extractedContent |
Dict[str, Any] |
Yes | Structured JSON content |
outputFormat |
str |
Yes | Target format (html, pdf, docx, xlsx, csv, json, md, txt, png) |
title |
str |
Yes | Document title |
userPrompt |
str |
No | Original user prompt |
aiService |
AiService |
No | AI service for prompt building |
Supported Formats
| Format | MIME Type | Description |
|---|---|---|
html |
text/html |
HTML document with CSS |
pdf |
application/pdf |
PDF document |
docx |
application/vnd.openxmlformats-officedocument.wordprocessingml.document |
Word document |
xlsx |
application/vnd.openxmlformats-officedocument.spreadsheetml.sheet |
Excel spreadsheet |
pptx |
application/vnd.openxmlformats-officedocument.presentationml.presentation |
PowerPoint |
csv |
text/csv |
CSV file |
json |
application/json |
JSON data |
md |
text/markdown |
Markdown document |
txt |
text/plain |
Plain text |
png |
image/png |
PNG image |
Returns
- Type:
tuple[str, str] - Description:
(rendered_content, mime_type)rendered_content: Base64-encoded content for binary formats, or string for text formatsmime_type: MIME type of the rendered content
Example Usage
# Render to PDF
pdf_content, mime_type = await services.generation.renderReport(
extractedContent=structured_data,
outputFormat="pdf",
title="Quarterly Report",
userPrompt="Create a comprehensive quarterly report"
)
# Render to DOCX
docx_content, mime_type = await services.generation.renderReport(
extractedContent=structured_data,
outputFormat="docx",
title="Analysis Document"
)
# Render to HTML
html_content, mime_type = await services.generation.renderReport(
extractedContent=structured_data,
outputFormat="html",
title="Web Report"
)
Method: createDocumentsFromActionResult
Create ChatDocument objects from action results.
Signature
def createDocumentsFromActionResult(
actionResult,
action,
workflow,
message_id: str = None
) -> List[ChatDocument]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
actionResult |
ActionResult |
Yes | Action result with documents |
action |
Action |
Yes | Action that produced the result |
workflow |
ChatWorkflow |
Yes | Current workflow |
message_id |
str |
No | Message ID to associate documents with |
Returns
- Type:
List[ChatDocument] - Description: List of created ChatDocument objects with workflow context
Example Usage
# In a feature after AI generates documents
action_result = await services.ai.callAiDocuments(
prompt="Generate report",
outputFormat="pdf"
)
# Create ChatDocument objects
documents = services.generation.createDocumentsFromActionResult(
actionResult=action_result,
action=current_action,
workflow=services.workflow,
message_id=message.id
)
# Store with message
message = services.chat.storeMessageWithDocuments(
workflow=services.workflow,
messageData=message_data,
documents=documents
)
Neutralization Service API
Location: modules/services/serviceNeutralization/mainServiceNeutralization.py
Overview
The Neutralization Service anonymizes sensitive data for GDPR compliance.
Class: NeutralizationService
Access via:
services.neutralization.method_name()
Method: processText
Neutralize text content.
Signature
def processText(
text: str
) -> Dict[str, Any]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
text |
str |
Yes | Text content to neutralize |
Returns
{
"neutralized_text": "[firstname.uuid] [lastname.uuid] works at...",
"mapping": {
"[firstname.uuid]": "John",
"[lastname.uuid]": "Smith",
# More mappings...
},
"attributes": [
{
"original": "John",
"placeholder": "[firstname.uuid]"
},
# More attributes...
],
"processed_info": {
"type": "text",
"patterns_found": 5,
"replacements_made": 5
}
}
Example Usage
# Neutralize text
text = "John Smith (john.smith@example.com) lives at 123 Main St."
result = services.neutralization.processText(text)
print(result["neutralized_text"])
# "[firstname.abc] [lastname.def] ([email.ghi]) lives at [address.jkl]"
# Use neutralized text with AI
ai_result = await services.ai.callAiDocuments(
prompt=f"Analyze this text: {result['neutralized_text']}"
)
# Resolve placeholders in result
resolved = services.neutralization.resolveText(ai_result)
Method: processFile
Neutralize file content.
Signature
def processFile(
fileId: str
) -> Dict[str, Any]
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
fileId |
str |
Yes | File ID to neutralize |
Returns
Same structure as processText, plus:
{
# ... same as processText result
"file_id": "file-123",
"is_binary": False,
"mime_type": "text/plain",
"file_name": "document.txt",
"neutralized_file_name": "neutralized_document.txt"
}
Example Usage
# Neutralize file
result = services.neutralization.processFile(document.fileId)
if result["is_binary"]:
print("Binary file - neutralization not available")
else:
neutralized_content = result["neutralized_text"]
# Process neutralized content
Method: resolveText
Resolve placeholders back to original values.
Signature
def resolveText(
text: str
) -> str
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
text |
str |
Yes | Text with placeholders |
Returns
- Type:
str - Description: Text with resolved values
Example Usage
# Neutralize and process
neutralized_result = services.neutralization.processText(sensitive_text)
ai_analysis = await services.ai.callAiDocuments(
prompt=f"Analyze: {neutralized_result['neutralized_text']}"
)
# Resolve result
final_result = services.neutralization.resolveText(ai_analysis)
# Original names/data restored
Method: getConfig
Get neutralization configuration.
Signature
def getConfig() -> Optional[DataNeutraliserConfig]
Returns
{
"id": "config-123",
"mandateId": "mandate-456",
"namesToParse": "John Smith\nJane Doe\nCompany Inc",
"customPatterns": {...},
"enabled": True
}
Example Usage
config = services.neutralization.getConfig()
if config:
names = config.namesToParse.split('\n')
print(f"Configured names: {len(names)}")
Method: saveConfig
Save or update neutralization configuration.
Signature
def saveConfig(
config_data: Dict[str, Any]
) -> DataNeutraliserConfig
Parameters
config_data = {
"namesToParse": "John Smith\nJane Doe",
"customPatterns": {},
"enabled": True
}
Returns
- Type:
DataNeutraliserConfig - Description: Saved configuration
Example Usage
# Update configuration
config = services.neutralization.saveConfig({
"namesToParse": "Executive Name\nCompany CEO\nProject Manager",
"enabled": True
})
Utils Service API
Location: modules/services/serviceUtils/mainServiceUtils.py
Overview
The Utils Service provides common utility functions across the application.
Class: UtilsService
Access via:
services.utils.method_name()
Configuration Methods
Method: configGet
Get configuration value.
Signature
def configGet(
key: str,
default: Any = None,
user_id: str = "system"
) -> Any
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
key |
str |
Yes | Configuration key |
default |
Any |
No | Default value if not found |
user_id |
str |
No | User ID for audit (default: "system") |
Example Usage
# Get configuration
max_file_size = services.utils.configGet(
"max_file_size_mb",
default=10
)
api_timeout = services.utils.configGet("api_timeout", 30)
Event Management Methods
Method: eventRegisterCron
Register a cron job.
Signature
def eventRegisterCron(
job_id: str,
func: Callable,
cron_kwargs: Dict[str, Any],
replace_existing: bool = True,
coalesce: bool = True,
max_instances: int = 1,
misfire_grace_time: int = 1800
) -> None
Parameters
| Parameter | Type | Description |
|---|---|---|
job_id |
str |
Unique job identifier |
func |
Callable |
Function to execute |
cron_kwargs |
Dict |
Cron schedule (hour, minute, day_of_week, etc.) |
replace_existing |
bool |
Replace existing job with same ID |
coalesce |
bool |
Coalesce multiple pending executions |
max_instances |
int |
Max concurrent instances |
misfire_grace_time |
int |
Grace time for misfired jobs (seconds) |
Example Usage
# Daily cleanup at 2 AM
services.utils.eventRegisterCron(
job_id="daily_cleanup",
func=cleanup_function,
cron_kwargs={"hour": 2, "minute": 0}
)
# Every Monday at 9 AM
services.utils.eventRegisterCron(
job_id="weekly_report",
func=generate_report,
cron_kwargs={"day_of_week": "mon", "hour": 9, "minute": 0}
)
Method: eventRegisterInterval
Register an interval job.
Signature
def eventRegisterInterval(
job_id: str,
func: Callable,
seconds: Optional[int] = None,
minutes: Optional[int] = None,
hours: Optional[int] = None,
replace_existing: bool = True,
coalesce: bool = True,
max_instances: int = 1,
misfire_grace_time: int = 1800
) -> None
Example Usage
# Every 30 minutes
services.utils.eventRegisterInterval(
job_id="status_check",
func=check_status,
minutes=30
)
# Every 2 hours
services.utils.eventRegisterInterval(
job_id="data_sync",
func=sync_data,
hours=2
)
Method: eventRemove
Remove a scheduled job.
Signature
def eventRemove(
job_id: str
) -> None
Example Usage
# Remove job
services.utils.eventRemove("daily_cleanup")
Time Methods
Method: timestampGetUtc
Get current UTC timestamp.
Signature
def timestampGetUtc() -> float
Returns
- Type:
float - Description: UTC timestamp in seconds
Example Usage
timestamp = services.utils.timestampGetUtc()
print(f"Current time: {timestamp}")
Debug Methods
Method: writeDebugFile
Write debug content to file.
Signature
def writeDebugFile(
content: str,
fileType: str,
documents: Optional[List] = None
) -> None
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
content |
str |
Yes | Content to write |
fileType |
str |
Yes | File type identifier (used in filename) |
documents |
List |
No | Related documents |
Example Usage
# Write debug file
services.utils.writeDebugFile(
content=json.dumps(data, indent=2),
fileType="action_result",
documents=processed_docs
)
# Creates: logs/debug/prompts/action_result_<timestamp>.txt
Method: sanitizePromptContent
Sanitize content for safe prompt insertion.
Signature
def sanitizePromptContent(
content: str,
contentType: str = "text"
) -> str
Parameters
| Parameter | Type | Values | Description |
|---|---|---|---|
content |
str |
- | Content to sanitize |
contentType |
str |
"text", "userinput", "json", "document" | Type of content |
Example Usage
# Sanitize user input
sanitized = services.utils.sanitizePromptContent(
content=user_input,
contentType="userinput"
)
# Use in prompt
prompt = f"Process this: {sanitized}"
JSON Utility Methods
Method: jsonExtractString
Extract JSON from text with fences or other formatting.
Signature
def jsonExtractString(
text: str
) -> str
Example Usage
# Extract JSON from markdown code fence
text = '''
Here's the result:
```json
{"key": "value"}
'''
json_str = services.utils.jsonExtractString(text) data = json.loads(json_str)
---
#### Method: `jsonTryParse`
Try to parse JSON with error recovery.
##### Signature
```python
def jsonTryParse(
text: str
) -> tuple[bool, Union[Dict, List, None]]
Returns
- Type:
tuple[bool, Union[Dict, List, None]] - Description:
(success, parsed_data)
Example Usage
success, data = services.utils.jsonTryParse(potentially_invalid_json)
if success:
print(f"Parsed: {data}")
else:
print("Failed to parse JSON")
Common Patterns
Pattern 1: Basic Service Usage
# Initialize services
services = Services(user=current_user, workflow=workflow)
# Use service
result = await services.ai.callAiDocuments(
prompt="Analyze these documents",
documents=documents
)
Pattern 2: Service Composition
# Use multiple services together
async def process_and_generate(documents, prompt):
# Step 1: Extract content
extracted = services.extraction.extractContent(
documents=documents,
options=extraction_options
)
# Step 2: Process with AI
analysis = await services.ai.callAiDocuments(
prompt=prompt,
documents=documents
)
# Step 3: Generate report
pdf_content, mime = await services.generation.renderReport(
extractedContent=analysis,
outputFormat="pdf",
title="Analysis Report"
)
# Step 4: Store result
message = services.chat.storeMessageWithDocuments(
workflow=services.workflow,
messageData={"content": "Report generated"},
documents=[pdf_document]
)
return message
Pattern 3: Progress Tracking
async def long_operation_with_progress(data):
op_id = f"operation_{uuid.uuid4()}"
try:
# Start
services.chat.progressLogStart(
op_id, "My Service", "Processing", f"{len(data)} items"
)
# Process with updates
for i, item in enumerate(data):
result = await process_item(item)
progress = (i + 1) / len(data)
services.chat.progressLogUpdate(
op_id, progress, f"Processed {i+1}/{len(data)}"
)
# Success
services.chat.progressLogFinish(op_id, True)
return results
except Exception as e:
services.chat.progressLogFinish(op_id, False)
raise
Pattern 4: Error Handling
import logging
logger = logging.getLogger(__name__)
async def robust_service_call():
try:
result = await services.ai.callAiDocuments(
prompt="Process this",
documents=documents
)
return result
except ValueError as e:
# Expected validation errors
logger.warning(f"Validation error: {str(e)}")
return {"error": "Invalid input"}
except Exception as e:
# Unexpected errors
logger.error(f"Service call failed: {str(e)}", exc_info=True)
# Store error log
services.chat.storeLog(
services.workflow,
{
"message": "Service Error",
"type": "error",
"status": str(e)
}
)
raise
Error Handling
Common Exceptions
| Exception | Description | How to Handle |
|---|---|---|
ValueError |
Invalid parameters | Validate input before calling |
FileNotFoundError |
File not found | Check file existence |
PermissionError |
Access denied | Check user permissions |
TimeoutError |
Operation timeout | Implement retry logic |
ServiceException |
Service-specific error | Check error message |
Best Practices
- Always use try-except for service calls
- Log errors with context
- Use progress tracking for long operations
- Store error logs in workflow for debugging
- Provide meaningful error messages to users
Example Error Handler
class ServiceErrorHandler:
@staticmethod
async def safe_call(func, *args, **kwargs):
"""Safely call a service method with error handling"""
try:
result = await func(*args, **kwargs)
return {"success": True, "data": result}
except ValueError as e:
logger.warning(f"Validation error: {str(e)}")
return {"success": False, "error": "Invalid input", "details": str(e)}
except TimeoutError as e:
logger.error(f"Timeout: {str(e)}")
return {"success": False, "error": "Operation timeout", "details": str(e)}
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return {"success": False, "error": "Internal error", "details": str(e)}
# Usage
result = await ServiceErrorHandler.safe_call(
services.ai.callAiDocuments,
prompt="Process this",
documents=documents
)
if result["success"]:
data = result["data"]
else:
print(f"Error: {result['error']}")
Additional Resources
- Services Component Overview - Architecture and patterns
- Architecture Overview - System architecture
- Security Component - Authentication and security
- Data Models Documentation - Data structures
Document Version: 1.0
Last Updated: 2025-01-25
Status: Complete