gateway/modules/workflows/methods/methodAi/actions/consolidate.py
2026-04-29 23:12:46 +02:00

89 lines
3.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
from typing import Any, Dict, List
from modules.datamodels.datamodelAi import AiCallOptions, AiCallRequest, OperationTypeEnum
from modules.datamodels.datamodelChat import ActionResult
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError
logger = logging.getLogger(__name__)
def _normalizeItems(parameters: Dict[str, Any]) -> List[Any]:
items = parameters.get("items")
if isinstance(items, list):
return items
agg = parameters.get("aggregateResult")
if isinstance(agg, dict) and isinstance(agg.get("items"), list):
return agg["items"]
return []
async def consolidate(self, parameters: Dict[str, Any]) -> ActionResult:
"""AI-assisted consolidation of aggregated loop / workflow items."""
mode = (parameters.get("mode") or "summarize").strip()
extra = (parameters.get("prompt") or "").strip()
items = _normalizeItems(parameters)
if not items:
return ActionResult.isFailure(
error="No items to consolidate. Connect an AggregateResult or pass items.",
)
try:
payload = json.dumps(items, ensure_ascii=False, default=str)[:120000]
except TypeError:
payload = str(items)[:120000]
if mode == "summarize":
instr = "Summarize the following aggregated workflow results clearly and concisely."
elif mode == "classify":
instr = (
"Classify and group the following aggregated items. "
"Output a structured summary (categories, counts, key labels)."
)
elif mode == "semanticMerge":
instr = (
"Semantically merge the following items into one coherent result. "
"Remove duplicates where appropriate."
)
else:
instr = "Process the following aggregated data according to the user instructions."
if extra:
instr += f"\n\nAdditional instructions: {extra}"
prompt = f"{instr}\n\n--- DATA ---\n{payload}"
ai_service = getattr(self.services, "ai", None)
if not ai_service:
return ActionResult.isFailure(error="AI service unavailable")
try:
req = AiCallRequest(
prompt=prompt,
options=AiCallOptions(operationType=OperationTypeEnum.DATA_ANALYSE),
)
from modules.workflows.methods.methodAi._common import applyCommonAiParams
applyCommonAiParams(parameters, req)
resp = await ai_service.callAi(req)
except (SubscriptionInactiveException, BillingContextError):
raise
except Exception as e:
logger.exception("consolidate: AI call failed: %s", e)
return ActionResult.isFailure(error=str(e))
if getattr(resp, "errorCount", 0) and resp.errorCount > 0:
return ActionResult.isFailure(error=resp.content or "AI call failed")
text = (resp.content or "").strip()
return ActionResult.isSuccess(
data={
"result": text,
"mode": mode,
"count": len(items),
},
)