# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Unit tests for Teamsbot Director Prompts (Plan #5). Covers: * Datamodel limits, defaults, enum-validation * SpeechTeamsResponse needsAgent / agentReason fields * TeamsbotService._buildPersistentDirectorContext rendering * TeamsbotService.submitDirectorPrompt: queues, emits SSE event, returns created * TeamsbotService._processDirectorPrompt lifecycle: queued -> running -> succeeded/consumed (one-shot vs persistent) * TeamsbotService._processDirectorPrompt failure path drops persistent prompt * TeamsbotService.removePersistentPrompt * getActiveService / _activeServices registry * TeamsbotObjects.getActivePersistentPrompts filtering The TeamsbotService constructor instantiates BrowserBotConnector, which is harmless (no network until joinMeeting). All DB / agent / SSE side-effects are stubbed via monkeypatch. """ from __future__ import annotations import asyncio from typing import Any, Dict, List, Optional from unittest.mock import MagicMock import pytest from pydantic import ValidationError from modules.features.teamsbot import service as serviceModule from modules.features.teamsbot.datamodelTeamsbot import ( DIRECTOR_PROMPT_FILE_LIMIT, DIRECTOR_PROMPT_TEXT_LIMIT, SpeechTeamsResponse, TeamsbotConfig, TeamsbotDirectorPrompt, TeamsbotDirectorPromptCreateRequest, TeamsbotDirectorPromptMode, TeamsbotDirectorPromptStatus, ) from modules.features.teamsbot.service import ( TeamsbotService, _activeServices, _sessionEvents, getActiveService, ) # ============================================================================ # Helpers # ============================================================================ class _FakeUser: """Minimal stand-in for modules.datamodels.datamodelUam.User used by the service layer. TeamsbotService only needs ``id`` for logging / interface keying.""" def __init__(self, userId: str = "user-op-1") -> None: self.id = userId class _FakeInterface: """In-memory stand-in for TeamsbotObjects (only the director-prompt API). Behaves like the real DB interface for the calls used by the service: ``createDirectorPrompt``, ``updateDirectorPrompt``, ``getDirectorPrompt``, ``getActivePersistentPrompts``, ``getActiveSystemBot``. """ def __init__(self) -> None: self.prompts: Dict[str, Dict[str, Any]] = {} self.created: List[Dict[str, Any]] = [] self.updates: List[Dict[str, Any]] = [] self.deleted: List[str] = [] def createDirectorPrompt(self, data: Dict[str, Any]) -> Dict[str, Any]: record = dict(data) if "id" not in record: record["id"] = f"prompt-{len(self.prompts)+1}" self.prompts[record["id"]] = record self.created.append(record) return record def updateDirectorPrompt(self, promptId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: if promptId not in self.prompts: return None self.prompts[promptId].update(updates) self.updates.append({"id": promptId, **updates}) return self.prompts[promptId] def getDirectorPrompt(self, promptId: str) -> Optional[Dict[str, Any]]: return self.prompts.get(promptId) def getActivePersistentPrompts(self, sessionId: str) -> List[Dict[str, Any]]: terminal = { TeamsbotDirectorPromptStatus.CONSUMED.value, TeamsbotDirectorPromptStatus.FAILED.value, } return [ p for p in self.prompts.values() if p.get("sessionId") == sessionId and p.get("mode") == TeamsbotDirectorPromptMode.PERSISTENT.value and p.get("status") not in terminal ] def getActiveSystemBot(self, mandateId: str) -> Optional[Dict[str, Any]]: return None class _CapturedEvents(list): """Helper to collect SSE events emitted by ``_emitSessionEvent``.""" async def append_event(self, sessionId: str, eventType: str, data: Any) -> None: self.append({"sessionId": sessionId, "type": eventType, "data": data}) def _patchInterface(monkeypatch: pytest.MonkeyPatch, fakeInterface: _FakeInterface) -> None: """Replace ``getInterface`` in the teamsbot service module so the service talks to our in-memory fake instead of PostgreSQL.""" from modules.features.teamsbot import interfaceFeatureTeamsbot as interfaceDb monkeypatch.setattr(interfaceDb, "getInterface", lambda *args, **kwargs: fakeInterface) def _patchEmit(monkeypatch: pytest.MonkeyPatch) -> _CapturedEvents: captured = _CapturedEvents() async def _stubEmit(sessionId: str, eventType: str, data: Any) -> None: await captured.append_event(sessionId, eventType, data) monkeypatch.setattr(serviceModule, "_emitSessionEvent", _stubEmit) return captured def _buildService() -> TeamsbotService: """Build a TeamsbotService with a minimal config. BrowserBotConnector is instantiated but never reached in these tests.""" config = TeamsbotConfig(botName="UnitTest Bot") svc = TeamsbotService( currentUser=_FakeUser(), mandateId="mandate-x", instanceId="instance-y", config=config, ) svc._activeSessionId = "session-1" return svc @pytest.fixture(autouse=True) def _resetGlobals(): """Avoid cross-test bleed in module-level globals.""" _activeServices.clear() _sessionEvents.clear() yield _activeServices.clear() _sessionEvents.clear() # ============================================================================ # 1) Datamodel # ============================================================================ class TestDirectorPromptDatamodel: def test_directorPromptDefaults(self): prompt = TeamsbotDirectorPrompt( sessionId="s1", instanceId="i1", operatorUserId="u1", text="Hello world", ) assert prompt.mode == TeamsbotDirectorPromptMode.ONE_SHOT assert prompt.status == TeamsbotDirectorPromptStatus.QUEUED assert prompt.fileIds == [] assert prompt.consumedAt is None assert prompt.responseText is None assert prompt.id # uuid auto-filled assert prompt.createdAt # iso timestamp auto-filled def test_directorPromptTextLimitEnforced(self): with pytest.raises(ValidationError): TeamsbotDirectorPrompt( sessionId="s1", instanceId="i1", operatorUserId="u1", text="x" * (DIRECTOR_PROMPT_TEXT_LIMIT + 1), ) def test_directorPromptCreateRequestDefaults(self): body = TeamsbotDirectorPromptCreateRequest(text="quick prompt") assert body.mode == TeamsbotDirectorPromptMode.ONE_SHOT assert body.fileIds == [] def test_directorPromptCreateRequestEmptyTextRejected(self): with pytest.raises(ValidationError): TeamsbotDirectorPromptCreateRequest(text="") def test_directorPromptCreateRequestTooLongRejected(self): with pytest.raises(ValidationError): TeamsbotDirectorPromptCreateRequest(text="x" * (DIRECTOR_PROMPT_TEXT_LIMIT + 1)) def test_directorPromptStatusEnum(self): assert TeamsbotDirectorPromptStatus.QUEUED.value == "queued" assert TeamsbotDirectorPromptStatus.RUNNING.value == "running" assert TeamsbotDirectorPromptStatus.SUCCEEDED.value == "succeeded" assert TeamsbotDirectorPromptStatus.CONSUMED.value == "consumed" assert TeamsbotDirectorPromptStatus.FAILED.value == "failed" def test_directorPromptModeEnum(self): assert TeamsbotDirectorPromptMode.ONE_SHOT.value == "oneShot" assert TeamsbotDirectorPromptMode.PERSISTENT.value == "persistent" def test_fileLimitConstantHasSaneValue(self): assert DIRECTOR_PROMPT_FILE_LIMIT == 10 assert DIRECTOR_PROMPT_TEXT_LIMIT == 8000 class TestSpeechTeamsResponseHybrid: def test_needsAgentDefaultFalse(self): resp = SpeechTeamsResponse(shouldRespond=False) assert resp.needsAgent is False assert resp.agentReason is None def test_needsAgentEscalation(self): resp = SpeechTeamsResponse( shouldRespond=True, responseText="Moment, ich recherchiere.", needsAgent=True, agentReason="webSearch SBB Schweiz", detectedIntent="addressed", ) assert resp.needsAgent is True assert resp.agentReason == "webSearch SBB Schweiz" # ============================================================================ # 2) Persistent Director Context Renderer # ============================================================================ class TestBuildPersistentDirectorContext: def test_emptyWhenNoPrompts(self): svc = _buildService() svc._activePersistentPrompts = [] assert svc._buildPersistentDirectorContext() == "" def test_singlePrompt(self): svc = _buildService() svc._activePersistentPrompts = [ {"id": "p1", "text": "Antworte immer in Englisch."}, ] rendered = svc._buildPersistentDirectorContext() assert "OPERATOR_DIRECTIVES" in rendered assert "- Antworte immer in Englisch." in rendered assert "private" in rendered def test_skipsBlankText(self): svc = _buildService() svc._activePersistentPrompts = [ {"id": "p1", "text": " "}, {"id": "p2", "text": "Sei hoeflich."}, ] rendered = svc._buildPersistentDirectorContext() assert "- Sei hoeflich." in rendered assert "p1" not in rendered # the blank one is filtered out def test_allBlankPromptsResultInEmpty(self): svc = _buildService() svc._activePersistentPrompts = [ {"id": "p1", "text": ""}, {"id": "p2", "text": " "}, ] assert svc._buildPersistentDirectorContext() == "" # ============================================================================ # 3) submitDirectorPrompt # ============================================================================ class TestSubmitDirectorPrompt: @pytest.mark.asyncio async def test_oneShotQueuesAndEmits(self, monkeypatch): fake = _FakeInterface() events = _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) # Block the auto-process task from running, otherwise it would call # the real agent service. We replace the coroutine factory. async def _noProcess(prompt): return None svc = _buildService() monkeypatch.setattr(svc, "_processDirectorPrompt", _noProcess) created = await svc.submitDirectorPrompt( sessionId="session-1", operatorUserId="user-op-1", text="Recherchier das im Internet.", mode=TeamsbotDirectorPromptMode.ONE_SHOT, fileIds=[], ) assert created["status"] == TeamsbotDirectorPromptStatus.QUEUED.value assert created["mode"] == TeamsbotDirectorPromptMode.ONE_SHOT.value assert created["text"] == "Recherchier das im Internet." assert created["sessionId"] == "session-1" assert created["instanceId"] == "instance-y" assert created["operatorUserId"] == "user-op-1" # SSE event with the queued lifecycle marker assert any( e["type"] == "directorPrompt" and e["data"]["status"] == TeamsbotDirectorPromptStatus.QUEUED.value and e["data"]["mode"] == TeamsbotDirectorPromptMode.ONE_SHOT.value for e in events ) # In-memory persistent registry remains empty for one-shot. assert svc._activePersistentPrompts == [] # Allow the (no-op) background task to settle so the loop is clean. await asyncio.sleep(0) @pytest.mark.asyncio async def test_persistentPromptAppendsToInMemoryRegistry(self, monkeypatch): fake = _FakeInterface() _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) async def _noProcess(prompt): return None svc = _buildService() monkeypatch.setattr(svc, "_processDirectorPrompt", _noProcess) created = await svc.submitDirectorPrompt( sessionId="session-1", operatorUserId="user-op-1", text="Antworte immer in Englisch.", mode=TeamsbotDirectorPromptMode.PERSISTENT, fileIds=["file-a", "file-b"], ) assert created["mode"] == TeamsbotDirectorPromptMode.PERSISTENT.value assert created["fileIds"] == ["file-a", "file-b"] assert len(svc._activePersistentPrompts) == 1 assert svc._activePersistentPrompts[0]["id"] == created["id"] await asyncio.sleep(0) # ============================================================================ # 4) _processDirectorPrompt lifecycle # ============================================================================ class TestProcessDirectorPromptLifecycle: @pytest.mark.asyncio async def test_oneShotSuccessTransitionsRunningThenConsumed(self, monkeypatch): fake = _FakeInterface() prompt = fake.createDirectorPrompt( TeamsbotDirectorPrompt( id="prompt-success-1", sessionId="session-1", instanceId="instance-y", operatorUserId="user-op-1", text="Was ist die Hauptstadt von Frankreich?", mode=TeamsbotDirectorPromptMode.ONE_SHOT, ).model_dump() ) events = _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) svc = _buildService() async def _stubAgent(**kwargs): return "Paris." monkeypatch.setattr(svc, "_runAgentForMeeting", _stubAgent) await svc._processDirectorPrompt(prompt) statuses = [u.get("status") for u in fake.updates if u["id"] == prompt["id"]] assert TeamsbotDirectorPromptStatus.RUNNING.value in statuses assert TeamsbotDirectorPromptStatus.CONSUMED.value in statuses final = fake.prompts[prompt["id"]] assert final["status"] == TeamsbotDirectorPromptStatus.CONSUMED.value assert final["responseText"] == "Paris." assert final.get("consumedAt") emittedStatuses = [ e["data"].get("status") for e in events if e["type"] == "directorPrompt" ] assert TeamsbotDirectorPromptStatus.RUNNING.value in emittedStatuses assert TeamsbotDirectorPromptStatus.CONSUMED.value in emittedStatuses @pytest.mark.asyncio async def test_persistentSuccessStaysSucceededNotConsumed(self, monkeypatch): fake = _FakeInterface() prompt = fake.createDirectorPrompt( TeamsbotDirectorPrompt( id="prompt-persist-1", sessionId="session-1", instanceId="instance-y", operatorUserId="user-op-1", text="Antworte immer in Englisch.", mode=TeamsbotDirectorPromptMode.PERSISTENT, ).model_dump() ) _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) svc = _buildService() async def _stubAgent(**kwargs): return "Acknowledged." monkeypatch.setattr(svc, "_runAgentForMeeting", _stubAgent) await svc._processDirectorPrompt(prompt) final = fake.prompts[prompt["id"]] assert final["status"] == TeamsbotDirectorPromptStatus.SUCCEEDED.value assert final["responseText"] == "Acknowledged." # Persistent prompts must stay alive beyond the run. assert final.get("consumedAt") is None @pytest.mark.asyncio async def test_failureMarksFailedAndDropsFromActivePersistent(self, monkeypatch): fake = _FakeInterface() prompt = fake.createDirectorPrompt( TeamsbotDirectorPrompt( id="prompt-fail-1", sessionId="session-1", instanceId="instance-y", operatorUserId="user-op-1", text="Mach was Komplexes.", mode=TeamsbotDirectorPromptMode.PERSISTENT, ).model_dump() ) events = _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) svc = _buildService() svc._activePersistentPrompts = [prompt] async def _stubAgentBoom(**kwargs): raise RuntimeError("agent down") monkeypatch.setattr(svc, "_runAgentForMeeting", _stubAgentBoom) await svc._processDirectorPrompt(prompt) final = fake.prompts[prompt["id"]] assert final["status"] == TeamsbotDirectorPromptStatus.FAILED.value assert "RuntimeError" in (final.get("statusMessage") or "") # The failed persistent prompt is removed from the in-memory directives. assert all(p["id"] != prompt["id"] for p in svc._activePersistentPrompts) emittedStatuses = [ e["data"].get("status") for e in events if e["type"] == "directorPrompt" ] assert TeamsbotDirectorPromptStatus.FAILED.value in emittedStatuses # ============================================================================ # 5) removePersistentPrompt # ============================================================================ class TestRemovePersistentPrompt: @pytest.mark.asyncio async def test_removePersistentPromptMarksConsumedAndDrops(self, monkeypatch): fake = _FakeInterface() prompt = fake.createDirectorPrompt( TeamsbotDirectorPrompt( id="prompt-rm-1", sessionId="session-1", instanceId="instance-y", operatorUserId="user-op-1", text="Bleib hoeflich.", mode=TeamsbotDirectorPromptMode.PERSISTENT, status=TeamsbotDirectorPromptStatus.SUCCEEDED, ).model_dump() ) events = _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) svc = _buildService() svc._activePersistentPrompts = [prompt] ok = await svc.removePersistentPrompt(prompt["id"]) assert ok is True final = fake.prompts[prompt["id"]] assert final["status"] == TeamsbotDirectorPromptStatus.CONSUMED.value assert final.get("consumedAt") assert final.get("statusMessage") == "Removed by operator" assert svc._activePersistentPrompts == [] assert any( e["type"] == "directorPrompt" and e["data"].get("removed") is True and e["data"].get("status") == TeamsbotDirectorPromptStatus.CONSUMED.value for e in events ) @pytest.mark.asyncio async def test_removeUnknownPromptReturnsFalse(self, monkeypatch): fake = _FakeInterface() _patchEmit(monkeypatch) _patchInterface(monkeypatch, fake) svc = _buildService() ok = await svc.removePersistentPrompt("unknown-id") assert ok is False # ============================================================================ # 6) _activeServices Registry # ============================================================================ class TestActiveServicesRegistry: def test_getActiveServiceReturnsNoneByDefault(self): assert getActiveService("not-active") is None def test_getActiveServiceReturnsRegistered(self): svc = _buildService() _activeServices["session-XYZ"] = svc assert getActiveService("session-XYZ") is svc def test_distinctSessionsMapToDistinctServices(self): a = _buildService() b = _buildService() _activeServices["s1"] = a _activeServices["s2"] = b assert getActiveService("s1") is a assert getActiveService("s2") is b assert getActiveService("s1") is not getActiveService("s2") # ============================================================================ # 7) Interface-level filtering for active persistent prompts # ============================================================================ class TestGetActivePersistentPromptsFiltering: """The interface-level helper is the source of truth for what gets re-loaded into _activePersistentPrompts on (re)connect.""" def test_onlyPersistentNonTerminal(self): fake = _FakeInterface() # All four lifecycle states for the same session for status in TeamsbotDirectorPromptStatus: fake.createDirectorPrompt( TeamsbotDirectorPrompt( sessionId="s1", instanceId="i1", operatorUserId="u1", text=f"persist-{status.value}", mode=TeamsbotDirectorPromptMode.PERSISTENT, status=status, ).model_dump() ) # one-shot persistent-failure-irrelevant fake.createDirectorPrompt( TeamsbotDirectorPrompt( sessionId="s1", instanceId="i1", operatorUserId="u1", text="oneShot-running", mode=TeamsbotDirectorPromptMode.ONE_SHOT, status=TeamsbotDirectorPromptStatus.RUNNING, ).model_dump() ) active = fake.getActivePersistentPrompts("s1") statuses = {p.get("status") for p in active} # CONSUMED and FAILED are terminal; ONE_SHOT is not persistent. assert TeamsbotDirectorPromptStatus.CONSUMED.value not in statuses assert TeamsbotDirectorPromptStatus.FAILED.value not in statuses # All returned prompts are persistent assert all( p.get("mode") == TeamsbotDirectorPromptMode.PERSISTENT.value for p in active ) # Non-terminal persistent: QUEUED, RUNNING, SUCCEEDED -> 3 records assert len(active) == 3 def test_filtersBySession(self): fake = _FakeInterface() fake.createDirectorPrompt( TeamsbotDirectorPrompt( sessionId="s1", instanceId="i1", operatorUserId="u1", text="A", mode=TeamsbotDirectorPromptMode.PERSISTENT, ).model_dump() ) fake.createDirectorPrompt( TeamsbotDirectorPrompt( sessionId="s2", instanceId="i1", operatorUserId="u1", text="B", mode=TeamsbotDirectorPromptMode.PERSISTENT, ).model_dump() ) assert len(fake.getActivePersistentPrompts("s1")) == 1 assert len(fake.getActivePersistentPrompts("s2")) == 1 assert fake.getActivePersistentPrompts("ghost") == []