database running

This commit is contained in:
ValueOn AG 2025-09-08 21:55:21 +02:00
parent 99c80c67b5
commit 748093b48e
10 changed files with 302 additions and 201 deletions

View file

@ -1098,36 +1098,36 @@ class HandlingTasks:
)
result_label = action.execResultLabel
# Process documents from the action result
created_documents = []
if result.success:
action.setSuccess()
# Extract result text from documents if available, otherwise use empty string
action.result = ""
if result.documents and len(result.documents) > 0:
# Try to get text content from the first document
first_doc = result.documents[0]
if isinstance(first_doc.documentData, dict):
action.result = first_doc.documentData.get("result", "")
elif isinstance(first_doc.documentData, str):
action.result = first_doc.documentData
# Preserve the action's execResultLabel for document routing
# Action methods should NOT return resultLabel - this is managed by the action handler
if not action.execResultLabel:
logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set")
# Always use the action's execResultLabel for message creation to ensure proper document routing
message_result_label = action.execResultLabel
# Create message first to get messageId, then create documents with messageId
message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index)
if message:
# Now create documents with the messageId
created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id)
# Update the message with the created documents
if created_documents:
message.documents = created_documents
# Update the message in the database
self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in created_documents]})
# Process documents from the action result
created_documents = []
if result.success:
action.setSuccess()
# Extract result text from documents if available, otherwise use empty string
action.result = ""
if result.documents and len(result.documents) > 0:
# Try to get text content from the first document
first_doc = result.documents[0]
if isinstance(first_doc.documentData, dict):
action.result = first_doc.documentData.get("result", "")
elif isinstance(first_doc.documentData, str):
action.result = first_doc.documentData
# Preserve the action's execResultLabel for document routing
# Action methods should NOT return resultLabel - this is managed by the action handler
if not action.execResultLabel:
logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set")
# Always use the action's execResultLabel for message creation to ensure proper document routing
message_result_label = action.execResultLabel
# Create message first to get messageId, then create documents with messageId
message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index)
if message:
# Now create documents with the messageId
created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id)
# Update the message with the created documents
if created_documents:
message.documents = created_documents
# Update the message in the database
self.chatInterface.updateMessage(message.id, {"documents": [doc.dict() for doc in created_documents]})
# Log action results
logger.info(f"Action completed successfully")

View file

@ -40,7 +40,7 @@ def _get_model_fields(model_class) -> Dict[str, str]:
elif field_type == int:
fields[field_name] = 'INTEGER'
elif field_type == float:
fields[field_name] = 'REAL'
fields[field_name] = 'DOUBLE PRECISION'
elif field_type == bool:
fields[field_name] = 'BOOLEAN'
else:
@ -80,7 +80,6 @@ class DatabaseConnector:
self._systemTableName = "_system"
self._initializeSystemTable()
logger.debug(f"Context: userId={self.userId}")
def initDbSystem(self):
"""Initialize the database system - creates database and tables."""
@ -154,8 +153,8 @@ class DatabaseConnector:
id SERIAL PRIMARY KEY,
table_name VARCHAR(255) UNIQUE NOT NULL,
initial_id VARCHAR(255) NOT NULL,
_createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
_modifiedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
_createdAt DOUBLE PRECISION,
_modifiedAt DOUBLE PRECISION
)
""")
@ -245,8 +244,8 @@ class DatabaseConnector:
for table_name, initial_id in data.items():
cursor.execute("""
INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt")
VALUES (%s, %s, CURRENT_TIMESTAMP)
""", (table_name, initial_id))
VALUES (%s, %s, %s)
""", (table_name, initial_id, get_utc_timestamp()))
self.connection.commit()
return True
@ -271,8 +270,8 @@ class DatabaseConnector:
CREATE TABLE "{self._systemTableName}" (
"table_name" VARCHAR(255) PRIMARY KEY,
"initial_id" VARCHAR(255),
"_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
"_createdAt" DOUBLE PRECISION,
"_modifiedAt" DOUBLE PRECISION
)
""")
logger.info("System table created successfully")
@ -285,10 +284,9 @@ class DatabaseConnector:
existing_columns = [row['column_name'] for row in cursor.fetchall()]
if '_modifiedAt' not in existing_columns:
cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION')
logger.info("Added _modifiedAt column to existing system table")
logger.debug("System table already exists")
return True
except Exception as e:
@ -313,11 +311,9 @@ class DatabaseConnector:
WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public'
''', (table,))
exists = cursor.fetchone()['count'] > 0
logger.debug(f"Table {table} exists check: {exists}")
if not exists:
# Create table from Pydantic model
logger.debug(f"Creating table {table} with model {model_class}")
self._create_table_from_model(cursor, table, model_class)
logger.info(f"Created table '{table}' with columns from Pydantic model")
@ -333,7 +329,6 @@ class DatabaseConnector:
def _create_table_from_model(self, cursor, table: str, model_class: type) -> None:
"""Create table with columns matching Pydantic model fields."""
fields = _get_model_fields(model_class)
logger.debug(f"Creating table {table} with fields: {fields}")
# Build column definitions with quoted identifiers to preserve exact case
columns = ['"id" VARCHAR(255) PRIMARY KEY']
@ -343,15 +338,14 @@ class DatabaseConnector:
# Add metadata columns
columns.extend([
'"_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
'"_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
'"_createdAt" DOUBLE PRECISION',
'"_modifiedAt" DOUBLE PRECISION',
'"_createdBy" VARCHAR(255)',
'"_modifiedBy" VARCHAR(255)'
])
# Create table
sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})'
logger.debug(f"Executing SQL: {sql}")
cursor.execute(sql)
# Create indexes for foreign keys
@ -366,8 +360,6 @@ class DatabaseConnector:
fields = _get_model_fields(model_class)
columns = ['id'] + [field for field in fields.keys() if field != 'id'] + ['_createdAt', '_createdBy', '_modifiedAt', '_modifiedBy']
logger.debug(f"Table {table} columns: {columns}")
logger.debug(f"Record data: {record}")
if not columns:
logger.error(f"No columns found for table {table}")
@ -384,17 +376,12 @@ class DatabaseConnector:
for col in columns:
value = filtered_record.get(col)
# Convert timestamp fields to proper PostgreSQL format
# Handle timestamp fields - store as Unix timestamps (floats) for consistency
if col in ['_createdAt', '_modifiedAt'] and value is not None:
if isinstance(value, (int, float)):
# Convert Unix timestamp to PostgreSQL timestamp
from datetime import datetime
value = datetime.fromtimestamp(value)
elif isinstance(value, str):
# If it's already a string, try to parse it
if isinstance(value, str):
# Try to parse string as timestamp
try:
from datetime import datetime
value = datetime.fromtimestamp(float(value))
value = float(value)
except:
pass # Keep as string if parsing fails
@ -424,7 +411,6 @@ class DatabaseConnector:
values.append(value)
logger.debug(f"Values to insert: {values}")
# Build INSERT/UPDATE with quoted identifiers
col_names = ', '.join([f'"{col}"' for col in columns])
@ -432,7 +418,6 @@ class DatabaseConnector:
updates = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ['_createdAt', '_createdBy']])
sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}'
logger.debug(f"SQL: {sql}")
cursor.execute(sql, values)
@ -454,6 +439,7 @@ class DatabaseConnector:
record = dict(row)
fields = _get_model_fields(model_class)
# Parse JSONB fields back to Python objects
for field_name, field_type in fields.items():
if field_type == 'JSONB' and field_name in record and record[field_name] is not None:
@ -527,22 +513,31 @@ class DatabaseConnector:
fields = _get_model_fields(model_class)
for record in records:
for field_name, field_type in fields.items():
if field_type == 'JSONB' and field_name in record and record[field_name] is not None:
import json
try:
if isinstance(record[field_name], str):
# Parse JSON string back to Python object
record[field_name] = json.loads(record[field_name])
elif isinstance(record[field_name], (dict, list)):
# Already a Python object, keep as is
pass
if field_type == 'JSONB' and field_name in record:
if record[field_name] is None:
# Convert None to appropriate default based on field name
if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']:
record[field_name] = []
elif field_name in ['execParameters', 'stats']:
record[field_name] = {}
else:
# Try to parse as JSON
record[field_name] = json.loads(str(record[field_name]))
except (json.JSONDecodeError, TypeError, ValueError):
# If parsing fails, keep as string
logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
pass
record[field_name] = None
else:
import json
try:
if isinstance(record[field_name], str):
# Parse JSON string back to Python object
record[field_name] = json.loads(record[field_name])
elif isinstance(record[field_name], (dict, list)):
# Already a Python object, keep as is
pass
else:
# Try to parse as JSON
record[field_name] = json.loads(str(record[field_name]))
except (json.JSONDecodeError, TypeError, ValueError):
# If parsing fails, keep as string
logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
pass
return records
except Exception as e:
@ -550,35 +545,6 @@ class DatabaseConnector:
return []
def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
"""Applies a record filter to the records"""
if not recordFilter:
return records
filteredRecords = []
for record in records:
match = True
for field, value in recordFilter.items():
# Check if the field exists
if field not in record:
match = False
break
# Convert both values to strings for comparison
recordValue = str(record[field])
filterValue = str(value)
# Direct string comparison
if recordValue != filterValue:
match = False
break
if match:
filteredRecords.append(record)
return filteredRecords
def _registerInitialId(self, table: str, initialId: str) -> bool:
"""Registers the initial ID for a table."""
@ -603,7 +569,6 @@ class DatabaseConnector:
logger.info(f"Initial ID updated from {existingInitialId} to {initialId} for table {table}")
return success
else:
logger.debug(f"Initial ID {existingInitialId} for table {table} already exists and is valid")
return True
except Exception as e:
logger.error(f"Error registering the initial ID for table {table}: {e}")
@ -699,34 +664,76 @@ class DatabaseConnector:
"""Returns a list of records from a table, filtered by criteria."""
table = model_class.__name__
# If we have specific record IDs in the filter, only load those records
if recordFilter and "id" in recordFilter:
recordId = recordFilter["id"]
record = self._loadRecord(model_class, recordId)
if record:
records = [record]
else:
try:
if not self._ensureTableExists(model_class):
return []
else:
# Load all records if no specific ID filter
records = self._loadTable(model_class)
# Apply recordFilter if available
if recordFilter:
records = self._applyRecordFilter(records, recordFilter)
# If fieldFilter is available, reduce the fields
if fieldFilter and isinstance(fieldFilter, list):
result = []
for record in records:
filteredRecord = {}
for field in fieldFilter:
if field in record:
filteredRecord[field] = record[field]
result.append(filteredRecord)
return result
return records
# Build WHERE clause from recordFilter
where_conditions = []
where_values = []
if recordFilter:
for field, value in recordFilter.items():
where_conditions.append(f'"{field}" = %s')
where_values.append(value)
# Build the query
if where_conditions:
where_clause = " WHERE " + " AND ".join(where_conditions)
else:
where_clause = ""
query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"'
with self.connection.cursor() as cursor:
cursor.execute(query, where_values)
records = [dict(row) for row in cursor.fetchall()]
# Handle JSONB fields for all records
fields = _get_model_fields(model_class)
for record in records:
for field_name, field_type in fields.items():
if field_type == 'JSONB' and field_name in record:
if record[field_name] is None:
# Convert None to appropriate default based on field name
if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']:
record[field_name] = []
elif field_name in ['execParameters', 'stats']:
record[field_name] = {}
else:
record[field_name] = None
else:
import json
try:
if isinstance(record[field_name], str):
# Parse JSON string back to Python object
record[field_name] = json.loads(record[field_name])
elif isinstance(record[field_name], (dict, list)):
# Already a Python object, keep as is
pass
else:
# Try to parse as JSON
record[field_name] = json.loads(str(record[field_name]))
except (json.JSONDecodeError, TypeError, ValueError):
# If parsing fails, keep as string
logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
pass
# If fieldFilter is available, reduce the fields
if fieldFilter and isinstance(fieldFilter, list):
result = []
for record in records:
filteredRecord = {}
for field in fieldFilter:
if field in record:
filteredRecord[field] = record[field]
result.append(filteredRecord)
return result
return records
except Exception as e:
logger.error(f"Error loading records from table {table}: {e}")
return []
def recordCreate(self, model_class: type, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]:
"""Creates a new record in a table based on Pydantic model class."""
@ -793,7 +800,7 @@ class DatabaseConnector:
with self.connection.cursor() as cursor:
# Check if record exists
cursor.execute(f"SELECT id FROM {table} WHERE id = %s", (recordId,))
cursor.execute(f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,))
if not cursor.fetchone():
return False
@ -804,7 +811,7 @@ class DatabaseConnector:
logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table")
# Delete the record
cursor.execute(f"DELETE FROM {table} WHERE id = %s", (recordId,))
cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,))
# No cache to update - database handles consistency
@ -822,14 +829,12 @@ class DatabaseConnector:
table = model_class.__name__
systemData = self._loadSystemTable()
initialId = systemData.get(table)
logger.debug(f"Initial ID for table '{table}': {initialId}")
return initialId
def close(self):
"""Close the database connection."""
if hasattr(self, 'connection') and self.connection and not self.connection.closed:
self.connection.close()
logger.debug("Database connection closed")
def __del__(self):
"""Cleanup method to close connection."""

View file

@ -728,7 +728,6 @@ class AppObjects:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
logger.debug(f"Deleted old access token {old_token['id']} for user {self.currentUser.id} and authority {token.authority}")
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old access tokens for user {self.currentUser.id} and authority {token.authority}")
@ -781,7 +780,6 @@ class AppObjects:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
logger.debug(f"Deleted old token {old_token['id']} for connectionId {token.connectionId}")
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old tokens for connectionId {token.connectionId}")
@ -864,17 +862,6 @@ class AppObjects:
"connectionId": connectionId
})
# Debug: Log what we found
logger.debug(f"getConnectionToken: Found {len(tokens)} tokens for connectionId {connectionId}")
if tokens:
for i, token in enumerate(tokens):
logger.debug(f"getConnectionToken: Token {i}: id={token.get('id')}, expiresAt={token.get('expiresAt')}, createdAt={token.get('createdAt')}")
else:
# Debug: Check if there are any tokens at all in the database
all_tokens = self.db.getRecordset(Token, recordFilter={})
logger.debug(f"getConnectionToken: No tokens found for connectionId {connectionId}. Total tokens in database: {len(all_tokens)}")
if all_tokens:
logger.debug(f"getConnectionToken: Sample tokens: {[{'id': t.get('id'), 'connectionId': t.get('connectionId'), 'authority': t.get('authority')} for t in all_tokens[:3]]}")
if not tokens:
logger.warning(f"No connection token found for connectionId: {connectionId}")
@ -890,25 +877,21 @@ class AppObjects:
if latest_token.expiresAt and latest_token.expiresAt < (current_time + thirty_minutes):
if auto_refresh:
logger.debug(f"getConnectionToken: Token expires soon, attempting refresh. expiresAt: {latest_token.expiresAt}, current_time: {current_time}")
# Import TokenManager here to avoid circular imports
from modules.security.tokenManager import TokenManager
token_manager = TokenManager()
# Try to refresh the token
logger.debug(f"getConnectionToken: Calling token_manager.refresh_token for token {latest_token.id}")
refreshed_token = token_manager.refresh_token(latest_token)
if refreshed_token:
logger.debug(f"getConnectionToken: Token refresh successful, saving new token {refreshed_token.id}")
# Save the new token (which will automatically replace old ones)
self.saveConnectionToken(refreshed_token)
logger.info(f"Proactively refreshed connection token for connectionId {connectionId} (expired in {latest_token.expiresAt - current_time} seconds)")
return refreshed_token
else:
logger.warning(f"getConnectionToken: Token refresh failed for connectionId {connectionId}")
logger.warning(f"Token refresh failed for connectionId {connectionId}")
return None
else:
logger.warning(f"Connection token for connectionId {connectionId} expires soon (expiresAt: {latest_token.expiresAt})")
@ -1047,13 +1030,9 @@ def getRootUser() -> User:
if not users:
raise ValueError("Initial user not found in database")
logger.debug(f"Retrieved user data: {users[0]}")
# Convert to User model and return the model instance
user_data = users[0]
logger.debug(f"User data keys: {list(user_data.keys())}")
logger.debug(f"User id: {user_data.get('id')}")
logger.debug(f"User mandateId: {user_data.get('mandateId')}")
return User.parse_obj(user_data)

View file

@ -200,6 +200,9 @@ register_model_labels(
"id": {"en": "ID", "fr": "ID"},
"messageId": {"en": "Message ID", "fr": "ID du message"},
"fileId": {"en": "File ID", "fr": "ID du fichier"},
"fileName": {"en": "File Name", "fr": "Nom du fichier"},
"fileSize": {"en": "File Size", "fr": "Taille du fichier"},
"mimeType": {"en": "MIME Type", "fr": "Type MIME"},
"roundNumber": {"en": "Round Number", "fr": "Numéro de tour"},
"taskNumber": {"en": "Task Number", "fr": "Numéro de tâche"},
"actionNumber": {"en": "Action Number", "fr": "Numéro d'action"},

View file

@ -130,7 +130,6 @@ class ChatObjects:
except Exception as e:
logger.error(f"Error closing database connection: {e}")
logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
@ -307,7 +306,6 @@ class ChatObjects:
log_dict = log_data
log_dict["workflowId"] = workflowId
self.createLog(log_dict)
logger.debug(f"Updated {len(logs_data)} logs for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow logs: {str(e)}")
if 'messages' in object_fields:
@ -322,7 +320,6 @@ class ChatObjects:
msg_dict = message_data
msg_dict["workflowId"] = workflowId
self.updateMessage(msg_dict.get("id"), msg_dict)
logger.debug(f"Updated {len(messages_data)} messages for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow messages: {str(e)}")
if 'stats' in object_fields:
@ -331,7 +328,6 @@ class ChatObjects:
if stats_data:
stats_data["workflowId"] = workflowId
self.db.recordCreate(ChatStat, stats_data)
logger.debug(f"Updated stats for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
@ -402,7 +398,6 @@ class ChatObjects:
# 4. Finally delete the workflow itself
success = self.db.recordDelete(ChatWorkflow, workflowId)
logger.debug(f"Successfully deleted workflow {workflowId} and all related data")
return success
except Exception as e:
@ -461,6 +456,7 @@ class ChatObjects:
chat_messages.append(chat_message)
return chat_messages
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
@ -505,15 +501,12 @@ class ChatObjects:
# This ensures messages have the correct progress context when workflows are continued
if "roundNumber" not in messageData:
messageData["roundNumber"] = workflow.currentRound
logger.debug(f"Auto-setting roundNumber to {workflow.currentRound} for message {messageData['id']}")
if "taskNumber" not in messageData:
messageData["taskNumber"] = workflow.currentTask
logger.debug(f"Auto-setting taskNumber to {workflow.currentTask} for message {messageData['id']}")
if "actionNumber" not in messageData:
messageData["actionNumber"] = workflow.currentAction
logger.debug(f"Auto-setting actionNumber to {workflow.currentAction} for message {messageData['id']}")
# Use generic field separation based on ChatMessage model
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
@ -571,7 +564,6 @@ class ChatObjects:
def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a workflow message if user has access to the workflow."""
try:
logger.debug(f"Updating message {messageId} in database")
# Ensure messageId is provided
if not messageId:
@ -646,7 +638,6 @@ class ChatObjects:
doc_dict = doc_data
doc_dict["messageId"] = messageId
self.createDocument(doc_dict)
logger.debug(f"Updated {len(documents_data)} documents for message {messageId}")
except Exception as e:
logger.error(f"Error updating message documents: {str(e)}")
if 'stats' in object_fields:
@ -655,12 +646,9 @@ class ChatObjects:
if stats_data:
stats_data["messageId"] = messageId
self.db.recordCreate(ChatStat, stats_data)
logger.debug(f"Updated stats for message {messageId}")
except Exception as e:
logger.error(f"Error updating message stats: {str(e)}")
if updatedMessage:
logger.debug(f"Message {messageId} updated successfully")
else:
if not updatedMessage:
logger.warning(f"Failed to update message {messageId}")
return updatedMessage
@ -703,7 +691,6 @@ class ChatObjects:
# 3. Finally delete the message itself
success = self.db.recordDelete(ChatMessage, messageId)
logger.debug(f"Successfully deleted message {messageId} and all related data")
return success
except Exception as e:
@ -722,7 +709,6 @@ class ChatObjects:
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
logger.debug(f"Removing file {fileId} from message {messageId} in workflow {workflowId}")
# Get documents for this message from normalized table
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
@ -750,7 +736,6 @@ class ChatObjects:
success = self.db.recordDelete(ChatDocument, docId)
if success:
removed = True
logger.debug(f"Successfully removed document {docId} (fileId: {fileIdValue})")
else:
logger.warning(f"Failed to delete document {docId}")
@ -758,7 +743,6 @@ class ChatObjects:
logger.warning(f"No matching file {fileId} found in message {messageId}")
return False
logger.debug(f"Successfully removed file {fileId} from message {messageId}")
return True
except Exception as e:
@ -902,6 +886,100 @@ class ChatObjects:
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return ChatStat(**stats[0])
def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]:
"""
Returns unified chat data (messages, logs, stats) for a workflow in chronological order.
Uses timestamp-based selective data transfer for efficient polling.
"""
# Check workflow access first
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return {"items": []}
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return {"items": []}
# Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators)
items = []
# Get messages
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
for msg in messages:
# Apply timestamp filtering in Python
msg_timestamp = msg.get("publishedAt", get_utc_timestamp())
if afterTimestamp is not None and msg_timestamp <= afterTimestamp:
continue
# Load documents for each message
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", get_utc_timestamp()),
stats=self.getMessageStats(msg["id"]),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
# Use publishedAt as the timestamp for chronological ordering
items.append({
"type": "message",
"createdAt": msg_timestamp,
"item": chat_message.dict()
})
# Get logs
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
for log in logs:
# Apply timestamp filtering in Python
log_timestamp = log.get("timestamp", get_utc_timestamp())
if afterTimestamp is not None and log_timestamp <= afterTimestamp:
continue
chat_log = ChatLog(**log)
items.append({
"type": "log",
"createdAt": log_timestamp,
"item": chat_log.dict()
})
# Get stats
stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
for stat in stats:
# Apply timestamp filtering in Python
stat_timestamp = stat.get("_createdAt", get_utc_timestamp())
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
continue
chat_stat = ChatStat(**stat)
items.append({
"type": "stat",
"createdAt": stat_timestamp,
"item": chat_stat.dict()
})
# Sort all items by createdAt timestamp for chronological order
items.sort(key=lambda x: x["createdAt"])
return {"items": items}
def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool:
"""Updates workflow statistics during execution with incremental values."""
try:

View file

@ -504,7 +504,9 @@ class TicketSharepointSyncInterface:
except Exception as e:
# If audit logging fails, we don't want to break the main sync process
# Just log the error (this could be enhanced with fallback logging)
print(f"Failed to write audit log: {str(e)}")
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Failed to write audit log: {str(e)}")
def _create_csv_content(self, data: list[dict]) -> bytes:
"""Create CSV content with 4-row structure matching reference code."""

View file

@ -68,8 +68,8 @@ async def perform_sync_jira_delta_group():
sharepoint_site_url = None
# Jira connection parameters
jira_username = None
jira_api_token = None
jira_username = "ONHOLD - TASK - p.motsch@valueon.ch"
jira_api_token = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA"
jira_url = "https://deltasecurity.atlassian.net"
project_code = "DCS"
issue_type = "Task"

View file

@ -59,29 +59,16 @@ async def get_workflows(
appInterface = getInterface(currentUser)
workflows_data = appInterface.getWorkflows()
# Convert raw dictionaries to ChatWorkflow objects
# Convert raw dictionaries to ChatWorkflow objects by loading each workflow properly
workflows = []
for workflow_data in workflows_data:
try:
workflow = ChatWorkflow(
id=workflow_data["id"],
status=workflow_data.get("status", "running"),
name=workflow_data.get("name"),
currentRound=workflow_data.get("currentRound", 0), # Default value
currentTask=workflow_data.get("currentTask", 0),
currentAction=workflow_data.get("currentAction", 0),
totalTasks=workflow_data.get("totalTasks", 0),
totalActions=workflow_data.get("totalActions", 0),
lastActivity=workflow_data.get("lastActivity", get_utc_timestamp()),
startedAt=workflow_data.get("startedAt", get_utc_timestamp()),
logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
stats=ChatStat(**workflow_data.get("stats", {})) if workflow_data.get("stats") else None,
mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
)
workflows.append(workflow)
# Load the workflow properly using the same method as individual workflow endpoint
workflow = appInterface.getWorkflow(workflow_data["id"])
if workflow:
workflows.append(workflow)
except Exception as e:
logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}")
logger.warning(f"Error loading workflow {workflow_data.get('id', 'unknown')}: {str(e)}")
# Skip invalid workflows instead of failing the entire request
continue
@ -276,7 +263,8 @@ async def get_workflow_messages(
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
return allMessages[messageIndex + 1:]
filteredMessages = allMessages[messageIndex + 1:]
return filteredMessages
return allMessages
except HTTPException:
@ -395,6 +383,45 @@ async def delete_workflow(
)
# Unified Chat Data Endpoint for Polling
@router.get("/{workflowId}/chatData")
@limiter.limit("120/minute")
async def get_workflow_chat_data(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer.
Returns all data types in chronological order based on _createdAt timestamp.
"""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get unified chat data using the new method
chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp)
return chatData
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting unified chat data: {str(e)}"
)
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])

View file

@ -5,6 +5,7 @@ Ensures all timestamps are properly handled as UTC.
from datetime import datetime, timezone, timedelta
from typing import Union, Optional
import time
def get_utc_now() -> datetime:
"""
@ -17,12 +18,12 @@ def get_utc_now() -> datetime:
def get_utc_timestamp() -> float:
"""
Get current UTC timestamp (seconds since epoch).
Get current UTC timestamp (seconds since epoch with millisecond precision).
Returns:
float: Current UTC timestamp in seconds
float: Current UTC timestamp in seconds with millisecond precision
"""
return datetime.now(timezone.utc).timestamp()
return time.time()
def to_utc_timestamp(dt: datetime) -> float:
"""

View file

@ -24,6 +24,12 @@ TODO
- check zusammenfassung von 10 dokumenten >10 MB
- test case bewerbung
# Ida changes gateway:
- Polling endpoint + doku dazu
- files in documents integriert --> document endpoint for files
- prompts in chat endpoint
-
# DOCUMENTATION
Design principles
- UI: Module classes for data management (CRUD tables & forms --> formGeneric)