243 lines
9.1 KiB
Python
243 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Test script: verify basic functionality of migrated services.
|
|
Run: pytest gateway/tests/unit/serviceCenter/test_service_center_functionality.py -v
|
|
"""
|
|
|
|
import pytest
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.serviceCenter import getService
|
|
from modules.serviceCenter.context import ServiceCenterContext
|
|
|
|
|
|
def _make_test_context():
|
|
"""Create minimal context for service tests."""
|
|
user = User(id="test-user", username="testuser", email="test@example.com")
|
|
return ServiceCenterContext(
|
|
user=user,
|
|
mandate_id="test-mandate",
|
|
feature_instance_id=None,
|
|
workflow_id=None,
|
|
workflow=None,
|
|
)
|
|
|
|
|
|
# ========== UtilsService ==========
|
|
|
|
class TestUtilsService:
|
|
"""Tests for UtilsService."""
|
|
|
|
@pytest.fixture
|
|
def utils(self):
|
|
ctx = _make_test_context()
|
|
return getService("utils", ctx)
|
|
|
|
def test_json_strip_code_fences(self, utils):
|
|
"""stripCodeFences removes markdown code fences."""
|
|
text = '```json\n{"a": 1}\n```'
|
|
result = utils.jsonStripCodeFences(text)
|
|
assert "```" not in result
|
|
assert "{" in result and "}" in result
|
|
|
|
def test_json_extract_first_balanced(self, utils):
|
|
"""extractFirstBalancedJson extracts first balanced JSON."""
|
|
text = 'prefix {"key": "value"} suffix'
|
|
result = utils.jsonExtractFirstBalanced(text)
|
|
assert "key" in result
|
|
assert "value" in result
|
|
|
|
def test_json_normalize_text(self, utils):
|
|
"""normalizeJsonText normalizes JSON string content."""
|
|
text = ' {"a":1} '
|
|
result = utils.jsonNormalizeText(text)
|
|
assert result.strip() != "" or text.strip() == ""
|
|
|
|
def test_json_try_parse_valid(self, utils):
|
|
"""tryParseJson parses valid JSON. Returns (obj, error, cleaned_str)."""
|
|
obj, err, cleaned = utils.jsonTryParse('{"x": 42}')
|
|
assert obj is not None
|
|
assert obj.get("x") == 42
|
|
assert err is None
|
|
|
|
def test_json_try_parse_invalid(self, utils):
|
|
"""tryParseJson rejects invalid JSON. Returns (None, error, cleaned_str)."""
|
|
obj, err, cleaned = utils.jsonTryParse('{invalid')
|
|
assert obj is None
|
|
assert err is not None
|
|
|
|
def test_sanitize_prompt_content_empty(self, utils):
|
|
"""sanitizePromptContent returns empty for empty input."""
|
|
assert utils.sanitizePromptContent("") == ""
|
|
assert utils.sanitizePromptContent(None) == ""
|
|
|
|
def test_sanitize_prompt_content_text(self, utils):
|
|
"""sanitizePromptContent escapes special chars for text."""
|
|
result = utils.sanitizePromptContent('hello "world"')
|
|
assert "world" in result
|
|
assert "\\" in result or "'" in result
|
|
|
|
def test_sanitize_prompt_content_userinput(self, utils):
|
|
"""sanitizePromptContent wraps userinput in quotes."""
|
|
result = utils.sanitizePromptContent("test", contentType="userinput")
|
|
assert result.startswith("'") and result.endswith("'")
|
|
|
|
def test_timestamp_get_utc(self, utils):
|
|
"""timestampGetUtc returns positive float."""
|
|
ts = utils.timestampGetUtc()
|
|
assert isinstance(ts, (int, float))
|
|
assert ts > 0
|
|
|
|
def test_config_get_default(self, utils):
|
|
"""configGet returns default for missing key."""
|
|
val = utils.configGet("nonexistent_key_xyz", default="fallback")
|
|
assert val == "fallback"
|
|
|
|
|
|
# ========== TicketService ==========
|
|
|
|
class TestTicketService:
|
|
"""Tests for TicketService (structure only - connectTicket needs async + mocks)."""
|
|
|
|
@pytest.fixture
|
|
def ticket(self):
|
|
ctx = _make_test_context()
|
|
return getService("ticket", ctx)
|
|
|
|
def test_ticket_has_connect_method(self, ticket):
|
|
"""TicketService has connectTicket method."""
|
|
assert hasattr(ticket, "connectTicket")
|
|
assert callable(getattr(ticket, "connectTicket"))
|
|
|
|
|
|
# ========== SharepointService ==========
|
|
|
|
class TestSharepointService:
|
|
"""Tests for SharepointService (pure/sync methods, no Graph API)."""
|
|
|
|
@pytest.fixture
|
|
def sharepoint(self):
|
|
ctx = _make_test_context()
|
|
return getService("sharepoint", ctx)
|
|
|
|
def test_extract_site_from_standard_path(self, sharepoint):
|
|
"""extractSiteFromStandardPath parses Microsoft-standard path."""
|
|
result = sharepoint.extractSiteFromStandardPath("/sites/company-share/Docs/Work")
|
|
assert result is not None
|
|
assert result["siteName"] == "company-share"
|
|
assert result["innerPath"] == "Docs/Work"
|
|
|
|
def test_extract_site_invalid_path(self, sharepoint):
|
|
"""extractSiteFromStandardPath returns None for invalid path."""
|
|
assert sharepoint.extractSiteFromStandardPath("invalid") is None
|
|
assert sharepoint.extractSiteFromStandardPath("/other/prefix") is None
|
|
|
|
def test_validate_path_query(self, sharepoint):
|
|
"""validatePathQuery validates path format."""
|
|
valid, err = sharepoint.validatePathQuery("/sites/mysite/Documents")
|
|
assert valid is True
|
|
assert err is None
|
|
|
|
valid2, err2 = sharepoint.validatePathQuery("")
|
|
assert valid2 is False
|
|
assert "empty" in (err2 or "").lower() or err2 is not None
|
|
|
|
def test_filter_sites_by_hint(self, sharepoint):
|
|
"""filterSitesByHint filters by substring."""
|
|
sites = [{"displayName": "Company", "webUrl": "https://a.com"}, {"displayName": "Other", "webUrl": "https://b.com"}]
|
|
filtered = sharepoint.filterSitesByHint(sites, "company")
|
|
assert len(filtered) == 1
|
|
assert filtered[0]["displayName"] == "Company"
|
|
|
|
def test_detect_folder_type(self, sharepoint):
|
|
"""detectFolderType identifies folders vs files."""
|
|
assert sharepoint.detectFolderType({"folder": {}}) is True
|
|
assert sharepoint.detectFolderType({"file": {"mimeType": "pdf"}}) is False
|
|
|
|
|
|
# ========== ChatService ==========
|
|
|
|
class TestChatService:
|
|
"""Tests for ChatService (methods that don't require DB/workflow)."""
|
|
|
|
@pytest.fixture
|
|
def chat(self):
|
|
ctx = _make_test_context()
|
|
return getService("chat", ctx)
|
|
|
|
def test_calculate_object_size(self, chat):
|
|
"""calculateObjectSize returns byte count."""
|
|
assert chat.calculateObjectSize({"x": 1}) > 0
|
|
assert chat.calculateObjectSize(None) == 0
|
|
|
|
def test_get_workflow_context_no_workflow(self, chat):
|
|
"""getWorkflowContext returns defaults when no workflow."""
|
|
ctx = chat.getWorkflowContext()
|
|
assert ctx["currentRound"] == 0
|
|
assert "currentTask" in ctx
|
|
|
|
def test_get_document_reference_from_chat_document(self, chat):
|
|
"""getDocumentReferenceFromChatDocument produces docItem format."""
|
|
mock_doc = type("Doc", (), {"id": "id-1", "fileName": "f.pdf"})()
|
|
ref = chat.getDocumentReferenceFromChatDocument(mock_doc)
|
|
assert ref.startswith("docItem:")
|
|
assert "id-1" in ref and "f.pdf" in ref
|
|
|
|
def test_get_document_count_no_workflow(self, chat):
|
|
"""getDocumentCount returns message when no workflow."""
|
|
msg = chat.getDocumentCount()
|
|
assert "No documents" in msg or "document" in msg.lower()
|
|
|
|
|
|
# ========== ExtractionService ==========
|
|
|
|
class TestExtractionService:
|
|
"""Tests for ExtractionService (pure methods, no documents)."""
|
|
|
|
@pytest.fixture
|
|
def extraction(self):
|
|
ctx = _make_test_context()
|
|
return getService("extraction", ctx)
|
|
|
|
def test_merge_part_results_empty(self, extraction):
|
|
"""mergePartResults with empty list returns empty string."""
|
|
assert extraction.mergePartResults([]) == ""
|
|
|
|
def test_is_json_extraction_response(self, extraction):
|
|
"""_isJsonExtractionResponse detects extraction format."""
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
part = ContentPart(id="p1", label="l1", typeGroup="text", mimeType="text/plain", data='{"extracted_content": {"text": "x", "tables": []}}')
|
|
assert extraction._isJsonExtractionResponse([part]) is True
|
|
|
|
def test_create_error_response(self, extraction):
|
|
"""_createErrorResponse produces correct AiCallResponse."""
|
|
err = extraction._createErrorResponse("msg", 10, 20)
|
|
assert err.content == "msg"
|
|
assert err.errorCount == 1
|
|
|
|
|
|
# ========== WebService ==========
|
|
|
|
class TestWebService:
|
|
"""Tests for WebService (structure only - performWebResearch needs full stack)."""
|
|
|
|
@pytest.fixture
|
|
def web(self):
|
|
try:
|
|
ctx = _make_test_context()
|
|
return getService("web", ctx)
|
|
except (KeyError, ImportError, ModuleNotFoundError):
|
|
pytest.skip("WebService dependencies not fully migrated")
|
|
|
|
def test_web_has_perform_web_research(self, web):
|
|
"""WebService has performWebResearch method."""
|
|
assert hasattr(web, "performWebResearch")
|
|
assert callable(getattr(web, "performWebResearch"))
|
|
|
|
def test_web_workflow_id_property(self, web):
|
|
"""WebService _workflow_id returns string when no workflow."""
|
|
# No workflow in context -> returns no-workflow-<timestamp>
|
|
wf_id = web._workflow_id()
|
|
assert isinstance(wf_id, str)
|
|
assert "workflow" in wf_id.lower() or wf_id.startswith("no-workflow")
|