625 lines
24 KiB
Python
625 lines
24 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""
|
|
Shared helpers for WorkflowAutomation route files.
|
|
|
|
Extracted from routeWorkflowDashboard.py and routeWorkflowAutomation.py to
|
|
avoid code duplication across route files. Contains DB access, RBAC scoping,
|
|
pagination helpers, and FK label resolver setup.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
from fastapi import HTTPException
|
|
|
|
from modules.auth.authentication import RequestContext
|
|
from modules.connectors.connectorDbPostgre import DatabaseConnector
|
|
from modules.datamodels.datamodelPagination import PaginationParams, normalize_pagination_dict
|
|
from modules.datamodels.datamodelWorkflowAutomation import (
|
|
AutoRun, AutoStepLog, AutoWorkflow, AutoTask, AutoVersion,
|
|
WORKFLOW_AUTOMATION_DATABASE,
|
|
)
|
|
from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIface
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB access
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _getWorkflowAutomationDb() -> DatabaseConnector:
|
|
"""Get a DatabaseConnector for the WorkflowAutomation (graphicaleditor) DB."""
|
|
return DatabaseConnector(
|
|
dbHost=APP_CONFIG.get("DB_HOST", "localhost"),
|
|
dbDatabase=WORKFLOW_AUTOMATION_DATABASE,
|
|
dbUser=APP_CONFIG.get("DB_USER"),
|
|
dbPassword=APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD"),
|
|
dbPort=int(APP_CONFIG.get("DB_PORT", 5432)),
|
|
userId=None,
|
|
)
|
|
|
|
|
|
def _getAppDb() -> DatabaseConnector:
|
|
"""Get the root interface DB (poweron_app) for FK label resolution."""
|
|
return _getRootIface().db
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RBAC helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _getUserMandateIds(userId: str) -> List[str]:
|
|
"""Get mandate IDs the user is a member of."""
|
|
rootIface = _getRootIface()
|
|
memberships = rootIface.getUserMandates(userId)
|
|
return [um.mandateId for um in memberships if um.mandateId and um.enabled]
|
|
|
|
|
|
def _getAdminMandateIds(userId: str, mandateIds: List[str]) -> List[str]:
|
|
"""Batch-check which mandates the user is admin for."""
|
|
if not mandateIds:
|
|
return []
|
|
rootIface = _getRootIface()
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
|
|
memberships = rootIface.db.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"userId": userId, "mandateId": mandateIds, "enabled": True},
|
|
)
|
|
if not memberships:
|
|
return []
|
|
|
|
umIdToMandateId: Dict[str, str] = {}
|
|
for m in memberships:
|
|
row = m if isinstance(m, dict) else m.__dict__
|
|
um_id = row.get("id")
|
|
mid = row.get("mandateId")
|
|
if um_id and mid:
|
|
umIdToMandateId[str(um_id)] = str(mid)
|
|
|
|
userMandateIds = list(umIdToMandateId.keys())
|
|
allRoles = rootIface.db.getRecordset(
|
|
UserMandateRole,
|
|
recordFilter={"userMandateId": userMandateIds},
|
|
)
|
|
if not allRoles:
|
|
return []
|
|
|
|
roleIds: set = set()
|
|
roleToMandate: Dict[str, set] = {}
|
|
for r in allRoles:
|
|
row = r if isinstance(r, dict) else r.__dict__
|
|
rid = row.get("roleId")
|
|
um_id = row.get("userMandateId")
|
|
mid = umIdToMandateId.get(str(um_id)) if um_id else None
|
|
if rid and mid:
|
|
roleIds.add(rid)
|
|
roleToMandate.setdefault(rid, set()).add(mid)
|
|
|
|
if not roleIds:
|
|
return []
|
|
|
|
from modules.datamodels.datamodelRbac import Role
|
|
roleRecords = rootIface.db.getRecordset(Role, recordFilter={"id": list(roleIds)})
|
|
adminMandates: set = set()
|
|
for role in (roleRecords or []):
|
|
row = role if isinstance(role, dict) else role.__dict__
|
|
rid = row.get("id")
|
|
if not rid or rid not in roleToMandate:
|
|
continue
|
|
if row.get("roleLabel") == "admin" and not row.get("featureInstanceId"):
|
|
adminMandates.update(roleToMandate[rid])
|
|
|
|
return [mid for mid in mandateIds if mid in adminMandates]
|
|
|
|
|
|
def _isUserMandateAdmin(userId: str, mandateId: str) -> bool:
|
|
"""Check if user is admin for a specific mandate."""
|
|
return mandateId in _getAdminMandateIds(userId, [mandateId])
|
|
|
|
|
|
def _scopedWorkflowFilter(context: RequestContext) -> Optional[Dict[str, Any]]:
|
|
"""Build DB filter for listing workflows: mandate-scoped for members, None for sysadmin."""
|
|
if context.isPlatformAdmin:
|
|
return None
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId:
|
|
return {"mandateId": "__impossible__"}
|
|
mandateIds = _getUserMandateIds(userId)
|
|
if mandateIds:
|
|
return {"mandateId": mandateIds}
|
|
return {"mandateId": "__impossible__"}
|
|
|
|
|
|
def _scopedRunFilter(context: RequestContext) -> Optional[Dict[str, Any]]:
|
|
"""Build DB filter for listing runs: admin sees mandate runs, user sees own."""
|
|
if context.isPlatformAdmin:
|
|
return None
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId:
|
|
return {"ownerId": "__impossible__"}
|
|
mandateIds = _getUserMandateIds(userId)
|
|
adminMandateIds = _getAdminMandateIds(userId, mandateIds)
|
|
if adminMandateIds:
|
|
return {"mandateId": adminMandateIds}
|
|
return {"ownerId": userId}
|
|
|
|
|
|
def _userMayDeleteWorkflow(context: RequestContext, wfMandateId: Optional[str]) -> bool:
|
|
"""Check if user may delete a workflow in the given mandate."""
|
|
if context.isPlatformAdmin:
|
|
return True
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId or not wfMandateId:
|
|
return False
|
|
userMandateIds = _getUserMandateIds(userId)
|
|
adminMandateIds = _getAdminMandateIds(userId, userMandateIds)
|
|
return wfMandateId in adminMandateIds
|
|
|
|
|
|
def _validateWorkflowAccess(
|
|
context: RequestContext,
|
|
workflow: Optional[Dict[str, Any]],
|
|
action: str = "read",
|
|
) -> None:
|
|
"""Validate access to a workflow. Raises HTTPException(403) on denial.
|
|
|
|
Actions:
|
|
- 'read': mandate membership
|
|
- 'write'/'delete': mandate admin
|
|
- 'execute': mandate membership + FeatureAccess on targetInstanceId
|
|
"""
|
|
if context.isPlatformAdmin:
|
|
return
|
|
|
|
userId = str(context.user.id) if context.user else None
|
|
if not userId:
|
|
raise HTTPException(status_code=403, detail="Authentication required")
|
|
|
|
if workflow is None:
|
|
raise HTTPException(status_code=404, detail="Workflow not found")
|
|
|
|
wfMandateId = workflow.get("mandateId") or ""
|
|
if not wfMandateId:
|
|
if action == "read":
|
|
return
|
|
raise HTTPException(status_code=403, detail="Workflow has no mandate — admin only")
|
|
|
|
userMandateIds = _getUserMandateIds(userId)
|
|
if wfMandateId not in userMandateIds:
|
|
raise HTTPException(status_code=403, detail="Not a member of the workflow's mandate")
|
|
|
|
if action == "read":
|
|
return
|
|
|
|
if action == "execute":
|
|
targetInstanceId = workflow.get("targetFeatureInstanceId")
|
|
if targetInstanceId:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
access = getRootInterface().getFeatureAccess(userId, targetInstanceId)
|
|
if access and access.get("enabled"):
|
|
return
|
|
|
|
adminMandateIds = _getAdminMandateIds(userId, [wfMandateId])
|
|
if wfMandateId not in adminMandateIds:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail=f"Mandate admin required for '{action}' on workflows",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pagination
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parsePaginationOr400(pagination: Optional[str]) -> Optional[PaginationParams]:
|
|
"""Parse a JSON pagination query string. Raises 400 on parse errors."""
|
|
if not pagination:
|
|
return None
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid 'pagination' query: not valid JSON ({e.msg})",
|
|
)
|
|
if not paginationDict:
|
|
return None
|
|
try:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
return PaginationParams(**paginationDict)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid 'pagination' payload: {e}",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FK label resolver setup (cross-DB: poweron_app vs poweron_graphicaleditor)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _resolveFkLabels(rows: list, model, labelResolvers: Optional[dict] = None) -> list:
|
|
"""Resolve FK labels for a list of rows using the app DB for user/mandate/instance lookups."""
|
|
if not rows:
|
|
return rows
|
|
from modules.dbHelpers.fkLabelResolver import enrichRowsWithFkLabels
|
|
appDb = _getAppDb()
|
|
enrichRowsWithFkLabels(rows, model, db=appDb, labelResolvers=labelResolvers)
|
|
return rows
|
|
|
|
|
|
def _buildStandardLabelResolvers() -> dict:
|
|
"""Standard FK label resolvers for mandateId, featureInstanceId, ownerId."""
|
|
from modules.dbHelpers.fkLabelResolver import (
|
|
resolveMandateLabels,
|
|
resolveInstanceLabels,
|
|
resolveUserLabels,
|
|
)
|
|
appDb = _getAppDb()
|
|
return {
|
|
"mandateId": lambda ids: resolveMandateLabels(ids, db=appDb),
|
|
"featureInstanceId": lambda ids: resolveInstanceLabels(ids, db=appDb),
|
|
"ownerId": lambda ids: resolveUserLabels(ids, db=appDb),
|
|
"sysCreatedBy": lambda ids: resolveUserLabels(ids, db=appDb),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cascade delete
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _cascadeDeleteWorkflow(db: DatabaseConnector, workflowId: str) -> None:
|
|
"""Delete AutoWorkflow and all dependent rows (versions, runs, step logs, tasks)."""
|
|
for v in db.getRecordset(AutoVersion, recordFilter={"workflowId": workflowId}) or []:
|
|
vid = v.get("id")
|
|
if vid:
|
|
db.recordDelete(AutoVersion, vid)
|
|
for run in db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or []:
|
|
runId = run.get("id")
|
|
if not runId:
|
|
continue
|
|
for sl in db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) or []:
|
|
slid = sl.get("id")
|
|
if slid:
|
|
db.recordDelete(AutoStepLog, slid)
|
|
db.recordDelete(AutoRun, runId)
|
|
for task in db.getRecordset(AutoTask, recordFilter={"workflowId": workflowId}) or []:
|
|
tid = task.get("id")
|
|
if tid:
|
|
db.recordDelete(AutoTask, tid)
|
|
db.recordDelete(AutoWorkflow, workflowId)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL join helpers for workflow listing with run stats
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_RUN_STATS_SUBQUERY = """
|
|
(
|
|
SELECT s."workflowId" AS "workflowId",
|
|
MAX(COALESCE(s."startedAt", s."sysCreatedAt")) AS "lastStartedAt",
|
|
COUNT(s."id")::bigint AS "runCount",
|
|
MAX(CASE WHEN s."status" IN ('running', 'paused') THEN s."id" END) AS "activeRunId"
|
|
FROM "AutoRun" s
|
|
GROUP BY s."workflowId"
|
|
) rs
|
|
"""
|
|
|
|
|
|
def _firstFkSortFieldForWorkflows(pagination) -> Optional[str]:
|
|
"""First sort field that requires FK label resolution (cross-DB), or None."""
|
|
from modules.dbHelpers.fkLabelResolver import buildLabelResolversFromModel
|
|
if not pagination or not pagination.sort:
|
|
return None
|
|
resolvers = buildLabelResolversFromModel(AutoWorkflow)
|
|
if not resolvers:
|
|
return None
|
|
for sf in pagination.sort:
|
|
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
|
|
if sfField and sfField in resolvers:
|
|
return sfField
|
|
return None
|
|
|
|
|
|
def _batchRunStatsForWorkflowIds(db: DatabaseConnector, workflowIds: List[str]) -> dict:
|
|
"""One grouped query: lastStartedAt, runCount, activeRunId per workflow."""
|
|
if not workflowIds or not db._ensureTableExists(AutoRun):
|
|
return {}
|
|
db._ensure_connection()
|
|
sql = """
|
|
SELECT "workflowId",
|
|
MAX(COALESCE("startedAt", "sysCreatedAt")) AS "lastStartedAt",
|
|
COUNT("id")::bigint AS "runCount",
|
|
MAX(CASE WHEN "status" IN ('running', 'paused') THEN "id" END) AS "activeRunId"
|
|
FROM "AutoRun"
|
|
WHERE "workflowId" = ANY(%s)
|
|
GROUP BY "workflowId"
|
|
"""
|
|
out: dict = {}
|
|
with db.borrowCursor() as cursor:
|
|
cursor.execute(sql, (workflowIds,))
|
|
for row in cursor.fetchall():
|
|
r = dict(row)
|
|
wid = r.get("workflowId")
|
|
if wid:
|
|
out[str(wid)] = r
|
|
return out
|
|
|
|
|
|
def _listingColSql(key: str, wfFieldNames: set) -> Optional[str]:
|
|
if key == "lastStartedAt":
|
|
return 'rs."lastStartedAt"'
|
|
if key == "runCount":
|
|
return 'COALESCE(rs."runCount", 0::bigint)'
|
|
if key == "isRunning":
|
|
return '(rs."activeRunId" IS NOT NULL)'
|
|
if key in wfFieldNames:
|
|
return f'w."{key}"'
|
|
return None
|
|
|
|
|
|
def _listingOrderExpr(key: str, wfFieldNames: set, wfFields: dict) -> Optional[str]:
|
|
if key == "lastStartedAt":
|
|
return 'rs."lastStartedAt"'
|
|
if key == "runCount":
|
|
return 'COALESCE(rs."runCount", 0::bigint)'
|
|
if key == "isRunning":
|
|
return 'CASE WHEN rs."activeRunId" IS NOT NULL THEN 1 ELSE 0 END'
|
|
if key in wfFieldNames:
|
|
colType = wfFields.get(key, "TEXT")
|
|
if colType == "BOOLEAN":
|
|
return f'COALESCE(w."{key}", FALSE)'
|
|
return f'w."{key}"'
|
|
return None
|
|
|
|
|
|
def _appendJoinedListingFilters(whereParts: list, values: list, pagination, wfFields: dict) -> None:
|
|
"""Append WHERE fragments for joined workflow listing (w + rs)."""
|
|
wfFieldNames = set(wfFields.keys())
|
|
validCols = wfFieldNames | {"lastStartedAt", "runCount", "isRunning"}
|
|
|
|
if not pagination or not pagination.filters:
|
|
return
|
|
|
|
for key, val in pagination.filters.items():
|
|
if key == "search" and isinstance(val, str) and val.strip():
|
|
term = f"%{val.strip()}%"
|
|
textCols = [c for c, t in wfFields.items() if t == "TEXT"]
|
|
if textCols:
|
|
orParts = [f'COALESCE(w."{c}"::TEXT, \'\') ILIKE %s' for c in textCols]
|
|
whereParts.append(f"({' OR '.join(orParts)})")
|
|
values.extend([term] * len(textCols))
|
|
continue
|
|
|
|
if key not in validCols:
|
|
continue
|
|
|
|
if key == "isRunning":
|
|
if isinstance(val, dict):
|
|
op = val.get("operator", "equals")
|
|
v = val.get("value", "")
|
|
isTrue = str(v).lower() == "true"
|
|
if op in ("equals", "eq"):
|
|
whereParts.append('(rs."activeRunId" IS NOT NULL)' if isTrue else '(rs."activeRunId" IS NULL)')
|
|
elif val is None:
|
|
whereParts.append('(rs."activeRunId" IS NULL)')
|
|
else:
|
|
whereParts.append(
|
|
'(rs."activeRunId" IS NOT NULL)' if str(val).lower() == "true" else '(rs."activeRunId" IS NULL)'
|
|
)
|
|
continue
|
|
|
|
colRef = _listingColSql(key, wfFieldNames)
|
|
if not colRef:
|
|
continue
|
|
|
|
colType = wfFields.get(key, "TEXT") if key in wfFieldNames else (
|
|
"DOUBLE PRECISION" if key == "lastStartedAt" else "BIGINT" if key == "runCount" else "TEXT"
|
|
)
|
|
|
|
if val is None:
|
|
if key == "lastStartedAt":
|
|
whereParts.append(f'({colRef} IS NULL)')
|
|
elif key == "runCount":
|
|
whereParts.append(f'({colRef} = 0)')
|
|
else:
|
|
whereParts.append(f'({colRef} IS NULL OR {colRef}::TEXT = \'\')')
|
|
continue
|
|
|
|
if not isinstance(val, dict):
|
|
if colType == "BOOLEAN" or key == "isRunning":
|
|
whereParts.append(f'COALESCE({colRef}, FALSE) = %s')
|
|
values.append(str(val).lower() == "true")
|
|
else:
|
|
whereParts.append(f'{colRef}::TEXT ILIKE %s')
|
|
values.append(str(val))
|
|
continue
|
|
|
|
op = val.get("operator", "equals")
|
|
v = val.get("value", "")
|
|
if op in ("equals", "eq"):
|
|
if colType == "BOOLEAN":
|
|
whereParts.append(f'COALESCE({colRef}, FALSE) = %s')
|
|
values.append(str(v).lower() == "true")
|
|
else:
|
|
whereParts.append(f'{colRef}::TEXT = %s')
|
|
values.append(str(v))
|
|
elif op == "contains":
|
|
whereParts.append(f'{colRef}::TEXT ILIKE %s')
|
|
values.append(f"%{v}%")
|
|
elif op == "startsWith":
|
|
whereParts.append(f'{colRef}::TEXT ILIKE %s')
|
|
values.append(f"{v}%")
|
|
elif op == "endsWith":
|
|
whereParts.append(f'{colRef}::TEXT ILIKE %s')
|
|
values.append(f"%{v}")
|
|
elif op in ("gt", "gte", "lt", "lte"):
|
|
sqlOp = {"gt": ">", "gte": ">=", "lt": "<", "lte": "<="}[op]
|
|
if colType in ("INTEGER", "DOUBLE PRECISION", "BIGINT") or key in ("lastStartedAt", "runCount"):
|
|
try:
|
|
whereParts.append(f'{colRef}::double precision {sqlOp} %s')
|
|
values.append(float(v))
|
|
except (ValueError, TypeError):
|
|
continue
|
|
else:
|
|
whereParts.append(f'{colRef}::TEXT {sqlOp} %s')
|
|
values.append(str(v))
|
|
elif op == "between":
|
|
fromVal = v.get("from", "") if isinstance(v, dict) else ""
|
|
toVal = v.get("to", "") if isinstance(v, dict) else ""
|
|
if not fromVal and not toVal:
|
|
continue
|
|
isNumericCol = colType in ("INTEGER", "DOUBLE PRECISION", "BIGINT") or key in ("lastStartedAt", "runCount")
|
|
isDateVal = bool(fromVal and re.match(r"^\d{4}-\d{2}-\d{2}$", str(fromVal))) or bool(
|
|
toVal and re.match(r"^\d{4}-\d{2}-\d{2}$", str(toVal))
|
|
)
|
|
if isNumericCol and isDateVal:
|
|
if fromVal and toVal:
|
|
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
|
|
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
|
|
hour=23, minute=59, second=59, tzinfo=timezone.utc
|
|
).timestamp()
|
|
whereParts.append(f"({colRef} >= %s AND {colRef} <= %s)")
|
|
values.extend([fromTs, toTs])
|
|
elif fromVal:
|
|
fromTs = datetime.strptime(str(fromVal), "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
|
|
whereParts.append(f"({colRef} >= %s)")
|
|
values.append(fromTs)
|
|
else:
|
|
toTs = datetime.strptime(str(toVal), "%Y-%m-%d").replace(
|
|
hour=23, minute=59, second=59, tzinfo=timezone.utc
|
|
).timestamp()
|
|
whereParts.append(f"({colRef} <= %s)")
|
|
values.append(toTs)
|
|
elif isNumericCol:
|
|
try:
|
|
if fromVal and toVal:
|
|
whereParts.append(
|
|
f"({colRef}::double precision >= %s AND {colRef}::double precision <= %s)"
|
|
)
|
|
values.extend([float(fromVal), float(toVal)])
|
|
elif fromVal:
|
|
whereParts.append(f"{colRef}::double precision >= %s")
|
|
values.append(float(fromVal))
|
|
elif toVal:
|
|
whereParts.append(f"{colRef}::double precision <= %s")
|
|
values.append(float(toVal))
|
|
except (ValueError, TypeError):
|
|
continue
|
|
else:
|
|
if fromVal and toVal:
|
|
whereParts.append(f"({colRef}::TEXT >= %s AND {colRef}::TEXT <= %s)")
|
|
values.extend([str(fromVal), str(toVal)])
|
|
elif fromVal:
|
|
whereParts.append(f"{colRef}::TEXT >= %s")
|
|
values.append(str(fromVal))
|
|
elif toVal:
|
|
whereParts.append(f"{colRef}::TEXT <= %s")
|
|
values.append(str(toVal))
|
|
|
|
|
|
def _buildJoinedWorkflowWhereOrderLimit(
|
|
recordFilter: dict,
|
|
pagination,
|
|
wfFields: dict,
|
|
) -> tuple:
|
|
"""WHERE / ORDER BY / LIMIT for joined AutoWorkflow + run stats listing."""
|
|
wfFieldNames = set(wfFields.keys())
|
|
whereParts: list = []
|
|
values: list = []
|
|
|
|
for field, value in (recordFilter or {}).items():
|
|
if value is None:
|
|
whereParts.append(f'w."{field}" IS NULL')
|
|
elif isinstance(value, list):
|
|
whereParts.append(f'w."{field}" = ANY(%s)')
|
|
values.append(value)
|
|
else:
|
|
whereParts.append(f'w."{field}" = %s')
|
|
values.append(value)
|
|
|
|
_appendJoinedListingFilters(whereParts, values, pagination, wfFields)
|
|
|
|
whereClause = " WHERE " + " AND ".join(whereParts) if whereParts else ""
|
|
|
|
orderParts: list = []
|
|
if pagination and pagination.sort:
|
|
for sf in pagination.sort:
|
|
sfField = sf.get("field") if isinstance(sf, dict) else getattr(sf, "field", None)
|
|
sfDir = sf.get("direction", "asc") if isinstance(sf, dict) else getattr(sf, "direction", "asc")
|
|
if not sfField:
|
|
continue
|
|
expr = _listingOrderExpr(sfField, wfFieldNames, wfFields)
|
|
if not expr:
|
|
continue
|
|
direction = "DESC" if str(sfDir).lower() == "desc" else "ASC"
|
|
orderParts.append(f"{expr} {direction} NULLS LAST")
|
|
if not orderParts:
|
|
orderParts.append('w."sysCreatedAt" DESC NULLS LAST')
|
|
|
|
orderClause = " ORDER BY " + ", ".join(orderParts)
|
|
|
|
limitClause = ""
|
|
if pagination:
|
|
offset = (pagination.page - 1) * pagination.pageSize
|
|
limitClause = f" LIMIT {pagination.pageSize} OFFSET {offset}"
|
|
|
|
return whereClause, orderClause, limitClause, values
|
|
|
|
|
|
def _getWorkflowsJoinedPaginated(
|
|
db: DatabaseConnector,
|
|
recordFilter: dict,
|
|
paginationParams: PaginationParams,
|
|
) -> dict:
|
|
"""SQL listing: AutoWorkflow LEFT JOIN aggregated AutoRun stats (one query + count)."""
|
|
from modules.connectors.connectorDbPostgre import getModelFields, parseRecordFields
|
|
|
|
wfFields = getModelFields(AutoWorkflow)
|
|
whereClause, orderClause, limitClause, values = _buildJoinedWorkflowWhereOrderLimit(
|
|
recordFilter, paginationParams, wfFields,
|
|
)
|
|
countValues = list(values)
|
|
|
|
fromSql = f'"AutoWorkflow" w LEFT JOIN {_RUN_STATS_SUBQUERY.strip()} ON rs."workflowId" = w."id"'
|
|
|
|
countSql = f"SELECT COUNT(*) AS cnt FROM {fromSql}{whereClause}"
|
|
dataSql = f"SELECT w.*, rs.\"lastStartedAt\", rs.\"runCount\", rs.\"activeRunId\" FROM {fromSql}{whereClause}{orderClause}{limitClause}"
|
|
|
|
db._ensure_connection()
|
|
with db.borrowCursor() as cursor:
|
|
cursor.execute(countSql, countValues)
|
|
totalItems = int(cursor.fetchone()["cnt"])
|
|
|
|
cursor.execute(dataSql, values)
|
|
rawRows = [dict(row) for row in cursor.fetchall()]
|
|
|
|
pageSize = paginationParams.pageSize if paginationParams else max(totalItems, 1)
|
|
totalPages = math.ceil(totalItems / pageSize) if totalItems > 0 else 0
|
|
|
|
modelFields = AutoWorkflow.model_fields
|
|
for record in rawRows:
|
|
parseRecordFields(record, wfFields, "table AutoWorkflow joined listing")
|
|
for fieldName, fieldType in wfFields.items():
|
|
if fieldType == "JSONB" and fieldName in record and record[fieldName] is None:
|
|
fieldInfo = modelFields.get(fieldName)
|
|
if fieldInfo:
|
|
fieldAnnotation = fieldInfo.annotation
|
|
if fieldAnnotation == list or (
|
|
hasattr(fieldAnnotation, "__origin__") and fieldAnnotation.__origin__ is list
|
|
):
|
|
record[fieldName] = []
|
|
elif fieldAnnotation == dict or (
|
|
hasattr(fieldAnnotation, "__origin__") and fieldAnnotation.__origin__ is dict
|
|
):
|
|
record[fieldName] = {}
|
|
|
|
return {"items": rawRows, "totalItems": totalItems, "totalPages": totalPages}
|