diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py index 20cfbe13..cfc31401 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/chat/handling/handlingTasks.py @@ -1098,36 +1098,36 @@ class HandlingTasks: ) result_label = action.execResultLabel - # Process documents from the action result - created_documents = [] - if result.success: - action.setSuccess() - # Extract result text from documents if available, otherwise use empty string - action.result = "" - if result.documents and len(result.documents) > 0: - # Try to get text content from the first document - first_doc = result.documents[0] - if isinstance(first_doc.documentData, dict): - action.result = first_doc.documentData.get("result", "") - elif isinstance(first_doc.documentData, str): - action.result = first_doc.documentData - # Preserve the action's execResultLabel for document routing - # Action methods should NOT return resultLabel - this is managed by the action handler - if not action.execResultLabel: - logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set") - # Always use the action's execResultLabel for message creation to ensure proper document routing - message_result_label = action.execResultLabel - - # Create message first to get messageId, then create documents with messageId - message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index) - if message: - # Now create documents with the messageId - created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id) - # Update the message with the created documents - if created_documents: - message.documents = created_documents - # Update the message in the database - self.chatInterface.updateMessage(message.id, {"documents": [doc.to_dict() for doc in created_documents]}) + # Process documents from the action result + created_documents = [] + if result.success: + action.setSuccess() + # Extract result text from documents if available, otherwise use empty string + action.result = "" + if result.documents and len(result.documents) > 0: + # Try to get text content from the first document + first_doc = result.documents[0] + if isinstance(first_doc.documentData, dict): + action.result = first_doc.documentData.get("result", "") + elif isinstance(first_doc.documentData, str): + action.result = first_doc.documentData + # Preserve the action's execResultLabel for document routing + # Action methods should NOT return resultLabel - this is managed by the action handler + if not action.execResultLabel: + logger.warning(f"Action {action.execMethod}.{action.execAction} has no execResultLabel set") + # Always use the action's execResultLabel for message creation to ensure proper document routing + message_result_label = action.execResultLabel + + # Create message first to get messageId, then create documents with messageId + message = await self.createActionMessage(action, result, workflow, message_result_label, [], task_step, task_index) + if message: + # Now create documents with the messageId + created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow, message.id) + # Update the message with the created documents + if created_documents: + message.documents = created_documents + # Update the message in the database + self.chatInterface.updateMessage(message.id, {"documents": [doc.dict() for doc in created_documents]}) # Log action results logger.info(f"Action completed successfully") diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py index eea519e8..dfee166a 100644 --- a/modules/connectors/connectorDbPostgre.py +++ b/modules/connectors/connectorDbPostgre.py @@ -40,7 +40,7 @@ def _get_model_fields(model_class) -> Dict[str, str]: elif field_type == int: fields[field_name] = 'INTEGER' elif field_type == float: - fields[field_name] = 'REAL' + fields[field_name] = 'DOUBLE PRECISION' elif field_type == bool: fields[field_name] = 'BOOLEAN' else: @@ -80,7 +80,6 @@ class DatabaseConnector: self._systemTableName = "_system" self._initializeSystemTable() - logger.debug(f"Context: userId={self.userId}") def initDbSystem(self): """Initialize the database system - creates database and tables.""" @@ -154,8 +153,8 @@ class DatabaseConnector: id SERIAL PRIMARY KEY, table_name VARCHAR(255) UNIQUE NOT NULL, initial_id VARCHAR(255) NOT NULL, - _createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - _modifiedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP + _createdAt DOUBLE PRECISION, + _modifiedAt DOUBLE PRECISION ) """) @@ -245,8 +244,8 @@ class DatabaseConnector: for table_name, initial_id in data.items(): cursor.execute(""" INSERT INTO "_system" ("table_name", "initial_id", "_modifiedAt") - VALUES (%s, %s, CURRENT_TIMESTAMP) - """, (table_name, initial_id)) + VALUES (%s, %s, %s) + """, (table_name, initial_id, get_utc_timestamp())) self.connection.commit() return True @@ -271,8 +270,8 @@ class DatabaseConnector: CREATE TABLE "{self._systemTableName}" ( "table_name" VARCHAR(255) PRIMARY KEY, "initial_id" VARCHAR(255), - "_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - "_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP + "_createdAt" DOUBLE PRECISION, + "_modifiedAt" DOUBLE PRECISION ) """) logger.info("System table created successfully") @@ -285,10 +284,9 @@ class DatabaseConnector: existing_columns = [row['column_name'] for row in cursor.fetchall()] if '_modifiedAt' not in existing_columns: - cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP') + cursor.execute(f'ALTER TABLE "{self._systemTableName}" ADD COLUMN "_modifiedAt" DOUBLE PRECISION') logger.info("Added _modifiedAt column to existing system table") - logger.debug("System table already exists") return True except Exception as e: @@ -313,11 +311,9 @@ class DatabaseConnector: WHERE LOWER(table_name) = LOWER(%s) AND table_schema = 'public' ''', (table,)) exists = cursor.fetchone()['count'] > 0 - logger.debug(f"Table {table} exists check: {exists}") if not exists: # Create table from Pydantic model - logger.debug(f"Creating table {table} with model {model_class}") self._create_table_from_model(cursor, table, model_class) logger.info(f"Created table '{table}' with columns from Pydantic model") @@ -333,7 +329,6 @@ class DatabaseConnector: def _create_table_from_model(self, cursor, table: str, model_class: type) -> None: """Create table with columns matching Pydantic model fields.""" fields = _get_model_fields(model_class) - logger.debug(f"Creating table {table} with fields: {fields}") # Build column definitions with quoted identifiers to preserve exact case columns = ['"id" VARCHAR(255) PRIMARY KEY'] @@ -343,15 +338,14 @@ class DatabaseConnector: # Add metadata columns columns.extend([ - '"_createdAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP', - '"_modifiedAt" TIMESTAMP DEFAULT CURRENT_TIMESTAMP', + '"_createdAt" DOUBLE PRECISION', + '"_modifiedAt" DOUBLE PRECISION', '"_createdBy" VARCHAR(255)', '"_modifiedBy" VARCHAR(255)' ]) # Create table sql = f'CREATE TABLE IF NOT EXISTS "{table}" ({", ".join(columns)})' - logger.debug(f"Executing SQL: {sql}") cursor.execute(sql) # Create indexes for foreign keys @@ -366,8 +360,6 @@ class DatabaseConnector: fields = _get_model_fields(model_class) columns = ['id'] + [field for field in fields.keys() if field != 'id'] + ['_createdAt', '_createdBy', '_modifiedAt', '_modifiedBy'] - logger.debug(f"Table {table} columns: {columns}") - logger.debug(f"Record data: {record}") if not columns: logger.error(f"No columns found for table {table}") @@ -384,17 +376,12 @@ class DatabaseConnector: for col in columns: value = filtered_record.get(col) - # Convert timestamp fields to proper PostgreSQL format + # Handle timestamp fields - store as Unix timestamps (floats) for consistency if col in ['_createdAt', '_modifiedAt'] and value is not None: - if isinstance(value, (int, float)): - # Convert Unix timestamp to PostgreSQL timestamp - from datetime import datetime - value = datetime.fromtimestamp(value) - elif isinstance(value, str): - # If it's already a string, try to parse it + if isinstance(value, str): + # Try to parse string as timestamp try: - from datetime import datetime - value = datetime.fromtimestamp(float(value)) + value = float(value) except: pass # Keep as string if parsing fails @@ -424,7 +411,6 @@ class DatabaseConnector: values.append(value) - logger.debug(f"Values to insert: {values}") # Build INSERT/UPDATE with quoted identifiers col_names = ', '.join([f'"{col}"' for col in columns]) @@ -432,7 +418,6 @@ class DatabaseConnector: updates = ', '.join([f'"{col}" = EXCLUDED."{col}"' for col in columns[1:] if col not in ['_createdAt', '_createdBy']]) sql = f'INSERT INTO "{table}" ({col_names}) VALUES ({placeholders}) ON CONFLICT ("id") DO UPDATE SET {updates}' - logger.debug(f"SQL: {sql}") cursor.execute(sql, values) @@ -454,6 +439,7 @@ class DatabaseConnector: record = dict(row) fields = _get_model_fields(model_class) + # Parse JSONB fields back to Python objects for field_name, field_type in fields.items(): if field_type == 'JSONB' and field_name in record and record[field_name] is not None: @@ -527,22 +513,31 @@ class DatabaseConnector: fields = _get_model_fields(model_class) for record in records: for field_name, field_type in fields.items(): - if field_type == 'JSONB' and field_name in record and record[field_name] is not None: - import json - try: - if isinstance(record[field_name], str): - # Parse JSON string back to Python object - record[field_name] = json.loads(record[field_name]) - elif isinstance(record[field_name], (dict, list)): - # Already a Python object, keep as is - pass + if field_type == 'JSONB' and field_name in record: + if record[field_name] is None: + # Convert None to appropriate default based on field name + if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: + record[field_name] = [] + elif field_name in ['execParameters', 'stats']: + record[field_name] = {} else: - # Try to parse as JSON - record[field_name] = json.loads(str(record[field_name])) - except (json.JSONDecodeError, TypeError, ValueError): - # If parsing fails, keep as string - logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") - pass + record[field_name] = None + else: + import json + try: + if isinstance(record[field_name], str): + # Parse JSON string back to Python object + record[field_name] = json.loads(record[field_name]) + elif isinstance(record[field_name], (dict, list)): + # Already a Python object, keep as is + pass + else: + # Try to parse as JSON + record[field_name] = json.loads(str(record[field_name])) + except (json.JSONDecodeError, TypeError, ValueError): + # If parsing fails, keep as string + logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") + pass return records except Exception as e: @@ -550,35 +545,6 @@ class DatabaseConnector: return [] - def _applyRecordFilter(self, records: List[Dict[str, Any]], recordFilter: Dict[str, Any] = None) -> List[Dict[str, Any]]: - """Applies a record filter to the records""" - if not recordFilter: - return records - - filteredRecords = [] - - for record in records: - match = True - - for field, value in recordFilter.items(): - # Check if the field exists - if field not in record: - match = False - break - - # Convert both values to strings for comparison - recordValue = str(record[field]) - filterValue = str(value) - - # Direct string comparison - if recordValue != filterValue: - match = False - break - - if match: - filteredRecords.append(record) - - return filteredRecords def _registerInitialId(self, table: str, initialId: str) -> bool: """Registers the initial ID for a table.""" @@ -603,7 +569,6 @@ class DatabaseConnector: logger.info(f"Initial ID updated from {existingInitialId} to {initialId} for table {table}") return success else: - logger.debug(f"Initial ID {existingInitialId} for table {table} already exists and is valid") return True except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") @@ -699,34 +664,76 @@ class DatabaseConnector: """Returns a list of records from a table, filtered by criteria.""" table = model_class.__name__ - # If we have specific record IDs in the filter, only load those records - if recordFilter and "id" in recordFilter: - recordId = recordFilter["id"] - record = self._loadRecord(model_class, recordId) - if record: - records = [record] - else: + try: + if not self._ensureTableExists(model_class): return [] - else: - # Load all records if no specific ID filter - records = self._loadTable(model_class) - - # Apply recordFilter if available - if recordFilter: - records = self._applyRecordFilter(records, recordFilter) - - # If fieldFilter is available, reduce the fields - if fieldFilter and isinstance(fieldFilter, list): - result = [] - for record in records: - filteredRecord = {} - for field in fieldFilter: - if field in record: - filteredRecord[field] = record[field] - result.append(filteredRecord) - return result - - return records + + # Build WHERE clause from recordFilter + where_conditions = [] + where_values = [] + + if recordFilter: + for field, value in recordFilter.items(): + where_conditions.append(f'"{field}" = %s') + where_values.append(value) + + # Build the query + if where_conditions: + where_clause = " WHERE " + " AND ".join(where_conditions) + else: + where_clause = "" + + query = f'SELECT * FROM "{table}"{where_clause} ORDER BY "id"' + + with self.connection.cursor() as cursor: + cursor.execute(query, where_values) + records = [dict(row) for row in cursor.fetchall()] + + # Handle JSONB fields for all records + fields = _get_model_fields(model_class) + for record in records: + for field_name, field_type in fields.items(): + if field_type == 'JSONB' and field_name in record: + if record[field_name] is None: + # Convert None to appropriate default based on field name + if field_name in ['logs', 'messages', 'tasks', 'expectedDocumentFormats', 'resultDocuments']: + record[field_name] = [] + elif field_name in ['execParameters', 'stats']: + record[field_name] = {} + else: + record[field_name] = None + else: + import json + try: + if isinstance(record[field_name], str): + # Parse JSON string back to Python object + record[field_name] = json.loads(record[field_name]) + elif isinstance(record[field_name], (dict, list)): + # Already a Python object, keep as is + pass + else: + # Try to parse as JSON + record[field_name] = json.loads(str(record[field_name])) + except (json.JSONDecodeError, TypeError, ValueError): + # If parsing fails, keep as string + logger.warning(f"Could not parse JSONB field {field_name}, keeping as string: {record[field_name]}") + pass + + # If fieldFilter is available, reduce the fields + if fieldFilter and isinstance(fieldFilter, list): + result = [] + for record in records: + filteredRecord = {} + for field in fieldFilter: + if field in record: + filteredRecord[field] = record[field] + result.append(filteredRecord) + return result + + return records + except Exception as e: + logger.error(f"Error loading records from table {table}: {e}") + return [] def recordCreate(self, model_class: type, record: Union[Dict[str, Any], BaseModel]) -> Dict[str, Any]: """Creates a new record in a table based on Pydantic model class.""" @@ -793,7 +800,7 @@ class DatabaseConnector: with self.connection.cursor() as cursor: # Check if record exists - cursor.execute(f"SELECT id FROM {table} WHERE id = %s", (recordId,)) + cursor.execute(f'SELECT "id" FROM "{table}" WHERE "id" = %s', (recordId,)) if not cursor.fetchone(): return False @@ -804,7 +811,7 @@ class DatabaseConnector: logger.info(f"Initial ID {recordId} for table {table} has been removed from the system table") # Delete the record - cursor.execute(f"DELETE FROM {table} WHERE id = %s", (recordId,)) + cursor.execute(f'DELETE FROM "{table}" WHERE "id" = %s', (recordId,)) # No cache to update - database handles consistency @@ -822,14 +829,12 @@ class DatabaseConnector: table = model_class.__name__ systemData = self._loadSystemTable() initialId = systemData.get(table) - logger.debug(f"Initial ID for table '{table}': {initialId}") return initialId def close(self): """Close the database connection.""" if hasattr(self, 'connection') and self.connection and not self.connection.closed: self.connection.close() - logger.debug("Database connection closed") def __del__(self): """Cleanup method to close connection.""" diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py index 75af8878..c71e0c03 100644 --- a/modules/interfaces/interfaceAppObjects.py +++ b/modules/interfaces/interfaceAppObjects.py @@ -728,7 +728,6 @@ class AppObjects: if old_token["id"] != token.id: # Don't delete the new token if it already exists self.db.recordDelete(Token, old_token["id"]) deleted_count += 1 - logger.debug(f"Deleted old access token {old_token['id']} for user {self.currentUser.id} and authority {token.authority}") if deleted_count > 0: logger.info(f"Replaced {deleted_count} old access tokens for user {self.currentUser.id} and authority {token.authority}") @@ -781,7 +780,6 @@ class AppObjects: if old_token["id"] != token.id: # Don't delete the new token if it already exists self.db.recordDelete(Token, old_token["id"]) deleted_count += 1 - logger.debug(f"Deleted old token {old_token['id']} for connectionId {token.connectionId}") if deleted_count > 0: logger.info(f"Replaced {deleted_count} old tokens for connectionId {token.connectionId}") @@ -864,17 +862,6 @@ class AppObjects: "connectionId": connectionId }) - # Debug: Log what we found - logger.debug(f"getConnectionToken: Found {len(tokens)} tokens for connectionId {connectionId}") - if tokens: - for i, token in enumerate(tokens): - logger.debug(f"getConnectionToken: Token {i}: id={token.get('id')}, expiresAt={token.get('expiresAt')}, createdAt={token.get('createdAt')}") - else: - # Debug: Check if there are any tokens at all in the database - all_tokens = self.db.getRecordset(Token, recordFilter={}) - logger.debug(f"getConnectionToken: No tokens found for connectionId {connectionId}. Total tokens in database: {len(all_tokens)}") - if all_tokens: - logger.debug(f"getConnectionToken: Sample tokens: {[{'id': t.get('id'), 'connectionId': t.get('connectionId'), 'authority': t.get('authority')} for t in all_tokens[:3]]}") if not tokens: logger.warning(f"No connection token found for connectionId: {connectionId}") @@ -890,25 +877,21 @@ class AppObjects: if latest_token.expiresAt and latest_token.expiresAt < (current_time + thirty_minutes): if auto_refresh: - logger.debug(f"getConnectionToken: Token expires soon, attempting refresh. expiresAt: {latest_token.expiresAt}, current_time: {current_time}") - # Import TokenManager here to avoid circular imports from modules.security.tokenManager import TokenManager token_manager = TokenManager() # Try to refresh the token - logger.debug(f"getConnectionToken: Calling token_manager.refresh_token for token {latest_token.id}") refreshed_token = token_manager.refresh_token(latest_token) if refreshed_token: - logger.debug(f"getConnectionToken: Token refresh successful, saving new token {refreshed_token.id}") # Save the new token (which will automatically replace old ones) self.saveConnectionToken(refreshed_token) logger.info(f"Proactively refreshed connection token for connectionId {connectionId} (expired in {latest_token.expiresAt - current_time} seconds)") return refreshed_token else: - logger.warning(f"getConnectionToken: Token refresh failed for connectionId {connectionId}") + logger.warning(f"Token refresh failed for connectionId {connectionId}") return None else: logger.warning(f"Connection token for connectionId {connectionId} expires soon (expiresAt: {latest_token.expiresAt})") @@ -1047,13 +1030,9 @@ def getRootUser() -> User: if not users: raise ValueError("Initial user not found in database") - logger.debug(f"Retrieved user data: {users[0]}") # Convert to User model and return the model instance user_data = users[0] - logger.debug(f"User data keys: {list(user_data.keys())}") - logger.debug(f"User id: {user_data.get('id')}") - logger.debug(f"User mandateId: {user_data.get('mandateId')}") return User.parse_obj(user_data) diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py index 5633e8f8..ed71963a 100644 --- a/modules/interfaces/interfaceChatModel.py +++ b/modules/interfaces/interfaceChatModel.py @@ -200,6 +200,9 @@ register_model_labels( "id": {"en": "ID", "fr": "ID"}, "messageId": {"en": "Message ID", "fr": "ID du message"}, "fileId": {"en": "File ID", "fr": "ID du fichier"}, + "fileName": {"en": "File Name", "fr": "Nom du fichier"}, + "fileSize": {"en": "File Size", "fr": "Taille du fichier"}, + "mimeType": {"en": "MIME Type", "fr": "Type MIME"}, "roundNumber": {"en": "Round Number", "fr": "Numéro de tour"}, "taskNumber": {"en": "Task Number", "fr": "Numéro de tâche"}, "actionNumber": {"en": "Action Number", "fr": "Numéro d'action"}, diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 8aca736a..43ad3f97 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -130,7 +130,6 @@ class ChatObjects: except Exception as e: logger.error(f"Error closing database connection: {e}") - logger.debug(f"User context set: userId={self.userId}, mandateId={self.mandateId}") def _initializeDatabase(self): """Initializes the database connection directly.""" @@ -307,7 +306,6 @@ class ChatObjects: log_dict = log_data log_dict["workflowId"] = workflowId self.createLog(log_dict) - logger.debug(f"Updated {len(logs_data)} logs for workflow {workflowId}") except Exception as e: logger.error(f"Error updating workflow logs: {str(e)}") if 'messages' in object_fields: @@ -322,7 +320,6 @@ class ChatObjects: msg_dict = message_data msg_dict["workflowId"] = workflowId self.updateMessage(msg_dict.get("id"), msg_dict) - logger.debug(f"Updated {len(messages_data)} messages for workflow {workflowId}") except Exception as e: logger.error(f"Error updating workflow messages: {str(e)}") if 'stats' in object_fields: @@ -331,7 +328,6 @@ class ChatObjects: if stats_data: stats_data["workflowId"] = workflowId self.db.recordCreate(ChatStat, stats_data) - logger.debug(f"Updated stats for workflow {workflowId}") except Exception as e: logger.error(f"Error updating workflow stats: {str(e)}") @@ -402,7 +398,6 @@ class ChatObjects: # 4. Finally delete the workflow itself success = self.db.recordDelete(ChatWorkflow, workflowId) - logger.debug(f"Successfully deleted workflow {workflowId} and all related data") return success except Exception as e: @@ -461,6 +456,7 @@ class ChatObjects: chat_messages.append(chat_message) + return chat_messages def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage: @@ -505,15 +501,12 @@ class ChatObjects: # This ensures messages have the correct progress context when workflows are continued if "roundNumber" not in messageData: messageData["roundNumber"] = workflow.currentRound - logger.debug(f"Auto-setting roundNumber to {workflow.currentRound} for message {messageData['id']}") if "taskNumber" not in messageData: messageData["taskNumber"] = workflow.currentTask - logger.debug(f"Auto-setting taskNumber to {workflow.currentTask} for message {messageData['id']}") if "actionNumber" not in messageData: messageData["actionNumber"] = workflow.currentAction - logger.debug(f"Auto-setting actionNumber to {workflow.currentAction} for message {messageData['id']}") # Use generic field separation based on ChatMessage model simple_fields, object_fields = self._separate_object_fields(ChatMessage, messageData) @@ -571,7 +564,6 @@ class ChatObjects: def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]: """Updates a workflow message if user has access to the workflow.""" try: - logger.debug(f"Updating message {messageId} in database") # Ensure messageId is provided if not messageId: @@ -646,7 +638,6 @@ class ChatObjects: doc_dict = doc_data doc_dict["messageId"] = messageId self.createDocument(doc_dict) - logger.debug(f"Updated {len(documents_data)} documents for message {messageId}") except Exception as e: logger.error(f"Error updating message documents: {str(e)}") if 'stats' in object_fields: @@ -655,12 +646,9 @@ class ChatObjects: if stats_data: stats_data["messageId"] = messageId self.db.recordCreate(ChatStat, stats_data) - logger.debug(f"Updated stats for message {messageId}") except Exception as e: logger.error(f"Error updating message stats: {str(e)}") - if updatedMessage: - logger.debug(f"Message {messageId} updated successfully") - else: + if not updatedMessage: logger.warning(f"Failed to update message {messageId}") return updatedMessage @@ -703,7 +691,6 @@ class ChatObjects: # 3. Finally delete the message itself success = self.db.recordDelete(ChatMessage, messageId) - logger.debug(f"Successfully deleted message {messageId} and all related data") return success except Exception as e: @@ -722,7 +709,6 @@ class ChatObjects: if not self._canModify(ChatWorkflow, workflowId): raise PermissionError(f"No permission to modify workflow {workflowId}") - logger.debug(f"Removing file {fileId} from message {messageId} in workflow {workflowId}") # Get documents for this message from normalized table documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId}) @@ -750,7 +736,6 @@ class ChatObjects: success = self.db.recordDelete(ChatDocument, docId) if success: removed = True - logger.debug(f"Successfully removed document {docId} (fileId: {fileIdValue})") else: logger.warning(f"Failed to delete document {docId}") @@ -758,7 +743,6 @@ class ChatObjects: logger.warning(f"No matching file {fileId} found in message {messageId}") return False - logger.debug(f"Successfully removed file {fileId} from message {messageId}") return True except Exception as e: @@ -902,6 +886,100 @@ class ChatObjects: stats.sort(key=lambda x: x.get("created_at", ""), reverse=True) return ChatStat(**stats[0]) + def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]: + """ + Returns unified chat data (messages, logs, stats) for a workflow in chronological order. + Uses timestamp-based selective data transfer for efficient polling. + """ + # Check workflow access first + workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId}) + if not workflows: + return {"items": []} + + filteredWorkflows = self._uam(ChatWorkflow, workflows) + if not filteredWorkflows: + return {"items": []} + + # Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators) + items = [] + + # Get messages + messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId}) + for msg in messages: + # Apply timestamp filtering in Python + msg_timestamp = msg.get("publishedAt", get_utc_timestamp()) + if afterTimestamp is not None and msg_timestamp <= afterTimestamp: + continue + + # Load documents for each message + documents = self.getDocuments(msg["id"]) + + # Create ChatMessage object with loaded documents + chat_message = ChatMessage( + id=msg["id"], + workflowId=msg["workflowId"], + parentMessageId=msg.get("parentMessageId"), + documents=documents, + documentsLabel=msg.get("documentsLabel"), + message=msg.get("message"), + role=msg.get("role", "assistant"), + status=msg.get("status", "step"), + sequenceNr=msg.get("sequenceNr", 0), + publishedAt=msg.get("publishedAt", get_utc_timestamp()), + stats=self.getMessageStats(msg["id"]), + success=msg.get("success"), + actionId=msg.get("actionId"), + actionMethod=msg.get("actionMethod"), + actionName=msg.get("actionName"), + roundNumber=msg.get("roundNumber"), + taskNumber=msg.get("taskNumber"), + actionNumber=msg.get("actionNumber"), + taskProgress=msg.get("taskProgress"), + actionProgress=msg.get("actionProgress") + ) + + # Use publishedAt as the timestamp for chronological ordering + items.append({ + "type": "message", + "createdAt": msg_timestamp, + "item": chat_message.dict() + }) + + # Get logs + logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId}) + for log in logs: + # Apply timestamp filtering in Python + log_timestamp = log.get("timestamp", get_utc_timestamp()) + if afterTimestamp is not None and log_timestamp <= afterTimestamp: + continue + + chat_log = ChatLog(**log) + items.append({ + "type": "log", + "createdAt": log_timestamp, + "item": chat_log.dict() + }) + + # Get stats + stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId}) + for stat in stats: + # Apply timestamp filtering in Python + stat_timestamp = stat.get("_createdAt", get_utc_timestamp()) + if afterTimestamp is not None and stat_timestamp <= afterTimestamp: + continue + + chat_stat = ChatStat(**stat) + items.append({ + "type": "stat", + "createdAt": stat_timestamp, + "item": chat_stat.dict() + }) + + # Sort all items by createdAt timestamp for chronological order + items.sort(key=lambda x: x["createdAt"]) + + return {"items": items} + def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool: """Updates workflow statistics during execution with incremental values.""" try: diff --git a/modules/interfaces/interfaceTicketObjects.py b/modules/interfaces/interfaceTicketObjects.py index 3df6464f..8d46e20f 100644 --- a/modules/interfaces/interfaceTicketObjects.py +++ b/modules/interfaces/interfaceTicketObjects.py @@ -504,7 +504,9 @@ class TicketSharepointSyncInterface: except Exception as e: # If audit logging fails, we don't want to break the main sync process # Just log the error (this could be enhanced with fallback logging) - print(f"Failed to write audit log: {str(e)}") + import logging + logger = logging.getLogger(__name__) + logger.warning(f"Failed to write audit log: {str(e)}") def _create_csv_content(self, data: list[dict]) -> bytes: """Create CSV content with 4-row structure matching reference code.""" diff --git a/modules/routes/routeJira.py b/modules/routes/routeJira.py index 7874b181..3e4038aa 100644 --- a/modules/routes/routeJira.py +++ b/modules/routes/routeJira.py @@ -68,8 +68,8 @@ async def perform_sync_jira_delta_group(): sharepoint_site_url = None # Jira connection parameters - jira_username = None - jira_api_token = None + jira_username = "ONHOLD - TASK - p.motsch@valueon.ch" + jira_api_token = "ATATT3xFfGF0d973nNb3R1wTDI4lesmJfJAmooS-4cYMJTyLfwYv4himrE6yyCxyX3aSMfl34NHcm2fAXeFXrLHUzJx0RQVUBonCFnlgexjLQTgS5BoCbSO7dwAVjlcHZZkArHbooCUaRwJ15n6AHkm-nwdjLQ3Z74TFnKKUZC4uhuh3Aj-MuX8=2D7124FA" jira_url = "https://deltasecurity.atlassian.net" project_code = "DCS" issue_type = "Task" diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index 81f48205..fe70e347 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -59,29 +59,16 @@ async def get_workflows( appInterface = getInterface(currentUser) workflows_data = appInterface.getWorkflows() - # Convert raw dictionaries to ChatWorkflow objects + # Convert raw dictionaries to ChatWorkflow objects by loading each workflow properly workflows = [] for workflow_data in workflows_data: try: - workflow = ChatWorkflow( - id=workflow_data["id"], - status=workflow_data.get("status", "running"), - name=workflow_data.get("name"), - currentRound=workflow_data.get("currentRound", 0), # Default value - currentTask=workflow_data.get("currentTask", 0), - currentAction=workflow_data.get("currentAction", 0), - totalTasks=workflow_data.get("totalTasks", 0), - totalActions=workflow_data.get("totalActions", 0), - lastActivity=workflow_data.get("lastActivity", get_utc_timestamp()), - startedAt=workflow_data.get("startedAt", get_utc_timestamp()), - logs=[ChatLog(**log) for log in workflow_data.get("logs", [])], - messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])], - stats=ChatStat(**workflow_data.get("stats", {})) if workflow_data.get("stats") else None, - mandateId=workflow_data.get("mandateId", currentUser.mandateId or "") - ) - workflows.append(workflow) + # Load the workflow properly using the same method as individual workflow endpoint + workflow = appInterface.getWorkflow(workflow_data["id"]) + if workflow: + workflows.append(workflow) except Exception as e: - logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}") + logger.warning(f"Error loading workflow {workflow_data.get('id', 'unknown')}: {str(e)}") # Skip invalid workflows instead of failing the entire request continue @@ -276,7 +263,8 @@ async def get_workflow_messages( messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) if messageIndex >= 0: # Return only messages after the specified message - return allMessages[messageIndex + 1:] + filteredMessages = allMessages[messageIndex + 1:] + return filteredMessages return allMessages except HTTPException: @@ -395,6 +383,45 @@ async def delete_workflow( ) +# Unified Chat Data Endpoint for Polling +@router.get("/{workflowId}/chatData") +@limiter.limit("120/minute") +async def get_workflow_chat_data( + request: Request, + workflowId: str = Path(..., description="ID of the workflow"), + afterTimestamp: Optional[float] = Query(None, description="Unix timestamp to get data after"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """ + Get unified chat data (messages, logs, stats) for a workflow with timestamp-based selective data transfer. + Returns all data types in chronological order based on _createdAt timestamp. + """ + try: + # Get service center + interfaceChat = getServiceChat(currentUser) + + # Verify workflow exists + workflow = interfaceChat.getWorkflow(workflowId) + if not workflow: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow with ID {workflowId} not found" + ) + + # Get unified chat data using the new method + chatData = interfaceChat.getUnifiedChatData(workflowId, afterTimestamp) + + return chatData + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting unified chat data: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error getting unified chat data: {str(e)}" + ) + # Document Management Endpoints @router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any]) diff --git a/modules/shared/timezoneUtils.py b/modules/shared/timezoneUtils.py index a9d2260d..93011060 100644 --- a/modules/shared/timezoneUtils.py +++ b/modules/shared/timezoneUtils.py @@ -5,6 +5,7 @@ Ensures all timestamps are properly handled as UTC. from datetime import datetime, timezone, timedelta from typing import Union, Optional +import time def get_utc_now() -> datetime: """ @@ -17,12 +18,12 @@ def get_utc_now() -> datetime: def get_utc_timestamp() -> float: """ - Get current UTC timestamp (seconds since epoch). + Get current UTC timestamp (seconds since epoch with millisecond precision). Returns: - float: Current UTC timestamp in seconds + float: Current UTC timestamp in seconds with millisecond precision """ - return datetime.now(timezone.utc).timestamp() + return time.time() def to_utc_timestamp(dt: datetime) -> float: """ diff --git a/notes/changelog.txt b/notes/changelog.txt index d4574804..e10e683a 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -24,6 +24,12 @@ TODO - check zusammenfassung von 10 dokumenten >10 MB - test case bewerbung +# Ida changes gateway: +- Polling endpoint + doku dazu +- files in documents integriert --> document endpoint for files +- prompts in chat endpoint +- + # DOCUMENTATION Design principles - UI: Module classes for data management (CRUD tables & forms --> formGeneric)