fixes doc generation and renderers
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 18s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped

This commit is contained in:
ValueOn AG 2026-06-03 16:45:17 +02:00
parent 67806e5323
commit 2eb1a5589d
27 changed files with 1812 additions and 3293 deletions

23
app.py
View file

@ -61,6 +61,13 @@ class DailyRotatingFileHandler(RotatingFileHandler):
return True
return False
def doRollover(self):
"""Size-based rollover that tolerates Windows file locks."""
try:
super().doRollover()
except PermissionError:
pass
def emit(self, record):
"""Emit a log record, switching files if date has changed"""
# Check if we need to switch to a new file
@ -454,6 +461,20 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning(f"Could not shutdown feature containers: {e}")
# 4. Cancel all pending streaming EventManager tasks (cleanup sleeps, agent tasks)
try:
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager as _getStreamingEM
_getStreamingEM().shutdown()
except Exception as e:
logger.warning(f"Streaming EventManager shutdown failed: {e}")
# 5. Close shared HTTP sessions (ResilientHttp) to avoid TCP keepalive hang
try:
from modules.connectors._httpResilience import closeAllResilientHttp
await closeAllResilientHttp()
except Exception as e:
logger.warning(f"Closing HTTP sessions failed: {e}")
logger.info("Application has been shut down")
except asyncio.CancelledError:
@ -734,7 +755,7 @@ if __name__ == "__main__":
port = int(os.environ.get("PORT", 8000))
try:
from gunicorn.app.wsgiapp import WSGIApplication # noqa: F401
import gunicorn.app.wsgiapp # type: ignore[import-untyped] # noqa: F401
import subprocess
import sys
subprocess.run([

View file

@ -140,11 +140,10 @@ class ModelSelector:
promptFiltered.append(model)
else:
maxAllowedTokens = model.contextLength * 0.8
# Compare prompt tokens (not bytes) with model's token limit
if promptTokens <= maxAllowedTokens:
if totalTokens <= maxAllowedTokens:
promptFiltered.append(model)
else:
logger.debug(f"Model {model.name} filtered out: promptSize={promptTokens:.0f} tokens > maxAllowed={maxAllowedTokens:.0f} tokens (80% of {model.contextLength} tokens)")
logger.debug(f"Model {model.name} filtered out: totalTokens={totalTokens:.0f} > maxAllowed={maxAllowedTokens:.0f} tokens (80% of {model.contextLength} tokens)")
logger.debug(f"After prompt size filtering: {len(promptFiltered)} models")

View file

@ -654,6 +654,7 @@ class AiAnthropic(BaseConnectorAi):
mimeType = parts[0].replace("data:", "")
base64Data = parts[1]
_SUPPORTED = {"image/jpeg", "image/png", "image/gif", "image/webp"}
import base64 as _b64
try:
rawHead = _b64.b64decode(base64Data[:32])
@ -668,6 +669,9 @@ class AiAnthropic(BaseConnectorAi):
except Exception:
pass
if mimeType not in _SUPPORTED:
raise ValueError(f"Unsupported image media_type '{mimeType}' for Anthropic (supported: {', '.join(sorted(_SUPPORTED))})")
# Convert to Anthropic's vision format
anthropicMessages = [{
"role": "user",

View file

@ -0,0 +1,241 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Shared HTTP resilience helpers for provider connectors.
Provides a reusable session pool with concurrency limiter and retry-with-backoff
so that Google, MSFT and Infomaniak connectors do not each re-implement
per-request sessions, unbounded parallelism, and missing retry logic.
"""
import asyncio
import logging
import time
from typing import Any, Dict, Optional, Union
import aiohttp
logger = logging.getLogger(__name__)
_DEFAULT_MAX_CONCURRENT = 8
_DEFAULT_MAX_RETRIES = 3
_DEFAULT_TIMEOUT_S = 30
_RETRYABLE_STATUS = {429, 502, 503, 504}
_instances: list["ResilientHttp"] = []
class ResilientHttp:
"""Managed aiohttp.ClientSession with semaphore + retry.
Typical usage inside a connector module-level function::
_http = ResilientHttp("Google", maxConcurrent=8)
async def _googleGet(token, url):
return await _http.getJson(url, headers={"Authorization": f"Bearer {token}"})
The session is created lazily on first call, reused across requests,
and closed via ``closeAllResilientHttp()`` at app shutdown.
"""
def __init__(
self,
providerLabel: str = "HTTP",
maxConcurrent: int = _DEFAULT_MAX_CONCURRENT,
maxRetries: int = _DEFAULT_MAX_RETRIES,
defaultTimeoutS: float = _DEFAULT_TIMEOUT_S,
):
self._label = providerLabel
self._maxConcurrent = maxConcurrent
self._maxRetries = maxRetries
self._defaultTimeout = aiohttp.ClientTimeout(total=defaultTimeoutS)
self._semaphore: Optional[asyncio.Semaphore] = None
self._session: Optional[aiohttp.ClientSession] = None
_instances.append(self)
def _ensureReady(self) -> aiohttp.ClientSession:
if self._semaphore is None:
self._semaphore = asyncio.Semaphore(self._maxConcurrent)
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(timeout=self._defaultTimeout)
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
await asyncio.sleep(0.25)
self._session = None
async def getJson(
self,
url: str,
headers: Dict[str, str],
timeout: Optional[aiohttp.ClientTimeout] = None,
allowRedirects: bool = True,
) -> Dict[str, Any]:
"""GET request returning parsed JSON with retry + throttle."""
session = self._ensureReady()
assert self._semaphore is not None
lastError: Optional[str] = None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
async with session.get(
url,
headers=headers,
timeout=timeout or self._defaultTimeout,
allow_redirects=allowRedirects,
) as resp:
if resp.status in (200, 201):
return await resp.json()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} GET {resp.status} (attempt {attempt}/{self._maxRetries}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
lastError = f"{resp.status}: {errorText[:200]}"
logger.warning(f"{self._label} GET {url[:120]} -> {lastError[:300]}")
return {"error": lastError}
except asyncio.TimeoutError:
lastError = f"timeout after {self._defaultTimeout.total}s"
if attempt < self._maxRetries:
logger.warning(f"{self._label} GET timeout (attempt {attempt}): {url[:120]}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
lastError = str(e)
if attempt < self._maxRetries:
logger.warning(f"{self._label} GET client error (attempt {attempt}): {e}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
return {"error": lastError or "unknown error"}
async def getBytes(
self,
url: str,
headers: Dict[str, str],
timeout: Optional[aiohttp.ClientTimeout] = None,
allowRedirects: bool = True,
) -> Optional[bytes]:
"""GET request returning raw bytes (for file downloads)."""
session = self._ensureReady()
assert self._semaphore is not None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
async with session.get(
url,
headers=headers,
timeout=timeout or self._defaultTimeout,
allow_redirects=allowRedirects,
) as resp:
if resp.status == 200:
return await resp.read()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} download {resp.status} (attempt {attempt}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
logger.warning(f"{self._label} download {url[:120]} -> {resp.status}: {errorText[:200]}")
return None
except asyncio.TimeoutError:
if attempt < self._maxRetries:
logger.warning(f"{self._label} download timeout (attempt {attempt}): {url[:120]}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
if attempt < self._maxRetries:
logger.warning(f"{self._label} download client error (attempt {attempt}): {e}")
await asyncio.sleep(min(2 ** attempt, 10))
continue
return None
async def request(
self,
method: str,
url: str,
headers: Dict[str, str],
data: Any = None,
timeout: Optional[aiohttp.ClientTimeout] = None,
) -> Dict[str, Any]:
"""Generic HTTP request with retry for retryable status codes."""
session = self._ensureReady()
assert self._semaphore is not None
lastError: Optional[str] = None
for attempt in range(1, self._maxRetries + 1):
async with self._semaphore:
try:
kwargs: Dict[str, Any] = {"headers": headers}
if data is not None:
kwargs["data"] = data
async with session.request(
method, url,
timeout=timeout or self._defaultTimeout,
**kwargs,
) as resp:
if resp.status in (200, 201, 202, 204):
if resp.status == 204:
return {}
return await resp.json()
if resp.status in _RETRYABLE_STATUS:
retryAfter = _parseRetryAfter(resp.headers.get("Retry-After"))
waitS = retryAfter if retryAfter > 0 else min(2 ** attempt, 30)
logger.warning(
f"{self._label} {method} {resp.status} (attempt {attempt}), "
f"retry in {waitS:.1f}s: {url[:120]}"
)
await asyncio.sleep(waitS)
continue
errorText = await resp.text()
lastError = f"{resp.status}: {errorText[:200]}"
logger.warning(f"{self._label} {method} {url[:120]} -> {lastError[:300]}")
return {"error": lastError}
except asyncio.TimeoutError:
lastError = f"timeout after {(timeout or self._defaultTimeout).total}s"
if attempt < self._maxRetries:
await asyncio.sleep(min(2 ** attempt, 10))
continue
except aiohttp.ClientError as e:
lastError = str(e)
if attempt < self._maxRetries:
await asyncio.sleep(min(2 ** attempt, 10))
continue
return {"error": lastError or "unknown error"}
async def closeAllResilientHttp() -> None:
"""Close all ResilientHttp sessions. Call at app shutdown."""
for inst in _instances:
try:
await inst.close()
except Exception as e:
logger.debug(f"Error closing {inst._label} session: {e}")
logger.info(f"Closed {len(_instances)} ResilientHttp session(s)")
def _parseRetryAfter(value: Optional[str]) -> float:
"""Parse Retry-After header (seconds or HTTP-date). Returns 0 if absent/unparseable."""
if not value:
return 0.0
try:
return float(value)
except ValueError:
pass
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(value)
delta = (dt.timestamp() - time.time())
return max(delta, 0.5)
except Exception:
return 0.0

View file

@ -10,10 +10,13 @@ from typing import Any, Dict, List, Optional
import aiohttp
from modules.connectors.connectorProviderBase import ProviderConnector, ServiceAdapter, DownloadResult
from modules.connectors._httpResilience import ResilientHttp
from modules.datamodels.datamodelDataSource import ExternalEntry
logger = logging.getLogger(__name__)
_http = ResilientHttp("Google", maxConcurrent=8, defaultTimeoutS=20)
_DRIVE_BASE = "https://www.googleapis.com/drive/v3"
_GMAIL_BASE = "https://gmail.googleapis.com/gmail/v1"
_CALENDAR_BASE = "https://www.googleapis.com/calendar/v3"
@ -57,17 +60,7 @@ def _parseGoogleDateRange(text: Optional[str]) -> tuple:
async def _googleGet(token: str, url: str) -> Dict[str, Any]:
headers = {"Authorization": f"Bearer {token}"}
timeout = aiohttp.ClientTimeout(total=20)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as resp:
if resp.status in (200, 201):
return await resp.json()
errorText = await resp.text()
logger.warning(f"Google API {resp.status}: {errorText[:300]}")
return {"error": f"{resp.status}: {errorText[:200]}"}
except Exception as e:
return {"error": str(e)}
return await _http.getJson(url, headers=headers)
def _raiseGoogleError(result: Dict[str, Any], ctx: str) -> None:
@ -128,23 +121,19 @@ class DriveAdapter(ServiceAdapter):
if not fileId:
return b""
headers = {"Authorization": f"Bearer {self._token}"}
timeout = aiohttp.ClientTimeout(total=60)
dlTimeout = aiohttp.ClientTimeout(total=60)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
# Try direct download first
url = f"{_DRIVE_BASE}/files/{fileId}?alt=media"
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
return await resp.read()
logger.debug(f"Google Drive direct download returned {resp.status} for {fileId}")
data = await _http.getBytes(url, headers=headers, timeout=dlTimeout)
if data is not None:
return data
logger.debug(f"Google Drive direct download returned None for {fileId}")
# If 403/404, check if it's a native Google file that needs export
metaUrl = f"{_DRIVE_BASE}/files/{fileId}?fields=mimeType,name"
async with session.get(metaUrl, headers=headers) as metaResp:
if metaResp.status != 200:
logger.warning(f"Google Drive metadata fetch failed ({metaResp.status}) for {fileId}")
meta = await _http.getJson(metaUrl, headers=headers)
if "error" in meta:
logger.warning(f"Google Drive metadata fetch failed for {fileId}: {meta['error']}")
return b""
meta = await metaResp.json()
fileMime = meta.get("mimeType", "")
fileName = meta.get("name", fileId)
@ -155,10 +144,10 @@ class DriveAdapter(ServiceAdapter):
exportUrl = f"{_DRIVE_BASE}/files/{fileId}/export?mimeType={exportMime}"
logger.info(f"Google Drive: exporting '{fileName}' as {exportMime}")
async with session.get(exportUrl, headers=headers) as exportResp:
if exportResp.status == 200:
return await exportResp.read()
logger.warning(f"Google Drive export failed ({exportResp.status}) for '{fileName}'")
exported = await _http.getBytes(exportUrl, headers=headers, timeout=dlTimeout)
if exported is not None:
return exported
logger.warning(f"Google Drive export failed for '{fileName}'")
except Exception as e:
logger.error(f"Google Drive download failed for {fileId}: {e}")
return b""

View file

@ -44,10 +44,13 @@ from modules.connectors.connectorProviderBase import (
ServiceAdapter,
DownloadResult,
)
from modules.connectors._httpResilience import ResilientHttp
from modules.datamodels.datamodelDataSource import ExternalEntry
logger = logging.getLogger(__name__)
_http = ResilientHttp("Infomaniak", maxConcurrent=6, defaultTimeoutS=20)
_API_BASE = "https://api.infomaniak.com"
_CALENDAR_BASE = "https://calendar.infomaniak.com"
_CONTACTS_BASE = "https://contacts.infomaniak.com"
@ -82,18 +85,7 @@ async def _infomaniakGet(
"""
url = f"{baseUrl.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
timeout = aiohttp.ClientTimeout(total=20)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers, allow_redirects=False) as resp:
if resp.status in (200, 201):
return await resp.json()
errorText = await resp.text()
logger.warning(f"Infomaniak GET {url} -> {resp.status}: {errorText[:300]}")
return {"error": f"{resp.status}: {errorText[:200]}"}
except Exception as e:
logger.error(f"Infomaniak GET {url} crashed: {e}")
return {"error": str(e)}
return await _http.getJson(url, headers=headers, allowRedirects=False)
def _raiseInfomaniakError(result: Dict[str, Any], ctx: str) -> None:
@ -124,20 +116,7 @@ async def _infomaniakDownload(
"""
url = f"{baseUrl.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {"Authorization": f"Bearer {token}"}
timeout = aiohttp.ClientTimeout(total=120)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers, allow_redirects=True) as resp:
if resp.status == 200:
return await resp.read()
logger.warning(
f"Infomaniak download {url} -> {resp.status}: "
f"{(await resp.text())[:300]}"
)
return None
except Exception as e:
logger.error(f"Infomaniak download {url} crashed: {e}")
return None
return await _http.getBytes(url, headers=headers, timeout=aiohttp.ClientTimeout(total=120))
def _unwrapData(payload: Any) -> Any:

View file

@ -13,11 +13,13 @@ import urllib.parse
from typing import Dict, Any, List, Optional
from modules.connectors.connectorProviderBase import ProviderConnector, ServiceAdapter, DownloadResult
from modules.connectors._httpResilience import ResilientHttp
from modules.datamodels.datamodelDataSource import ExternalEntry
logger = logging.getLogger(__name__)
_GRAPH_BASE = "https://graph.microsoft.com/v1.0"
_http = ResilientHttp("Graph", maxConcurrent=10, defaultTimeoutS=30)
class _GraphApiMixin:
@ -44,24 +46,14 @@ class _GraphApiMixin:
async def _graphDownload(self, endpoint: str) -> Optional[bytes]:
"""Download binary content from Graph API."""
headers = {"Authorization": f"Bearer {self._accessToken}"}
timeout = aiohttp.ClientTimeout(total=60)
url = f"{_GRAPH_BASE}/{endpoint.lstrip('/')}"
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as resp:
if resp.status == 200:
return await resp.read()
logger.error(f"Download failed {resp.status}: {await resp.text()}")
return None
except Exception as e:
logger.error(f"Graph download error: {e}")
return None
return await _http.getBytes(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60))
async def _makeGraphCall(
token: str, endpoint: str, method: str = "GET", data: Any = None
) -> Dict[str, Any]:
"""Execute a single Microsoft Graph API call."""
"""Execute a single Microsoft Graph API call via shared resilient HTTP client."""
url = f"{_GRAPH_BASE}/{endpoint.lstrip('/')}"
contentType = "application/json; charset=utf-8"
if method == "PUT" and isinstance(data, bytes):
@ -72,37 +64,7 @@ async def _makeGraphCall(
}
if "$count=true" in endpoint:
headers["ConsistencyLevel"] = "eventual"
timeout = aiohttp.ClientTimeout(total=30)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
kwargs: Dict[str, Any] = {"headers": headers}
if data is not None:
kwargs["data"] = data
if method == "GET":
async with session.get(url, **kwargs) as resp:
return await _handleResponse(resp)
elif method == "POST":
async with session.post(url, **kwargs) as resp:
return await _handleResponse(resp)
elif method == "PUT":
async with session.put(url, **kwargs) as resp:
return await _handleResponse(resp)
elif method == "PATCH":
async with session.patch(url, **kwargs) as resp:
return await _handleResponse(resp)
elif method == "DELETE":
async with session.delete(url, **kwargs) as resp:
if resp.status in (200, 204):
return {}
return await _handleResponse(resp)
except asyncio.TimeoutError:
return {"error": f"Graph API timeout: {endpoint}"}
except Exception as e:
return {"error": f"Graph API error: {e}"}
return {"error": f"Unsupported method: {method}"}
return await _http.request(method, url, headers=headers, data=data)
async def _handleResponse(resp: aiohttp.ClientResponse) -> Dict[str, Any]:

View file

@ -494,7 +494,7 @@ TEMPLATE_WORKFLOWS = [
"closingBalance in accountSummary ist bereits der korrekte Ist-Wert.\n\n"
"WICHTIG: Erstelle KEINEN separaten Chart pro Konto. Nur EIN "
"Uebersichts-Chart ueber alle Konten ist gewuenscht.\n\n"
"Hinweis: Das documentTheme ist 'finance'. Wenn du ein Dokument erstellst, "
"Hinweis: Wenn du ein Dokument erstellst, "
"verwende einen professionellen Finanz-Stil:\n"
"- Schriftart: Calibri\n"
"- Primaerfarbe: #1F3864 (Dunkelblau)\n"
@ -504,7 +504,6 @@ TEMPLATE_WORKFLOWS = [
"Nutze den style-Parameter von renderDocument um diese Vorgaben umzusetzen."
),
"resultType": "xlsx",
"documentTheme": "finance",
"requireNeutralization": False,
"documentList": {"type": "ref", "nodeId": "trigger", "path": ["payload", "documentList"]},
"context": {"type": "ref", "nodeId": "refresh", "path": ["data", "accountingData"]},

View file

@ -141,6 +141,18 @@ class AiObjects:
_MAX_SHORT_RETRY = 15.0
# Pre-flight guard: reject obviously oversized payloads before entering the failover loop
estimatedTokens = (len(prompt or "") + len(context or "")) // 3
bestContextLength = max((m.contextLength for m in failoverModelList if m.contextLength > 0), default=0)
if bestContextLength > 0 and estimatedTokens > bestContextLength * 0.95:
errorMsg = (f"Input too large for all available models: ~{estimatedTokens} estimated tokens "
f"vs best model context {bestContextLength}. Use chunking pipeline instead.")
logger.error(errorMsg)
return AiCallResponse(
content=errorMsg, modelName="error", priceCHF=0.0,
processingTime=0.0, bytesSent=0, bytesReceived=0, errorCount=1,
)
lastError = None
for attempt, model in enumerate(failoverModelList):
try:

View file

@ -181,6 +181,19 @@ class EventManager:
task = asyncio.create_task(_cleanup())
self._cleanup_tasks[workflow_id] = task
def shutdown(self) -> None:
"""Cancel all pending cleanup and agent tasks for fast process exit."""
for wfId, task in list(self._cleanup_tasks.items()):
if not task.done():
task.cancel()
self._cleanup_tasks.clear()
for wfId, task in list(self._agent_tasks.items()):
if not task.done():
task.cancel()
self._agent_tasks.clear()
self._queues.clear()
logger.info("EventManager shutdown: all tasks cancelled")
# Global event manager instance
_event_manager: Optional[EventManager] = None

View file

@ -224,7 +224,6 @@ def _registerMediaTools(registry: ToolRegistry, services):
title=title,
userPrompt=content,
style=args.get("style"),
documentTheme=args.get("documentTheme"),
imageResolver=_imageBytesResolver if lazyBlockImages else None,
)
@ -315,17 +314,6 @@ def _registerMediaTools(registry: ToolRegistry, services):
"outputFormat": {"type": "string", "description": "Target format: pdf, docx, xlsx, pptx, csv, html, md, json, txt", "default": "pdf"},
"title": {"type": "string", "description": "Document title", "default": "Document"},
"language": {"type": "string", "description": "Document language (ISO 639-1)", "default": "de"},
"documentTheme": {
"type": "string",
"enum": ["general", "finance", "legal", "technical", "hr", "marketing"],
"description": (
"Named style preset applied by the renderer (colors, fonts, spacing). "
"Pick the one that matches the document purpose: 'legal' for serif/justified "
"legal filings, 'marketing' for bold image-friendly layouts, 'finance', "
"'technical', 'hr', or 'general' (default). The explicit 'style' object, if "
"provided, overrides individual preset keys."
),
},
"style": {
"type": "object",
"description": (

View file

@ -112,6 +112,50 @@ def _makeReadFile(services):
return readFile
_MAX_FILE_BYTES = 50_000_000 # 50 MB safety limit
def _makeReadFileBytes(services):
"""Create a readFileBytes(fileId) closure for binary file access in the sandbox."""
def readFileBytes(fileId: str) -> bytes:
mgmt = getattr(services, 'interfaceDbComponent', None) if services else None
if not mgmt:
raise RuntimeError("readFileBytes: no file store available in this session")
data = mgmt.getFileData(str(fileId))
if data is None:
raise FileNotFoundError(f"File '{fileId}' not found in workspace")
if len(data) > _MAX_FILE_BYTES:
raise ValueError(f"File too large for sandbox analysis ({len(data)} bytes, limit {_MAX_FILE_BYTES})")
return data
return readFileBytes
class SafeZipFile:
"""Read-only in-memory ZIP analysis wrapper for the sandbox.
Does not expose extract/write -- only namelist, infolist, and in-memory read."""
def __init__(self, data: bytes):
import zipfile as _zf
self._zf = _zf.ZipFile(io.BytesIO(data), 'r')
def namelist(self):
return self._zf.namelist()
def infolist(self):
return [{"filename": i.filename, "file_size": i.file_size,
"compress_size": i.compress_size, "date_time": i.date_time}
for i in self._zf.infolist()]
def read(self, name: str) -> bytes:
return self._zf.read(name)
def __enter__(self):
return self
def __exit__(self, *args):
self._zf.close()
async def executePython(code: str, *, services=None) -> Dict[str, Any]:
"""Execute Python code in a restricted sandbox. Returns {success, output, error}."""
import asyncio
@ -120,8 +164,10 @@ async def executePython(code: str, *, services=None) -> Dict[str, Any]:
restrictedGlobals = _buildRestrictedGlobals()
vfs = _VirtualFS()
restrictedGlobals["__builtins__"]["open"] = vfs.open
restrictedGlobals["__builtins__"]["SafeZipFile"] = SafeZipFile
if services:
restrictedGlobals["__builtins__"]["readFile"] = _makeReadFile(services)
restrictedGlobals["__builtins__"]["readFileBytes"] = _makeReadFileBytes(services)
capturedOutput = io.StringIO()
oldStdout = sys.stdout
oldStderr = sys.stderr

View file

@ -1680,8 +1680,7 @@ Respond with ONLY a JSON object in this exact format:
language: str,
title: str,
userPrompt: str,
parentOperationId: str,
documentTheme: Optional[str] = None
parentOperationId: str
) -> List[RenderedDocument]:
"""
Phase 5E: Rendert gefüllte Struktur zum Ziel-Format.
@ -1733,8 +1732,7 @@ Respond with ONLY a JSON object in this exact format:
title,
userPrompt,
self,
parentOperationId=renderOperationId, # Parent-Referenz für ChatLog-Hierarchie
documentTheme=documentTheme
parentOperationId=renderOperationId # Parent-Referenz für ChatLog-Hierarchie
)
# ChatLog abschließen
@ -1776,8 +1774,7 @@ Respond with ONLY a JSON object in this exact format:
outputFormat: Optional[str] = None,
title: Optional[str] = None,
parentOperationId: Optional[str] = None,
generationIntent: Optional[str] = None, # NEW: Explicit intent from action (skips detection)
documentTheme: Optional[str] = None # Named style preset for document rendering
generationIntent: Optional[str] = None # NEW: Explicit intent from action (skips detection)
) -> AiResponse:
"""
Unified AI content generation with explicit intent requirement.
@ -1796,8 +1793,6 @@ Respond with ONLY a JSON object in this exact format:
parentOperationId: Optional parent operation ID for hierarchical logging
generationIntent: REQUIRED explicit intent ("document" | "code" | "image") from action.
NO auto-detection - actions must explicitly specify intent.
documentTheme: Optional named style preset (general/finance/legal/technical/
hr/marketing) forwarded to the renderer for document generation.
Returns:
AiResponse with content, metadata, and optional documents
@ -1868,8 +1863,7 @@ Respond with ONLY a JSON object in this exact format:
contentParts=contentParts,
outputFormat=outputFormat,
title=title,
parentOperationId=parentOperationId,
documentTheme=documentTheme
parentOperationId=parentOperationId
)
# DATA_EXTRACT: Extract content from documents and process with AI (no structure generation)
@ -2085,8 +2079,7 @@ Respond with ONLY a JSON object in this exact format:
contentParts: Optional[List[ContentPart]],
outputFormat: str,
title: str,
parentOperationId: Optional[str],
documentTheme: Optional[str] = None
parentOperationId: Optional[str]
) -> AiResponse:
"""Handle document generation using document generation path."""
from modules.serviceCenter.services.serviceGeneration.paths.documentPath import DocumentGenerationPath
@ -2103,8 +2096,7 @@ Respond with ONLY a JSON object in this exact format:
contentParts=contentParts,
outputFormat=outputFormat,
title=title or "Generated Document",
parentOperationId=parentOperationId,
documentTheme=documentTheme
parentOperationId=parentOperationId
)

View file

@ -27,6 +27,21 @@ class _AiResponseFallback:
logger = logging.getLogger(__name__)
def _normalizeImageElement(element: Dict[str, Any]) -> None:
"""Ensure image element has nested content dict.
AI sometimes returns flat keys (base64Data, altText, ...) at the top level.
All renderers expect element['content'] to be a dict with those keys."""
if "content" in element and isinstance(element.get("content"), dict):
return
element["content"] = {
"base64Data": element.pop("base64Data", ""),
"altText": element.pop("altText", ""),
"caption": element.pop("caption", ""),
"mimeType": element.pop("mimeType", "image/png"),
"fileName": element.pop("fileName", element.get("id", "image") + ".png"),
}
def _elements_from_section_content_ai_json(parsed: Any) -> List[Any]:
"""Normalize section_content AI JSON (incl. models that return {\"text\": ...}) into elements."""
from modules.serviceCenter.services.serviceAi.subLoopingUseCases import _normalizeSectionContentJson
@ -494,14 +509,18 @@ class StructureFiller:
try:
jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content))
if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
_normalizeImageElement(jsonContent)
elements.append(jsonContent)
logger.debug("AI returned proper JSON image structure")
base64Data = None # Signal that image was already processed
base64Data = None
elif isinstance(jsonContent, list) and len(jsonContent) > 0:
if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
for item in jsonContent:
if isinstance(item, dict) and item.get("type") == "image":
_normalizeImageElement(item)
elements.extend(jsonContent)
logger.debug("AI returned proper JSON image structure in list")
base64Data = None # Signal that image was already processed
base64Data = None
else:
base64Data = "" # Continue with normal processing
else:

View file

@ -1309,6 +1309,7 @@ class ExtractionService:
Calls aiObjects._callWithModel() for actual AI calls.
"""
lastError = None
_VISION_API_TYPES = {"image/jpeg", "image/png", "image/gif", "image/webp"}
# Check if this is an image - Vision models need special handling
isImage = (contentPart.typeGroup == "image") or (contentPart.mimeType and contentPart.mimeType.startswith("image/"))
@ -1316,6 +1317,18 @@ class ExtractionService:
# Determine the correct operation type based on content type
actualOperationType = options.operationType
if isImage:
resolvedMime = contentPart.mimeType or "image/jpeg"
if resolvedMime not in _VISION_API_TYPES and contentPart.data:
resolvedMime = _sniffImageMime(contentPart.data) or resolvedMime
if resolvedMime not in _VISION_API_TYPES:
logger.info(f"Skipping unsupported image type '{resolvedMime}' (supported: {', '.join(sorted(_VISION_API_TYPES))})")
return AiCallResponse(
content=f"[Image skipped: unsupported format {resolvedMime}]",
modelName="skipped", priceCHF=0.0,
processingTime=0.0, bytesSent=0, bytesReceived=0, errorCount=0,
)
contentPart.mimeType = resolvedMime
actualOperationType = OperationTypeEnum.IMAGE_ANALYSE
# Get vision-capable models for images
availableModels = modelRegistry.getAvailableModels()
@ -1805,6 +1818,24 @@ class ExtractionService:
)
def _sniffImageMime(data) -> Optional[str]:
"""Detect image format from magic bytes. Returns None if unrecognised."""
import base64 as _b64
try:
raw = data if isinstance(data, bytes) else _b64.b64decode(data[:32])
if raw[:3] == b"\xff\xd8\xff":
return "image/jpeg"
if raw[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
if raw[:4] == b"GIF8":
return "image/gif"
if raw[:4] == b"RIFF" and len(raw) >= 12 and raw[8:12] == b"WEBP":
return "image/webp"
except Exception:
pass
return None
# Module-level function for use by subPipeline and ExtractionService
def applyMerging(parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]:
"""Apply merging strategy to parts with intelligent token-aware merging.

View file

@ -14,7 +14,7 @@ from .subDocumentUtility import (
detectMimeTypeFromData,
convertDocumentDataToString
)
from .styleDefaults import resolveStyle
from .styleDefaults import resolveStyle, deepMerge
logger = logging.getLogger(__name__)
@ -383,7 +383,80 @@ class GenerationService:
'workflowId': 'unknown'
}
async def renderReport(self, extractedContent: Dict[str, Any], outputFormat: str, language: str, title: str, userPrompt: str = None, aiService=None, parentOperationId: Optional[str] = None, style: Optional[Dict[str, Any]] = None, documentTheme: Optional[str] = None, imageResolver=None) -> List[RenderedDocument]:
async def _enhanceStyleWithAi(self, resolvedStyle: Dict[str, Any], extractedContent: Dict[str, Any], userPrompt: str | None, aiService) -> Dict[str, Any]:
"""Let AI refine the resolved style based on document content and context.
Returns the enhanced style dict, or the original on failure.
"""
try:
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
import json as _json, re as _re
metadata = extractedContent.get("metadata", {}) if isinstance(extractedContent, dict) else {}
docTitle = metadata.get("title", "") if isinstance(metadata, dict) else ""
docType = metadata.get("documentType", "") if isinstance(metadata, dict) else ""
userHint = (userPrompt or "")[:300]
styleJson = _json.dumps(resolvedStyle, indent=2, default=str)
prompt = (
"You are a document styling expert. Given the document context below, "
"return a JSON delta object containing ONLY the style properties you want to change "
"from the current defaults. Return {} if no changes are needed.\n\n"
f"Document title: {docTitle}\n"
f"Document type: {docType}\n"
f"User request (excerpt): {userHint}\n\n"
f"Current style (full schema):\n{styleJson}\n\n"
"You may adjust any property: fonts (primary/monospace), colors, "
"documentTitle (size, alignment), headings (h1-h4 sizes, colors, spacing), "
"paragraph (size, lineSpacing, alignment e.g. justified), "
"table (header colors, banding, borders, cell padding), "
"list (bullet character, indent), image (default width, alignment), "
"codeBlock (font size, background, border), "
"coverPage (title/subtitle sizes and colors), "
"caption (size, color, italic), "
"page (format, margins, header/footer).\n"
"Match the document's purpose and tone. Examples: a legal document should use "
"serif fonts and justified text; a financial report conservative colors; "
"a marketing brochure bold colors and generous spacing.\n"
"Return ONLY a valid JSON object (no markdown fences, no explanation)."
)
options = AiCallOptions()
options.operationType = OperationTypeEnum.DATA_GENERATE
request = AiCallRequest(prompt=prompt, context="", options=options)
response = await aiService.callAi(request)
raw = (response.content or "").strip() if response else ""
if not raw:
return resolvedStyle
jsonMatch = _re.search(r'```json\s*\n(.*?)\n```', raw, _re.DOTALL)
if jsonMatch:
raw = jsonMatch.group(1).strip()
elif raw.startswith('```'):
raw = _re.sub(r'^```\w*\s*', '', raw)
raw = _re.sub(r'\s*```$', '', raw)
jsonStart = raw.find('{')
jsonEnd = raw.rfind('}')
if jsonStart == -1 or jsonEnd == -1:
return resolvedStyle
raw = raw[jsonStart:jsonEnd + 1]
delta = _json.loads(raw)
if not isinstance(delta, dict) or not delta:
return resolvedStyle
enhanced = deepMerge(resolvedStyle, delta)
logger.info("AI style enhancement applied %d top-level key(s)", len(delta))
return enhanced
except Exception as exc:
logger.warning("AI style enhancement failed, using base style: %s", exc)
return resolvedStyle
async def renderReport(self, extractedContent: Dict[str, Any], outputFormat: str, language: str, title: str, userPrompt: str = None, aiService=None, parentOperationId: Optional[str] = None, style: Optional[Dict[str, Any]] = None, imageResolver=None) -> List[RenderedDocument]:
"""
Render extracted JSON content to the specified output format.
Processes EACH document separately and calls renderer for each.
@ -401,9 +474,6 @@ class GenerationService:
aiService: AI service instance for generation prompt creation
parentOperationId: Optional parent operation ID for hierarchical logging
style: Optional style overrides (deep-merged with DEFAULT_STYLE)
documentTheme: Optional named theme preset (general/finance/legal/
technical/hr/marketing). Resolved as DEFAULT_STYLE <- preset <- style,
so an explicit ``style`` override always wins.
imageResolver: Optional callable ``fileId -> bytes`` for lazy, on-demand
resolution of block images that carry only a ``fileId`` (no embedded
base64). Lets large documents avoid holding every image's bytes in
@ -414,7 +484,9 @@ class GenerationService:
Each RenderedDocument represents one rendered file (main document or supporting file)
"""
try:
resolvedStyle = resolveStyle(style, documentTheme)
resolvedStyle = resolveStyle(style)
if aiService:
resolvedStyle = await self._enhanceStyleWithAi(resolvedStyle, extractedContent, userPrompt, aiService)
# Validate JSON input
if not isinstance(extractedContent, dict):
raise ValueError("extractedContent must be a JSON dictionary")

View file

@ -34,8 +34,7 @@ class DocumentGenerationPath:
contentParts: Optional[List[ContentPart]] = None,
outputFormat: str = "txt",
title: Optional[str] = None,
parentOperationId: Optional[str] = None,
documentTheme: Optional[str] = None
parentOperationId: Optional[str] = None
) -> AiResponse:
"""
Generate document using existing chapter/section model.
@ -166,8 +165,7 @@ class DocumentGenerationPath:
language, # Global fallback (per-document language extracted from structure in renderReport)
title or "Generated Document",
userPrompt,
docOperationId,
documentTheme=documentTheme
docOperationId
)
# Baue Response: Konvertiere alle gerenderten Dokumente zu DocumentData

View file

@ -154,8 +154,12 @@ class BaseRenderer(ABC):
para = style["paragraph"]
lst = style["list"]
cb = style["codeBlock"]
fonts = style.get("fonts") if isinstance(style.get("fonts"), dict) else {}
colors = style.get("colors") if isinstance(style.get("colors"), dict) else {}
primaryColor = colors.get("primary", "#1F3864")
primaryColor = colors.get("primary", "#24292e")
secondaryColor = colors.get("secondary", "#586069")
accentColor = colors.get("accent", "#0366d6")
bgColor = colors.get("background", "#FFFFFF")
rawDocTitle = style.get("documentTitle")
docTitle = rawDocTitle if isinstance(rawDocTitle, dict) else {}
titleSizePt = docTitle.get("sizePt")
@ -168,6 +172,10 @@ class BaseRenderer(ABC):
titleAlign = "center"
titleSpaceBefore = docTitle.get("spaceBeforePt", 0)
titleSpaceAfter = docTitle.get("spaceAfterPt", 18)
img = style.get("image") if isinstance(style.get("image"), dict) else {}
page = style.get("page") if isinstance(style.get("page"), dict) else {}
cover = style.get("coverPage") if isinstance(style.get("coverPage"), dict) else {}
caption = style.get("caption") if isinstance(style.get("caption"), dict) else {}
return {
"title": {
"font_size": titleSizePt,
@ -179,56 +187,169 @@ class BaseRenderer(ABC):
},
"heading1": {
"font_size": h1["sizePt"], "color": h1["color"],
"bold": h1.get("weight") == "bold", "align": "left",
"bold": h1.get("weight") == "bold", "align": h1.get("align", "left"),
"space_before": h1.get("spaceBeforePt", 24),
"space_after": h1.get("spaceAfterPt", 8),
},
"heading2": {
"font_size": h2["sizePt"], "color": h2["color"],
"bold": h2.get("weight") == "bold", "align": "left",
"bold": h2.get("weight") == "bold", "align": h2.get("align", "left"),
"space_before": h2.get("spaceBeforePt", 20),
"space_after": h2.get("spaceAfterPt", 6),
},
"heading3": {
"font_size": h3["sizePt"], "color": h3["color"],
"bold": h3.get("weight") == "bold", "align": "left",
"bold": h3.get("weight") == "bold", "align": h3.get("align", "left"),
"space_before": h3.get("spaceBeforePt", 16),
"space_after": h3.get("spaceAfterPt", 4),
},
"heading4": {
"font_size": h4["sizePt"], "color": h4["color"],
"bold": h4.get("weight") == "bold", "align": "left",
"bold": h4.get("weight") == "bold", "align": h4.get("align", "left"),
"space_before": h4.get("spaceBeforePt", 12),
"space_after": h4.get("spaceAfterPt", 3),
},
"paragraph": {
"font_size": para["sizePt"], "color": para["color"],
"bold": False, "align": "left",
"line_height": para.get("lineSpacing", 1.15),
"bold": False, "align": para.get("align", "left"),
"line_height": para.get("lineSpacing", 1.5),
},
"table_header": {
"background": tbl["headerBg"], "text_color": tbl["headerFg"],
"font_size": tbl.get("headerSizePt", 10),
"bold": True, "align": "center",
},
"table_cell": {
"background": tbl["rowBandingOdd"], "text_color": para["color"],
"font_size": tbl.get("bodySizePt", 10),
"bold": False, "align": "left",
},
"table_border": {
"style": "grid", "color": tbl["borderColor"],
"style": tbl.get("borderStyle", "grid"),
"color": tbl["borderColor"],
"width": tbl.get("borderWidthPt", 0.5),
},
"table_banding": {
"enabled": tbl.get("bandingEnabled", True),
"even": tbl.get("rowBandingEven", "#f6f8fa"),
"odd": tbl.get("rowBandingOdd", "#FFFFFF"),
},
"table_padding": tbl.get("cellPaddingPt", 4),
"bullet_list": {
"font_size": lst["sizePt"], "color": para["color"],
"indent": lst["indentPt"],
"bullet_char": lst.get("bulletChar", "\u2022"),
},
"code_block": {
"font": style["fonts"]["monospace"],
"font": fonts.get("monospace", "Consolas"),
"font_size": cb["fontSizePt"], "color": para["color"],
"background": cb["background"],
"border_color": cb.get("borderColor", "#e1e4e8"),
},
"fonts": {
"primary": fonts.get("primary", "Calibri"),
"monospace": fonts.get("monospace", "Consolas"),
},
"colors": {
"primary": primaryColor,
"secondary": secondaryColor,
"accent": accentColor,
"background": bgColor,
},
"image": {
"default_width": img.get("defaultWidthPt", 480),
"max_width": img.get("maxWidthPt", 800),
"alignment": img.get("alignment", "center"),
},
"page": {
"format": page.get("format", "A4"),
"margins": page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60}),
"show_page_numbers": page.get("showPageNumbers", True),
"header_height": page.get("headerHeight", 30),
"footer_height": page.get("footerHeight", 30),
"header_logo": page.get("headerLogo"),
"header_text": page.get("headerText", ""),
"footer_text": page.get("footerText", ""),
},
"cover_page": {
"title_size": cover.get("titleSizePt", 28),
"subtitle_size": cover.get("subtitleSizePt", 16),
"author_size": cover.get("authorSizePt", 12),
"date_size": cover.get("dateSizePt", 12),
"title_color": cover.get("titleColor", primaryColor),
"subtitle_color": cover.get("subtitleColor", secondaryColor),
},
"caption": {
"font_size": caption.get("sizePt", 10),
"color": caption.get("color", secondaryColor),
"italic": caption.get("italic", True),
"align": caption.get("align", "center"),
},
}
@staticmethod
def _looksNumeric(values: list) -> bool:
"""Return True if most non-empty values in the column look numeric."""
numCount = 0
total = 0
for v in values:
text = str(v).strip() if v is not None else ""
if not text:
continue
total += 1
cleaned = text.replace(",", "").replace("'", "").replace(" ", "")
cleaned = cleaned.lstrip("$€£CHF").rstrip("%")
try:
float(cleaned)
numCount += 1
except ValueError:
pass
return total > 0 and numCount / total >= 0.6
@staticmethod
def _looksDate(values: list) -> bool:
"""Return True if most non-empty values look like dates."""
dateCount = 0
total = 0
datePattern = re.compile(r"^\d{1,4}[./-]\d{1,2}[./-]\d{1,4}$")
for v in values:
text = str(v).strip() if v is not None else ""
if not text:
continue
total += 1
if datePattern.match(text):
dateCount += 1
return total > 0 and dateCount / total >= 0.6
def _inferColumnAlignments(self, headers: list, rows: list, tableStyle: dict | None = None) -> list:
"""Infer per-column alignments from explicit tableStyle or data heuristic.
Returns a list of ``"left"``/``"center"``/``"right"`` strings, one per column.
"""
colCount = len(headers) if headers else (len(rows[0]) if rows else 0)
if not colCount:
return []
if tableStyle and tableStyle.get("columnAlignments"):
explicit = tableStyle["columnAlignments"]
if isinstance(explicit, list) and len(explicit) >= colCount:
return [a if a in ("left", "center", "right") else "left" for a in explicit[:colCount]]
alignments = []
for colIdx in range(colCount):
colValues = []
for row in rows:
if colIdx < len(row):
cell = row[colIdx]
if isinstance(cell, list):
cell = "".join(r.get("value", "") for r in cell if isinstance(r, dict))
colValues.append(cell)
if self._looksNumeric(colValues):
alignments.append("right")
elif self._looksDate(colValues):
alignments.append("center")
else:
alignments.append("left")
return alignments
@staticmethod
def _inlineRunsFromContent(content: Dict[str, Any], *, itemsKey: str = None) -> Any:
"""Extract inline runs from new-format content, falling back to old format.

View file

@ -115,8 +115,10 @@ class RendererHtml(BaseRenderer):
styles = self._convertUnifiedStyleToInternal(style)
self._unifiedStyle = style
else:
styles = await self._getStyleSet(jsonContent, userPrompt, aiService)
self._unifiedStyle = None
from modules.serviceCenter.services.serviceGeneration.styleDefaults import resolveStyle
style = resolveStyle()
styles = self._convertUnifiedStyleToInternal(style)
self._unifiedStyle = style
# Validate JSON structure
if not self._validateJsonStructure(jsonContent):
@ -174,107 +176,6 @@ class RendererHtml(BaseRenderer):
self.logger.error(f"Error generating HTML from JSON: {str(e)}")
raise Exception(f"HTML generation failed: {str(e)}")
async def _getStyleSet(self, extractedContent: Dict[str, Any] = None, userPrompt: str = None, aiService=None, templateName: str = None) -> Dict[str, Any]:
"""Get style set - use styles from document generation metadata if available,
otherwise enhance default styles with AI if userPrompt provided.
WICHTIG: In a dynamic scalable AI system, styling should come from document generation,
not be generated separately by renderers. Only fall back to AI if styles not provided.
Args:
extractedContent: Document content with metadata (may contain styles)
userPrompt: User's prompt (AI will detect style instructions in any language)
aiService: AI service (used only if styles not in metadata and userPrompt provided)
templateName: Name of template style set (None = default)
Returns:
Dict with style definitions for all document styles
"""
# Get default style set
defaultStyleSet = self._getDefaultStyleSet()
# FIRST: Check if styles are provided in document generation metadata (preferred approach)
if extractedContent:
metadata = extractedContent.get("metadata", {})
if isinstance(metadata, dict):
styles = metadata.get("styles")
if styles and isinstance(styles, dict):
self.logger.debug("Using styles from document generation metadata")
return self._validateStylesContrast(styles)
# FALLBACK: Enhance with AI if userPrompt provided (only if styles not in metadata)
if userPrompt and aiService:
self.logger.info(f"Styles not in metadata, enhancing with AI based on user prompt...")
enhancedStyleSet = await self._enhanceStylesWithAI(userPrompt, defaultStyleSet, aiService)
return self._validateStylesContrast(enhancedStyleSet)
else:
# Use default styles only
return defaultStyleSet
async def _enhanceStylesWithAI(self, userPrompt: str, defaultStyleSet: Dict[str, Any], aiService) -> Dict[str, Any]:
"""Enhance default styles with AI based on user prompt."""
try:
style_template = self._createAiStyleTemplate("html", userPrompt, defaultStyleSet)
enhanced_styles = await self._getAiStyles(aiService, style_template, defaultStyleSet)
return enhanced_styles
except Exception as e:
self.logger.warning(f"AI style enhancement failed: {str(e)}, using default styles")
return defaultStyleSet
def _validateStylesContrast(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix contrast issues in AI-generated styles."""
try:
# Fix table header contrast
if "table_header" in styles:
header = styles["table_header"]
bgColor = header.get("background", "#FFFFFF")
textColor = header.get("color", "#000000")
# If both are white or both are dark, fix it
if bgColor.upper() == "#FFFFFF" and textColor.upper() == "#FFFFFF":
header["background"] = "#4F4F4F"
header["color"] = "#FFFFFF"
elif bgColor.upper() == "#000000" and textColor.upper() == "#000000":
header["background"] = "#4F4F4F"
header["color"] = "#FFFFFF"
# Fix table cell contrast
if "table_cell" in styles:
cell = styles["table_cell"]
bgColor = cell.get("background", "#FFFFFF")
textColor = cell.get("color", "#000000")
# If both are white or both are dark, fix it
if bgColor.upper() == "#FFFFFF" and textColor.upper() == "#FFFFFF":
cell["background"] = "#FFFFFF"
cell["color"] = "#2F2F2F"
elif bgColor.upper() == "#000000" and textColor.upper() == "#000000":
cell["background"] = "#FFFFFF"
cell["color"] = "#2F2F2F"
return styles
except Exception as e:
self.logger.warning(f"Style validation failed: {str(e)}")
return self._getDefaultStyleSet()
def _getDefaultStyleSet(self) -> Dict[str, Any]:
"""Default HTML style set - used when no style instructions present."""
return {
"title": {"font_size": "2.5em", "color": "#1F4E79", "font_weight": "bold", "text_align": "center", "margin": "0 0 1em 0"},
"heading1": {"font_size": "2em", "color": "#2F2F2F", "font_weight": "bold", "text_align": "left", "margin": "1.5em 0 0.5em 0"},
"heading2": {"font_size": "1.5em", "color": "#4F4F4F", "font_weight": "bold", "text_align": "left", "margin": "1em 0 0.5em 0"},
"paragraph": {"font_size": "1em", "color": "#2F2F2F", "font_weight": "normal", "text_align": "left", "margin": "0 0 1em 0", "line_height": "1.6"},
"table": {"border": "1px solid #ddd", "border_collapse": "collapse", "width": "100%", "margin": "1em 0"},
"table_header": {"background": "#4F4F4F", "color": "#FFFFFF", "font_weight": "bold", "text_align": "center", "padding": "12px"},
"table_cell": {"background": "#FFFFFF", "color": "#2F2F2F", "font_weight": "normal", "text_align": "left", "padding": "8px", "border": "1px solid #ddd"},
"bullet_list": {"font_size": "1em", "color": "#2F2F2F", "margin": "0 0 1em 0", "padding_left": "20px"},
"code_block": {"font_family": "Courier New, monospace", "font_size": "0.9em", "color": "#2F2F2F", "background": "#F5F5F5", "padding": "1em", "border": "1px solid #ddd", "border_radius": "4px", "margin": "1em 0"},
"image": {"max_width": "100%", "height": "auto", "margin": "1em 0", "border_radius": "4px"},
"body": {"font_family": "Arial, sans-serif", "background": "#FFFFFF", "color": "#2F2F2F", "margin": "0", "padding": "20px"}
}
def _generateCssStyles(self, styles: Dict[str, Any]) -> str:
"""Generate CSS from style definitions."""
# When unified style is available, generate CSS directly from it
@ -440,7 +341,9 @@ class RendererHtml(BaseRenderer):
css_parts.append(f" font-size: {h.get('sizePt', max(24 - (level-1)*4, 12))}pt;")
css_parts.append(f" color: {h.get('color', primaryColor)};")
css_parts.append(f" font-weight: {h.get('weight', 'bold')};")
css_parts.append(f" margin: 1.2em 0 0.4em 0;")
spBefore = h.get('spaceBeforePt', max(24 - (level - 1) * 4, 12))
spAfter = h.get('spaceAfterPt', max(8 - (level - 1) * 2, 3))
css_parts.append(f" margin: {spBefore}pt 0 {spAfter}pt 0;")
css_parts.append("}")
# Paragraphs
@ -453,11 +356,16 @@ class RendererHtml(BaseRenderer):
# Tables
borderColor = tbl.get("borderColor", "#DEE2E6")
borderStyle = tbl.get("borderStyle", "grid")
css_parts.append("table {")
css_parts.append(f" border-collapse: collapse;")
css_parts.append(f" width: 100%;")
css_parts.append(f" margin: 1em 0;")
css_parts.append(" border-collapse: collapse;")
css_parts.append(" width: 100%;")
css_parts.append(" margin: 1em 0;")
if borderStyle == "grid":
css_parts.append(f" border: 1px solid {borderColor};")
elif borderStyle == "horizontal":
css_parts.append(f" border-top: 1px solid {borderColor};")
css_parts.append(f" border-bottom: 1px solid {borderColor};")
css_parts.append("}")
# Table headers
@ -466,17 +374,30 @@ class RendererHtml(BaseRenderer):
css_parts.append(f" color: {tbl.get('headerFg', '#FFFFFF')};")
css_parts.append(" font-weight: bold;")
css_parts.append(" text-align: center;")
css_parts.append(f" padding: 10px;")
css_parts.append(" padding: 10px;")
if borderStyle == "grid":
css_parts.append(f" border: 1px solid {borderColor};")
elif borderStyle == "horizontal":
css_parts.append(f" border-bottom: 1px solid {borderColor};")
css_parts.append("}")
# Table cells
css_parts.append("td {")
css_parts.append(f" color: {paraColor};")
css_parts.append(" padding: 8px;")
if borderStyle == "grid":
css_parts.append(f" border: 1px solid {borderColor};")
elif borderStyle == "horizontal":
css_parts.append(f" border-bottom: 1px solid {borderColor};")
css_parts.append("}")
# Row banding
if tbl.get("bandingEnabled", True):
evenBg = tbl.get("rowBandingEven", "#f6f8fa")
oddBg = tbl.get("rowBandingOdd", "#FFFFFF")
css_parts.append(f"tbody tr:nth-child(even) {{ background: {evenBg}; }}")
css_parts.append(f"tbody tr:nth-child(odd) {{ background: {oddBg}; }}")
# Lists
css_parts.append("ul {")
css_parts.append(f" font-size: {lst.get('sizePt', paraSizePt)}pt;")
@ -499,13 +420,33 @@ class RendererHtml(BaseRenderer):
css_parts.append("}")
# Images
imgStyle = style.get("image", {})
imgMaxWidth = imgStyle.get("maxWidthPt", 800)
imgAlignment = imgStyle.get("alignment", "center")
css_parts.append("img {")
css_parts.append(" max-width: 100%;")
css_parts.append(f" max-width: min({imgMaxWidth}pt, 100%);")
css_parts.append(" height: auto;")
css_parts.append(" display: block;")
if imgAlignment == "center":
css_parts.append(" margin: 1em auto;")
elif imgAlignment == "right":
css_parts.append(" margin: 1em 0 1em auto;")
else:
css_parts.append(" margin: 1em 0;")
css_parts.append(" border-radius: 4px;")
css_parts.append("}")
# Figcaptions
captionStyle = style.get("caption", {})
css_parts.append("figcaption {")
css_parts.append(f" font-size: {captionStyle.get('sizePt', 10)}pt;")
css_parts.append(f" color: {captionStyle.get('color', '#586069')};")
if captionStyle.get("italic", True):
css_parts.append(" font-style: italic;")
css_parts.append(f" text-align: {captionStyle.get('align', 'center')};")
css_parts.append(" margin-top: 0.5em;")
css_parts.append("}")
# Generated info
css_parts.append(".generated-info {")
css_parts.append(" font-size: 0.9em;")
@ -839,9 +780,7 @@ class RendererHtml(BaseRenderer):
# Use data URI as placeholder - will be replaced with file path in _replaceImageDataUris
# Include a marker so we can find and replace it
imageMarker = f"<!--IMAGE_MARKER:{len(base64Data)}:{altTextEscaped[:50]}-->"
# Add max-width and max-height to ensure image fits within page dimensions
# Typical page width is ~800-1200px, height varies but we limit to 600px for readability
imgTag = f'<img src="data:image/png;base64,{base64Data}" alt="{altTextEscaped}" style="max-width: 100%; max-height: 600px; width: auto; height: auto;">'
imgTag = f'<img src="data:image/png;base64,{base64Data}" alt="{altTextEscaped}">'
if captionEscaped:
return f'{imageMarker}<figure>{imgTag}<figcaption>{captionEscaped}</figcaption></figure>'

View file

@ -20,7 +20,7 @@ try:
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY, TA_RIGHT
REPORTLAB_AVAILABLE = True
except ImportError:
REPORTLAB_AVAILABLE = False
@ -28,12 +28,71 @@ except ImportError:
import re as _re_pdf
from ._pdfFontFallback import wrapEmojiSpansInXml as _wrapEmojiSpansInXml
from modules.serviceCenter.services.serviceGeneration.styleDefaults import deepMerge as _deepMergeStyle
# A4 width in pt; margins must match SimpleDocTemplate(leftMargin/rightMargin)
_PDF_MARGIN_LR_PT = 72.0
_PDF_A4_WIDTH_PT = 595.27
_PDF_CONTENT_WIDTH_PT = _PDF_A4_WIDTH_PT - (2 * _PDF_MARGIN_LR_PT)
# Font resolution: map CSS/system font names to ReportLab built-in equivalents.
# ReportLab core fonts: Helvetica, Times-Roman, Courier, Symbol, ZapfDingbats.
_FONT_FALLBACK_MAP = {
"calibri": "Helvetica",
"arial": "Helvetica",
"verdana": "Helvetica",
"segoe ui": "Helvetica",
"helvetica": "Helvetica",
"times new roman": "Times-Roman",
"times": "Times-Roman",
"georgia": "Times-Roman",
"consolas": "Courier",
"courier new": "Courier",
"courier": "Courier",
"monospace": "Courier",
}
_BOLD_VARIANT = {
"Helvetica": "Helvetica-Bold",
"Times-Roman": "Times-Bold",
"Courier": "Courier-Bold",
}
_registeredTtfFonts: set = set()
def _resolveFontFamily(fontName: str, bold: bool = False) -> str:
"""Resolve a CSS/system font name to a ReportLab-compatible font name.
Tries TTF registration from the system font dir first; on failure falls
back to the closest built-in core font.
"""
if not fontName:
return "Helvetica-Bold" if bold else "Helvetica"
key = fontName.strip().lower()
if key in _registeredTtfFonts:
return f"{fontName}-Bold" if bold else fontName
if key not in _FONT_FALLBACK_MAP:
try:
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
import os
winFontsDir = os.path.join(os.environ.get("WINDIR", r"C:\Windows"), "Fonts")
candidates = [
os.path.join(winFontsDir, f"{fontName}.ttf"),
os.path.join(winFontsDir, f"{fontName.lower()}.ttf"),
f"/usr/share/fonts/truetype/{fontName.lower()}/{fontName.lower()}.ttf",
]
for path in candidates:
if os.path.isfile(path):
pdfmetrics.registerFont(TTFont(fontName, path))
_registeredTtfFonts.add(key)
return fontName
except Exception:
pass
coreFont = _FONT_FALLBACK_MAP.get(key, "Helvetica")
if bold:
return _BOLD_VARIANT.get(coreFont, f"{coreFont}-Bold")
return coreFont
def _boxDrawingCharToAscii(ch: str) -> str:
"""Map one box-drawing character to ASCII (Courier has no glyphs for U+2500U+257F)."""
@ -170,10 +229,11 @@ class RendererPdf(BaseRenderer):
# memory simultaneously. Collected here, deleted after the build.
self._tempImageFiles = []
try:
# Get style set from unified style or legacy approach
self._unifiedStyle = unifiedStyle
if unifiedStyle:
styles = self._convertUnifiedStyleToInternal(unifiedStyle)
self._unifiedStyle = unifiedStyle
else:
styles = self._convertUnifiedStyleToInternal({})
for level in range(1, 7):
hKey = f"heading{level}"
if hKey not in styles:
@ -182,13 +242,10 @@ class RendererPdf(BaseRenderer):
styles[hKey].setdefault("space_after", 12)
styles[hKey].setdefault("space_before", 12)
styles["paragraph"].setdefault("space_after", 6)
styles["paragraph"].setdefault("line_height", unifiedStyle["paragraph"].get("lineSpacing", 1.2))
styles["paragraph"].setdefault("line_height", (unifiedStyle or {}).get("paragraph", {}).get("lineSpacing", 1.5))
styles["bullet_list"].setdefault("space_after", 3)
styles["code_block"].setdefault("space_after", 6)
styles["code_block"].setdefault("align", "left")
else:
styles = await self._getStyleSet(json_content, userPrompt, aiService)
self._unifiedStyle = None
# Validate JSON structure
if not self._validateJsonStructure(json_content):
@ -307,247 +364,6 @@ class RendererPdf(BaseRenderer):
if not removed:
raise
async def _getStyleSet(self, extractedContent: Dict[str, Any] = None, userPrompt: str = None, aiService=None, templateName: str = None) -> Dict[str, Any]:
"""Get style set - use styles from document generation metadata if available,
otherwise enhance default styles with AI if userPrompt provided.
WICHTIG: In a dynamic scalable AI system, styling should come from document generation,
not be generated separately by renderers. Only fall back to AI if styles not provided.
Args:
extractedContent: Document content with metadata (may contain styles)
userPrompt: User's prompt (AI will detect style instructions in any language)
aiService: AI service (used only if styles not in metadata and userPrompt provided)
templateName: Name of template style set (None = default)
Returns:
Dict with style definitions for all document styles
"""
# Get default style set
defaultStyleSet = self._getDefaultStyleSet()
# FIRST: Check if styles are provided in document generation metadata (preferred approach)
if extractedContent:
metadata = extractedContent.get("metadata", {})
if isinstance(metadata, dict):
styles = metadata.get("styles")
if styles and isinstance(styles, dict):
self.logger.debug("Using styles from document generation metadata")
enhancedStyleSet = self._convertColorsFormat(styles)
return self._validateStylesContrast(enhancedStyleSet)
# FALLBACK: Enhance with AI if userPrompt provided (only if styles not in metadata)
if userPrompt and aiService:
self.logger.info(f"Styles not in metadata, enhancing with AI based on user prompt...")
enhancedStyleSet = await self._enhanceStylesWithAI(userPrompt, defaultStyleSet, aiService)
# Convert colors to PDF format after getting styles
enhancedStyleSet = self._convertColorsFormat(enhancedStyleSet)
return self._validateStylesContrast(enhancedStyleSet)
else:
# Use default styles only
return defaultStyleSet
async def _enhanceStylesWithAI(self, userPrompt: str, defaultStyleSet: Dict[str, Any], aiService) -> Dict[str, Any]:
"""Enhance default styles with AI based on user prompt."""
try:
style_template = self._createAiStyleTemplate("pdf", userPrompt, defaultStyleSet)
enhanced_styles = await self._getAiStyles(aiService, style_template, defaultStyleSet)
return enhanced_styles
except Exception as e:
self.logger.warning(f"AI style enhancement failed: {str(e)}, using default styles")
return defaultStyleSet
def _validateStylesContrast(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix contrast issues in AI-generated styles."""
try:
# Fix table header contrast
if "table_header" in styles:
header = styles["table_header"]
bg_color = header.get("background", "#FFFFFF")
text_color = header.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
header["background"] = "#4F4F4F"
header["text_color"] = "#FFFFFF"
# Fix table cell contrast
if "table_cell" in styles:
cell = styles["table_cell"]
bg_color = cell.get("background", "#FFFFFF")
text_color = cell.get("text_color", "#000000")
# If both are white or both are dark, fix it
if bg_color.upper() == "#FFFFFF" and text_color.upper() == "#FFFFFF":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
elif bg_color.upper() == "#000000" and text_color.upper() == "#000000":
cell["background"] = "#FFFFFF"
cell["text_color"] = "#2F2F2F"
return styles
except Exception as e:
self.logger.warning(f"Style validation failed: {str(e)}")
return self._getDefaultStyleSet()
def _getDefaultStyleSet(self) -> Dict[str, Any]:
"""Default PDF style set - used when no style instructions present."""
return {
"title": {"font_size": 24, "color": "#1F4E79", "bold": True, "align": "center", "space_after": 30},
# Markdown #..###### — sizes must strictly decrease (H1 largest … H6 smallest).
"heading1": {"font_size": 18, "color": "#2F2F2F", "bold": True, "align": "left", "space_after": 12, "space_before": 12},
"heading2": {"font_size": 15, "color": "#2F2F2F", "bold": True, "align": "left", "space_after": 10, "space_before": 10},
"heading3": {"font_size": 13, "color": "#4F4F4F", "bold": True, "align": "left", "space_after": 8, "space_before": 8},
"heading4": {"font_size": 12, "color": "#4F4F4F", "bold": True, "align": "left", "space_after": 6, "space_before": 6},
"heading5": {"font_size": 11, "color": "#4F4F4F", "bold": True, "align": "left", "space_after": 6, "space_before": 6},
"heading6": {"font_size": 10, "color": "#4F4F4F", "bold": True, "align": "left", "space_after": 4, "space_before": 4},
"paragraph": {"font_size": 11, "color": "#2F2F2F", "bold": False, "align": "left", "space_after": 6, "line_height": 1.2},
"table_header": {"background": "#4F4F4F", "text_color": "#FFFFFF", "bold": True, "align": "left", "font_size": 12},
"table_cell": {"background": "#FFFFFF", "text_color": "#2F2F2F", "bold": False, "align": "left", "font_size": 10},
"bullet_list": {"font_size": 11, "color": "#2F2F2F", "space_after": 3},
"code_block": {"font": "Courier", "font_size": 9, "color": "#2F2F2F", "background": "#F5F5F5", "space_after": 6, "align": "left"}
}
async def _getAiStylesWithPdfColors(self, ai_service, style_template: str, default_styles: Dict[str, Any]) -> Dict[str, Any]:
"""Get AI styles with proper PDF color conversion."""
if not ai_service:
return default_styles
try:
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
request_options = AiCallOptions()
request_options.operationType = OperationTypeEnum.DATA_GENERATE
request = AiCallRequest(prompt=style_template, context="", options=request_options)
# Check if AI service is properly configured
if not hasattr(ai_service, 'aiObjects') or not ai_service.aiObjects:
self.logger.warning("AI service not properly configured, using defaults")
return default_styles
response = await ai_service.callAi(request)
# Check if response is valid
if not response:
self.logger.warning("AI service returned no response, using defaults")
return default_styles
import json
import re
# Clean and parse JSON
result = response.content.strip() if response and response.content else ""
# Check if result is empty
if not result:
self.logger.warning("AI styling returned empty response, using defaults")
return default_styles
# Log the raw response for debugging
self.logger.debug(f"AI styling raw response: {result[:200]}...")
# Extract JSON from various formats
json_match = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
if json_match:
result = json_match.group(1).strip()
elif result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
# Try to extract JSON from explanatory text
json_patterns = [
r'\{[^{}]*"title"[^{}]*\}', # Simple JSON object
r'\{.*?"title".*?\}', # JSON with title field
r'\{.*?"font_size".*?\}', # JSON with font_size field
]
for pattern in json_patterns:
json_match = re.search(pattern, result, re.DOTALL)
if json_match:
result = json_match.group(0)
break
# Additional cleanup - remove any leading/trailing whitespace and newlines
result = result.strip()
# Check if result is still empty after cleanup
if not result:
self.logger.warning("AI styling returned empty content after cleanup, using defaults")
return default_styles
# Try to parse JSON
try:
styles = json.loads(result)
self.logger.debug(f"Successfully parsed AI styles: {list(styles.keys())}")
except json.JSONDecodeError as json_error:
self.logger.warning(f"AI styling returned invalid JSON: {json_error}")
# Use print instead of logger to avoid truncation
self.services.utils.debugLogToFile(f"FULL AI RESPONSE THAT FAILED TO PARSE: {result}", "PDF_RENDERER")
self.services.utils.debugLogToFile(f"RESPONSE LENGTH: {len(result)} characters", "PDF_RENDERER")
self.logger.warning(f"Raw content that failed to parse: {result}")
# Try to fix incomplete JSON by adding missing closing braces
open_braces = result.count('{')
close_braces = result.count('}')
if open_braces > close_braces:
# JSON is incomplete, add missing closing braces
missing_braces = open_braces - close_braces
result = result + '}' * missing_braces
self.logger.info(f"Added {missing_braces} missing closing brace(s)")
# Try parsing the fixed JSON
try:
styles = json.loads(result)
self.logger.info("Successfully fixed incomplete JSON")
except json.JSONDecodeError as fix_error:
self.logger.warning(f"Fixed JSON still invalid: {fix_error}")
# Try to extract just the JSON part if it's embedded in text
json_start = result.find('{')
json_end = result.rfind('}')
if json_start != -1 and json_end != -1 and json_end > json_start:
json_part = result[json_start:json_end+1]
try:
styles = json.loads(json_part)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return default_styles
else:
return default_styles
else:
# Try to extract just the JSON part if it's embedded in text
json_start = result.find('{')
json_end = result.rfind('}')
if json_start != -1 and json_end != -1 and json_end > json_start:
json_part = result[json_start:json_end+1]
try:
styles = json.loads(json_part)
self.logger.info("Successfully extracted JSON from explanatory text")
except json.JSONDecodeError:
self.logger.warning("Could not extract valid JSON from response, using defaults")
return default_styles
else:
return default_styles
# Convert colors to PDF format (keep as hex strings, PDF renderer will convert them)
styles = self._convertColorsFormat(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return default_styles
def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Convert colors to proper format for PDF compatibility."""
@ -580,9 +396,13 @@ class RendererPdf(BaseRenderer):
sizes = {1: 18, 2: 15, 3: 13, 4: 12, 5: 11, 6: 10}
fs = sizes.get(level, 10)
sb = max(4, 14 - level)
us = getattr(self, '_unifiedStyle', None) or {}
clrs = us.get("colors", {})
primary = clrs.get("primary", "#24292e")
secondary = clrs.get("secondary", "#586069")
return {
"font_size": fs,
"color": "#2F2F2F" if level <= 2 else "#4F4F4F",
"color": primary if level <= 2 else secondary,
"bold": True,
"align": "left",
"space_after": sb,
@ -594,14 +414,19 @@ class RendererPdf(BaseRenderer):
title_style_def = styles.get("title") or {}
fs = title_style_def.get("font_size", 26)
bold = title_style_def.get("bold", True)
us = getattr(self, '_unifiedStyle', None)
primaryFont = us["fonts"]["primary"] if us else "Calibri"
coverTitleColor = styles.get("cover_page", {}).get("title_color")
colorsFallback = styles.get("colors", {}).get("primary", "#24292e")
titleColor = title_style_def.get("color", coverTitleColor or colorsFallback)
return ParagraphStyle(
"DocumentTitle",
fontName="Helvetica-Bold" if bold else "Helvetica",
fontName=_resolveFontFamily(primaryFont, bold),
fontSize=fs,
spaceAfter=title_style_def.get("space_after", 18),
spaceBefore=title_style_def.get("space_before", 0),
alignment=self._getAlignment(title_style_def.get("align", "center")),
textColor=self._hexToColor(title_style_def.get("color", "#1F3864")),
textColor=self._hexToColor(titleColor),
leading=fs * 1.25,
)
@ -611,28 +436,32 @@ class RendererPdf(BaseRenderer):
heading_style_def = styles.get(heading_key) or self._defaultHeadingStyleDef(level)
fs = heading_style_def.get("font_size", self._defaultHeadingStyleDef(level)["font_size"])
bold = heading_style_def.get("bold", True)
us = getattr(self, '_unifiedStyle', None)
primaryFont = us["fonts"]["primary"] if us else "Calibri"
return ParagraphStyle(
f'CustomHeading{level}',
fontName="Helvetica-Bold" if bold else "Helvetica",
fontName=_resolveFontFamily(primaryFont, bold),
fontSize=fs,
spaceAfter=heading_style_def.get("space_after", 12),
spaceBefore=heading_style_def.get("space_before", 12),
alignment=self._getAlignment(heading_style_def.get("align", "left")),
textColor=self._hexToColor(heading_style_def.get("color", "#2F2F2F")),
textColor=self._hexToColor(heading_style_def.get("color", styles.get("colors", {}).get("primary", "#24292e"))),
leading=fs * 1.35,
)
def _createNormalStyle(self, styles: Dict[str, Any]) -> ParagraphStyle:
"""Create normal paragraph style from style definitions."""
paragraph_style_def = styles.get("paragraph", {})
us = getattr(self, '_unifiedStyle', None)
primaryFont = us["fonts"]["primary"] if us else "Calibri"
return ParagraphStyle(
'CustomNormal',
fontName=_resolveFontFamily(primaryFont, False),
fontSize=paragraph_style_def.get("font_size", 11),
spaceAfter=paragraph_style_def.get("space_after", 6),
alignment=self._getAlignment(paragraph_style_def.get("align", "left")),
textColor=self._hexToColor(paragraph_style_def.get("color", "#2F2F2F")),
leading=paragraph_style_def.get("line_height", 1.2) * paragraph_style_def.get("font_size", 11)
textColor=self._hexToColor(paragraph_style_def.get("color", "#24292e")),
leading=paragraph_style_def.get("line_height", 1.5) * paragraph_style_def.get("font_size", 11)
)
def _getAlignment(self, align: str) -> int:
@ -644,10 +473,10 @@ class RendererPdf(BaseRenderer):
"center": TA_CENTER,
"left": TA_LEFT,
"justify": TA_JUSTIFY,
"right": TA_LEFT, # ReportLab doesn't have TA_RIGHT, use LEFT as fallback
"0": TA_LEFT, # Handle numeric strings
"right": TA_RIGHT,
"0": TA_LEFT,
"1": TA_CENTER,
"2": TA_JUSTIFY
"2": TA_JUSTIFY,
}
return align_map.get(align.lower().strip(), TA_LEFT)
@ -687,7 +516,7 @@ class RendererPdf(BaseRenderer):
"""Convert inline runs to ReportLab Paragraph XML."""
parts = []
us = getattr(self, '_unifiedStyle', None)
monoFont = us["fonts"]["monospace"] if us else "Courier"
monoFont = _resolveFontFamily(us["fonts"]["monospace"] if us else "Courier")
for run in runs:
runType = run.get("type", "text")
value = self._escapeReportlabXml(run.get("value", ""))
@ -730,13 +559,15 @@ class RendererPdf(BaseRenderer):
if not text:
return ""
text = _normalizePdfMonospaceText(text)
us = getattr(self, '_unifiedStyle', None)
monoFont = _resolveFontFamily(us["fonts"]["monospace"] if us else "Courier")
out: List[str] = []
pos = 0
for m in _re_pdf.finditer(r"`([^`]*)`", text):
before = text[pos:m.start()]
out.append(self._applyInlineMarkdownToEscapedPlain(before))
code = m.group(1)
out.append(f'<font name="Courier">{self._escapeReportlabXml(code)}</font>')
out.append(f'<font name="{monoFont}">{self._escapeReportlabXml(code)}</font>')
pos = m.end()
out.append(self._applyInlineMarkdownToEscapedPlain(text[pos:]))
return _wrapEmojiSpansInXml("".join(out))
@ -750,16 +581,75 @@ class RendererPdf(BaseRenderer):
"""Paragraph style for table cells (word wrap within colWidth)."""
tdef = styles.get(tableStyleKey, {})
fs = tdef.get("font_size", 12 if header else 10)
defaultTc = "#FFFFFF" if header else "#2F2F2F"
defaultTc = "#24292e"
us = getattr(self, '_unifiedStyle', None)
primaryFont = us["fonts"]["primary"] if us else "Calibri"
isBold = header and tdef.get("bold", True)
return ParagraphStyle(
f"TblCell{'H' if header else 'B'}{tableStyleKey}",
fontSize=fs,
leading=fs * 1.25,
alignment=TA_LEFT,
alignment=self._getAlignment(tdef.get("align", "left")),
textColor=self._hexToColor(tdef.get("text_color", defaultTc)),
fontName="Helvetica-Bold" if header and tdef.get("bold", True) else "Helvetica",
fontName=_resolveFontFamily(primaryFont, isBold),
)
def _createCaptionStyle(self, styles: Dict[str, Any]) -> ParagraphStyle:
"""Paragraph style for image/figure captions driven by styles["caption"]."""
captionDef = styles.get("caption", {})
us = getattr(self, '_unifiedStyle', None)
primaryFont = us["fonts"]["primary"] if us else "Calibri"
fs = captionDef.get("font_size", 10)
colorFallback = styles.get("colors", {}).get("secondary", "#586069")
return ParagraphStyle(
"CaptionStyle",
fontName=_resolveFontFamily(primaryFont, False),
fontSize=fs,
leading=fs * 1.25,
textColor=self._hexToColor(captionDef.get("color", colorFallback)),
alignment=self._getAlignment(captionDef.get("align", "center")),
spaceAfter=4,
)
def _inferColumnAlignments(self, headers: List, rows: List, mergedTableStyle: Dict[str, Any]) -> List[str]:
"""Infer per-column text alignment from explicit style or cell content heuristics.
Numeric-majority columns (>60 %) get right-aligned; everything else left.
An explicit ``columnAlignments`` list in *mergedTableStyle* takes precedence.
"""
numCols = len(headers)
explicit = mergedTableStyle.get("columnAlignments", [])
if explicit and len(explicit) >= numCols:
return list(explicit[:numCols])
alignments = list(explicit) if explicit else []
for colIdx in range(len(alignments), numCols):
numericCount = 0
totalCount = 0
for row in rows:
if colIdx < len(row):
cell = row[colIdx]
if isinstance(cell, list):
val = "".join(
r.get("value", "") if isinstance(r, dict) else str(r) for r in cell
).strip()
elif cell is not None:
val = str(cell).strip()
else:
val = ""
if val:
totalCount += 1
cleaned = val.replace(",", "").replace("%", "").replace("$", "").replace("\u20ac", "").replace("'", "").strip()
try:
float(cleaned)
numericCount += 1
except (ValueError, TypeError):
pass
if totalCount > 0 and numericCount / totalCount > 0.6:
alignments.append("right")
else:
alignments.append("left")
return alignments
def _renderJsonSection(self, section: Dict[str, Any], styles: Dict[str, Any]) -> List[Any]:
"""Render a single JSON section to PDF elements using AI-generated styles.
Supports three content formats: reference, object (base64), extracted_text.
@ -841,7 +731,12 @@ class RendererPdf(BaseRenderer):
return [Paragraph(f"[Error rendering section: {str(e)}]", self._createNormalStyle(styles))]
def _renderJsonTable(self, table_data: Dict[str, Any], styles: Dict[str, Any]) -> List[Any]:
"""Render a JSON table: left-aligned, width capped to printable area, cells wrap."""
"""Render a JSON table: left-aligned, width capped to printable area, cells wrap.
Supports per-table style overrides via ``content["tableStyle"]``, border
style variants (grid / horizontal / none), banding toggle, configurable
cell padding, and auto-inferred column alignments.
"""
try:
content = table_data.get("content", {})
if not isinstance(content, dict):
@ -852,12 +747,30 @@ class RendererPdf(BaseRenderer):
if not headers or not rows:
return []
# Per-table style override merged onto global table style
us = getattr(self, '_unifiedStyle', None) or {}
globalTableStyle = us.get("table", {})
perTableOverride = content.get("tableStyle", {})
mergedTableStyle = _deepMergeStyle(globalTableStyle, perTableOverride) if perTableOverride else dict(globalTableStyle)
numCols = len(headers)
colWidth = _PDF_CONTENT_WIDTH_PT / max(numCols, 1)
colWidths = [colWidth] * numCols
colAligns = self._inferColumnAlignments(headers, rows, mergedTableStyle)
hdrPs = self._createTableCellParagraphStyle(styles, header=True, tableStyleKey="table_header")
cellPs = self._createTableCellParagraphStyle(styles, header=False, tableStyleKey="table_cell")
cellBasePs = self._createTableCellParagraphStyle(styles, header=False, tableStyleKey="table_cell")
colCellStyles: List[ParagraphStyle] = []
for colIdx in range(numCols):
colAlign = colAligns[colIdx] if colIdx < len(colAligns) else "left"
colPs = ParagraphStyle(
f"TblCellB_c{colIdx}",
parent=cellBasePs,
alignment=self._getAlignment(colAlign),
)
colCellStyles.append(colPs)
def _cellPara(cell, ps):
runs = self._inlineRunsForCell(cell)
@ -871,28 +784,45 @@ class RendererPdf(BaseRenderer):
for row in rows:
padded = list(row) + [""] * max(0, numCols - len(row))
padded = padded[:numCols]
bodyRows.append([_cellPara(c, cellPs) for c in padded])
bodyRows.append([_cellPara(padded[i], colCellStyles[i]) for i in range(numCols)])
table_matrix = [headerRow] + bodyRows
table = Table(table_matrix, colWidths=colWidths, repeatRows=1)
table_header_style = styles.get("table_header", {})
table_cell_style = styles.get("table_cell", {})
borderColor = self._hexToColor(mergedTableStyle.get("borderColor", "#e1e4e8"))
borderWidth = mergedTableStyle.get("borderWidthPt", 0.5)
evenBg = self._hexToColor(mergedTableStyle.get("rowBandingEven", "#f6f8fa"))
oddBg = self._hexToColor(mergedTableStyle.get("rowBandingOdd", "#FFFFFF"))
cellPad = mergedTableStyle.get("cellPaddingPt", 4)
table_style = [
("BACKGROUND", (0, 0), (-1, 0), self._hexToColor(table_header_style.get("background", "#4F4F4F"))),
("BACKGROUND", (0, 1), (-1, -1), self._hexToColor(table_cell_style.get("background", "#FFFFFF"))),
tableStyleCmds = [
("BACKGROUND", (0, 0), (-1, 0), self._hexToColor(table_header_style.get("background", "#f6f8fa"))),
("ALIGN", (0, 0), (-1, -1), "LEFT"),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 4),
("RIGHTPADDING", (0, 0), (-1, -1), 4),
("TOPPADDING", (0, 0), (-1, 0), 6),
("BOTTOMPADDING", (0, 0), (-1, 0), 8),
("TOPPADDING", (0, 1), (-1, -1), 4),
("BOTTOMPADDING", (0, 1), (-1, -1), 4),
("GRID", (0, 0), (-1, -1), 0.5, colors.black),
("LEFTPADDING", (0, 0), (-1, -1), cellPad),
("RIGHTPADDING", (0, 0), (-1, -1), cellPad),
("TOPPADDING", (0, 0), (-1, 0), cellPad + 2),
("BOTTOMPADDING", (0, 0), (-1, 0), cellPad + 4),
("TOPPADDING", (0, 1), (-1, -1), cellPad),
("BOTTOMPADDING", (0, 1), (-1, -1), cellPad),
]
table.setStyle(TableStyle(table_style))
borderStyleName = mergedTableStyle.get("borderStyle", "grid")
if borderStyleName == "grid":
tableStyleCmds.append(("GRID", (0, 0), (-1, -1), borderWidth, borderColor))
elif borderStyleName == "horizontal":
tableStyleCmds.append(("LINEABOVE", (0, 0), (-1, 0), borderWidth, borderColor))
for rowIdx in range(len(table_matrix)):
tableStyleCmds.append(("LINEBELOW", (0, rowIdx), (-1, rowIdx), borderWidth, borderColor))
bandingEnabled = mergedTableStyle.get("bandingEnabled", True)
if bandingEnabled:
for rowIdx in range(1, len(table_matrix)):
bg = evenBg if rowIdx % 2 == 0 else oddBg
tableStyleCmds.append(("BACKGROUND", (0, rowIdx), (-1, rowIdx), bg))
table.setStyle(TableStyle(tableStyleCmds))
return [table, Spacer(1, 12)]
except Exception as e:
@ -911,7 +841,7 @@ class RendererPdf(BaseRenderer):
bulletStyle = ParagraphStyle(
"BulletItem",
fontSize=bulletStyleDef.get("font_size", 11),
textColor=self._hexToColor(bulletStyleDef.get("color", "#333333")),
textColor=self._hexToColor(bulletStyleDef.get("color", styles.get("colors", {}).get("primary", "#24292e"))),
leftIndent=indent,
firstLineIndent=-indent,
spaceAfter=2,
@ -1006,11 +936,13 @@ class RendererPdf(BaseRenderer):
fs = code_style_def.get("font_size", 9)
mono = code_style_def.get("font", "Courier")
textColorFallback = styles.get("colors", {}).get("primary", "#24292e")
if language:
lang_style = ParagraphStyle(
"CodeLanguage",
fontSize=fs,
textColor=self._hexToColor(code_style_def.get("color", "#2F2F2F")),
textColor=self._hexToColor(code_style_def.get("color", textColorFallback)),
fontName="Helvetica-Bold",
alignment=TA_LEFT,
)
@ -1024,7 +956,7 @@ class RendererPdf(BaseRenderer):
approxCharWPt = max(fs * 0.52, 4.5)
usableWidth = _PDF_CONTENT_WIDTH_PT - 16 # left+right padding
maxLineChars = max(48, int(usableWidth / approxCharWPt))
bg_col = self._hexToColor(code_style_def.get("background", "#F5F5F5"))
bg_col = self._hexToColor(code_style_def.get("background", "#f6f8fa"))
leading = fs * 1.2
spaceAfter = code_style_def.get("space_after", 6)
@ -1054,17 +986,19 @@ class RendererPdf(BaseRenderer):
fontName=mono,
fontSize=fs,
leading=leading,
textColor=self._hexToColor(code_style_def.get("color", "#2F2F2F")),
textColor=self._hexToColor(code_style_def.get("color", textColorFallback)),
alignment=TA_LEFT,
leftIndent=0,
rightIndent=0,
)
pf = Preformatted(chunkText, codePrStyle, dedent=0, maxLineLength=maxLineChars)
borderCol = self._hexToColor(code_style_def.get("border_color", "#e1e4e8"))
tbl = Table([[pf]], colWidths=[_PDF_CONTENT_WIDTH_PT])
tbl.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), bg_col),
("BOX", (0, 0), (-1, -1), 0.5, borderCol),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 8),
("RIGHTPADDING", (0, 0), (-1, -1), 8),
@ -1103,11 +1037,26 @@ class RendererPdf(BaseRenderer):
if title:
out.append(self._paragraphFromInlineMarkdown(title, self._createDocumentTitleStyle(styles)))
out.append(Spacer(1, 18))
for key, sizePt in (("subtitle", 16), ("author", 12), ("date", 12)):
coverDef = styles.get("cover_page", {})
coverSizes = {
"subtitle": coverDef.get("subtitle_size", 16),
"author": coverDef.get("author_size", 12),
"date": coverDef.get("date_size", 12),
}
coverColors = {
"subtitle": coverDef.get("subtitle_color"),
"author": None,
"date": None,
}
for key in ("subtitle", "author", "date"):
val = (content.get(key) or "").strip()
if not val:
continue
st = ParagraphStyle(f"cover_{key}", parent=self._createNormalStyle(styles), alignment=1, fontSize=sizePt)
sizePt = coverSizes[key]
kwargs: Dict[str, Any] = {"alignment": 1, "fontSize": sizePt}
if coverColors[key]:
kwargs["textColor"] = self._hexToColor(coverColors[key])
st = ParagraphStyle(f"cover_{key}", parent=self._createNormalStyle(styles), **kwargs)
out.append(Paragraph(self._escapeReportlabXml(val), st))
out.append(Spacer(1, 8))
out.append(PageBreak())
@ -1310,26 +1259,27 @@ class RendererPdf(BaseRenderer):
# Add caption if available
if caption:
captionStyle = self._createNormalStyle(styles)
captionStyle.fontSize = 10
captionStyle.textColor = self._hexToColor(styles.get("paragraph", {}).get("color", "#666666"))
elements.append(Paragraph(f"<i>{caption}</i>", captionStyle))
captionDef = styles.get("caption", {})
capStyle = self._createCaptionStyle(styles)
capXml = self._escapeReportlabXml(caption)
if captionDef.get("italic", True):
capXml = f"<i>{capXml}</i>"
elements.append(Paragraph(capXml, capStyle))
elif alt_text and alt_text != "Image":
# Use alt text as caption if no caption provided, but avoid usageHint format
if "Render as visual element:" in alt_text:
# Extract filename from usageHint if possible
parts = alt_text.split("Render as visual element:")
if len(parts) > 1:
filename = parts[1].strip()
caption_text = f"Figure: {filename}"
caption_text = f"Figure: {parts[1].strip()}"
else:
caption_text = alt_text
else:
caption_text = f"Figure: {alt_text}"
captionStyle = self._createNormalStyle(styles)
captionStyle.fontSize = 10
captionStyle.textColor = self._hexToColor(styles.get("paragraph", {}).get("color", "#666666"))
elements.append(Paragraph(f"<i>{caption_text}</i>", captionStyle))
captionDef = styles.get("caption", {})
capStyle = self._createCaptionStyle(styles)
capXml = self._escapeReportlabXml(caption_text)
if captionDef.get("italic", True):
capXml = f"<i>{capXml}</i>"
elements.append(Paragraph(capXml, capStyle))
return elements

View file

@ -6,6 +6,7 @@ Excel renderer for report generation using openpyxl.
from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument
from modules.serviceCenter.services.serviceGeneration.styleDefaults import deepMerge
from typing import Dict, Any, List, Optional
import io
import base64
@ -128,176 +129,6 @@ class RendererXlsx(BaseRenderer):
)
]
def _generateExcel(self, content: str, title: str) -> str:
"""Generate Excel content using openpyxl."""
try:
# Create workbook
wb = Workbook()
# Remove default sheet
wb.remove(wb.active)
# Create sheets
summarySheet = wb.create_sheet("Summary", 0)
dataSheet = wb.create_sheet("Data", 1)
analysisSheet = wb.create_sheet("Analysis", 2)
# Add content to sheets
self._populateSummarySheet(summarySheet, title, wb)
self._populateDataSheet(dataSheet, content)
self._populateAnalysisSheet(analysisSheet, content)
# Ensure workbook has at least one sheet (Excel requirement)
if len(wb.worksheets) == 0:
wb.create_sheet("Sheet1")
# Save to buffer with error handling
buffer = io.BytesIO()
try:
wb.save(buffer)
buffer.seek(0)
except Exception as save_error:
self.logger.error(f"Error saving Excel workbook: {str(save_error)}")
# Try to fix common issues and retry
try:
# Remove any invalid sheet names or empty sheets
for sheet in list(wb.worksheets):
if not sheet.title or len(sheet.title.strip()) == 0:
wb.remove(sheet)
# Ensure at least one sheet exists
if len(wb.worksheets) == 0:
wb.create_sheet("Sheet1")
# Retry save
buffer = io.BytesIO()
wb.save(buffer)
buffer.seek(0)
except Exception as retry_error:
self.logger.error(f"Retry save also failed: {str(retry_error)}")
raise Exception(f"Failed to save Excel workbook: {str(save_error)}")
# Convert to base64
excelBytes = buffer.getvalue()
excelBase64 = base64.b64encode(excelBytes).decode('utf-8')
return excelBase64
except Exception as e:
self.logger.error(f"Error generating Excel: {str(e)}")
raise
def _populateSummarySheet(self, sheet, title: str, wb: Workbook = None):
"""Populate the summary sheet."""
try:
# Title
sheet['A1'] = title
sheet['A1'].font = Font(size=16, bold=True)
sheet['A1'].alignment = Alignment(horizontal='left')
# Generation info
sheet['A3'] = "Generated:"
sheet['B3'] = self._formatTimestamp()
sheet['A4'] = "Status:"
sheet['B4'] = "Generated Successfully"
# Key metrics placeholder
sheet['A6'] = "Key Metrics:"
sheet['A6'].font = Font(bold=True)
sheet['A7'] = "Total Items:"
# Only add formula if Data sheet exists (check workbook sheets)
if wb and "Data" in [s.title for s in wb.worksheets]:
sheet['B7'] = "=COUNTA(Data!A:A)-1" # Count non-empty cells in Data sheet
else:
sheet['B7'] = "N/A" # Data sheet not available
# Auto-adjust column widths
sheet.column_dimensions['A'].width = 20
sheet.column_dimensions['B'].width = 30
except Exception as e:
self.logger.warning(f"Could not populate summary sheet: {str(e)}")
def _populateDataSheet(self, sheet, content: str):
"""Populate the data sheet."""
try:
# Headers
headers = ["Item/Category", "Value/Amount", "Percentage", "Source Document", "Notes/Comments"]
for col, header in enumerate(headers, 1):
cell = sheet.cell(row=1, column=col, value=header)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="FFCCCCCC", end_color="FFCCCCCC", fill_type="solid")
# Process content
lines = content.split('\n')
row = 2
for line in lines:
line = line.strip()
if not line:
continue
# Check for table data (lines with |)
if '|' in line:
cells = [cell.strip() for cell in line.split('|') if cell.strip()]
for col, cellData in enumerate(cells[:5], 1): # Limit to 5 columns
sheet.cell(row=row, column=col, value=cellData)
row += 1
else:
# Regular content
sheet.cell(row=row, column=1, value=line)
row += 1
# Auto-adjust column widths
for col in range(1, 6):
sheet.column_dimensions[get_column_letter(col)].width = 20
except Exception as e:
self.logger.warning(f"Could not populate data sheet: {str(e)}")
def _populateAnalysisSheet(self, sheet, content: str):
"""Populate the analysis sheet."""
try:
# Title
sheet['A1'] = "Analysis & Insights"
sheet['A1'].font = Font(size=14, bold=True)
# Content analysis
lines = content.split('\n')
row = 3
sheet['A3'] = "Content Analysis:"
sheet['A3'].font = Font(bold=True)
row += 1
# Count different types of content
tableLines = sum(1 for line in lines if '|' in line)
listLines = sum(1 for line in lines if line.startswith(('- ', '* ')))
textLines = len(lines) - tableLines - listLines
sheet[f'A{row}'] = f"Total Lines: {len(lines)}"
row += 1
sheet[f'A{row}'] = f"Table Rows: {tableLines}"
row += 1
sheet[f'A{row}'] = f"List Items: {listLines}"
row += 1
sheet[f'A{row}'] = f"Text Lines: {textLines}"
row += 2
# Recommendations
sheet[f'A{row}'] = "Recommendations:"
sheet[f'A{row}'].font = Font(bold=True)
row += 1
sheet[f'A{row}'] = "1. Review data accuracy"
row += 1
sheet[f'A{row}'] = "2. Consider additional analysis"
row += 1
sheet[f'A{row}'] = "3. Update regularly"
# Auto-adjust column width
sheet.column_dimensions['A'].width = 30
except Exception as e:
self.logger.warning(f"Could not populate analysis sheet: {str(e)}")
async def _generateExcelFromJson(self, jsonContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None, *, style: Dict[str, Any] = None) -> str:
"""Generate Excel content from structured JSON document using AI-generated styling."""
try:
@ -308,12 +139,9 @@ class RendererXlsx(BaseRenderer):
# Store unified style for use by inline-run helpers
self._unifiedStyle = style
# Get style set: prefer unified style, fall back to legacy approach
if style:
# Convert unified style to internal format
styles = self._convertUnifiedStyleToInternal(style)
styles = self._convertColorsFormat(styles)
else:
styles = await self._getStyleSet(jsonContent, userPrompt, aiService)
# Validate JSON structure (standardized schema: {metadata: {...}, documents: [{sections: [...]}]})
if not self._validateJsonStructure(jsonContent):
@ -380,109 +208,6 @@ class RendererXlsx(BaseRenderer):
self.logger.error(f"Error generating Excel from JSON: {str(e)}")
raise Exception(f"Excel generation failed: {str(e)}")
async def _getStyleSet(self, extractedContent: Dict[str, Any] = None, userPrompt: str = None, aiService=None, templateName: str = None) -> Dict[str, Any]:
"""Get style set - use styles from document generation metadata if available,
otherwise enhance default styles with AI if userPrompt provided.
WICHTIG: In a dynamic scalable AI system, styling should come from document generation,
not be generated separately by renderers. Only fall back to AI if styles not provided.
Args:
extractedContent: Document content with metadata (may contain styles)
userPrompt: User's prompt (AI will detect style instructions in any language)
aiService: AI service (used only if styles not in metadata and userPrompt provided)
templateName: Name of template style set (None = default)
Returns:
Dict with style definitions for all document styles
"""
# Get default style set
defaultStyleSet = self._getDefaultStyleSet()
# FIRST: Check if styles are provided in document generation metadata (preferred approach)
if extractedContent:
metadata = extractedContent.get("metadata", {})
if isinstance(metadata, dict):
styles = metadata.get("styles")
if styles and isinstance(styles, dict):
self.logger.debug("Using styles from document generation metadata")
enhancedStyleSet = self._convertColorsFormat(styles)
return self._validateStylesContrast(enhancedStyleSet)
# FALLBACK: Enhance with AI if userPrompt provided (only if styles not in metadata)
if userPrompt and aiService:
self.logger.info(f"Styles not in metadata, enhancing with AI based on user prompt...")
enhancedStyleSet = await self._enhanceStylesWithAI(userPrompt, defaultStyleSet, aiService)
# Convert colors to Excel format after getting styles
enhancedStyleSet = self._convertColorsFormat(enhancedStyleSet)
return self._validateStylesContrast(enhancedStyleSet)
else:
# Use default styles only
return defaultStyleSet
async def _enhanceStylesWithAI(self, userPrompt: str, defaultStyleSet: Dict[str, Any], aiService) -> Dict[str, Any]:
"""Enhance default styles with AI based on user prompt."""
try:
style_template = self._createAiStyleTemplate("xlsx", userPrompt, defaultStyleSet)
enhanced_styles = await self._getAiStylesWithExcelColors(aiService, style_template, defaultStyleSet)
return enhanced_styles
except Exception as e:
self.logger.warning(f"AI style enhancement failed: {str(e)}, using default styles")
return defaultStyleSet
def _validateStylesContrast(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and fix contrast issues in AI-generated styles."""
try:
# Fix table header contrast
if "table_header" in styles:
header = styles["table_header"]
bgColor = header.get("background", "FFFFFFFF")
textColor = header.get("text_color", "FF000000")
# Normalize colors (remove # if present, ensure aRGB format)
bgColor = self._normalizeColor(bgColor)
textColor = self._normalizeColor(textColor)
# If both are white or both are dark, fix it
if bgColor.upper() == "FFFFFFFF" and textColor.upper() == "FFFFFFFF":
header["background"] = "FF4F4F4F"
header["text_color"] = "FFFFFFFF"
elif bgColor.upper() == "FF000000" and textColor.upper() == "FF000000":
header["background"] = "FF4F4F4F"
header["text_color"] = "FFFFFFFF"
else:
# Ensure colors are in correct format
header["background"] = bgColor
header["text_color"] = textColor
# Fix table cell contrast
if "table_cell" in styles:
cell = styles["table_cell"]
bgColor = cell.get("background", "FFFFFFFF")
textColor = cell.get("text_color", "FF000000")
# Normalize colors (remove # if present, ensure aRGB format)
bgColor = self._normalizeColor(bgColor)
textColor = self._normalizeColor(textColor)
# If both are white or both are dark, fix it
if bgColor.upper() == "FFFFFFFF" and textColor.upper() == "FFFFFFFF":
cell["background"] = "FFFFFFFF"
cell["text_color"] = "FF2F2F2F"
elif bgColor.upper() == "FF000000" and textColor.upper() == "FF000000":
cell["background"] = "FFFFFFFF"
cell["text_color"] = "FF2F2F2F"
else:
# Ensure colors are in correct format
cell["background"] = bgColor
cell["text_color"] = textColor
return styles
except Exception as e:
self.logger.warning(f"Style validation failed: {str(e)}")
return self._getDefaultStyleSet()
def _normalizeColor(self, colorValue: str) -> str:
"""Normalize color to aRGB format without # prefix."""
if not isinstance(colorValue, str):
@ -506,77 +231,10 @@ class RendererXlsx(BaseRenderer):
# Unexpected format, return default black
return "FF000000"
def _getDefaultStyleSet(self) -> Dict[str, Any]:
"""Default Excel style set - used when no style instructions present."""
return {
"title": {"font_size": 16, "color": "FF1F4E79", "bold": True, "align": "left"},
"heading": {"font_size": 14, "color": "FF2F2F2F", "bold": True, "align": "left"},
"table_header": {"background": "FF4F4F4F", "text_color": "FFFFFFFF", "bold": True, "align": "center"},
"table_cell": {"background": "FFFFFFFF", "text_color": "FF2F2F2F", "bold": False, "align": "left"},
"bullet_list": {"font_size": 11, "color": "FF2F2F2F", "indent": 2},
"paragraph": {"font_size": 11, "color": "FF2F2F2F", "bold": False, "align": "left"},
"code_block": {"font": "Courier New", "font_size": 10, "color": "FF2F2F2F", "background": "FFF5F5F5"}
}
def _renderInlineRuns(self, runs: list) -> str:
"""Flatten inline runs to plain text for Excel cells."""
return "".join(r.get("value", "") for r in runs)
async def _getAiStylesWithExcelColors(self, aiService, styleTemplate: str, defaultStyles: Dict[str, Any]) -> Dict[str, Any]:
"""Get AI styles with proper Excel color conversion."""
if not aiService:
return defaultStyles
try:
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum
requestOptions = AiCallOptions()
requestOptions.operationType = OperationTypeEnum.DATA_GENERATE
request = AiCallRequest(prompt=styleTemplate, context="", options=requestOptions)
response = await aiService.callAi(request)
import json
import re
# Clean and parse JSON
result = response.content.strip() if response and response.content else ""
# Check if result is empty
if not result:
self.logger.warning("AI styling returned empty response, using defaults")
return defaultStyles
# Extract JSON from markdown if present
json_match = re.search(r'```json\s*\n(.*?)\n```', result, re.DOTALL)
if json_match:
result = json_match.group(1).strip()
self.services.utils.debugLogToFile(f"EXTRACTED JSON FROM MARKDOWN: {result[:100]}...", "EXCEL_RENDERER")
elif result.startswith('```json'):
result = re.sub(r'^```json\s*', '', result)
result = re.sub(r'\s*```$', '', result)
self.services.utils.debugLogToFile(f"CLEANED JSON FROM MARKDOWN: {result[:100]}...", "EXCEL_RENDERER")
elif result.startswith('```'):
result = re.sub(r'^```\s*', '', result)
result = re.sub(r'\s*```$', '', result)
self.services.utils.debugLogToFile(f"CLEANED JSON FROM GENERIC MARKDOWN: {result[:100]}...", "EXCEL_RENDERER")
# Try to parse JSON
try:
styles = json.loads(result)
except json.JSONDecodeError as json_error:
self.logger.warning(f"AI styling returned invalid JSON: {json_error}, using defaults")
return defaultStyles
# Convert colors to Excel aRGB format
styles = self._convertColorsFormat(styles)
return styles
except Exception as e:
self.logger.warning(f"AI styling failed: {str(e)}, using defaults")
return defaultStyles
def _getSafeAlignment(self, alignValue: Any) -> str:
"""Get safe alignment value for openpyxl. Valid values: 'left', 'general', 'distributed', 'fill', 'justify', 'center', 'right', 'centerContinuous'."""
if not alignValue:
@ -627,15 +285,27 @@ class RendererXlsx(BaseRenderer):
except Exception:
return default
@staticmethod
def _looksLikeColor(value: str) -> bool:
"""Return True if *value* looks like a hex color (e.g. ``#e1e4e8`` or ``FF24292E``)."""
raw = value.lstrip('#')
if len(raw) not in (3, 6, 8):
return False
return all(c in '0123456789abcdefABCDEF' for c in raw)
def _convertColorsFormat(self, styles: Dict[str, Any]) -> Dict[str, Any]:
"""Convert hex colors to aRGB format for Excel compatibility (without # prefix)."""
"""Convert hex colors to aRGB format for Excel compatibility (without # prefix).
Only touches values that actually look like hex colors so that non-color
strings (font names, border style keywords, bullet chars, etc.) are
preserved intact.
"""
try:
self.services.utils.debugLogToFile(f"CONVERTING COLORS IN STYLES: {styles}", "EXCEL_RENDERER")
for styleName, styleConfig in styles.items():
if isinstance(styleConfig, dict):
for prop, value in styleConfig.items():
if isinstance(value, str):
# Normalize color to aRGB format without # prefix
if isinstance(value, str) and self._looksLikeColor(value):
styles[styleName][prop] = self._normalizeColor(value)
return styles
except Exception as e:
@ -789,199 +459,6 @@ class RendererXlsx(BaseRenderer):
except Exception as e:
self.logger.warning(f"Could not populate Excel sheets: {str(e)}")
def _populateTableSheet(self, sheet, section: Dict[str, Any], styles: Dict[str, Any], sheetTitle: str):
"""Populate a sheet with a single table section."""
try:
# Sheet title
sheet['A1'] = sheetTitle
title_style = styles.get("title", {})
sheet['A1'].font = Font(size=16, bold=True, color=self._getSafeColor(title_style.get("color", "FF1F4E79")))
sheet['A1'].alignment = Alignment(horizontal=self._getSafeAlignment(title_style.get("align", "left")))
# Get table data from elements (canonical JSON format)
elements = section.get("elements", [])
if elements and isinstance(elements, list) and len(elements) > 0:
table_element = elements[0]
# Extract from nested content structure
content = table_element.get("content", {})
if not isinstance(content, dict):
headers = []
rows = []
else:
headers = content.get("headers") or []
rows = content.get("rows") or []
# Ensure headers and rows are lists
if not isinstance(headers, list):
headers = []
if not isinstance(rows, list):
rows = []
else:
headers = []
rows = []
if not headers and not rows:
sheet['A3'] = "No table data available"
return
# Add headers
header_style = styles.get("table_header", {})
for col, header in enumerate(headers, 1):
cell = sheet.cell(row=3, column=col, value=header)
if header_style.get("bold"):
cell.font = Font(bold=True, color=self._getSafeColor(header_style.get("text_color", "FF000000")))
if header_style.get("background"):
cell.fill = PatternFill(start_color=self._getSafeColor(header_style["background"]), end_color=self._getSafeColor(header_style["background"]), fill_type="solid")
# Add rows - handle both array format and cells object format
cell_style = styles.get("table_cell", {})
header_count = len(headers)
for row_idx, row_data in enumerate(rows, 4):
# Handle different row formats
if isinstance(row_data, list):
# Array format: [value1, value2, ...]
cell_values = row_data
elif isinstance(row_data, dict) and "cells" in row_data:
# Cells object format: {"cells": [{"value": ...}, ...]}
cell_values = [cell_obj.get("value", "") for cell_obj in row_data.get("cells", [])]
else:
# Unknown format, skip
continue
# Validate row column count matches headers - pad or truncate if needed
if len(cell_values) < header_count:
# Pad with empty strings if row has fewer columns
cell_values.extend([""] * (header_count - len(cell_values)))
elif len(cell_values) > header_count:
# Truncate if row has more columns than headers
cell_values = cell_values[:header_count]
for col_idx, cell_value in enumerate(cell_values, 1):
# Extract value if it's a dict with "value" key
if isinstance(cell_value, dict):
actual_value = cell_value.get("value", "")
else:
actual_value = cell_value
cell = sheet.cell(row=row_idx, column=col_idx, value=actual_value)
if cell_style.get("text_color"):
cell.font = Font(color=self._getSafeColor(cell_style["text_color"]))
# Auto-adjust column widths
for col in range(1, len(headers) + 1):
sheet.column_dimensions[get_column_letter(col)].width = 20
except Exception as e:
self.logger.warning(f"Could not populate table sheet: {str(e)}")
def _populateMainSheet(self, sheet, jsonContent: Dict[str, Any], styles: Dict[str, Any]):
"""Populate the main sheet with document overview and all content."""
try:
# Document title - use documents[].title as primary source, fallback to metadata.title
documents = jsonContent.get("documents", [])
if documents and isinstance(documents[0], dict) and documents[0].get("title"):
documentTitle = documents[0].get("title")
else:
documentTitle = jsonContent.get("metadata", {}).get("title", "Generated Report")
sheet['A1'] = documentTitle
# Safety check for title style
title_style = styles.get("title", {"font_size": 16, "bold": True, "color": "#FF1F4E79", "align": "left"})
try:
safe_color = self._getSafeColor(title_style["color"])
sheet['A1'].font = Font(size=title_style["font_size"], bold=title_style["bold"], color=safe_color)
sheet['A1'].alignment = Alignment(horizontal=self._getSafeAlignment(title_style["align"]))
except Exception as font_error:
# Try with a safe color
sheet['A1'].font = Font(size=title_style["font_size"], bold=title_style["bold"], color="FF000000")
sheet['A1'].alignment = Alignment(horizontal=self._getSafeAlignment(title_style["align"]))
# Generation info
sheet['A3'] = "Generated:"
sheet['B3'] = self._formatTimestamp()
sheet['A4'] = "Status:"
sheet['B4'] = "Generated Successfully"
# Document metadata
metadata = jsonContent.get("metadata", {})
if metadata:
sheet['A6'] = "Document Information:"
sheet['A6'].font = Font(bold=True)
row = 7
for key, value in metadata.items():
if key != "title":
sheet[f'A{row}'] = f"{key.title()}:"
sheet[f'B{row}'] = str(value)
row += 1
# Content overview
sections = self._extractSections(jsonContent)
sheet[f'A{row + 1}'] = "Content Overview:"
sheet[f'A{row + 1}'].font = Font(bold=True)
row += 2
sheet[f'A{row}'] = f"Total Sections: {len(sections)}"
# Count different content types
content_types = {}
for section in sections:
content_type = section.get("content_type", "unknown")
content_types[content_type] = content_types.get(content_type, 0) + 1
for content_type, count in content_types.items():
row += 1
sheet[f'A{row}'] = f"{content_type.title()} Sections: {count}"
# Add all content to this sheet
row += 2
for section in sections:
row = self._addSectionToSheet(sheet, section, styles, row)
row += 1 # Empty row between sections
# Auto-adjust column widths
sheet.column_dimensions['A'].width = 20
sheet.column_dimensions['B'].width = 30
except Exception as e:
self.logger.warning(f"Could not populate main sheet: {str(e)}")
def _populateContentTypeSheets(self, sheets: Dict[str, Any], jsonContent: Dict[str, Any], styles: Dict[str, Any], sheetNames: List[str]):
"""Populate additional sheets based on content types."""
try:
sections = self._extractSections(jsonContent)
for sheetName in sheetNames:
if sheetName not in sheets:
continue
sheet = sheets[sheetName]
sheetTitle = sheetName.title()
sheet['A1'] = sheetTitle
sheet['A1'].font = Font(size=16, bold=True)
row = 3
# Filter sections by content type
if sheetName == "tables":
filtered_sections = [s for s in sections if s.get("content_type") == "table"]
elif sheetName == "lists":
filtered_sections = [s for s in sections if s.get("content_type") == "list"]
elif sheetName == "text":
filtered_sections = [s for s in sections if s.get("content_type") in ["paragraph", "heading"]]
else:
filtered_sections = sections
for section in filtered_sections:
row = self._addSectionToSheet(sheet, section, styles, row)
row += 1 # Empty row between sections
# Auto-adjust column widths
for col in range(1, 6):
sheet.column_dimensions[get_column_letter(col)].width = 20
except Exception as e:
self.logger.warning(f"Could not populate content type sheets: {str(e)}")
def _addSectionToSheet(self, sheet, section: Dict[str, Any], styles: Dict[str, Any], startRow: int) -> int:
"""Add a section to a sheet and return the next row."""
try:
@ -1161,20 +638,21 @@ class RendererXlsx(BaseRenderer):
text = text[:32764] + "..."
return text
def _buildTableBorder(self, borderStyle: str, borderColor: str) -> Border:
"""Build an openpyxl ``Border`` matching the requested *borderStyle*."""
if borderStyle == "none":
noSide = Side(style=None)
return Border(left=noSide, right=noSide, top=noSide, bottom=noSide)
if borderStyle == "horizontal":
hSide = Side(style="thin", color=borderColor)
noSide = Side(style=None)
return Border(left=noSide, right=noSide, top=hSide, bottom=hSide)
thinSide = Side(style="thin", color=borderColor)
return Border(left=thinSide, right=thinSide, top=thinSide, bottom=thinSide)
def _addTableToExcel(self, sheet, element: Dict[str, Any], styles: Dict[str, Any], startRow: int) -> int:
"""
Add a table element to Excel sheet with proper formatting and borders.
PERFORMANCE OPTIMIZATIONS:
1. Pre-calculated style objects (Font, PatternFill, Alignment) to avoid repeated creation
2. Optimized _sanitizeCellValue() with regex pre-checks for numbers and dates
3. Batch cell operations where possible
4. Reduced exception handling overhead
Expected performance: 10-30x faster for large tables compared to unoptimized version.
"""
"""Add a table element to Excel sheet with styling, borders, banding and alignment."""
try:
# Extract from nested content structure
content = element.get("content", {})
if not isinstance(content, dict):
return startRow
@ -1182,7 +660,6 @@ class RendererXlsx(BaseRenderer):
headers = content.get("headers", [])
rows = content.get("rows", [])
# Ensure headers and rows are lists
if not isinstance(headers, list):
headers = []
if not isinstance(rows, list):
@ -1191,131 +668,102 @@ class RendererXlsx(BaseRenderer):
if not headers and not rows:
return startRow
# Define border style
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
# --- per-table style override merge ---
tableStyleOverride = content.get("tableStyle", {})
mergedStyles = deepMerge(styles, tableStyleOverride) if tableStyleOverride else styles
# --- border ---
tableBorderCfg = mergedStyles.get("table_border", {})
borderColor = self._getSafeColor(tableBorderCfg.get("color", "FF000000"))
borderStyleName = tableBorderCfg.get("style", "grid")
tableBorder = self._buildTableBorder(borderStyleName, borderColor)
# --- header style ---
headerStyle = mergedStyles.get("table_header", {})
primaryFont = mergedStyles.get("fonts", {}).get("primary")
headerFontColor = self._getSafeColor(headerStyle.get("text_color", "FF000000"))
headerFontSize = headerStyle.get("font_size")
headerFont = Font(name=primaryFont, bold=headerStyle.get("bold", True),
size=headerFontSize, color=headerFontColor)
headerFill = None
if headerStyle.get("background"):
hdrBg = self._getSafeColor(headerStyle["background"])
headerFill = PatternFill(start_color=hdrBg, end_color=hdrBg, fill_type="solid")
headerAlignment = Alignment(
horizontal=self._getSafeAlignment(headerStyle.get("align", "left")),
vertical="center"
)
headerRow = startRow
header_style = styles.get("table_header", {})
# Pre-calculate and cache style objects to avoid repeated parsing
header_font_color = self._getSafeColor(header_style.get("text_color", "FF000000"))
header_font = Font(bold=header_style.get("bold", True), color=header_font_color)
header_bg_color = None
header_fill = None
if header_style.get("background"):
header_bg_color = self._getSafeColor(header_style["background"])
header_fill = PatternFill(start_color=header_bg_color, end_color=header_bg_color, fill_type="solid")
header_alignment = Alignment(
horizontal=self._getSafeAlignment(header_style.get("align", "left")),
vertical="center"
)
# Add headers with formatting - OPTIMIZED: use cached style objects
for col, header in enumerate(headers, 1):
runs = self._inlineRunsForCell(header)
headerText = self._renderInlineRuns(runs)
sanitized_header = self._sanitizeCellValue(headerText)
cell = sheet.cell(row=headerRow, column=col, value=sanitized_header)
# Apply styling with fallbacks - use pre-calculated objects
try:
cell.font = header_font
except Exception:
try:
cell.font = Font(bold=True, color=self._getSafeColor("FF000000"))
except Exception:
pass
try:
if header_fill:
cell.fill = header_fill
except Exception:
pass
try:
cell.alignment = header_alignment
except Exception:
try:
cell.alignment = Alignment(horizontal="left", vertical="center")
except Exception:
pass
try:
cell.border = thin_border
except Exception:
pass
cell = sheet.cell(row=headerRow, column=col, value=self._sanitizeCellValue(headerText))
cell.font = headerFont
if headerFill:
cell.fill = headerFill
cell.alignment = headerAlignment
cell.border = tableBorder
startRow += 1
# Add rows with formatting - OPTIMIZED: pre-calculate style objects
cell_style = styles.get("table_cell", {})
header_count = len(headers)
# --- cell style ---
cellStyle = mergedStyles.get("table_cell", {})
headerCount = len(headers)
cellTextColor = self._getSafeColor(cellStyle.get("text_color")) if cellStyle.get("text_color") else None
cellFontSize = cellStyle.get("font_size")
cellFont = Font(name=primaryFont, size=cellFontSize,
color=cellTextColor) if cellTextColor else None
# Pre-calculate and cache style objects to avoid repeated parsing
cell_text_color = None
cell_font = None
if cell_style.get("text_color"):
cell_text_color = self._getSafeColor(cell_style["text_color"])
cell_font = Font(color=cell_text_color)
cell_alignment = Alignment(
horizontal=self._getSafeAlignment(cell_style.get("align", "left")),
vertical="center"
)
# --- banding ---
bandingCfg = mergedStyles.get("table_banding", {})
bandingEnabled = bandingCfg.get("enabled", True)
evenFill = None
oddFill = None
if bandingEnabled:
evenColor = self._getSafeColor(bandingCfg.get("even", "FFF6F8FA"))
oddColor = self._getSafeColor(bandingCfg.get("odd", "FFFFFFFF"))
evenFill = PatternFill(start_color=evenColor, end_color=evenColor, fill_type="solid")
oddFill = PatternFill(start_color=oddColor, end_color=oddColor, fill_type="solid")
for row_data in rows:
# Handle different row formats
if isinstance(row_data, list):
cell_values = row_data
elif isinstance(row_data, dict) and "cells" in row_data:
cell_values = [cell_obj.get("value", "") for cell_obj in row_data.get("cells", [])]
# --- column alignments ---
colAlignments = self._inferColumnAlignments(headers, rows, mergedStyles)
for dataRowIdx, rowData in enumerate(rows):
if isinstance(rowData, list):
cellValues = rowData
elif isinstance(rowData, dict) and "cells" in rowData:
cellValues = [cellObj.get("value", "") for cellObj in rowData.get("cells", [])]
else:
continue
# Validate row column count matches headers - pad or truncate if needed
if len(cell_values) < header_count:
# Pad with empty strings if row has fewer columns
cell_values.extend([""] * (header_count - len(cell_values)))
elif len(cell_values) > header_count:
# Truncate if row has more columns than headers
cell_values = cell_values[:header_count]
if len(cellValues) < headerCount:
cellValues.extend([""] * (headerCount - len(cellValues)))
elif len(cellValues) > headerCount:
cellValues = cellValues[:headerCount]
for col, cell_value in enumerate(cell_values, 1):
runs = self._inlineRunsForCell(cell_value)
for col, cellValue in enumerate(cellValues, 1):
runs = self._inlineRunsForCell(cellValue)
cellText = self._renderInlineRuns(runs)
sanitized_value = self._sanitizeCellValue(cellText)
cell = sheet.cell(row=startRow, column=col, value=sanitized_value)
cell = sheet.cell(row=startRow, column=col, value=self._sanitizeCellValue(cellText))
# Apply styling with fallbacks - use pre-calculated objects
try:
if cell_font:
cell.font = cell_font
except Exception:
pass
if cellFont:
cell.font = cellFont
try:
cell.alignment = cell_alignment
except Exception:
try:
cell.alignment = Alignment(horizontal="left", vertical="center")
except Exception:
pass
if bandingEnabled:
rowFill = evenFill if dataRowIdx % 2 == 0 else oddFill
if rowFill:
cell.fill = rowFill
try:
cell.border = thin_border
except Exception:
pass
colAlign = colAlignments[col - 1] if col - 1 < len(colAlignments) else "left"
cell.alignment = Alignment(horizontal=colAlign, vertical="center")
cell.border = tableBorder
startRow += 1
# Auto-adjust column widths
for col in range(1, len(headers) + 1):
column_letter = get_column_letter(col)
sheet.column_dimensions[column_letter].width = 20
columnLetter = get_column_letter(col)
sheet.column_dimensions[columnLetter].width = 20
return startRow
@ -1334,12 +782,19 @@ class RendererXlsx(BaseRenderer):
listItems = []
listStyle = styles.get("bullet_list", {})
bulletChar = listStyle.get("bullet_char", "\u2022")
fontSize = listStyle.get("font_size", 11)
primaryFont = styles.get("fonts", {}).get("primary")
cellFont = Font(
name=primaryFont,
size=fontSize,
color=self._getSafeColor(listStyle.get("color"))
)
for item in listItems:
runs = self._inlineRunsForListItem(item)
text = self._renderInlineRuns(runs)
sheet.cell(row=startRow, column=1, value=f"\u2022 {text}")
if listStyle.get("color"):
sheet.cell(row=startRow, column=1).font = Font(color=self._getSafeColor(listStyle["color"]))
cell = sheet.cell(row=startRow, column=1, value=f"{bulletChar} {text}")
cell.font = cellFont
startRow += 1
return startRow
@ -1362,9 +817,10 @@ class RendererXlsx(BaseRenderer):
if text:
sheet.cell(row=startRow, column=1, value=text)
paragraph_style = styles.get("paragraph", {})
if paragraph_style.get("color"):
sheet.cell(row=startRow, column=1).font = Font(color=self._getSafeColor(paragraph_style["color"]))
paragraphStyle = styles.get("paragraph", {})
primaryFont = styles.get("fonts", {}).get("primary")
if paragraphStyle.get("color"):
sheet.cell(row=startRow, column=1).font = Font(name=primaryFont, color=self._getSafeColor(paragraphStyle["color"]))
startRow += 1
@ -1387,15 +843,13 @@ class RendererXlsx(BaseRenderer):
if text:
sheet.cell(row=startRow, column=1, value=text)
heading_style = styles.get("heading", {})
font_size = heading_style.get("font_size", 14)
if level > 1:
font_size = max(10, font_size - (level - 1) * 2)
headingStyle = styles.get(f"heading{level}", styles.get("heading1", {}))
primaryFont = styles.get("fonts", {}).get("primary")
sheet.cell(row=startRow, column=1).font = Font(
size=font_size,
bold=True,
color=self._getSafeColor(heading_style.get("color", "FF000000"))
name=primaryFont,
size=headingStyle.get("font_size", 14),
bold=headingStyle.get("bold", True),
color=self._getSafeColor(headingStyle.get("color"))
)
startRow += 1
@ -1506,37 +960,39 @@ class RendererXlsx(BaseRenderer):
language = content.get("language", "")
if code:
code_style = styles.get("code_block", {})
codeStyle = styles.get("code_block", {})
# Pre-calculate and cache style objects to avoid repeated parsing
code_font_name = code_style.get("font", "Courier New")
code_font_size = code_style.get("font_size", 10)
code_text_color = self._getSafeColor(code_style.get("color", "FF2F2F2F"))
code_font = Font(name=code_font_name, size=code_font_size, color=code_text_color)
codeFontName = codeStyle.get("font", styles.get("fonts", {}).get("monospace", "Consolas"))
codeFontSize = codeStyle.get("font_size", 10)
codeTextColor = self._getSafeColor(codeStyle.get("color", "FF2F2F2F"))
codeFont = Font(name=codeFontName, size=codeFontSize, color=codeTextColor)
code_bg_color = None
code_fill = None
if code_style.get("background"):
code_bg_color = self._getSafeColor(code_style["background"])
code_fill = PatternFill(start_color=code_bg_color, end_color=code_bg_color, fill_type="solid")
codeFill = None
if codeStyle.get("background"):
codeBgColor = self._getSafeColor(codeStyle["background"])
codeFill = PatternFill(start_color=codeBgColor, end_color=codeBgColor, fill_type="solid")
codeBorder = None
if codeStyle.get("border_color"):
codeBorderColor = self._getSafeColor(codeStyle["border_color"])
codeSide = Side(style="thin", color=codeBorderColor)
codeBorder = Border(left=codeSide, right=codeSide, top=codeSide, bottom=codeSide)
# Add language label if present
if language:
langCell = sheet.cell(row=startRow, column=1, value=f"Code ({language}):")
langCell.font = Font(bold=True, color=code_text_color)
langCell.font = Font(bold=True, color=codeTextColor)
startRow += 1
# Split code into lines and add each line - use cached style objects
code_lines = code.split('\n')
for line in code_lines:
codeLines = code.split('\n')
for line in codeLines:
codeCell = sheet.cell(row=startRow, column=1, value=line)
codeCell.font = code_font
# Set background color if specified
if code_fill:
codeCell.fill = code_fill
codeCell.font = codeFont
if codeFill:
codeCell.fill = codeFill
if codeBorder:
codeCell.border = codeBorder
startRow += 1
# Add spacing after code block
startRow += 1
return startRow

View file

@ -11,39 +11,56 @@ DEFAULT_STYLE: Dict[str, Any] = {
"monospace": "Consolas",
},
"colors": {
"primary": "#1F3864",
"secondary": "#2C3E50",
"accent": "#2980B9",
"primary": "#24292e",
"secondary": "#586069",
"accent": "#0366d6",
"background": "#FFFFFF",
},
"documentTitle": {
"sizePt": 28,
"weight": "bold",
"color": "#1F3864",
"color": "#24292e",
"spaceBeforePt": 0,
"spaceAfterPt": 18,
"align": "center",
},
"headings": {
"h1": {"sizePt": 22, "weight": "bold", "color": "#1F3864", "spaceBeforePt": 22, "spaceAfterPt": 8},
"h2": {"sizePt": 18, "weight": "bold", "color": "#1F3864", "spaceBeforePt": 20, "spaceAfterPt": 6},
"h3": {"sizePt": 14, "weight": "bold", "color": "#2C3E50", "spaceBeforePt": 16, "spaceAfterPt": 4},
"h4": {"sizePt": 12, "weight": "bold", "color": "#2C3E50", "spaceBeforePt": 12, "spaceAfterPt": 3},
"h1": {"sizePt": 22, "weight": "bold", "color": "#24292e", "spaceBeforePt": 24, "spaceAfterPt": 8},
"h2": {"sizePt": 18, "weight": "bold", "color": "#24292e", "spaceBeforePt": 20, "spaceAfterPt": 6},
"h3": {"sizePt": 14, "weight": "bold", "color": "#586069", "spaceBeforePt": 16, "spaceAfterPt": 4},
"h4": {"sizePt": 12, "weight": "bold", "color": "#586069", "spaceBeforePt": 12, "spaceAfterPt": 3},
},
"paragraph": {"sizePt": 11, "lineSpacing": 1.15, "color": "#333333"},
"paragraph": {"sizePt": 11, "lineSpacing": 1.5, "color": "#24292e", "align": "left"},
"table": {
"headerBg": "#1F3864",
"headerFg": "#FFFFFF",
"headerBg": "#f6f8fa",
"headerFg": "#24292e",
"headerSizePt": 10,
"bodySizePt": 10,
"rowBandingEven": "#F2F6FC",
"rowBandingEven": "#f6f8fa",
"rowBandingOdd": "#FFFFFF",
"borderColor": "#CBD5E1",
"borderColor": "#e1e4e8",
"borderWidthPt": 0.5,
"borderStyle": "grid",
"bandingEnabled": True,
"cellPaddingPt": 4,
},
"list": {"bulletChar": "\u2022", "indentPt": 18, "sizePt": 11},
"image": {"defaultWidthPt": 480, "maxWidthPt": 800, "alignment": "center"},
"codeBlock": {"fontSizePt": 9, "background": "#F8F9FA", "borderColor": "#E2E8F0"},
"codeBlock": {"fontSizePt": 9, "background": "#f6f8fa", "borderColor": "#e1e4e8"},
"coverPage": {
"titleSizePt": 28,
"subtitleSizePt": 16,
"authorSizePt": 12,
"dateSizePt": 12,
"titleColor": "#24292e",
"subtitleColor": "#586069",
},
"caption": {
"sizePt": 10,
"color": "#586069",
"italic": True,
"align": "center",
},
"page": {
"format": "A4",
"marginsPt": {"top": 60, "bottom": 60, "left": 60, "right": 60},
@ -57,98 +74,9 @@ DEFAULT_STYLE: Dict[str, Any] = {
}
# ------------------------------------------------------------------
# Theme presets (A3): named, purpose-specific style overrides that are
# deep-merged onto DEFAULT_STYLE. A preset only declares the keys it changes;
# everything else inherits the default. Explicit per-call `style` overrides
# always win over the preset.
# ------------------------------------------------------------------
THEME_PRESETS: Dict[str, Dict[str, Any]] = {
# "general" intentionally empty -> identical to DEFAULT_STYLE.
"general": {},
"finance": {
"fonts": {"primary": "Calibri"},
"colors": {"primary": "#0B3D2E", "secondary": "#14532D", "accent": "#047857"},
"documentTitle": {"color": "#0B3D2E", "align": "left"},
"headings": {
"h1": {"color": "#0B3D2E"},
"h2": {"color": "#0B3D2E"},
"h3": {"color": "#14532D"},
"h4": {"color": "#14532D"},
},
"table": {"headerBg": "#0B3D2E", "rowBandingEven": "#ECFDF5"},
},
"legal": {
# Serif, sober, single-column, justified body, no logo banner.
"fonts": {"primary": "Times New Roman"},
"colors": {"primary": "#1A1A1A", "secondary": "#333333", "accent": "#5A5A5A"},
"documentTitle": {"color": "#1A1A1A", "align": "center", "sizePt": 20},
"headings": {
"h1": {"color": "#1A1A1A", "sizePt": 16},
"h2": {"color": "#1A1A1A", "sizePt": 14},
"h3": {"color": "#333333", "sizePt": 12},
"h4": {"color": "#333333", "sizePt": 11},
},
"paragraph": {"sizePt": 11, "lineSpacing": 1.5, "color": "#1A1A1A", "align": "justify"},
"table": {"headerBg": "#333333", "rowBandingEven": "#F5F5F5", "borderColor": "#999999"},
"page": {"showPageNumbers": True},
},
"technical": {
"fonts": {"primary": "Arial", "monospace": "Consolas"},
"colors": {"primary": "#0F172A", "secondary": "#1E293B", "accent": "#2563EB"},
"documentTitle": {"color": "#0F172A", "align": "left"},
"headings": {
"h1": {"color": "#0F172A"},
"h2": {"color": "#1E293B"},
"h3": {"color": "#1E293B"},
"h4": {"color": "#334155"},
},
"paragraph": {"sizePt": 10, "lineSpacing": 1.2},
"codeBlock": {"fontSizePt": 9, "background": "#0F172A"},
"table": {"headerBg": "#1E293B", "rowBandingEven": "#EEF2FF"},
},
"hr": {
"fonts": {"primary": "Calibri"},
"colors": {"primary": "#5B21B6", "secondary": "#6D28D9", "accent": "#9333EA"},
"documentTitle": {"color": "#5B21B6", "align": "center"},
"headings": {
"h1": {"color": "#5B21B6"},
"h2": {"color": "#6D28D9"},
"h3": {"color": "#7C3AED"},
"h4": {"color": "#7C3AED"},
},
"table": {"headerBg": "#5B21B6", "rowBandingEven": "#F5F3FF"},
},
"marketing": {
# Bold, image-friendly, generous spacing, larger title.
"fonts": {"primary": "Verdana"},
"colors": {"primary": "#BE123C", "secondary": "#E11D48", "accent": "#F59E0B"},
"documentTitle": {"color": "#BE123C", "sizePt": 34, "align": "center", "spaceAfterPt": 24},
"headings": {
"h1": {"color": "#BE123C", "sizePt": 24},
"h2": {"color": "#E11D48", "sizePt": 19},
"h3": {"color": "#E11D48", "sizePt": 15},
"h4": {"color": "#9F1239", "sizePt": 13},
},
"paragraph": {"sizePt": 12, "lineSpacing": 1.3},
"image": {"defaultWidthPt": 540, "maxWidthPt": 900, "alignment": "center"},
"table": {"headerBg": "#BE123C", "rowBandingEven": "#FFF1F2"},
},
}
def resolveTheme(themeName: str | None) -> Dict[str, Any]:
"""Return the partial style override for a named theme preset.
Unknown / empty names fall back to ``{}`` (i.e. plain DEFAULT_STYLE).
The lookup is case-insensitive.
"""
if not themeName:
return {}
return dict(THEME_PRESETS.get(str(themeName).strip().lower(), {}))
def _deepMerge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
def deepMerge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively merge override into base. Both dicts left unchanged; returns new dict."""
result = {}
for key in base:
@ -156,7 +84,7 @@ def _deepMerge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]
baseVal = base[key]
overVal = override[key]
if isinstance(baseVal, dict) and isinstance(overVal, dict):
result[key] = _deepMerge(baseVal, overVal)
result[key] = deepMerge(baseVal, overVal)
else:
result[key] = overVal
else:
@ -167,17 +95,15 @@ def _deepMerge(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]
return result
def resolveStyle(agentStyle: dict | None, documentTheme: str | None = None) -> Dict[str, Any]:
"""Resolve the effective style: ``DEFAULT_STYLE <- themePreset <- agentStyle``.
def resolveStyle(agentStyle: dict | None = None) -> Dict[str, Any]:
"""Resolve the effective style: ``DEFAULT_STYLE <- agentStyle``.
Precedence (lowest to highest): platform defaults, the named ``documentTheme``
preset, then any explicit per-call ``agentStyle`` override. With no theme and
no override this returns plain :data:`DEFAULT_STYLE`.
Precedence (lowest to highest): platform defaults, then any explicit
per-call ``agentStyle`` override. With no override this returns plain
:data:`DEFAULT_STYLE`. Context-aware styling is handled by the AI
enhancement step in ``mainServiceGeneration.renderReport``.
"""
resolved = dict(DEFAULT_STYLE)
themeOverride = resolveTheme(documentTheme)
if themeOverride:
resolved = _deepMerge(resolved, themeOverride)
if agentStyle:
resolved = _deepMerge(resolved, agentStyle)
resolved = deepMerge(resolved, agentStyle)
return resolved

View file

@ -22,7 +22,6 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult:
return ActionResult.isFailure(error="prompt is required")
documentType = parameters.get("documentType")
documentTheme = parameters.get("documentTheme") or None
# Prefer explicit outputFormat (flow UI); resultType remains for legacy / API callers.
resultType = parameters.get("outputFormat") or parameters.get("resultType")
if isinstance(resultType, str):
@ -83,8 +82,7 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult:
outputFormat=resultType, # Can be None - AI determines from prompt
title=title,
parentOperationId=parentOperationId,
generationIntent="document", # NEW: Explicit intent, skips detection
documentTheme=documentTheme # Named style preset for the renderer
generationIntent="document" # NEW: Explicit intent, skips detection
)
# Convert AiResponse to ActionResult

View file

@ -252,32 +252,37 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult:
output_format_for_call = output_extension.replace('.', '') if output_extension else (output_format or 'txt')
# Simple mode: fast path without document generation pipeline
# Uses the same extraction + chunking pipeline as full mode to avoid oversized prompts
if simpleMode:
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI (simple mode)")
context_parts = []
paramContext = parameters.get("context") # already serialized above
if paramContext and isinstance(paramContext, str) and paramContext.strip():
context_parts.append(paramContext.strip())
if documentList and len(documentList.references) > 0:
simpleParts: Optional[List[ContentPart]] = contentParts
if not simpleParts and documentList and len(documentList.references) > 0:
from modules.datamodels.datamodelDocref import DocumentItemReference
fileIdRefs = [r for r in documentList.references if isinstance(r, DocumentItemReference)]
if fileIdRefs:
simpleParts = _resolve_file_refs_to_content_parts(self.services, fileIdRefs)
if not simpleParts:
try:
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
for doc in documents:
if hasattr(doc, 'fileId') and doc.fileId:
fileData = self.services.interfaceDbComponent.getFileData(doc.fileId)
if fileData:
if isinstance(fileData, bytes):
doc_text = fileData.decode('utf-8', errors='ignore')
else:
doc_text = str(fileData)
context_parts.append(doc_text)
simpleParts = _action_docs_to_content_parts(self.services, [
{"documentData": self.services.interfaceDbComponent.getFileData(doc.fileId),
"documentName": getattr(doc, 'fileName', ''),
"mimeType": getattr(doc, 'mimeType', 'application/octet-stream')}
for doc in documents if hasattr(doc, 'fileId') and doc.fileId
])
except Exception as e:
logger.warning(f"Error extracting context from documents in simple mode: {e}")
context_text = "\n\n".join(context_parts) if context_parts else ""
logger.warning(f"Error extracting content parts in simple mode: {e}")
paramContext = parameters.get("context")
simplePrompt = aiPrompt
if paramContext and isinstance(paramContext, str) and paramContext.strip():
simplePrompt = f"{aiPrompt}\n\n--- DATA CONTEXT ---\n{paramContext.strip()}"
request = AiCallRequest(
prompt=aiPrompt,
context=context_text if context_text else None,
prompt=simplePrompt,
contentParts=simpleParts if simpleParts else None,
context=None,
options=AiCallOptions(
resultFormat=output_format_for_call,
operationType=OperationTypeEnum.DATA_ANALYSE,

View file

@ -85,15 +85,6 @@ class MethodAi(MethodBase):
default="",
description="Additional context data (string or upstream-bound dict/list, e.g. accounting data) appended to the prompt. Non-string values are JSON-serialized."
),
"documentTheme": WorkflowActionParameter(
name="documentTheme",
type="str",
frontendType=FrontendType.SELECT,
frontendOptions=["general", "finance", "legal", "technical", "hr", "marketing"],
required=False,
default="general",
description="Named style preset for the document renderer (general/finance/legal/technical/hr/marketing). The agent forwards it to the renderDocument tool's documentTheme."
),
"resultType": WorkflowActionParameter(
name="resultType",
type="str",
@ -385,15 +376,6 @@ class MethodAi(MethodBase):
required=False,
description="Type of document (content hint for the model); used as title fallback when title is empty."
),
"documentTheme": WorkflowActionParameter(
name="documentTheme",
type="str",
frontendType=FrontendType.SELECT,
frontendOptions=["general", "finance", "legal", "technical", "hr", "marketing"],
required=False,
default="general",
description="Named style preset applied by the renderer (colors, fonts, spacing): general, finance, legal, technical, hr, marketing."
),
"resultType": WorkflowActionParameter(
name="resultType",
type="str",