gateway/modules/workflows/methods/methodTrustee/methodTrustee.py
2026-02-22 00:07:33 +01:00

120 lines
5.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Trustee document workflow method: extract from files, process to positions, sync to accounting.
"""
import logging
from modules.workflows.methods.methodBase import MethodBase
from modules.datamodels.datamodelWorkflowActions import WorkflowActionDefinition, WorkflowActionParameter
from modules.shared.frontendTypes import FrontendType
from .actions.extractFromFiles import extractFromFiles
from .actions.processDocuments import processDocuments
from .actions.syncToAccounting import syncToAccounting
logger = logging.getLogger(__name__)
class MethodTrustee(MethodBase):
"""Trustee document and expense workflow: extract, process, sync to accounting."""
def __init__(self, services):
super().__init__(services)
self.name = "trustee"
self.description = "Trustee document extraction, processing and accounting sync"
self._actions = {
"extractFromFiles": WorkflowActionDefinition(
actionId="trustee.extractFromFiles",
description="Extract document type and data from PDF/JPG (fileIds or SharePoint folder)",
dynamicMode=False,
parameters={
"fileIds": WorkflowActionParameter(
name="fileIds",
type="list",
frontendType=FrontendType.JSON,
required=False,
description="List of file IDs already in DB (alternative to connectionReference + sharepointFolder)",
),
"connectionReference": WorkflowActionParameter(
name="connectionReference",
type="str",
frontendType=FrontendType.USER_CONNECTION,
required=False,
description="Microsoft connection for SharePoint (use with sharepointFolder)",
),
"sharepointFolder": WorkflowActionParameter(
name="sharepointFolder",
type="str",
frontendType=FrontendType.TEXT,
required=False,
description="SharePoint folder path (e.g. /sites/MySite/Documents/Expenses)",
),
"featureInstanceId": WorkflowActionParameter(
name="featureInstanceId",
type="str",
frontendType=FrontendType.TEXT,
required=True,
description="Trustee feature instance ID",
),
"prompt": WorkflowActionParameter(
name="prompt",
type="str",
frontendType=FrontendType.TEXTAREA,
required=False,
description="AI prompt for extraction (optional)",
),
},
execute=extractFromFiles.__get__(self, self.__class__),
),
"processDocuments": WorkflowActionDefinition(
actionId="trustee.processDocuments",
description="Create TrusteeDocument + TrusteePosition from extraction result (documentList from previous action)",
dynamicMode=False,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
type="list",
frontendType=FrontendType.DOCUMENT_REFERENCE,
required=True,
description="Reference to extractFromFiles result (e.g. docList:messageId:extract_result)",
),
"featureInstanceId": WorkflowActionParameter(
name="featureInstanceId",
type="str",
frontendType=FrontendType.TEXT,
required=True,
description="Trustee feature instance ID",
),
},
execute=processDocuments.__get__(self, self.__class__),
),
"syncToAccounting": WorkflowActionDefinition(
actionId="trustee.syncToAccounting",
description="Push trustee positions to accounting (documentList = processDocuments result)",
dynamicMode=False,
parameters={
"documentList": WorkflowActionParameter(
name="documentList",
type="list",
frontendType=FrontendType.DOCUMENT_REFERENCE,
required=True,
description="Reference to processDocuments result message",
),
"featureInstanceId": WorkflowActionParameter(
name="featureInstanceId",
type="str",
frontendType=FrontendType.TEXT,
required=True,
description="Trustee feature instance ID",
),
},
execute=syncToAccounting.__get__(self, self.__class__),
),
}
self._validateActions()
self.extractFromFiles = extractFromFiles.__get__(self, self.__class__)
self.processDocuments = processDocuments.__get__(self, self.__class__)
self.syncToAccounting = syncToAccounting.__get__(self, self.__class__)