gateway/modules/services/test_all_services.py
2025-09-30 18:30:33 +02:00

226 lines
8.8 KiB
Python

import asyncio
import os
import sys
from typing import List, Dict, Any
# Ensure relative imports work when running directly
CURRENT_DIR = os.path.dirname(__file__)
GATEWAY_DIR = os.path.dirname(os.path.dirname(CURRENT_DIR))
if GATEWAY_DIR not in sys.path:
sys.path.append(GATEWAY_DIR)
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.services.serviceGeneration.mainServiceGeneration import DocumentGenerationService
from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, ProcessingMode, Priority
from modules.services.serviceAi.mainServiceAi import AiService
TESTDATA_DIR = os.path.join(GATEWAY_DIR, "testdata")
def _read_test_files() -> List[Dict[str, Any]]:
files = []
for name in os.listdir(TESTDATA_DIR):
path = os.path.join(TESTDATA_DIR, name)
if not os.path.isfile(path):
continue
try:
with open(path, "rb") as f:
data = f.read()
mime = _guess_mime(name)
files.append({
"id": name,
"bytes": data,
"fileName": name,
"mimeType": mime,
})
except Exception:
continue
return files
def _guess_mime(name: str) -> str:
lower = name.lower()
if lower.endswith(".pdf"):
return "application/pdf"
if lower.endswith(".xlsx"):
return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
if lower.endswith(".jpg") or lower.endswith(".jpeg"):
return "image/jpeg"
if lower.endswith(".png"):
return "image/png"
return "application/octet-stream"
def run_extraction_1000_bytes() -> None:
svc = ExtractionService()
docs = _read_test_files()
options = {
# cap total pooled size per document set
"maxSize": 1000,
# allow chunking to respect the cap across parts
"chunkAllowed": True,
# chunk sizes for different content types to help fit under the cap
"textChunkSize": 500,
"tableChunkSize": 500,
"structureChunkSize": 500,
# simple merge strategy if supported
"mergeStrategy": {},
}
results = svc.extractContent(docs, options)
print("[extraction] documents:", len(docs), "results:", len(results))
for i, ec in enumerate(results):
total = sum(int(p.metadata.get("size", 0) or 0) for p in ec.parts)
print(f" - doc[{i}] parts={len(ec.parts)} pooledBytes={total}")
async def main():
print("=== serviceExtraction: compress to 1000 bytes ===")
run_extraction_1000_bytes()
print("\n=== serviceGeneration: create ActionResult and write output to testdata ===")
await run_generation_write_file()
print("\n=== serviceAi: planning call + image + pdf extraction ===")
await run_ai_tests()
if __name__ == "__main__":
asyncio.run(main())
async def run_generation_write_file() -> None:
# Minimal stubs for interfaces expected by DocumentGenerationService
class _FileItem:
def __init__(self, file_id: str, file_name: str, mime_type: str, content: bytes):
self.id = file_id
self.fileName = file_name
self.mimeType = mime_type
self.fileSize = len(content)
class _ComponentInterface:
def __init__(self):
self._files = {}
def createFile(self, name: str, mimeType: str, content: bytes):
fid = f"test_{len(self._files)+1}"
item = _FileItem(fid, name, mimeType, content)
self._files[fid] = item
return item
def createFileData(self, fileId: str, content: bytes):
# Persist into testdata directory as requested
item = self._files[fileId]
out_path = os.path.join(TESTDATA_DIR, f"output_{fileId}_{item.fileName}")
with open(out_path, "wb") as f:
f.write(content)
def getFile(self, fileId: str):
return self._files.get(fileId)
class _ServiceCenter:
def __init__(self, comp):
self.interfaceDbComponent = comp
self.interfaceDbChat = None
self.workflow = type("_Wf", (), {"id": "wf_test", "currentRound": 1, "currentTask": 1, "currentAction": 1, "status": "running", "totalTasks": 1, "totalActions": 1})()
component = _ComponentInterface()
center = _ServiceCenter(component)
gen = DocumentGenerationService(center)
# Build a fake action and ActionResult with a small text document
class _Action:
def __init__(self):
self.id = "action_test"
self.execMethod = "document"
self.execAction = "generate"
self.execParameters = {}
self.execResultLabel = "round1_task1_action1_results"
action = _Action()
content = "This is a generated test file from serviceGeneration test."
action_doc = ActionDocument(documentName="test_generated.txt", documentData=content, mimeType="text/plain")
action_result = ActionResult(success=True, documents=[action_doc])
docs = gen.createDocumentsFromActionResult(action_result, action, center.workflow, message_id="msg_test")
print("[generation] created documents:", len(docs))
async def run_ai_tests() -> None:
# Create AiService instance (uses internal default model registry; no external creds required for this test)
ai = await AiService.create()
# Planning AI call (like in handlingTasks.generateTaskPlan)
plan_options = AiCallOptions(
operationType=OperationType.GENERATE_PLAN,
priority=Priority.QUALITY,
compressPrompt=False,
compressContext=False,
processingMode=ProcessingMode.DETAILED,
maxCost=0.05,
maxProcessingTime=10,
)
plan_prompt = """
You are a planning assistant. Return a compact JSON with fields: tasks:[{id, objective, success_criteria:[]}], languageUserDetected:"en".
Create exactly one simple task id:"task_1" objective:"Test planning" success_criteria:["done"].
""".strip()
plan_resp = await ai.callAi(prompt=plan_prompt, placeholders=None, options=plan_options)
print("[ai] planning response length:", len(plan_resp) if plan_resp else 0)
# Image content extraction prompt using test JPEG
img_path = os.path.join(TESTDATA_DIR, "00Untitled.jpg")
img_resp = None
if os.path.exists(img_path):
try:
with open(img_path, "rb") as f:
img_bytes = f.read()
img_options = AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
priority=Priority.BALANCED,
compressPrompt=True,
compressContext=False,
processingMode=ProcessingMode.ADVANCED,
maxCost=0.02,
maxProcessingTime=10,
)
img_resp = await ai.callAiImage(
prompt="Describe the content of this image succinctly.",
imageData=img_bytes,
mimeType="image/jpeg",
options=img_options,
)
print("[ai] image analysis response length:", len(img_resp) if img_resp else 0)
except Exception as e:
print("[ai] image analysis error:", str(e))
else:
print("[ai] image test file not found; skipping")
# PDF extraction prompt: emulate text call with document context built via ExtractionService
pdf_path = os.path.join(TESTDATA_DIR, "diagramm_komponenten.pdf")
if os.path.exists(pdf_path):
try:
# Build a minimal ChatDocument-like shim that AiService._callAiText expects via extraction
class _Doc:
def __init__(self, file_path: str, mime: str):
self.id = "doc_pdf"
self.fileName = os.path.basename(file_path)
self.mimeType = mime
with open(file_path, "rb") as f:
self.fileData = f.read()
pdf_doc = _Doc(pdf_path, "application/pdf")
pdf_options = AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
priority=Priority.BALANCED,
compressPrompt=True,
compressContext=True,
processingMode=ProcessingMode.ADVANCED,
maxContextBytes=1000,
chunkAllowed=True,
maxCost=0.02,
maxProcessingTime=10,
)
pdf_prompt = "Extract key information from the attached PDF."
pdf_resp = await ai.callAi(prompt=pdf_prompt, documents=[pdf_doc], options=pdf_options)
print("[ai] pdf extraction response length:", len(pdf_resp) if pdf_resp else 0)
except Exception as e:
print("[ai] pdf extraction error:", str(e))
else:
print("[ai] pdf test file not found; skipping")