731 lines
28 KiB
Python
731 lines
28 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
System-level Workflow Dashboard API.
|
|
|
|
Provides cross-feature, cross-mandate access to workflow runs AND workflows
|
|
with RBAC scoping: user sees own runs/workflows, mandate admin sees mandate
|
|
runs/workflows, sysadmin sees all.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import math
|
|
from typing import Optional, List
|
|
from fastapi import APIRouter, Depends, Request, Query, Path, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from slowapi import Limiter
|
|
from slowapi.util import get_remote_address
|
|
|
|
from modules.auth.authentication import getRequestContext, RequestContext
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict
|
|
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
|
|
AutoRun, AutoStepLog, AutoWorkflow, AutoTask,
|
|
)
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
|
|
routeApiMsg = apiRouteContext("routeWorkflowDashboard")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
limiter = Limiter(key_func=get_remote_address)
|
|
|
|
router = APIRouter(prefix="/api/system/workflow-runs", tags=["WorkflowDashboard"])
|
|
|
|
_GREENFIELD_DB = "poweron_graphicaleditor"
|
|
|
|
|
|
def _getDb() -> DatabaseConnector:
|
|
return DatabaseConnector(
|
|
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
|
|
dbDatabase=_GREENFIELD_DB,
|
|
dbUser=APP_CONFIG.get("DB_USER"),
|
|
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
|
|
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
|
|
userId=None,
|
|
)
|
|
|
|
|
|
def _getUserMandateIds(userId: str) -> list[str]:
|
|
"""Get mandate IDs the user is a member of."""
|
|
rootIface = getRootInterface()
|
|
memberships = rootIface.getUserMandates(userId)
|
|
return [um.mandateId for um in memberships if um.mandateId and um.enabled]
|
|
|
|
|
|
def _getAdminMandateIds(userId: str, mandateIds: list) -> list:
|
|
"""Batch-check which mandates the user is admin for (2 SQL queries total)."""
|
|
if not mandateIds:
|
|
return []
|
|
rootIface = getRootInterface()
|
|
from modules.datamodels.datamodelMembership import UserMandateRole
|
|
allRoles = rootIface.db.getRecordset(UserMandateRole, recordFilter={
|
|
"userId": userId, "mandateId": mandateIds,
|
|
})
|
|
if not allRoles:
|
|
return []
|
|
|
|
roleIds = set()
|
|
roleToMandate: dict = {}
|
|
for r in allRoles:
|
|
row = r if isinstance(r, dict) else r.__dict__
|
|
rid = row.get("roleId")
|
|
mid = row.get("mandateId")
|
|
if rid:
|
|
roleIds.add(rid)
|
|
roleToMandate.setdefault(rid, set()).add(mid)
|
|
|
|
if not roleIds:
|
|
return []
|
|
|
|
from modules.datamodels.datamodelRbac import MandateRole
|
|
roleRecords = rootIface.db.getRecordset(MandateRole, recordFilter={"id": list(roleIds)})
|
|
adminMandates: set = set()
|
|
for role in (roleRecords or []):
|
|
row = role if isinstance(role, dict) else role.__dict__
|
|
if row.get("isAdmin"):
|
|
rid = row.get("id")
|
|
if rid and rid in roleToMandate:
|
|
adminMandates.update(roleToMandate[rid])
|
|
|
|
return [mid for mid in mandateIds if mid in adminMandates]
|
|
|
|
|
|
def _isUserMandateAdmin(userId: str, mandateId: str) -> bool:
|
|
"""Check if user is admin for a specific mandate."""
|
|
adminIds = _getAdminMandateIds(userId, [mandateId])
|
|
return mandateId in adminIds
|
|
|
|
|
|
def _scopedRunFilter(context: RequestContext) -> Optional[dict]:
|
|
"""
|
|
Build a DB filter dict based on RBAC:
|
|
- sysadmin: None (no filter)
|
|
- mandate admin: mandateId IN user's mandates
|
|
- normal user: ownerId = userId
|
|
"""
|
|
if context.hasSysAdminRole:
|
|
return None
|
|
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId:
|
|
return {"ownerId": "__impossible__"}
|
|
|
|
mandateIds = _getUserMandateIds(userId)
|
|
adminMandateIds = _getAdminMandateIds(userId, mandateIds)
|
|
|
|
if adminMandateIds:
|
|
return {"mandateId": adminMandateIds}
|
|
|
|
return {"ownerId": userId}
|
|
|
|
|
|
def _scopedWorkflowFilter(context: RequestContext) -> Optional[dict]:
|
|
"""
|
|
Build a DB filter for AutoWorkflow based on RBAC:
|
|
- sysadmin: None (no filter, sees all)
|
|
- normal user: mandateId IN user's mandates
|
|
"""
|
|
if context.hasSysAdminRole:
|
|
return None
|
|
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId:
|
|
return {"mandateId": "__impossible__"}
|
|
|
|
mandateIds = _getUserMandateIds(userId)
|
|
if mandateIds:
|
|
return {"mandateId": mandateIds}
|
|
|
|
return {"mandateId": "__impossible__"}
|
|
|
|
|
|
|
|
@router.get("")
|
|
@limiter.limit("60/minute")
|
|
def get_workflow_runs(
|
|
request: Request,
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
status: Optional[str] = Query(None, description="Filter by status"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> dict:
|
|
"""List workflow runs with RBAC scoping (SQL-paginated)."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoRun):
|
|
return {"runs": [], "total": 0, "limit": limit, "offset": offset}
|
|
|
|
baseFilter = _scopedRunFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
|
|
if status:
|
|
recordFilter["status"] = status
|
|
if mandateId:
|
|
recordFilter["mandateId"] = mandateId
|
|
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except Exception:
|
|
pass
|
|
|
|
if not paginationParams:
|
|
page = (offset // limit) + 1 if limit > 0 else 1
|
|
paginationParams = PaginationParams(
|
|
page=page,
|
|
pageSize=limit,
|
|
sort=[{"field": "sysCreatedAt", "direction": "desc"}],
|
|
)
|
|
|
|
result = db.getRecordsetPaginated(
|
|
AutoRun,
|
|
pagination=paginationParams,
|
|
recordFilter=recordFilter if recordFilter else None,
|
|
)
|
|
pageRuns = result.get("items", []) if isinstance(result, dict) else result.items
|
|
total = result.get("totalItems", 0) if isinstance(result, dict) else result.totalItems
|
|
|
|
wfIds = list({r.get("workflowId") for r in pageRuns if r.get("workflowId")})
|
|
wfMap: dict = {}
|
|
if wfIds and db._ensureTableExists(AutoWorkflow):
|
|
wfs = db.getRecordset(AutoWorkflow, recordFilter={"id": wfIds})
|
|
for wf in (wfs or []):
|
|
wfMap[wf.get("id")] = wf
|
|
|
|
mandateIds = list({r.get("mandateId") for r in pageRuns if r.get("mandateId")})
|
|
instanceIds = list({
|
|
wfMap[r.get("workflowId")].get("featureInstanceId")
|
|
for r in pageRuns
|
|
if r.get("workflowId") in wfMap and wfMap[r.get("workflowId")].get("featureInstanceId")
|
|
})
|
|
|
|
mandateLabelMap: dict = {}
|
|
instanceLabelMap: dict = {}
|
|
try:
|
|
rootIface = getRootInterface()
|
|
if mandateIds:
|
|
mMap = rootIface.getMandatesByIds(mandateIds)
|
|
for mid, m in mMap.items():
|
|
mandateLabelMap[mid] = getattr(m, "label", None) or getattr(m, "name", mid) or mid
|
|
if instanceIds:
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
featureIface = getFeatureInterface(rootIface.db)
|
|
for iid in instanceIds:
|
|
fi = featureIface.getFeatureInstance(iid)
|
|
if fi:
|
|
instanceLabelMap[iid] = fi.label or iid
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich run labels: {e}")
|
|
|
|
runs = []
|
|
for r in pageRuns:
|
|
row = dict(r)
|
|
wfId = row.get("workflowId")
|
|
wf = wfMap.get(wfId, {})
|
|
row["workflowLabel"] = (
|
|
row.get("label")
|
|
or (wf.get("label") if isinstance(wf, dict) else None)
|
|
or wfId
|
|
or "—"
|
|
)
|
|
row["mandateLabel"] = mandateLabelMap.get(row.get("mandateId"), row.get("mandateId") or "—")
|
|
fiid = wf.get("featureInstanceId") if isinstance(wf, dict) else None
|
|
row["featureInstanceId"] = fiid
|
|
row["instanceLabel"] = instanceLabelMap.get(fiid, fiid or "—")
|
|
runs.append(row)
|
|
|
|
return {"runs": runs, "total": total, "limit": limit, "offset": offset}
|
|
|
|
|
|
@router.get("/metrics")
|
|
@limiter.limit("60/minute")
|
|
def get_workflow_metrics(
|
|
request: Request,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> dict:
|
|
"""Aggregated metrics across all accessible workflow runs (SQL COUNT).
|
|
|
|
Uses the same RBAC scoping as the runs list and workflows list
|
|
so that metric cards always match the table data.
|
|
"""
|
|
db = _getDb()
|
|
|
|
# --- Workflow counts (same filter as /workflows endpoint) ---
|
|
workflowCount = 0
|
|
activeWorkflows = 0
|
|
if db._ensureTableExists(AutoWorkflow):
|
|
wfBaseFilter = _scopedWorkflowFilter(context)
|
|
wfFilter = dict(wfBaseFilter) if wfBaseFilter else {}
|
|
wfFilter["isTemplate"] = False
|
|
|
|
wfCount = db.getRecordsetPaginated(
|
|
AutoWorkflow, pagination=PaginationParams(page=1, pageSize=1),
|
|
recordFilter=wfFilter if wfFilter else None,
|
|
)
|
|
workflowCount = wfCount.get("totalItems", 0) if isinstance(wfCount, dict) else wfCount.totalItems
|
|
|
|
activeFilter = dict(wfFilter)
|
|
activeFilter["active"] = True
|
|
activeCount = db.getRecordsetPaginated(
|
|
AutoWorkflow, pagination=PaginationParams(page=1, pageSize=1),
|
|
recordFilter=activeFilter,
|
|
)
|
|
activeWorkflows = activeCount.get("totalItems", 0) if isinstance(activeCount, dict) else activeCount.totalItems
|
|
|
|
# --- Run counts (same filter as /runs endpoint) ---
|
|
if not db._ensureTableExists(AutoRun):
|
|
return {
|
|
"totalRuns": 0, "runsByStatus": {}, "totalTokens": 0,
|
|
"totalCredits": 0, "workflowCount": workflowCount,
|
|
"activeWorkflows": activeWorkflows,
|
|
}
|
|
|
|
runBaseFilter = _scopedRunFilter(context)
|
|
|
|
countResult = db.getRecordsetPaginated(
|
|
AutoRun, pagination=PaginationParams(page=1, pageSize=1),
|
|
recordFilter=runBaseFilter,
|
|
)
|
|
totalRuns = countResult.get("totalItems", 0) if isinstance(countResult, dict) else countResult.totalItems
|
|
|
|
runsByStatus: dict = {}
|
|
try:
|
|
statusValues = db.getDistinctColumnValues(AutoRun, "status", recordFilter=runBaseFilter)
|
|
for sv in (statusValues or []):
|
|
statusFilter = dict(runBaseFilter) if runBaseFilter else {}
|
|
statusFilter["status"] = sv
|
|
sr = db.getRecordsetPaginated(
|
|
AutoRun, pagination=PaginationParams(page=1, pageSize=1),
|
|
recordFilter=statusFilter,
|
|
)
|
|
runsByStatus[sv] = sr.get("totalItems", 0) if isinstance(sr, dict) else sr.totalItems
|
|
except Exception as e:
|
|
logger.warning(f"Failed to compute runsByStatus: {e}")
|
|
|
|
totalTokens = 0
|
|
totalCredits = 0.0
|
|
if 0 < totalRuns <= 10000:
|
|
allRuns = db.getRecordset(AutoRun, recordFilter=runBaseFilter, fieldFilter=["costTokens", "costCredits"]) or []
|
|
for r in allRuns:
|
|
totalTokens += r.get("costTokens", 0) or 0
|
|
totalCredits += r.get("costCredits", 0.0) or 0.0
|
|
|
|
return {
|
|
"totalRuns": totalRuns,
|
|
"runsByStatus": runsByStatus,
|
|
"totalTokens": totalTokens,
|
|
"totalCredits": round(totalCredits, 4),
|
|
"workflowCount": workflowCount,
|
|
"activeWorkflows": activeWorkflows,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System-level Workflow listing (all workflows the user can see via RBAC)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/workflows")
|
|
@limiter.limit("60/minute")
|
|
def get_system_workflows(
|
|
request: Request,
|
|
active: Optional[bool] = Query(None, description="Filter by active status"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> dict:
|
|
"""List all workflows the user has access to (RBAC-scoped, cross-instance)."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoWorkflow):
|
|
return {"items": [], "pagination": {"currentPage": 1, "pageSize": 25, "totalItems": 0, "totalPages": 0}}
|
|
|
|
baseFilter = _scopedWorkflowFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
recordFilter["isTemplate"] = False
|
|
|
|
if active is not None:
|
|
recordFilter["active"] = active
|
|
if mandateId:
|
|
recordFilter["mandateId"] = mandateId
|
|
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except Exception:
|
|
pass
|
|
|
|
if not paginationParams:
|
|
paginationParams = PaginationParams(
|
|
page=1,
|
|
pageSize=25,
|
|
sort=[{"field": "sysCreatedAt", "direction": "desc"}],
|
|
)
|
|
|
|
result = db.getRecordsetPaginated(
|
|
AutoWorkflow,
|
|
pagination=paginationParams,
|
|
recordFilter=recordFilter if recordFilter else None,
|
|
)
|
|
pageItems = result.get("items", []) if isinstance(result, dict) else result.items
|
|
totalItems = result.get("totalItems", 0) if isinstance(result, dict) else result.totalItems
|
|
totalPages = result.get("totalPages", 0) if isinstance(result, dict) else result.totalPages
|
|
|
|
mandateIds = list({w.get("mandateId") for w in pageItems if w.get("mandateId")})
|
|
instanceIds = list({w.get("featureInstanceId") for w in pageItems if w.get("featureInstanceId")})
|
|
|
|
mandateLabelMap: dict = {}
|
|
instanceLabelMap: dict = {}
|
|
try:
|
|
rootIface = getRootInterface()
|
|
if mandateIds:
|
|
mandateMap = rootIface.getMandatesByIds(mandateIds)
|
|
for mid, m in mandateMap.items():
|
|
mandateLabelMap[mid] = getattr(m, "label", None) or getattr(m, "name", mid) or mid
|
|
if instanceIds:
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
featureIface = getFeatureInterface(rootIface.db)
|
|
for iid in instanceIds:
|
|
fi = featureIface.getFeatureInstance(iid)
|
|
if fi:
|
|
instanceLabelMap[iid] = fi.label or iid
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich workflow labels: {e}")
|
|
|
|
userId = str(context.user.id) if context.user else None
|
|
adminMandateIds = []
|
|
if userId and not context.hasSysAdminRole:
|
|
userMandateIds = _getUserMandateIds(userId)
|
|
adminMandateIds = _getAdminMandateIds(userId, userMandateIds)
|
|
|
|
workflowIds = [w.get("id") for w in pageItems if w.get("id")]
|
|
activeRunMap: dict = {}
|
|
runCountMap: dict = {}
|
|
lastStartedMap: dict = {}
|
|
if workflowIds:
|
|
try:
|
|
if db._ensureTableExists(AutoRun):
|
|
for wfId in workflowIds:
|
|
runs = db.getRecordset(AutoRun, recordFilter={"workflowId": wfId})
|
|
runCountMap[wfId] = len(runs)
|
|
for r in runs:
|
|
rDict = dict(r)
|
|
ts = rDict.get("sysCreatedAt")
|
|
if ts and (lastStartedMap.get(wfId) is None or ts > lastStartedMap.get(wfId)):
|
|
lastStartedMap[wfId] = ts
|
|
if rDict.get("status") in ("running", "paused"):
|
|
activeRunMap[wfId] = rDict.get("id")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enrich workflow run info: {e}")
|
|
|
|
items = []
|
|
for w in pageItems:
|
|
row = dict(w)
|
|
wMandateId = row.get("mandateId")
|
|
wfId = row.get("id")
|
|
row["mandateLabel"] = mandateLabelMap.get(wMandateId, wMandateId or "—")
|
|
row["instanceLabel"] = instanceLabelMap.get(row.get("featureInstanceId"), row.get("featureInstanceId") or "—")
|
|
row["isRunning"] = wfId in activeRunMap
|
|
row["activeRunId"] = activeRunMap.get(wfId)
|
|
row["runCount"] = runCountMap.get(wfId, 0)
|
|
row["lastStartedAt"] = lastStartedMap.get(wfId)
|
|
|
|
if context.hasSysAdminRole:
|
|
row["canEdit"] = True
|
|
row["canDelete"] = True
|
|
row["canExecute"] = True
|
|
elif wMandateId and wMandateId in adminMandateIds:
|
|
row["canEdit"] = True
|
|
row["canDelete"] = True
|
|
row["canExecute"] = True
|
|
else:
|
|
row["canEdit"] = False
|
|
row["canDelete"] = False
|
|
row["canExecute"] = False
|
|
|
|
row.pop("graph", None)
|
|
|
|
items.append(row)
|
|
|
|
return {
|
|
"items": items,
|
|
"pagination": {
|
|
"currentPage": paginationParams.page,
|
|
"pageSize": paginationParams.pageSize,
|
|
"totalItems": totalItems,
|
|
"totalPages": totalPages,
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Filter-values endpoints (for FormGeneratorTable column filters)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _enrichedFilterValues(
|
|
db, context: RequestContext, modelClass, scopeFilter, column: str,
|
|
) -> List[str]:
|
|
"""Return distinct filter values for enriched columns (mandateLabel, instanceLabel)
|
|
or delegate to DB-level DISTINCT for raw columns."""
|
|
if column in ("mandateLabel", "mandateId"):
|
|
baseFilter = scopeFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
if modelClass == AutoWorkflow:
|
|
recordFilter["isTemplate"] = False
|
|
items = db.getRecordset(modelClass, recordFilter=recordFilter or None, fieldFilter=["mandateId"]) or []
|
|
mandateIds = list({r.get("mandateId") for r in items if r.get("mandateId")})
|
|
if not mandateIds:
|
|
return []
|
|
try:
|
|
rootIface = getRootInterface()
|
|
mMap = rootIface.getMandatesByIds(mandateIds)
|
|
labels = sorted({
|
|
getattr(m, "label", None) or getattr(m, "name", mid) or mid
|
|
for mid, m in mMap.items()
|
|
}, key=lambda v: v.lower())
|
|
return labels
|
|
except Exception:
|
|
return sorted(mandateIds)
|
|
|
|
if column in ("instanceLabel", "featureInstanceId"):
|
|
baseFilter = scopeFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
if modelClass == AutoWorkflow:
|
|
recordFilter["isTemplate"] = False
|
|
items = db.getRecordset(modelClass, recordFilter=recordFilter or None, fieldFilter=["featureInstanceId"]) or []
|
|
instanceIds = list({r.get("featureInstanceId") for r in items if r.get("featureInstanceId")})
|
|
else:
|
|
items = db.getRecordset(modelClass, recordFilter=recordFilter or None, fieldFilter=["workflowId"]) or []
|
|
wfIds = list({r.get("workflowId") for r in items if r.get("workflowId")})
|
|
instanceIds = []
|
|
if wfIds and db._ensureTableExists(AutoWorkflow):
|
|
wfs = db.getRecordset(AutoWorkflow, recordFilter={"id": wfIds}, fieldFilter=["featureInstanceId"]) or []
|
|
instanceIds = list({w.get("featureInstanceId") for w in wfs if w.get("featureInstanceId")})
|
|
if not instanceIds:
|
|
return []
|
|
try:
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
rootIface = getRootInterface()
|
|
featureIface = getFeatureInterface(rootIface.db)
|
|
labels = []
|
|
for iid in instanceIds:
|
|
fi = featureIface.getFeatureInstance(iid)
|
|
if fi:
|
|
labels.append(fi.label or iid)
|
|
return sorted(set(labels), key=lambda v: v.lower())
|
|
except Exception:
|
|
return sorted(instanceIds)
|
|
|
|
if column == "workflowLabel":
|
|
baseFilter = scopeFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
items = db.getRecordset(modelClass, recordFilter=recordFilter or None, fieldFilter=["workflowId", "label"]) or []
|
|
labels = set()
|
|
wfIds = set()
|
|
for r in items:
|
|
if r.get("label"):
|
|
labels.add(r["label"])
|
|
if r.get("workflowId"):
|
|
wfIds.add(r["workflowId"])
|
|
if wfIds and db._ensureTableExists(AutoWorkflow):
|
|
wfs = db.getRecordset(AutoWorkflow, recordFilter={"id": list(wfIds)}, fieldFilter=["label"]) or []
|
|
for wf in wfs:
|
|
if wf.get("label"):
|
|
labels.add(wf["label"])
|
|
return sorted(labels, key=lambda v: v.lower())
|
|
|
|
baseFilter = scopeFilter(context)
|
|
recordFilter = dict(baseFilter) if baseFilter else {}
|
|
if modelClass == AutoWorkflow:
|
|
recordFilter["isTemplate"] = False
|
|
return db.getDistinctColumnValues(modelClass, column, recordFilter=recordFilter or None) or []
|
|
|
|
|
|
@router.get("/filter-values")
|
|
@limiter.limit("60/minute")
|
|
def get_run_filter_values(
|
|
request: Request,
|
|
column: str = Query(..., description="Column key"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> list:
|
|
"""Return distinct filter values for a column in workflow runs."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoRun):
|
|
return []
|
|
return _enrichedFilterValues(db, context, AutoRun, _scopedRunFilter, column)
|
|
|
|
|
|
@router.get("/workflows/filter-values")
|
|
@limiter.limit("60/minute")
|
|
def get_workflow_filter_values(
|
|
request: Request,
|
|
column: str = Query(..., description="Column key"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> list:
|
|
"""Return distinct filter values for a column in workflows."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoWorkflow):
|
|
return []
|
|
return _enrichedFilterValues(db, context, AutoWorkflow, _scopedWorkflowFilter, column)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Run-specific endpoints (path-param routes MUST come after static routes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/{runId}/steps")
|
|
@limiter.limit("60/minute")
|
|
def get_run_steps(
|
|
request: Request,
|
|
runId: str = Path(..., description="Run ID"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
) -> dict:
|
|
"""Get step logs for a specific run (with access check)."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoRun):
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
|
|
runs = db.getRecordset(AutoRun, recordFilter={"id": runId})
|
|
if not runs:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
run = dict(runs[0])
|
|
|
|
if not context.hasSysAdminRole:
|
|
userId = str(context.user.id) if context.user else None
|
|
runOwner = run.get("ownerId")
|
|
runMandate = run.get("mandateId")
|
|
|
|
if runOwner == userId:
|
|
pass
|
|
elif runMandate and userId and _isUserMandateAdmin(userId, runMandate):
|
|
pass
|
|
else:
|
|
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied"))
|
|
|
|
if not db._ensureTableExists(AutoStepLog):
|
|
return {"steps": []}
|
|
|
|
records = db.getRecordset(AutoStepLog, recordFilter={"runId": runId})
|
|
steps = [dict(r) for r in records] if records else []
|
|
steps.sort(key=lambda s: s.get("startedAt") or 0)
|
|
return {"steps": steps}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSE stream for live run tracing (system-level, no instanceId required)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/{runId}/stream")
|
|
async def get_run_stream(
|
|
request: Request,
|
|
runId: str = Path(..., description="Run ID"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""SSE stream for live step-log updates during a workflow run (system-level)."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoRun):
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
|
|
runs = db.getRecordset(AutoRun, recordFilter={"id": runId})
|
|
if not runs:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
run = dict(runs[0])
|
|
|
|
if not context.hasSysAdminRole:
|
|
userId = str(context.user.id) if context.user else None
|
|
runOwner = run.get("ownerId")
|
|
runMandate = run.get("mandateId")
|
|
if runOwner == userId:
|
|
pass
|
|
elif runMandate and userId and _isUserMandateAdmin(userId, runMandate):
|
|
pass
|
|
else:
|
|
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied"))
|
|
|
|
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager
|
|
sseEventManager = get_event_manager()
|
|
queueId = f"run-trace-{runId}"
|
|
sseEventManager.create_queue(queueId)
|
|
|
|
async def _sseGenerator():
|
|
queue = sseEventManager.get_queue(queueId)
|
|
if not queue:
|
|
return
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(queue.get(), timeout=30)
|
|
except asyncio.TimeoutError:
|
|
yield "data: {\"type\": \"keepalive\"}\n\n"
|
|
continue
|
|
if event is None:
|
|
break
|
|
payload = event.get("data", event) if isinstance(event, dict) else event
|
|
yield f"data: {json.dumps(payload, default=str)}\n\n"
|
|
eventType = payload.get("type", "") if isinstance(payload, dict) else ""
|
|
if eventType in ("run_complete", "run_failed"):
|
|
break
|
|
await sseEventManager.cleanup(queueId, delay=10)
|
|
|
|
return StreamingResponse(
|
|
_sseGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
},
|
|
)
|
|
|
|
|
|
@router.post("/{runId}/stop")
|
|
@limiter.limit("30/minute")
|
|
def stop_workflow_run(
|
|
request: Request,
|
|
runId: str = Path(..., description="Run ID"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Stop a running workflow execution (system-level)."""
|
|
db = _getDb()
|
|
if not db._ensureTableExists(AutoRun):
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
|
|
runs = db.getRecordset(AutoRun, recordFilter={"id": runId})
|
|
if not runs:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Run not found"))
|
|
run = dict(runs[0])
|
|
|
|
if not context.hasSysAdminRole:
|
|
userId = str(context.user.id) if context.user else None
|
|
runOwner = run.get("ownerId")
|
|
runMandate = run.get("mandateId")
|
|
if runOwner == userId:
|
|
pass
|
|
elif runMandate and userId and _isUserMandateAdmin(userId, runMandate):
|
|
pass
|
|
else:
|
|
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied"))
|
|
|
|
from modules.workflows.automation2.executionEngine import requestRunStop
|
|
flagged = requestRunStop(runId)
|
|
|
|
if not flagged:
|
|
currentStatus = run.get("status", "")
|
|
if currentStatus in ("completed", "failed", "stopped"):
|
|
return {"status": currentStatus, "runId": runId, "message": "Run already finished"}
|
|
db.recordModify(AutoRun, runId, {"status": "stopped"})
|
|
return {"status": "stopped", "runId": runId, "message": "Run not active in memory, marked as stopped"}
|
|
|
|
return {"status": "stopping", "runId": runId, "message": "Stop signal sent"}
|