gateway/modules/features/commcoach/serviceCommcoachExport.py
2026-04-26 18:11:42 +02:00

260 lines
10 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
CommCoach Export Service.
Generates Markdown and PDF exports for dossiers and sessions.
"""
import logging
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def buildDossierMarkdown(context: Dict[str, Any], sessions: List[Dict[str, Any]],
tasks: List[Dict[str, Any]], scores: List[Dict[str, Any]]) -> str:
"""Build a Markdown export of a full coaching dossier (context)."""
title = context.get("title", "Coaching Dossier")
description = context.get("description", "")
category = context.get("category", "custom")
createdAt = _formatDate(context.get("createdAt"))
lines = [
f"# {title}",
"",
f"**Kategorie:** {category} ",
f"**Erstellt:** {createdAt} ",
]
if description:
lines.append(f"**Beschreibung:** {description} ")
goalsRaw = context.get("goals")
goals = _parseJson(goalsRaw, [])
if goals:
lines += ["", "## Ziele", ""]
for g in goals:
text = g.get("text", g) if isinstance(g, dict) else str(g)
status = g.get("status", "open") if isinstance(g, dict) else "open"
marker = "[x]" if status in ("done", "completed") else "[ ]"
lines.append(f"- {marker} {text}")
insightsRaw = context.get("insights")
insights = _parseJson(insightsRaw, [])
if insights:
lines += ["", "## Erkenntnisse", ""]
for ins in insights:
text = ins.get("text", ins) if isinstance(ins, dict) else str(ins)
lines.append(f"- {text}")
completedSessions = [s for s in sessions if s.get("status") == "completed"]
completedSessions.sort(key=lambda s: s.get("startedAt") or 0)
if completedSessions:
lines += ["", "## Sessions", ""]
for i, s in enumerate(completedSessions, 1):
dateStr = _formatDate(s.get("startedAt") or s.get("createdAt"))
duration = s.get("durationSeconds", 0)
durationMin = duration // 60 if duration else 0
score = s.get("competenceScore")
persona = s.get("personaId") or "Coach"
lines.append(f"### Session {i} -- {dateStr}")
lines.append("")
lines.append(f"**Dauer:** {durationMin} Min. | **Score:** {score or '--'} | **Persona:** {persona} ")
summary = s.get("summary")
if summary:
lines.append(f"\n{summary}")
lines.append("")
if tasks:
openTasks = [t for t in tasks if t.get("status") in ("open", "inProgress")]
doneTasks = [t for t in tasks if t.get("status") == "done"]
lines += ["", "## Aufgaben", ""]
if openTasks:
lines.append("**Offen:**")
for t in openTasks:
lines.append(f"- [ ] {t.get('title')} ({t.get('priority', 'medium')})")
lines.append("")
if doneTasks:
lines.append("**Erledigt:**")
for t in doneTasks:
lines.append(f"- [x] {t.get('title')}")
lines.append("")
if scores:
lines += ["", "## Kompetenz-Scores", ""]
dimScores = _groupScoresByDimension(scores)
for dim, entries in dimScores.items():
latest = entries[-1]
lines.append(f"- **{dim}**: {latest.get('score', '--')} ({latest.get('trend', 'stable')})")
lines += ["", "---", f"*Exportiert am {_formatDate(None)}*", ""]
return "\n".join(lines)
def buildSessionMarkdown(session: Dict[str, Any], messages: List[Dict[str, Any]],
tasks: List[Dict[str, Any]], scores: List[Dict[str, Any]]) -> str:
"""Build a Markdown export of a single session."""
dateStr = _formatDate(session.get("startedAt") or session.get("createdAt"))
duration = session.get("durationSeconds", 0)
durationMin = duration // 60 if duration else 0
score = session.get("competenceScore")
persona = session.get("personaId") or "Coach"
lines = [
f"# Coaching Session -- {dateStr}",
"",
f"**Dauer:** {durationMin} Min. | **Score:** {score or '--'} | **Persona:** {persona} ",
]
summary = session.get("summary")
if summary:
lines += ["", "## Zusammenfassung", "", summary]
if messages:
lines += ["", "## Gesprächsverlauf", ""]
for msg in messages:
role = "Du" if msg.get("role") == "user" else "Coach"
content = msg.get("content", "")
lines.append(f"**{role}:** {content}")
lines.append("")
sessionTasks = [t for t in tasks if t.get("sessionId") == session.get("id")]
if sessionTasks:
lines += ["## Aufgaben", ""]
for t in sessionTasks:
marker = "[x]" if t.get("status") == "done" else "[ ]"
lines.append(f"- {marker} {t.get('title')}")
lines.append("")
sessionScores = [s for s in scores if s.get("sessionId") == session.get("id")]
if sessionScores:
lines += ["## Scores", ""]
for s in sessionScores:
lines.append(f"- **{s.get('dimension')}**: {s.get('score')} ({s.get('trend', 'stable')})")
if s.get("evidence"):
lines.append(f" _{s.get('evidence')}_")
lines.append("")
lines += ["---", f"*Exportiert am {_formatDate(None)}*", ""]
return "\n".join(lines)
async def renderDossierPdf(context: Dict[str, Any], sessions: List[Dict[str, Any]],
tasks: List[Dict[str, Any]], scores: List[Dict[str, Any]],
aiService=None) -> bytes:
"""Render a dossier as PDF directly from markdown content via reportlab."""
md = buildDossierMarkdown(context, sessions, tasks, scores)
title = context.get("title", "Coaching Dossier")
return _markdownToPdf(md, title)
async def renderSessionPdf(session: Dict[str, Any], messages: List[Dict[str, Any]],
tasks: List[Dict[str, Any]], scores: List[Dict[str, Any]],
aiService=None) -> bytes:
"""Render a session as PDF directly from markdown content via reportlab."""
md = buildSessionMarkdown(session, messages, tasks, scores)
title = f"Coaching Session — {_formatDate(session.get('startedAt'))}"
return _markdownToPdf(md, title)
def _markdownToPdf(markdownText: str, title: str) -> bytes:
"""Convert markdown text to a styled PDF using reportlab. Raises on failure."""
import re as _re
import io
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_LEFT
from reportlab.lib import colors
buf = io.BytesIO()
doc = SimpleDocTemplate(buf, pagesize=A4, rightMargin=60, leftMargin=60, topMargin=50, bottomMargin=40)
baseStyles = getSampleStyleSheet()
sTitle = ParagraphStyle("DocTitle", parent=baseStyles["Title"], fontSize=18, spaceAfter=20, textColor=colors.HexColor("#1e40af"))
sH2 = ParagraphStyle("DocH2", parent=baseStyles["Heading2"], fontSize=13, spaceBefore=16, spaceAfter=6, textColor=colors.HexColor("#1f2937"))
sH3 = ParagraphStyle("DocH3", parent=baseStyles["Heading3"], fontSize=11, spaceBefore=12, spaceAfter=4, textColor=colors.HexColor("#374151"))
sBody = ParagraphStyle("DocBody", parent=baseStyles["Normal"], fontSize=10, leading=14, spaceAfter=4, alignment=TA_LEFT)
sBullet = ParagraphStyle("DocBullet", parent=sBody, leftIndent=18, bulletIndent=6, spaceBefore=2, spaceAfter=2)
sMeta = ParagraphStyle("DocMeta", parent=sBody, fontSize=8, textColor=colors.grey)
story = [Paragraph(_escXml(title), sTitle), Spacer(1, 10)]
for line in markdownText.split("\n"):
stripped = line.strip()
if not stripped:
story.append(Spacer(1, 4))
elif stripped.startswith("# "):
continue
elif stripped.startswith("### "):
story.append(Paragraph(_escXml(stripped[4:]), sH3))
elif stripped.startswith("## "):
story.append(Paragraph(_escXml(stripped[3:]), sH2))
elif stripped.startswith("---"):
story.append(Spacer(1, 8))
elif stripped.startswith("*") and stripped.endswith("*") and not stripped.startswith("**"):
story.append(Paragraph(f"<i>{_escXml(stripped.strip('*'))}</i>", sMeta))
elif stripped.startswith("- [x] "):
story.append(Paragraph(f"\u2713 {_mdToXml(stripped[6:])}", sBullet))
elif stripped.startswith("- [ ] "):
story.append(Paragraph(f"\u25CB {_mdToXml(stripped[6:])}", sBullet))
elif stripped.startswith("- "):
story.append(Paragraph(f"\u2022 {_mdToXml(stripped[2:])}", sBullet))
else:
story.append(Paragraph(_mdToXml(stripped), sBody))
doc.build(story)
buf.seek(0)
return buf.getvalue()
def _escXml(text: str) -> str:
"""Escape text for reportlab XML paragraphs."""
return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
def _mdToXml(text: str) -> str:
"""Convert markdown inline formatting to reportlab XML. Bold, italic, escape the rest."""
import re as _re
text = text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
text = _re.sub(r'\*\*(.+?)\*\*', r'<b>\1</b>', text)
text = _re.sub(r'__(.+?)__', r'<b>\1</b>', text)
text = _re.sub(r'\*(.+?)\*', r'<i>\1</i>', text)
text = _re.sub(r'_(.+?)_', r'<i>\1</i>', text)
return text
def _formatDate(val) -> str:
if not val:
return datetime.now(timezone.utc).strftime("%d.%m.%Y")
if isinstance(val, (int, float)):
dt = datetime.fromtimestamp(float(val), tz=timezone.utc)
return dt.strftime("%d.%m.%Y")
dt = datetime.fromisoformat(str(val).replace("Z", "+00:00"))
return dt.strftime("%d.%m.%Y")
def _parseJson(value, fallback):
if not value:
return fallback
if isinstance(value, (list, dict)):
return value
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return fallback
def _groupScoresByDimension(scores: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
groups: Dict[str, List[Dict[str, Any]]] = {}
for s in scores:
dim = s.get("dimension", "unknown")
if dim not in groups:
groups[dim] = []
groups[dim].append(s)
for dim in groups:
groups[dim].sort(key=lambda x: x.get("createdAt") or "")
return groups