From 748093b48e9dd70a3fd5f15d29400e7bc531c975 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 8 Sep 2025 21:55:21 +0200
Subject: [PATCH] database running
---
modules/chat/handling/handlingTasks.py | 60 +++---
modules/connectors/connectorDbPostgre.py | 215 ++++++++++---------
modules/interfaces/interfaceAppObjects.py | 23 +-
modules/interfaces/interfaceChatModel.py | 3 +
modules/interfaces/interfaceChatObjects.py | 114 ++++++++--
modules/interfaces/interfaceTicketObjects.py | 4 +-
modules/routes/routeJira.py | 4 +-
modules/routes/routeWorkflows.py | 67 ++++--
modules/shared/timezoneUtils.py | 7 +-
notes/changelog.txt | 6 +
10 files changed, 302 insertions(+), 201 deletions(-)
diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py
index 20cfbe13..cfc31401 100644
--- a/modules/chat/handling/handlingTasks.py
+++ b/modules/chat/handling/handlingTasks.py
@@ -1098,36 +1098,36 @@ class HandlingTasks:
)
result_label = action.execResultLabel
- # Process documents from the action result
- created_documents = []
- if result.success:
- action.setSuccess()
- # Extract result text from documents if available, otherwise use empty string
- action.result = ""
- if result.documents and len(result.documents) > 0:
- # Try to get text content from the first document
- first_doc = result.documents[0]
- if isinstance(first_doc.documentData, dict):
- action.result = first_doc.documentData.get("result", "")
- elif isinstance(first_doc.documentData, str):
- action.result = first_doc.documentData
- # Preserve the action's execResultLabel for document routing
- # Action methods should NOT return resultLabel - this is managed by the action handler
- if not action.execResultLabel:
- logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set")
- # Always use the action's execResultLabel for message creation to ensure proper document routing
- message_result_label = action.execResultLabel
-
- # Create message first to get messageId, then create documents with messageId
- message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index)
- if message:
- # Now create documents with the messageId
- created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id)
- # Update the message with the created documents
- if created_documents:
- message.documents = created_documents
- # Update the message in the database
- self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in created_documents]})
+ # Process documents from the action result
+ created_documents = []
+ if result.success:
+ action.setSuccess()
+ # Extract result text from documents if available, otherwise use empty string
+ action.result = ""
+ if result.documents and len(result.documents) > 0:
+ # Try to get text content from the first document
+ first_doc = result.documents[0]
+ if isinstance(first_doc.documentData, dict):
+ action.result = first_doc.documentData.get("result", "")
+ elif isinstance(first_doc.documentData, str):
+ action.result = first_doc.documentData
+ # Preserve the action's execResultLabel for document routing
+ # Action methods should NOT return resultLabel - this is managed by the action handler
+ if not action.execResultLabel:
+ logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set")
+ # Always use the action's execResultLabel for message creation to ensure proper document routing
+ message_result_label = action.execResultLabel
+
+ # Create message first to get messageId, then create documents with messageId
+ message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index)
+ if message:
+ # Now create documents with the messageId
+ created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id)
+ # Update the message with the created documents
+ if created_documents:
+ message.documents = created_documents
+ # Update the message in the database
+ self.chatInterface.updateMessage(message.id, {"documents": [doc.dict() for doc in created_documents]})
# Log action results
logger.info(f"Action completed successfully")
diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py
index eea519e8..dfee166a 100644
--- a/modules/connectors/connectorDbPostgre.py
+++ b/modules/connectors/connectorDbPostgre.py
@@ -40,7 +40,7 @@ def _get_model_fields(model_class) -> Dict[str, str]:
elif field_type == int:
fields[field_name] = 'INTEGER'
elif field_type == float:
- fields[field_name] = 'REAL'
+ fields[field_name] = 'DOUBLE PRECISION'
elif field_type == bool:
fields[field_name] = 'BOOLEAN'
else:
@@ -80,7 +80,6 @@ class DatabaseConnector:
self._systemTableName = "_system"
self._initializeSystemTable()
- logger.debug(f"Context: userId={self.userId}")
def initDbSystem(self):
"""Initialize the database system - creates database and tables."""
@@ -154,8 +153,8 @@ class DatabaseConnector:
id SERIAL PRIMARY KEY,
table_name VARCHAR(255) UNIQUE NOT NULL,
initial_id VARCHAR(255) NOT NULL,
- _createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- _modifiedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ _createdAt DOUBLE PRECISION,
+ _modifiedAt DOUBLE PRECISION
)
""")
@@ -245,8 +244,8 @@ class DatabaseConnector:
for table_name, initial_id in data.items():
cursor.execute("""
INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt")
- VALUES (%s, %s, CURRENT_TIMESTAMP)
- """, (table_name, initial_id))
+ VALUES (%s, %s, %s)
+ """, (table_name, initial_id, get_utc_timestamp()))
self.connection.commit()
return True
@@ -271,8 +270,8 @@ class DatabaseConnector:
CREATE TABLE "{self._systemTableName}" (
"table_name" VARCHAR(255) PRIMARY KEY,
"initial_id" VARCHAR(255),
- "_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- "_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ "_createdAt" DOUBLE PRECISION,
+ "_modifiedAt" DOUBLE PRECISION
)
""")
logger.info("System table created successfully")
@@ -285,10 +284,9 @@ class DatabaseConnector:
existing_columns = [row['column_name'] for row in cursor.fetchall()]
if '_modifiedAt' not in existing_columns:
- cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
+ cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION')
logger.info("Added _modifiedAt column to existing system table")
- logger.debug("System table already exists")
return True
except Exception as e:
@@ -313,11 +311,9 @@ class DatabaseConnector:
WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public'
''', (table,))
exists = cursor.fetchone()['count'] > 0
- logger.debug(f"Table {table} exists check: {exists}")
if not exists:
# Create table from Pydantic model
- logger.debug(f"Creating table {table} with model {model_class}")
self._create_table_from_model(cursor, table, model_class)
logger.info(f"Created table '{table}' with columns from Pydantic model")
@@ -333,7 +329,6 @@ class DatabaseConnector:
def _create_table_from_model(self, cursor, table: str, model_class: type) -> None:
"""Create table with columns matching Pydantic model fields."""
fields = _get_model_fields(model_class)
- logger.debug(f"Creating table {table} with fields: {fields}")
# Build column definitions with quoted identifiers to preserve exact case
columns = ['"id" VARCHAR(255) PRIMARY KEY']
@@ -343,15 +338,14 @@ class DatabaseConnector:
# Add metadata columns
columns.extend([
- '"_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
- '"_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP',
+ '"_createdAt" DOUBLE PRECISION',
+ '"_modifiedAt" DOUBLE PRECISION',
'"_createdBy" VARCHAR(255)',
'"_modifiedBy" VARCHAR(255)'
])
# Create table
sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})'
- logger.debug(f"Executing SQL: {sql}")
cursor.execute(sql)
# Create indexes for foreign keys
@@ -366,8 +360,6 @@ class DatabaseConnector:
fields = _get_model_fields(model_class)
columns = ['id'] + [field for field in fields.keys() if field != 'id'] + ['_createdAt', '_createdBy', '_modifiedAt', '_modifiedBy']
- logger.debug(f"Table {table} columns: {columns}")
- logger.debug(f"Record data: {record}")
if not columns:
logger.error(f"No columns found for table {table}")
@@ -384,17 +376,12 @@ class DatabaseConnector:
for col in columns:
value = filtered_record.get(col)
- # Convert timestamp fields to proper PostgreSQL format
+ # Handle timestamp fields - store as Unix timestamps (floats) for consistency
if col in ['_createdAt', '_modifiedAt'] and value is not None:
- if isinstance(value, (int, float)):
- # Convert Unix timestamp to PostgreSQL timestamp
- from datetime import datetime
- value = datetime.fromtimestamp(value)
- elif isinstance(value, str):
- # If it's already a string, try to parse it
+ if isinstance(value, str):
+ # Try to parse string as timestamp
try:
- from datetime import datetime
- value = datetime.fromtimestamp(float(value))
+ value = float(value)
except:
pass # Keep as string if parsing fails
@@ -424,7 +411,6 @@ class DatabaseConnector:
values.append(value)
- logger.debug(f"Values to insert: {values}")
# Build INSERT/UPDATE with quoted identifiers
col_names = ', '.join([f'"{col}"' for col in columns])
@@ -432,7 +418,6 @@ class DatabaseConnector:
updates = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ['_createdAt', '_createdBy']])
sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}'
- logger.debug(f"SQL: {sql}")
cursor.execute(sql, values)
@@ -454,6 +439,7 @@ class DatabaseConnector:
record = dict(row)
fields = _get_model_fields(model_class)
+
# Parse JSONB fields back to Python objects
for field_name, field_type in fields.items():
if field_type == 'JSONB' and field_name in record and record[field_name] is not None:
@@ -527,22 +513,31 @@ class DatabaseConnector:
fields = _get_model_fields(model_class)
for record in records:
for field_name, field_type in fields.items():
- if field_type == 'JSONB' and field_name in record and record[field_name] is not None:
- import json
- try:
- if isinstance(record[field_name], str):
- # Parse JSON string back to Python object
- record[field_name] = json.loads(record[field_name])
- elif isinstance(record[field_name], (dict, list)):
- # Already a Python object, keep as is
- pass
+ if field_type == 'JSONB' and field_name in record:
+ if record[field_name] is None:
+ # Convert None to appropriate default based on field name
+ if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']:
+ record[field_name] = []
+ elif field_name in ['execParameters', 'stats']:
+ record[field_name] = {}
else:
- # Try to parse as JSON
- record[field_name] = json.loads(str(record[field_name]))
- except (json.JSONDecodeError, TypeError, ValueError):
- # If parsing fails, keep as string
- logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
- pass
+ record[field_name] = None
+ else:
+ import json
+ try:
+ if isinstance(record[field_name], str):
+ # Parse JSON string back to Python object
+ record[field_name] = json.loads(record[field_name])
+ elif isinstance(record[field_name], (dict, list)):
+ # Already a Python object, keep as is
+ pass
+ else:
+ # Try to parse as JSON
+ record[field_name] = json.loads(str(record[field_name]))
+ except (json.JSONDecodeError, TypeError, ValueError):
+ # If parsing fails, keep as string
+ logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
+ pass
return records
except Exception as e:
@@ -550,35 +545,6 @@ class DatabaseConnector:
return []
- def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]:
- """Applies a record filter to the records"""
- if not recordFilter:
- return records
-
- filteredRecords = []
-
- for record in records:
- match = True
-
- for field, value in recordFilter.items():
- # Check if the field exists
- if field not in record:
- match = False
- break
-
- # Convert both values to strings for comparison
- recordValue = str(record[field])
- filterValue = str(value)
-
- # Direct string comparison
- if recordValue != filterValue:
- match = False
- break
-
- if match:
- filteredRecords.append(record)
-
- return filteredRecords
def _registerInitialId(self, table: str, initialId: str) -> bool:
"""Registers the initial ID for a table."""
@@ -603,7 +569,6 @@ class DatabaseConnector:
logger.info(f"Initial ID updated from {existingInitialId} to {initialId} for table {table}")
return success
else:
- logger.debug(f"Initial ID {existingInitialId} for table {table} already exists and is valid")
return True
except Exception as e:
logger.error(f"Error registering the initial ID for table {table}: {e}")
@@ -699,34 +664,76 @@ class DatabaseConnector:
"""Returns a list of records from a table, filtered by criteria."""
table = model_class.__name__
- # If we have specific record IDs in the filter, only load those records
- if recordFilter and "id" in recordFilter:
- recordId = recordFilter["id"]
- record = self._loadRecord(model_class, recordId)
- if record:
- records = [record]
- else:
+ try:
+ if not self._ensureTableExists(model_class):
return []
- else:
- # Load all records if no specific ID filter
- records = self._loadTable(model_class)
-
- # Apply recordFilter if available
- if recordFilter:
- records = self._applyRecordFilter(records, recordFilter)
-
- # If fieldFilter is available, reduce the fields
- if fieldFilter and isinstance(fieldFilter, list):
- result = []
- for record in records:
- filteredRecord = {}
- for field in fieldFilter:
- if field in record:
- filteredRecord[field] = record[field]
- result.append(filteredRecord)
- return result
-
- return records
+
+ # Build WHERE clause from recordFilter
+ where_conditions = []
+ where_values = []
+
+ if recordFilter:
+ for field, value in recordFilter.items():
+ where_conditions.append(f'"{field}" = %s')
+ where_values.append(value)
+
+ # Build the query
+ if where_conditions:
+ where_clause = " WHERE " + " AND ".join(where_conditions)
+ else:
+ where_clause = ""
+
+ query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"'
+
+ with self.connection.cursor() as cursor:
+ cursor.execute(query, where_values)
+ records = [dict(row) for row in cursor.fetchall()]
+
+ # Handle JSONB fields for all records
+ fields = _get_model_fields(model_class)
+ for record in records:
+ for field_name, field_type in fields.items():
+ if field_type == 'JSONB' and field_name in record:
+ if record[field_name] is None:
+ # Convert None to appropriate default based on field name
+ if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']:
+ record[field_name] = []
+ elif field_name in ['execParameters', 'stats']:
+ record[field_name] = {}
+ else:
+ record[field_name] = None
+ else:
+ import json
+ try:
+ if isinstance(record[field_name], str):
+ # Parse JSON string back to Python object
+ record[field_name] = json.loads(record[field_name])
+ elif isinstance(record[field_name], (dict, list)):
+ # Already a Python object, keep as is
+ pass
+ else:
+ # Try to parse as JSON
+ record[field_name] = json.loads(str(record[field_name]))
+ except (json.JSONDecodeError, TypeError, ValueError):
+ # If parsing fails, keep as string
+ logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}")
+ pass
+
+ # If fieldFilter is available, reduce the fields
+ if fieldFilter and isinstance(fieldFilter, list):
+ result = []
+ for record in records:
+ filteredRecord = {}
+ for field in fieldFilter:
+ if field in record:
+ filteredRecord[field] = record[field]
+ result.append(filteredRecord)
+ return result
+
+ return records
+ except Exception as e:
+ logger.error(f"Error loading records from table {table}: {e}")
+ return []
def recordCreate(self, model_class: type, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]:
"""Creates a new record in a table based on Pydantic model class."""
@@ -793,7 +800,7 @@ class DatabaseConnector:
with self.connection.cursor() as cursor:
# Check if record exists
- cursor.execute(f"SELECT id FROM {table} WHERE id = %s", (recordId,))
+ cursor.execute(f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,))
if not cursor.fetchone():
return False
@@ -804,7 +811,7 @@ class DatabaseConnector:
logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table")
# Delete the record
- cursor.execute(f"DELETE FROM {table} WHERE id = %s", (recordId,))
+ cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,))
# No cache to update - database handles consistency
@@ -822,14 +829,12 @@ class DatabaseConnector:
table = model_class.__name__
systemData = self._loadSystemTable()
initialId = systemData.get(table)
- logger.debug(f"Initial ID for table '{table}': {initialId}")
return initialId
def close(self):
"""Close the database connection."""
if hasattr(self, 'connection') and self.connection and not self.connection.closed:
self.connection.close()
- logger.debug("Database connection closed")
def __del__(self):
"""Cleanup method to close connection."""
diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py
index 75af8878..c71e0c03 100644
--- a/modules/interfaces/interfaceAppObjects.py
+++ b/modules/interfaces/interfaceAppObjects.py
@@ -728,7 +728,6 @@ class AppObjects:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
- logger.debug(f"Deleted old access token {old_token['id']} for user {self.currentUser.id} and authority {token.authority}")
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old access tokens for user {self.currentUser.id} and authority {token.authority}")
@@ -781,7 +780,6 @@ class AppObjects:
if old_token["id"] != token.id: # Don't delete the new token if it already exists
self.db.recordDelete(Token, old_token["id"])
deleted_count += 1
- logger.debug(f"Deleted old token {old_token['id']} for connectionId {token.connectionId}")
if deleted_count > 0:
logger.info(f"Replaced {deleted_count} old tokens for connectionId {token.connectionId}")
@@ -864,17 +862,6 @@ class AppObjects:
"connectionId": connectionId
})
- # Debug: Log what we found
- logger.debug(f"getConnectionToken: Found {len(tokens)} tokens for connectionId {connectionId}")
- if tokens:
- for i, token in enumerate(tokens):
- logger.debug(f"getConnectionToken: Token {i}: id={token.get('id')}, expiresAt={token.get('expiresAt')}, createdAt={token.get('createdAt')}")
- else:
- # Debug: Check if there are any tokens at all in the database
- all_tokens = self.db.getRecordset(Token, recordFilter={})
- logger.debug(f"getConnectionToken: No tokens found for connectionId {connectionId}. Total tokens in database: {len(all_tokens)}")
- if all_tokens:
- logger.debug(f"getConnectionToken: Sample tokens: {[{'id': t.get('id'), 'connectionId': t.get('connectionId'), 'authority': t.get('authority')} for t in all_tokens[:3]]}")
if not tokens:
logger.warning(f"No connection token found for connectionId: {connectionId}")
@@ -890,25 +877,21 @@ class AppObjects:
if latest_token.expiresAt and latest_token.expiresAt < (current_time + thirty_minutes):
if auto_refresh:
- logger.debug(f"getConnectionToken: Token expires soon, attempting refresh. expiresAt: {latest_token.expiresAt}, current_time: {current_time}")
-
# Import TokenManager here to avoid circular imports
from modules.security.tokenManager import TokenManager
token_manager = TokenManager()
# Try to refresh the token
- logger.debug(f"getConnectionToken: Calling token_manager.refresh_token for token {latest_token.id}")
refreshed_token = token_manager.refresh_token(latest_token)
if refreshed_token:
- logger.debug(f"getConnectionToken: Token refresh successful, saving new token {refreshed_token.id}")
# Save the new token (which will automatically replace old ones)
self.saveConnectionToken(refreshed_token)
logger.info(f"Proactively refreshed connection token for connectionId {connectionId} (expired in {latest_token.expiresAt - current_time} seconds)")
return refreshed_token
else:
- logger.warning(f"getConnectionToken: Token refresh failed for connectionId {connectionId}")
+ logger.warning(f"Token refresh failed for connectionId {connectionId}")
return None
else:
logger.warning(f"Connection token for connectionId {connectionId} expires soon (expiresAt: {latest_token.expiresAt})")
@@ -1047,13 +1030,9 @@ def getRootUser() -> User:
if not users:
raise ValueError("Initial user not found in database")
- logger.debug(f"Retrieved user data: {users[0]}")
# Convert to User model and return the model instance
user_data = users[0]
- logger.debug(f"User data keys: {list(user_data.keys())}")
- logger.debug(f"User id: {user_data.get('id')}")
- logger.debug(f"User mandateId: {user_data.get('mandateId')}")
return User.parse_obj(user_data)
diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py
index 5633e8f8..ed71963a 100644
--- a/modules/interfaces/interfaceChatModel.py
+++ b/modules/interfaces/interfaceChatModel.py
@@ -200,6 +200,9 @@ register_model_labels(
"id": {"en": "ID", "fr": "ID"},
"messageId": {"en": "Message ID", "fr": "ID du message"},
"fileId": {"en": "File ID", "fr": "ID du fichier"},
+ "fileName": {"en": "File Name", "fr": "Nom du fichier"},
+ "fileSize": {"en": "File Size", "fr": "Taille du fichier"},
+ "mimeType": {"en": "MIME Type", "fr": "Type MIME"},
"roundNumber": {"en": "Round Number", "fr": "Numéro de tour"},
"taskNumber": {"en": "Task Number", "fr": "Numéro de tâche"},
"actionNumber": {"en": "Action Number", "fr": "Numéro d'action"},
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index 8aca736a..43ad3f97 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -130,7 +130,6 @@ class ChatObjects:
except Exception as e:
logger.error(f"Error closing database connection: {e}")
- logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
@@ -307,7 +306,6 @@ class ChatObjects:
log_dict = log_data
log_dict["workflowId"] = workflowId
self.createLog(log_dict)
- logger.debug(f"Updated {len(logs_data)} logs for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow logs: {str(e)}")
if 'messages' in object_fields:
@@ -322,7 +320,6 @@ class ChatObjects:
msg_dict = message_data
msg_dict["workflowId"] = workflowId
self.updateMessage(msg_dict.get("id"), msg_dict)
- logger.debug(f"Updated {len(messages_data)} messages for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow messages: {str(e)}")
if 'stats' in object_fields:
@@ -331,7 +328,6 @@ class ChatObjects:
if stats_data:
stats_data["workflowId"] = workflowId
self.db.recordCreate(ChatStat, stats_data)
- logger.debug(f"Updated stats for workflow {workflowId}")
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
@@ -402,7 +398,6 @@ class ChatObjects:
# 4. Finally delete the workflow itself
success = self.db.recordDelete(ChatWorkflow, workflowId)
- logger.debug(f"Successfully deleted workflow {workflowId} and all related data")
return success
except Exception as e:
@@ -461,6 +456,7 @@ class ChatObjects:
chat_messages.append(chat_message)
+
return chat_messages
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
@@ -505,15 +501,12 @@ class ChatObjects:
# This ensures messages have the correct progress context when workflows are continued
if "roundNumber" not in messageData:
messageData["roundNumber"] = workflow.currentRound
- logger.debug(f"Auto-setting roundNumber to {workflow.currentRound} for message {messageData['id']}")
if "taskNumber" not in messageData:
messageData["taskNumber"] = workflow.currentTask
- logger.debug(f"Auto-setting taskNumber to {workflow.currentTask} for message {messageData['id']}")
if "actionNumber" not in messageData:
messageData["actionNumber"] = workflow.currentAction
- logger.debug(f"Auto-setting actionNumber to {workflow.currentAction} for message {messageData['id']}")
# Use generic field separation based on ChatMessage model
simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData)
@@ -571,7 +564,6 @@ class ChatObjects:
def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a workflow message if user has access to the workflow."""
try:
- logger.debug(f"Updating message {messageId} in database")
# Ensure messageId is provided
if not messageId:
@@ -646,7 +638,6 @@ class ChatObjects:
doc_dict = doc_data
doc_dict["messageId"] = messageId
self.createDocument(doc_dict)
- logger.debug(f"Updated {len(documents_data)} documents for message {messageId}")
except Exception as e:
logger.error(f"Error updating message documents: {str(e)}")
if 'stats' in object_fields:
@@ -655,12 +646,9 @@ class ChatObjects:
if stats_data:
stats_data["messageId"] = messageId
self.db.recordCreate(ChatStat, stats_data)
- logger.debug(f"Updated stats for message {messageId}")
except Exception as e:
logger.error(f"Error updating message stats: {str(e)}")
- if updatedMessage:
- logger.debug(f"Message {messageId} updated successfully")
- else:
+ if not updatedMessage:
logger.warning(f"Failed to update message {messageId}")
return updatedMessage
@@ -703,7 +691,6 @@ class ChatObjects:
# 3. Finally delete the message itself
success = self.db.recordDelete(ChatMessage, messageId)
- logger.debug(f"Successfully deleted message {messageId} and all related data")
return success
except Exception as e:
@@ -722,7 +709,6 @@ class ChatObjects:
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
- logger.debug(f"Removing file {fileId} from message {messageId} in workflow {workflowId}")
# Get documents for this message from normalized table
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
@@ -750,7 +736,6 @@ class ChatObjects:
success = self.db.recordDelete(ChatDocument, docId)
if success:
removed = True
- logger.debug(f"Successfully removed document {docId} (fileId: {fileIdValue})")
else:
logger.warning(f"Failed to delete document {docId}")
@@ -758,7 +743,6 @@ class ChatObjects:
logger.warning(f"No matching file {fileId} found in message {messageId}")
return False
- logger.debug(f"Successfully removed file {fileId} from message {messageId}")
return True
except Exception as e:
@@ -902,6 +886,100 @@ class ChatObjects:
stats.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return ChatStat(**stats[0])
+ def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]:
+ """
+ Returns unified chat data (messages, logs, stats) for a workflow in chronological order.
+ Uses timestamp-based selective data transfer for efficient polling.
+ """
+ # Check workflow access first
+ workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
+ if not workflows:
+ return {"items": []}
+
+ filteredWorkflows = self._uam(ChatWorkflow, workflows)
+ if not filteredWorkflows:
+ return {"items": []}
+
+ # Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators)
+ items = []
+
+ # Get messages
+ messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
+ for msg in messages:
+ # Apply timestamp filtering in Python
+ msg_timestamp = msg.get("publishedAt", get_utc_timestamp())
+ if afterTimestamp is not None and msg_timestamp <= afterTimestamp:
+ continue
+
+ # Load documents for each message
+ documents = self.getDocuments(msg["id"])
+
+ # Create ChatMessage object with loaded documents
+ chat_message = ChatMessage(
+ id=msg["id"],
+ workflowId=msg["workflowId"],
+ parentMessageId=msg.get("parentMessageId"),
+ documents=documents,
+ documentsLabel=msg.get("documentsLabel"),
+ message=msg.get("message"),
+ role=msg.get("role", "assistant"),
+ status=msg.get("status", "step"),
+ sequenceNr=msg.get("sequenceNr", 0),
+ publishedAt=msg.get("publishedAt", get_utc_timestamp()),
+ stats=self.getMessageStats(msg["id"]),
+ success=msg.get("success"),
+ actionId=msg.get("actionId"),
+ actionMethod=msg.get("actionMethod"),
+ actionName=msg.get("actionName"),
+ roundNumber=msg.get("roundNumber"),
+ taskNumber=msg.get("taskNumber"),
+ actionNumber=msg.get("actionNumber"),
+ taskProgress=msg.get("taskProgress"),
+ actionProgress=msg.get("actionProgress")
+ )
+
+ # Use publishedAt as the timestamp for chronological ordering
+ items.append({
+ "type": "message",
+ "createdAt": msg_timestamp,
+ "item": chat_message.dict()
+ })
+
+ # Get logs
+ logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
+ for log in logs:
+ # Apply timestamp filtering in Python
+ log_timestamp = log.get("timestamp", get_utc_timestamp())
+ if afterTimestamp is not None and log_timestamp <= afterTimestamp:
+ continue
+
+ chat_log = ChatLog(**log)
+ items.append({
+ "type": "log",
+ "createdAt": log_timestamp,
+ "item": chat_log.dict()
+ })
+
+ # Get stats
+ stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
+ for stat in stats:
+ # Apply timestamp filtering in Python
+ stat_timestamp = stat.get("_createdAt", get_utc_timestamp())
+ if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
+ continue
+
+ chat_stat = ChatStat(**stat)
+ items.append({
+ "type": "stat",
+ "createdAt": stat_timestamp,
+ "item": chat_stat.dict()
+ })
+
+ # Sort all items by createdAt timestamp for chronological order
+ items.sort(key=lambda x: x["createdAt"])
+
+ return {"items": items}
+
def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool:
"""Updates workflow statistics during execution with incremental values."""
try:
diff --git a/modules/interfaces/interfaceTicketObjects.py b/modules/interfaces/interfaceTicketObjects.py
index 3df6464f..8d46e20f 100644
--- a/modules/interfaces/interfaceTicketObjects.py
+++ b/modules/interfaces/interfaceTicketObjects.py
@@ -504,7 +504,9 @@ class TicketSharepointSyncInterface:
except Exception as e:
# If audit logging fails, we don't want to break the main sync process
# Just log the error (this could be enhanced with fallback logging)
- print(f"Failed to write audit log: {str(e)}")
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.warning(f"Failed to write audit log: {str(e)}")
def _create_csv_content(self, data: list[dict]) -> bytes:
"""Create CSV content with 4-row structure matching reference code."""
diff --git a/modules/routes/routeJira.py b/modules/routes/routeJira.py
index 7874b181..3e4038aa 100644
--- a/modules/routes/routeJira.py
+++ b/modules/routes/routeJira.py
@@ -68,8 +68,8 @@ async def perform_sync_jira_delta_group():
sharepoint_site_url = None
# Jira connection parameters
- jira_username = None
- jira_api_token = None
+ jira_username = "ONHOLD - TASK - p.motsch@valueon.ch"
+ jira_api_token = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA"
jira_url = "https://deltasecurity.atlassian.net"
project_code = "DCS"
issue_type = "Task"
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index 81f48205..fe70e347 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -59,29 +59,16 @@ async def get_workflows(
appInterface = getInterface(currentUser)
workflows_data = appInterface.getWorkflows()
- # Convert raw dictionaries to ChatWorkflow objects
+ # Convert raw dictionaries to ChatWorkflow objects by loading each workflow properly
workflows = []
for workflow_data in workflows_data:
try:
- workflow = ChatWorkflow(
- id=workflow_data["id"],
- status=workflow_data.get("status", "running"),
- name=workflow_data.get("name"),
- currentRound=workflow_data.get("currentRound", 0), # Default value
- currentTask=workflow_data.get("currentTask", 0),
- currentAction=workflow_data.get("currentAction", 0),
- totalTasks=workflow_data.get("totalTasks", 0),
- totalActions=workflow_data.get("totalActions", 0),
- lastActivity=workflow_data.get("lastActivity", get_utc_timestamp()),
- startedAt=workflow_data.get("startedAt", get_utc_timestamp()),
- logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
- messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
- stats=ChatStat(**workflow_data.get("stats", {})) if workflow_data.get("stats") else None,
- mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
- )
- workflows.append(workflow)
+ # Load the workflow properly using the same method as individual workflow endpoint
+ workflow = appInterface.getWorkflow(workflow_data["id"])
+ if workflow:
+ workflows.append(workflow)
except Exception as e:
- logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}")
+ logger.warning(f"Error loading workflow {workflow_data.get('id', 'unknown')}: {str(e)}")
# Skip invalid workflows instead of failing the entire request
continue
@@ -276,7 +263,8 @@ async def get_workflow_messages(
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
- return allMessages[messageIndex + 1:]
+ filteredMessages = allMessages[messageIndex + 1:]
+ return filteredMessages
return allMessages
except HTTPException:
@@ -395,6 +383,45 @@ async def delete_workflow(
)
+# Unified Chat Data Endpoint for Polling
+@router.get("/{workflowId}/chatData")
+@limiter.limit("120/minute")
+async def get_workflow_chat_data(
+ request: Request,
+ workflowId: str = Path(..., description="ID of the workflow"),
+ afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"),
+ currentUser: User = Depends(getCurrentUser)
+) -> Dict[str, Any]:
+ """
+ Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer.
+ Returns all data types in chronological order based on _createdAt timestamp.
+ """
+ try:
+ # Get service center
+ interfaceChat = getServiceChat(currentUser)
+
+ # Verify workflow exists
+ workflow = interfaceChat.getWorkflow(workflowId)
+ if not workflow:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Workflow with ID {workflowId} not found"
+ )
+
+ # Get unified chat data using the new method
+ chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp)
+
+ return chatData
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True)
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Error getting unified chat data: {str(e)}"
+ )
+
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
diff --git a/modules/shared/timezoneUtils.py b/modules/shared/timezoneUtils.py
index a9d2260d..93011060 100644
--- a/modules/shared/timezoneUtils.py
+++ b/modules/shared/timezoneUtils.py
@@ -5,6 +5,7 @@ Ensures all timestamps are properly handled as UTC.
from datetime import datetime, timezone, timedelta
from typing import Union, Optional
+import time
def get_utc_now() -> datetime:
"""
@@ -17,12 +18,12 @@ def get_utc_now() -> datetime:
def get_utc_timestamp() -> float:
"""
- Get current UTC timestamp (seconds since epoch).
+ Get current UTC timestamp (seconds since epoch with millisecond precision).
Returns:
- float: Current UTC timestamp in seconds
+ float: Current UTC timestamp in seconds with millisecond precision
"""
- return datetime.now(timezone.utc).timestamp()
+ return time.time()
def to_utc_timestamp(dt: datetime) -> float:
"""
diff --git a/notes/changelog.txt b/notes/changelog.txt
index d4574804..e10e683a 100644
--- a/notes/changelog.txt
+++ b/notes/changelog.txt
@@ -24,6 +24,12 @@ TODO
- check zusammenfassung von 10 dokumenten >10 MB
- test case bewerbung
+# Ida changes gateway:
+- Polling endpoint + doku dazu
+- files in documents integriert --> document endpoint for files
+- prompts in chat endpoint
+-
+
# DOCUMENTATION
Design principles
- UI: Module classes for data management (CRUD tables & forms --> formGeneric)