import asyncio import os import sys from typing import List, Dict, Any # Ensure relative imports work when running directly CURRENT_DIR = os.path.dirname(__file__) GATEWAY_DIR = os.path.dirname(os.path.dirname(CURRENT_DIR)) if GATEWAY_DIR not in sys.path: sys.path.append(GATEWAY_DIR) from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.services.serviceGeneration.mainServiceGeneration import DocumentGenerationService from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument from modules.datamodels.datamodelAi import AiCallOptions, OperationType, ProcessingMode, Priority from modules.services.serviceAi.mainServiceAi import AiService TESTDATA_DIR = os.path.join(GATEWAY_DIR, "testdata") def _read_test_files() -> List[Dict[str, Any]]: files = [] for name in os.listdir(TESTDATA_DIR): path = os.path.join(TESTDATA_DIR, name) if not os.path.isfile(path): continue try: with open(path, "rb") as f: data = f.read() mime = _guess_mime(name) files.append({ "id": name, "bytes": data, "fileName": name, "mimeType": mime, }) except Exception: continue return files def _guess_mime(name: str) -> str: lower = name.lower() if lower.endswith(".pdf"): return "application/pdf" if lower.endswith(".xlsx"): return "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if lower.endswith(".jpg") or lower.endswith(".jpeg"): return "image/jpeg" if lower.endswith(".png"): return "image/png" return "application/octet-stream" def run_extraction_1000_bytes() -> None: svc = ExtractionService() docs = _read_test_files() options = { # cap total pooled size per document set "maxSize": 1000, # allow chunking to respect the cap across parts "chunkAllowed": True, # chunk sizes for different content types to help fit under the cap "textChunkSize": 500, "tableChunkSize": 500, "structureChunkSize": 500, # simple merge strategy if supported "mergeStrategy": {}, } results = svc.extractContent(docs, options) print("[extraction] documents:", len(docs), "results:", len(results)) for i, ec in enumerate(results): total = sum(int(p.metadata.get("size", 0) or 0) for p in ec.parts) print(f" - doc[{i}] parts={len(ec.parts)} pooledBytes={total}") async def main(): print("=== serviceExtraction: compress to 1000 bytes ===") run_extraction_1000_bytes() print("\n=== serviceGeneration: create ActionResult and write output to testdata ===") await run_generation_write_file() print("\n=== serviceAi: planning call + image + pdf extraction ===") await run_ai_tests() if __name__ == "__main__": asyncio.run(main()) async def run_generation_write_file() -> None: # Minimal stubs for interfaces expected by DocumentGenerationService class _FileItem: def __init__(self, file_id: str, file_name: str, mime_type: str, content: bytes): self.id = file_id self.fileName = file_name self.mimeType = mime_type self.fileSize = len(content) class _ComponentInterface: def __init__(self): self._files = {} def createFile(self, name: str, mimeType: str, content: bytes): fid = f"test_{len(self._files)+1}" item = _FileItem(fid, name, mimeType, content) self._files[fid] = item return item def createFileData(self, fileId: str, content: bytes): # Persist into testdata directory as requested item = self._files[fileId] out_path = os.path.join(TESTDATA_DIR, f"output_{fileId}_{item.fileName}") with open(out_path, "wb") as f: f.write(content) def getFile(self, fileId: str): return self._files.get(fileId) class _ServiceCenter: def __init__(self, comp): self.interfaceDbComponent = comp self.interfaceDbChat = None self.workflow = type("_Wf", (), {"id": "wf_test", "currentRound": 1, "currentTask": 1, "currentAction": 1, "status": "running", "totalTasks": 1, "totalActions": 1})() component = _ComponentInterface() center = _ServiceCenter(component) gen = DocumentGenerationService(center) # Build a fake action and ActionResult with a small text document class _Action: def __init__(self): self.id = "action_test" self.execMethod = "document" self.execAction = "generate" self.execParameters = {} self.execResultLabel = "round1_task1_action1_results" action = _Action() content = "This is a generated test file from serviceGeneration test." action_doc = ActionDocument(documentName="test_generated.txt", documentData=content, mimeType="text/plain") action_result = ActionResult(success=True, documents=[action_doc]) docs = gen.createDocumentsFromActionResult(action_result, action, center.workflow, message_id="msg_test") print("[generation] created documents:", len(docs)) async def run_ai_tests() -> None: # Create AiService instance (uses internal default model registry; no external creds required for this test) ai = await AiService.create() # Planning AI call (like in handlingTasks.generateTaskPlan) plan_options = AiCallOptions( operationType=OperationType.GENERATE_PLAN, priority=Priority.QUALITY, compressPrompt=False, compressContext=False, processingMode=ProcessingMode.DETAILED, maxCost=0.05, maxProcessingTime=10, ) plan_prompt = """ You are a planning assistant. Return a compact JSON with fields: tasks:[{id, objective, success_criteria:[]}], languageUserDetected:"en". Create exactly one simple task id:"task_1" objective:"Test planning" success_criteria:["done"]. """.strip() plan_resp = await ai.callAi(prompt=plan_prompt, placeholders=None, options=plan_options) print("[ai] planning response length:", len(plan_resp) if plan_resp else 0) # Image content extraction prompt using test JPEG img_path = os.path.join(TESTDATA_DIR, "00Untitled.jpg") img_resp = None if os.path.exists(img_path): try: with open(img_path, "rb") as f: img_bytes = f.read() img_options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, priority=Priority.BALANCED, compressPrompt=True, compressContext=False, processingMode=ProcessingMode.ADVANCED, maxCost=0.02, maxProcessingTime=10, ) img_resp = await ai.callAiImage( prompt="Describe the content of this image succinctly.", imageData=img_bytes, mimeType="image/jpeg", options=img_options, ) print("[ai] image analysis response length:", len(img_resp) if img_resp else 0) except Exception as e: print("[ai] image analysis error:", str(e)) else: print("[ai] image test file not found; skipping") # PDF extraction prompt: emulate text call with document context built via ExtractionService pdf_path = os.path.join(TESTDATA_DIR, "diagramm_komponenten.pdf") if os.path.exists(pdf_path): try: # Build a minimal ChatDocument-like shim that AiService._callAiText expects via extraction class _Doc: def __init__(self, file_path: str, mime: str): self.id = "doc_pdf" self.fileName = os.path.basename(file_path) self.mimeType = mime with open(file_path, "rb") as f: self.fileData = f.read() pdf_doc = _Doc(pdf_path, "application/pdf") pdf_options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, priority=Priority.BALANCED, compressPrompt=True, compressContext=True, processingMode=ProcessingMode.ADVANCED, maxContextBytes=1000, chunkAllowed=True, maxCost=0.02, maxProcessingTime=10, ) pdf_prompt = "Extract key information from the attached PDF." pdf_resp = await ai.callAi(prompt=pdf_prompt, documents=[pdf_doc], options=pdf_options) print("[ai] pdf extraction response length:", len(pdf_resp) if pdf_resp else 0) except Exception as e: print("[ai] pdf extraction error:", str(e)) else: print("[ai] pdf test file not found; skipping")