platform-core/modules/connectors/connectorProviderMsft.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

1572 lines
65 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""Microsoft ProviderConnector -- one MSFT connection serves SharePoint, Outlook, Teams, OneDrive.
All ServiceAdapters share the same OAuth access token obtained from the
UserConnection (authority=msft).
"""
import json
import logging
import re
import aiohttp
import asyncio
import urllib.parse
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, List, Optional
from modules.connectors.connectorProviderBase import ProviderConnector, ServiceAdapter, DownloadResult
from modules.shared.httpResilience import ResilientHttp
from modules.datamodels.datamodelDataSource import ExternalEntry
logger = logging.getLogger(__name__)
_GRAPH_BASE = "https://graph.microsoft.com/v1.0"
_http = ResilientHttp("Graph", maxConcurrent=10, defaultTimeoutS=30)
class _GraphApiMixin:
"""Shared Graph API call logic for all MSFT service adapters."""
def __init__(self, accessToken: str):
self._accessToken = accessToken
async def _graphGet(self, endpoint: str) -> Dict[str, Any]:
return await _makeGraphCall(self._accessToken, endpoint, "GET")
async def _graphPost(self, endpoint: str, data: Any = None) -> Dict[str, Any]:
return await _makeGraphCall(self._accessToken, endpoint, "POST", data)
async def _graphPut(self, endpoint: str, data: bytes = None) -> Dict[str, Any]:
return await _makeGraphCall(self._accessToken, endpoint, "PUT", data)
async def _graphPatch(self, endpoint: str, data: Any = None) -> Dict[str, Any]:
return await _makeGraphCall(self._accessToken, endpoint, "PATCH", data)
async def _graphDelete(self, endpoint: str) -> Dict[str, Any]:
return await _makeGraphCall(self._accessToken, endpoint, "DELETE")
async def _graphDownload(self, endpoint: str) -> Optional[bytes]:
"""Download binary content from Graph API."""
headers = {"Authorization": f"Bearer {self._accessToken}"}
url = f"{_GRAPH_BASE}/{endpoint.lstrip('/')}"
return await _http.getBytes(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60))
async def _makeGraphCall(
token: str, endpoint: str, method: str = "GET", data: Any = None
) -> Dict[str, Any]:
"""Execute a single Microsoft Graph API call via shared resilient HTTP client."""
url = f"{_GRAPH_BASE}/{endpoint.lstrip('/')}"
contentType = "application/json; charset=utf-8"
if method == "PUT" and isinstance(data, bytes):
contentType = "application/octet-stream"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": contentType,
}
if "$count=true" in endpoint:
headers["ConsistencyLevel"] = "eventual"
return await _http.request(method, url, headers=headers, data=data)
async def _handleResponse(resp: aiohttp.ClientResponse) -> Dict[str, Any]:
if resp.status in (200, 201):
return await resp.json()
if resp.status == 202:
return {"accepted": True}
if resp.status == 204:
return {}
errorText = await resp.text()
logger.error(f"Graph API {resp.status}: {errorText}")
return {"error": f"{resp.status}: {errorText}"}
def stripGraphBase(url: str) -> str:
"""Convert an absolute Graph URL (used by @odata.nextLink) into the
relative endpoint that ``_makeGraphCall`` expects."""
if not url:
return ""
if url.startswith(_GRAPH_BASE):
return url[len(_GRAPH_BASE):].lstrip("/")
return url
def _raiseGraphError(result: Dict[str, Any], ctx: str) -> None:
"""Raise a clear error for a failed Graph response.
Browse/search must NOT swallow API failures into an empty result list, which
makes a real error look like 'empty directory'. Callers (data-source tools,
tree-builder, sync jobs) already wrap these in try/except.
"""
err = result.get("error") if isinstance(result, dict) else None
logger.warning("Graph error (%s): %s", ctx, err or result)
raise RuntimeError(f"Graph error ({ctx}): {err or result}")
def _graphItemToExternalEntry(item: Dict[str, Any], basePath: str = "") -> ExternalEntry:
isFolder = "folder" in item
# Graph exposes the driveItem content hash as ``eTag`` (quoted) or
# ``cTag``; we normalise to a "revision" string so callers can use it as a
# stable ``contentVersion`` for idempotent ingestion without re-downloading
# file bytes.
revision = item.get("eTag") or item.get("cTag")
return ExternalEntry(
name=item.get("name", ""),
path=f"{basePath}/{item.get('name', '')}" if basePath else item.get("name", ""),
isFolder=isFolder,
size=item.get("size"),
mimeType=item.get("file", {}).get("mimeType") if not isFolder else None,
lastModified=None,
metadata={
"id": item.get("id"),
"webUrl": item.get("webUrl"),
"childCount": item.get("folder", {}).get("childCount") if isFolder else None,
"revision": revision,
"lastModifiedDateTime": item.get("lastModifiedDateTime"),
"parentReference": item.get("parentReference", {}),
},
)
# ---------------------------------------------------------------------------
# SharePoint Adapter
# ---------------------------------------------------------------------------
class SharepointAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter for SharePoint (files, sites) via Microsoft Graph."""
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
"""List items in a SharePoint folder.
Path format: /sites/<SiteName>/<FolderPath>
Root "/" lists available sites via discovery.
"""
if not path or path == "/":
return await self._discoverSites()
siteId, folderPath = _parseSharepointPath(path)
if not siteId:
return await self._discoverSites()
if not folderPath or folderPath == "/":
endpoint: Optional[str] = f"sites/{siteId}/drive/root/children?$top=200"
else:
cleanPath = folderPath.lstrip("/")
endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/children?$top=200"
# Follow @odata.nextLink until a hard cap is reached so large libraries
# are fully enumerated (required for bootstrap). Per-page size uses
# Graph's max supported value to minimise round-trips.
effectiveLimit = int(limit) if limit is not None else None
items: List[Dict[str, Any]] = []
hardCap = 5000
while endpoint and len(items) < hardCap:
result = await self._graphGet(endpoint)
if "error" in result:
if not items:
_raiseGraphError(result, "SharePoint browse")
break
for raw in result.get("value", []) or []:
items.append(raw)
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
entries = [_graphItemToExternalEntry(item, path) for item in items]
if filter:
entries = [e for e in entries if _matchFilter(e, filter)]
if effectiveLimit is not None:
entries = entries[: max(1, effectiveLimit)]
return entries
async def _discoverSites(self) -> List[ExternalEntry]:
"""Discover accessible SharePoint sites."""
result = await self._graphGet("sites?search=*&$top=50")
if "error" in result:
_raiseGraphError(result, "SharePoint site discovery")
return [
ExternalEntry(
name=s.get("displayName") or s.get("name", ""),
path=f"/sites/{s.get('id', '')}",
isFolder=True,
metadata={
"id": s.get("id"),
"webUrl": s.get("webUrl"),
"description": s.get("description", ""),
},
)
for s in result.get("value", [])
if s.get("displayName")
]
async def download(self, path: str) -> bytes:
siteId, filePath = _parseSharepointPath(path)
if not siteId or not filePath:
return b""
cleanPath = filePath.strip("/")
endpoint = f"sites/{siteId}/drive/root:/{cleanPath}:/content"
data = await self._graphDownload(endpoint)
return data or b""
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
siteId, folderPath = _parseSharepointPath(path)
if not siteId:
return {"error": "Invalid SharePoint path"}
cleanFolder = (folderPath or "").strip("/")
uploadPath = f"{cleanFolder}/{fileName}" if cleanFolder else fileName
endpoint = f"sites/{siteId}/drive/root:/{uploadPath}:/content"
result = await self._graphPut(endpoint, data)
return result
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
siteId, folderPath = _parseSharepointPath(path or "")
if not siteId:
return []
safeQuery = query.replace("'", "''")
cleanFolder = (folderPath or "").strip("/")
# Scope the search to the attached folder when one is given, so the agent
# does not get hits from unrelated parts of the site drive.
if cleanFolder:
endpoint: Optional[str] = f"sites/{siteId}/drive/root:/{cleanFolder}:/search(q='{safeQuery}')?$top=200"
else:
endpoint = f"sites/{siteId}/drive/root/search(q='{safeQuery}')?$top=200"
effectiveLimit = int(limit) if limit is not None else None
items: List[Dict[str, Any]] = []
hardCap = 1000
while endpoint and len(items) < hardCap:
result = await self._graphGet(endpoint)
if "error" in result:
if not items:
_raiseGraphError(result, "SharePoint search")
break
for raw in result.get("value", []) or []:
items.append(raw)
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
entries = [_graphItemToExternalEntry(item) for item in items]
if effectiveLimit is not None:
entries = entries[: max(1, effectiveLimit)]
return entries
# ---------------------------------------------------------------------------
# Outlook Adapter
# ---------------------------------------------------------------------------
_CHARSET_META = '<meta charset="utf-8">'
def _parseDateRange(filterStr: Optional[str]) -> tuple:
"""Parse a date range from a filter/query string.
Supports two ISO dates ("2026-06-01 2026-06-30"), a single ISO date
(treated as a ~31 day window), or a YYYY-MM month pattern. Returns
(startDateTime, endDateTime) ISO strings, or (None, None) if not parseable.
"""
if not filterStr:
return (None, None)
isoMatch = re.findall(r'\d{4}-\d{2}-\d{2}(?:T[\d:]+)?', filterStr)
if len(isoMatch) >= 2:
return (isoMatch[0], isoMatch[1])
if len(isoMatch) == 1:
try:
dt = datetime.fromisoformat(isoMatch[0])
return (isoMatch[0], (dt + timedelta(days=31)).strftime('%Y-%m-%dT00:00:00'))
except ValueError:
pass
monthMatch = re.match(r'^(\d{4})-(\d{2})$', filterStr.strip())
if monthMatch:
year, month = int(monthMatch.group(1)), int(monthMatch.group(2))
start = f"{year}-{month:02d}-01T00:00:00"
if month == 12:
end = f"{year + 1}-01-01T00:00:00"
else:
end = f"{year}-{month + 1:02d}-01T00:00:00"
return (start, end)
return (None, None)
def _toGraphUtc(isoStr: str) -> str:
"""Normalise an ISO date/datetime to a Graph-compatible UTC string
(always 'YYYY-MM-DDTHH:MM:SSZ')."""
if not isoStr:
return isoStr
value = isoStr.strip().rstrip("Z")
if "T" not in value:
value = f"{value}T00:00:00"
return f"{value}Z"
def _ensureHtmlCharset(html: str) -> str:
"""Ensure HTML body has a charset meta tag so Outlook renders UTF-8 correctly."""
if "charset" in html.lower():
return html
if html.strip().lower().startswith("<html"):
return html.replace("<html>", f"<html><head>{_CHARSET_META}</head>", 1)
return f"<html><head>{_CHARSET_META}</head><body>{html}</body></html>"
class OutlookAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter for Outlook (mail, calendar) via Microsoft Graph."""
# Default upper bound for messages returned from a single browse() call.
# Graph allows $top up to 1000 per page; we keep the default modest so
# accidental "browse all" calls don't blow up the LLM context. Callers
# (e.g. the agent's browseDataSource tool) can override via ``limit``.
_DEFAULT_MESSAGE_LIMIT = 100
_MAX_MESSAGE_LIMIT = 1000
_PAGE_SIZE = 100
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
"""List mail folders or messages.
path = "" or "/" → list ALL top-level mail folders (paginated)
path = "/<folderId>" → list messages in that folder (paginated, up to ``limit``)
"""
if not path or path == "/":
# Graph default page size for /me/mailFolders is 10. Mailboxes with
# localized + many system folders (Posteingang, Gesendet, Archiv, …)
# often exceed that, so the well-known Inbox can fall off the first
# page. We page through all results AND hard-fall-back to the
# well-known shortcut /me/mailFolders/inbox so the default folder
# is always visible regardless of locale/order.
folders: List[Dict[str, Any]] = []
seenIds: set = set()
endpoint: Optional[str] = "me/mailFolders?$top=100"
while endpoint:
result = await self._graphGet(endpoint)
if "error" in result:
break
for f in result.get("value", []):
fid = f.get("id")
if fid and fid not in seenIds:
seenIds.add(fid)
folders.append(f)
nextLink = result.get("@odata.nextLink")
if not nextLink:
endpoint = None
else:
endpoint = stripGraphBase(nextLink)
# Guarantee Inbox is present (well-known name, locale-independent)
if not any((f.get("displayName") or "").lower() in ("inbox", "posteingang") for f in folders):
inbox = await self._graphGet("me/mailFolders/inbox")
if "error" not in inbox and inbox.get("id") and inbox.get("id") not in seenIds:
folders.insert(0, inbox)
return [
ExternalEntry(
name=f.get("displayName", ""),
path=f"/{f.get('id', '')}",
isFolder=True,
metadata={
"id": f.get("id"),
"totalItemCount": f.get("totalItemCount"),
"unreadItemCount": f.get("unreadItemCount"),
"childFolderCount": f.get("childFolderCount"),
},
)
for f in folders
]
# The incoming path segment may be a display name ("MGB-Ablage"), a
# well-known shortcut ("inbox") or an already-resolved Graph folder id.
# Resolve it to a real id first; otherwise Graph rejects the URL with
# 400 ErrorInvalidIdMalformed.
folderRef = path.strip("/")
folderId = await self._resolveFolderId(folderRef)
if not folderId:
raise ValueError(
f"Outlook folder not found: '{folderRef}'. Browse the mailbox root "
f"(path '/') or call listMailFolders to obtain a valid folder id."
)
effectiveLimit = self._DEFAULT_MESSAGE_LIMIT if limit is None else max(1, min(int(limit), self._MAX_MESSAGE_LIMIT))
pageSize = min(self._PAGE_SIZE, effectiveLimit)
# Optional date-range filter (e.g. "2026-06" or "2026-06-01 2026-06-30")
# so only that period is fetched server-side instead of paging the whole
# folder. Falls back to a plain newest-first listing otherwise.
startDateTime, endDateTime = _parseDateRange(filter)
countParam = "&$count=true"
if startDateTime and endDateTime:
dateFilter = (
f"receivedDateTime ge {_toGraphUtc(startDateTime)} and "
f"receivedDateTime lt {_toGraphUtc(endDateTime)}"
)
endpoint: Optional[str] = (
f"me/mailFolders/{folderId}/messages"
f"?$top={pageSize}&$orderby=receivedDateTime desc"
f"&$filter={urllib.parse.quote(dateFilter)}{countParam}"
)
else:
endpoint = (
f"me/mailFolders/{folderId}/messages"
f"?$top={pageSize}&$orderby=receivedDateTime desc{countParam}"
)
messages: List[Dict[str, Any]] = []
totalCount: Optional[int] = None
firstPage = True
while endpoint and len(messages) < effectiveLimit:
result = await self._graphGet(endpoint)
if "error" in result:
if firstPage:
err = result.get("error") or {}
raise RuntimeError(
f"Graph error listing messages in folder '{folderRef}': "
f"{err.get('message') or err}"
)
break
if firstPage and "@odata.count" in result:
totalCount = result["@odata.count"]
firstPage = False
for m in result.get("value", []):
messages.append(m)
if len(messages) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
entries = [
ExternalEntry(
name=m.get("subject", "(no subject)"),
path=f"{path}/{m.get('id', '')}",
isFolder=False,
metadata={
"id": m.get("id"),
"from": m.get("from", {}).get("emailAddress", {}).get("address"),
"receivedDateTime": m.get("receivedDateTime"),
"hasAttachments": m.get("hasAttachments", False),
},
)
for m in messages
]
if totalCount is not None and totalCount > len(entries):
entries.append(ExternalEntry(
name=f"({totalCount} total messages in folder, {len(entries)} listed)",
path=f"{path}/_count", isFolder=False,
metadata={"totalCount": totalCount, "listed": len(entries)},
))
return entries
async def download(self, path: str) -> DownloadResult:
"""Download a mail message as RFC 822 EML via Graph API $value endpoint."""
messageId = path.strip("/").split("/")[-1]
meta = await self._graphGet(f"me/messages/{messageId}?$select=subject")
subject = meta.get("subject", messageId) if "error" not in meta else messageId
safeName = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", subject)[:80].strip(". ") or "email"
emlBytes = await self._graphDownload(f"me/messages/{messageId}/$value")
if not emlBytes:
return DownloadResult()
return DownloadResult(
data=emlBytes,
fileName=f"{safeName}.eml",
mimeType="message/rfc822",
)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
"""Not applicable for Outlook in the file sense."""
return {"error": "Upload not supported for Outlook"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
safeQuery = query.replace('"', '\\"')
effectiveLimit = self._DEFAULT_MESSAGE_LIMIT if limit is None else max(1, min(int(limit), self._MAX_MESSAGE_LIMIT))
# Scope the search to the attached folder when one is given, so the agent
# gets hits only from e.g. the Inbox instead of the whole mailbox. Resolve
# the folder reference (display name / well-known / id) to a real id first.
folderRef = (path or "").strip("/")
base = "me/messages"
if folderRef:
folderId = await self._resolveFolderId(folderRef)
if not folderId:
raise ValueError(
f"Outlook folder not found: '{folderRef}'. Call listMailFolders "
f"to obtain a valid folder id, or search without a folder scope."
)
base = f"me/mailFolders/{folderId}/messages"
# NOTE: Graph $search does not support $orderby and may return a single
# page (no @odata.nextLink). We still pass $top to lift the implicit 25.
endpoint = f"{base}?$search=\"{safeQuery}\"&$top={effectiveLimit}"
result = await self._graphGet(endpoint)
if "error" in result:
err = result.get("error") or {}
raise RuntimeError(f"Graph error searching mail: {err.get('message') or err}")
return [
ExternalEntry(
name=m.get("subject", "(no subject)"),
path=f"/search/{m.get('id', '')}",
isFolder=False,
metadata={
"id": m.get("id"),
"from": m.get("from", {}).get("emailAddress", {}).get("address"),
"receivedDateTime": m.get("receivedDateTime"),
},
)
for m in result.get("value", [])
]
def _buildMessage(
self, to: List[str], subject: str, body: str,
bodyType: str = "Text",
cc: Optional[List[str]] = None,
attachments: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Build a Graph API message object.
attachments: list of {"name": str, "contentBytes": str (base64), "contentType": str}
"""
content = body
if bodyType.upper() == "HTML":
content = _ensureHtmlCharset(body)
message: Dict[str, Any] = {
"subject": subject,
"body": {"contentType": bodyType, "content": content},
"toRecipients": [{"emailAddress": {"address": addr}} for addr in to],
}
if cc:
message["ccRecipients"] = [{"emailAddress": {"address": addr}} for addr in cc]
if attachments:
message["attachments"] = [
{
"@odata.type": "#microsoft.graph.fileAttachment",
"name": att["name"],
"contentBytes": att["contentBytes"],
"contentType": att.get("contentType", "application/octet-stream"),
}
for att in attachments
]
return message
async def sendMail(
self, to: List[str], subject: str, body: str,
bodyType: str = "Text",
cc: Optional[List[str]] = None,
attachments: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Send an email via Microsoft Graph. bodyType: 'Text' or 'HTML'."""
message = self._buildMessage(to, subject, body, bodyType, cc, attachments)
payload = json.dumps({"message": message, "saveToSentItems": True}).encode("utf-8")
result = await self._graphPost("me/sendMail", payload)
if "error" in result:
return result
return {"success": True}
async def createDraft(
self, to: List[str], subject: str, body: str,
bodyType: str = "Text",
cc: Optional[List[str]] = None,
attachments: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Create a draft email in the user's Drafts folder via Microsoft Graph."""
message = self._buildMessage(to, subject, body, bodyType, cc, attachments)
payload = json.dumps(message).encode("utf-8")
result = await self._graphPost("me/messages", payload)
if "error" in result:
return result
return {"success": True, "draft": True, "messageId": result.get("id", "")}
# ------------------------------------------------------------------
# Reply / Reply-All / Forward
# ------------------------------------------------------------------
# Microsoft Graph distinguishes between "send-immediately" endpoints
# (``/reply``, ``/replyAll``, ``/forward``) and their "create-draft"
# counterparts (``/createReply``, ``/createReplyAll``, ``/createForward``).
# The send-immediately variant accepts a free-text ``comment`` string
# that Graph prepends to the original conversation; the createReply*
# variants return a fully-populated draft message that the caller can
# further edit (e.g. via PATCH /me/messages/{id} with a richer body)
# before posting via /send. We expose both flavours so the agent can
# choose between "draft for review" and "send right now".
async def replyToMail(
self, messageId: str, comment: str,
replyAll: bool = False,
) -> Dict[str, Any]:
"""Reply (or reply-all) to an existing message immediately.
Preserves the conversation thread and the ``AW:`` prefix in Outlook --
unlike sendMail() which creates a brand-new conversation.
"""
endpointAction = "replyAll" if replyAll else "reply"
payload = json.dumps({"comment": comment}).encode("utf-8")
result = await self._graphPost(f"me/messages/{messageId}/{endpointAction}", payload)
if "error" in result:
return result
return {"success": True, "messageId": messageId, "action": endpointAction}
async def forwardMail(
self, messageId: str, to: List[str], comment: str = "",
) -> Dict[str, Any]:
"""Forward an existing message to new recipients."""
payload = json.dumps({
"comment": comment,
"toRecipients": [{"emailAddress": {"address": addr}} for addr in to],
}).encode("utf-8")
result = await self._graphPost(f"me/messages/{messageId}/forward", payload)
if "error" in result:
return result
return {"success": True, "messageId": messageId, "action": "forward"}
async def createReplyDraft(
self, messageId: str, comment: str = "",
replyAll: bool = False,
) -> Dict[str, Any]:
"""Create a reply-draft (in the Drafts folder) that the user can edit before sending."""
endpointAction = "createReplyAll" if replyAll else "createReply"
payload = json.dumps({"comment": comment}).encode("utf-8") if comment else b"{}"
result = await self._graphPost(f"me/messages/{messageId}/{endpointAction}", payload)
if "error" in result:
return result
return {"success": True, "draft": True, "messageId": result.get("id", ""), "originalMessageId": messageId}
async def createForwardDraft(
self, messageId: str, to: Optional[List[str]] = None, comment: str = "",
) -> Dict[str, Any]:
"""Create a forward-draft (in the Drafts folder) that the user can edit before sending."""
body: Dict[str, Any] = {}
if comment:
body["comment"] = comment
if to:
body["toRecipients"] = [{"emailAddress": {"address": addr}} for addr in to]
payload = json.dumps(body).encode("utf-8") if body else b"{}"
result = await self._graphPost(f"me/messages/{messageId}/createForward", payload)
if "error" in result:
return result
return {"success": True, "draft": True, "messageId": result.get("id", ""), "originalMessageId": messageId}
# ------------------------------------------------------------------
# Folder-Management & Mail-Management
# ------------------------------------------------------------------
# Mapping of Microsoft Graph "well-known folder names" plus a few common
# localized display names (DE) so the LLM can write natural names like
# "Posteingang", "Archiv", "deletedItems" without having to look up the
# opaque mailbox folder ID first.
_WELL_KNOWN_FOLDERS = {
"inbox": "inbox",
"posteingang": "inbox",
"drafts": "drafts",
"entwürfe": "drafts",
"entwurf": "drafts",
"sentitems": "sentitems",
"gesendet": "sentitems",
"gesendete elemente": "sentitems",
"deleteditems": "deleteditems",
"gelöscht": "deleteditems",
"gelöschte elemente": "deleteditems",
"papierkorb": "deleteditems",
"trash": "deleteditems",
"junkemail": "junkemail",
"spam": "junkemail",
"junk": "junkemail",
"outbox": "outbox",
"postausgang": "outbox",
"archive": "archive",
"archiv": "archive",
"msgfolderroot": "msgfolderroot",
"root": "msgfolderroot",
}
async def listMailFolders(self) -> List[Dict[str, Any]]:
"""List all top-level mail folders with id, name and counts.
Returns a flat list of dicts so the caller (e.g. an LLM tool) does not
need to know the Graph nesting model. Use ``_resolveFolderId()`` to
translate a user-provided name into a Graph folder ID.
"""
folders: List[Dict[str, Any]] = []
seenIds: set = set()
endpoint: Optional[str] = "me/mailFolders?$top=100"
while endpoint:
result = await self._graphGet(endpoint)
if "error" in result:
break
for f in result.get("value", []):
fid = f.get("id")
if fid and fid not in seenIds:
seenIds.add(fid)
folders.append({
"id": fid,
"displayName": f.get("displayName", ""),
"totalItemCount": f.get("totalItemCount", 0),
"unreadItemCount": f.get("unreadItemCount", 0),
"childFolderCount": f.get("childFolderCount", 0),
})
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
return folders
async def _resolveFolderId(self, folderRef: str) -> Optional[str]:
"""Resolve any user-supplied folder reference to a Graph folder ID.
Resolution order:
1. If it matches a well-known shortcut (locale-aware), return that
shortcut directly -- Graph accepts ``inbox``, ``drafts`` etc. in
the URL path.
2. If it looks like a Graph folder ID (long base64-ish string),
return as-is.
3. Otherwise fall back to a case-insensitive ``displayName`` match
against the user's mail folders.
Returns ``None`` if nothing matches so the caller can surface a clear
error instead of silently moving mail into the wrong place.
"""
if not folderRef:
return None
ref = folderRef.strip()
wellKnown = self._WELL_KNOWN_FOLDERS.get(ref.lower())
if wellKnown:
return wellKnown
# Heuristic: Graph folder IDs are long URL-safe base64 strings; never
# contain spaces; and almost always include "==" or AAAAA padding.
if len(ref) > 60 and " " not in ref:
return ref
for f in await self.listMailFolders():
if (f.get("displayName") or "").strip().lower() == ref.lower():
return f.get("id")
return None
async def moveMail(
self, messageId: str, destinationFolder: str,
) -> Dict[str, Any]:
"""Move a message to another folder (well-known name, displayName, or folder id)."""
destId = await self._resolveFolderId(destinationFolder)
if not destId:
return {"error": f"Folder not found: '{destinationFolder}'. Use listMailFolders to inspect available folders."}
payload = json.dumps({"destinationId": destId}).encode("utf-8")
result = await self._graphPost(f"me/messages/{messageId}/move", payload)
if "error" in result:
return result
return {"success": True, "messageId": result.get("id", messageId), "destinationFolder": destinationFolder}
async def copyMail(
self, messageId: str, destinationFolder: str,
) -> Dict[str, Any]:
"""Copy a message into another folder (original stays in place)."""
destId = await self._resolveFolderId(destinationFolder)
if not destId:
return {"error": f"Folder not found: '{destinationFolder}'. Use listMailFolders to inspect available folders."}
payload = json.dumps({"destinationId": destId}).encode("utf-8")
result = await self._graphPost(f"me/messages/{messageId}/copy", payload)
if "error" in result:
return result
return {"success": True, "newMessageId": result.get("id", ""), "destinationFolder": destinationFolder}
async def archiveMail(self, messageId: str) -> Dict[str, Any]:
"""Move a message to the user's Archive folder.
Outlook's Archive is a regular mail folder, not a flag, so this is a
thin convenience wrapper around :py:meth:`moveMail`.
"""
return await self.moveMail(messageId, "archive")
async def deleteMail(
self, messageId: str,
*,
hardDelete: bool = False,
) -> Dict[str, Any]:
"""Delete a message.
Default behaviour (``hardDelete=False``) moves the message to the
``Deleted Items`` folder, which mirrors what users see in the Outlook
UI when they press Delete. Set ``hardDelete=True`` to perform an
unrecoverable removal -- agent tools must require an extra
confirmation before invoking this path.
"""
if hardDelete:
result = await self._graphDelete(f"me/messages/{messageId}")
if "error" in result:
return result
return {"success": True, "messageId": messageId, "hardDelete": True}
return await self.moveMail(messageId, "deleteditems")
async def markMailAsRead(self, messageId: str) -> Dict[str, Any]:
"""Mark a message as read (sets ``isRead=true``)."""
payload = json.dumps({"isRead": True}).encode("utf-8")
result = await self._graphPatch(f"me/messages/{messageId}", payload)
if "error" in result:
return result
return {"success": True, "messageId": messageId, "isRead": True}
async def markMailAsUnread(self, messageId: str) -> Dict[str, Any]:
"""Mark a message as unread (sets ``isRead=false``)."""
payload = json.dumps({"isRead": False}).encode("utf-8")
result = await self._graphPatch(f"me/messages/{messageId}", payload)
if "error" in result:
return result
return {"success": True, "messageId": messageId, "isRead": False}
async def flagMail(
self, messageId: str,
*,
flagStatus: str = "flagged",
) -> Dict[str, Any]:
"""Set or clear the follow-up flag on a message.
``flagStatus`` accepts ``"flagged"`` (default), ``"complete"`` or
``"notFlagged"`` -- the three values Microsoft Graph recognises for
``followupFlag.flagStatus``.
"""
if flagStatus not in ("flagged", "complete", "notFlagged"):
return {"error": f"Invalid flagStatus '{flagStatus}'. Use one of: flagged, complete, notFlagged."}
payload = json.dumps({"flag": {"flagStatus": flagStatus}}).encode("utf-8")
result = await self._graphPatch(f"me/messages/{messageId}", payload)
if "error" in result:
return result
return {"success": True, "messageId": messageId, "flagStatus": flagStatus}
# ---------------------------------------------------------------------------
# Teams Adapter (Stub)
# ---------------------------------------------------------------------------
class TeamsAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter for Microsoft Teams -- browse joined teams and channels."""
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> list:
cleanPath = (path or "").strip("/")
if not cleanPath:
result = await self._graphGet("me/joinedTeams")
if "error" in result:
_raiseGraphError(result, "Teams browse")
return [
ExternalEntry(
name=t.get("displayName", ""),
path=f"/{t.get('id', '')}",
isFolder=True,
metadata={"id": t.get("id"), "description": t.get("description", "")},
)
for t in result.get("value", [])
]
parts = cleanPath.split("/", 1)
teamId = parts[0]
if len(parts) == 1:
result = await self._graphGet(f"teams/{teamId}/channels")
if "error" in result:
_raiseGraphError(result, "Teams channels")
return [
ExternalEntry(
name=ch.get("displayName", ""),
path=f"/{teamId}/{ch.get('id', '')}",
isFolder=True,
metadata={"id": ch.get("id"), "membershipType": ch.get("membershipType", "")},
)
for ch in result.get("value", [])
]
return []
async def download(self, path: str) -> bytes:
return b""
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "Teams upload not implemented"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> list:
return []
# ---------------------------------------------------------------------------
# OneDrive Adapter (Stub -- similar to SharePoint but personal drive)
# ---------------------------------------------------------------------------
class OneDriveAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter stub for OneDrive (personal drive)."""
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
cleanPath = (path or "").strip("/")
if not cleanPath:
endpoint: Optional[str] = "me/drive/root/children?$top=200"
else:
endpoint = f"me/drive/root:/{cleanPath}:/children?$top=200"
effectiveLimit = int(limit) if limit is not None else None
items: List[Dict[str, Any]] = []
hardCap = 5000
while endpoint and len(items) < hardCap:
result = await self._graphGet(endpoint)
if "error" in result:
if not items:
_raiseGraphError(result, "OneDrive browse")
break
for raw in result.get("value", []) or []:
items.append(raw)
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
entries = [_graphItemToExternalEntry(item, path) for item in items]
if filter:
entries = [e for e in entries if _matchFilter(e, filter)]
if effectiveLimit is not None:
entries = entries[: max(1, effectiveLimit)]
return entries
async def download(self, path: str) -> bytes:
cleanPath = (path or "").strip("/")
if not cleanPath:
return b""
data = await self._graphDownload(f"me/drive/root:/{cleanPath}:/content")
return data or b""
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
cleanPath = (path or "").strip("/")
uploadPath = f"{cleanPath}/{fileName}" if cleanPath else fileName
endpoint = f"me/drive/root:/{uploadPath}:/content"
return await self._graphPut(endpoint, data)
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
safeQuery = query.replace("'", "''")
cleanPath = (path or "").strip("/")
# Scope to the attached folder if given, otherwise search the whole drive.
if cleanPath:
endpoint: Optional[str] = f"me/drive/root:/{cleanPath}:/search(q='{safeQuery}')?$top=200"
else:
endpoint = f"me/drive/root/search(q='{safeQuery}')?$top=200"
effectiveLimit = int(limit) if limit is not None else None
items: List[Dict[str, Any]] = []
hardCap = 1000
while endpoint and len(items) < hardCap:
result = await self._graphGet(endpoint)
if "error" in result:
if not items:
_raiseGraphError(result, "OneDrive search")
break
for raw in result.get("value", []) or []:
items.append(raw)
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
if effectiveLimit is not None and len(items) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
entries = [_graphItemToExternalEntry(item) for item in items]
if effectiveLimit is not None:
entries = entries[: max(1, effectiveLimit)]
return entries
# ---------------------------------------------------------------------------
# Calendar Adapter
# ---------------------------------------------------------------------------
class CalendarAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter for Outlook Calendar via Microsoft Graph.
Path conventions:
``""`` / ``"/"`` -> list user calendars
``"/<calendarId>"`` -> list events in that calendar
``"/<calendarId>/<eventId>"`` -> reserved for future event detail browse
Downloads return a synthesised ``.ics`` (VCALENDAR/VEVENT) since Microsoft
Graph does not expose a ``/$value`` endpoint for events.
"""
_DEFAULT_EVENT_LIMIT = 100
_MAX_EVENT_LIMIT = 1000
_PAGE_SIZE = 100
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
cleanPath = (path or "").strip("/")
if not cleanPath:
result = await self._graphGet("me/calendars?$top=100")
if "error" in result:
_raiseGraphError(result, "MSFT Calendar list")
calendars = result.get("value", [])
if filter:
calendars = [c for c in calendars if filter.lower() in (c.get("name") or "").lower()]
return [
ExternalEntry(
name=c.get("name", ""),
path=f"/{c.get('id', '')}",
isFolder=True,
metadata={
"id": c.get("id"),
"color": c.get("color"),
"owner": (c.get("owner") or {}).get("address"),
"isDefaultCalendar": c.get("isDefaultCalendar", False),
"canEdit": c.get("canEdit", False),
},
)
for c in calendars
]
# The path segment may be a calendar display name or an already-resolved
# calendar id; resolve first so a name does not produce a malformed URL.
calendarRef = cleanPath.split("/", 1)[0]
calendarId = await self._resolveCalendarId(calendarRef)
if not calendarId:
raise ValueError(
f"Calendar not found: '{calendarRef}'. Browse the root ('/') to list "
f"calendars and use the returned id."
)
effectiveLimit = self._DEFAULT_EVENT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_EVENT_LIMIT))
pageSize = min(self._PAGE_SIZE, effectiveLimit)
startDateTime, endDateTime = self._parseDateRange(filter)
if startDateTime and endDateTime:
endpoint: Optional[str] = (
f"me/calendars/{calendarId}/calendarView"
f"?startDateTime={startDateTime}&endDateTime={endDateTime}"
f"&$top={pageSize}&$orderby=start/dateTime"
f"&$select=id,subject,start,end,location,organizer,isAllDay,webLink"
)
else:
endpoint = (
f"me/calendars/{calendarId}/events"
f"?$top={pageSize}&$orderby=start/dateTime desc"
f"&$select=id,subject,start,end,location,organizer,isAllDay,webLink"
)
events: List[Dict[str, Any]] = []
while endpoint and len(events) < effectiveLimit:
result = await self._graphGet(endpoint)
if "error" in result:
if not events:
_raiseGraphError(result, "MSFT Calendar events")
break
for ev in result.get("value", []):
events.append(ev)
if len(events) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
return [
ExternalEntry(
name=ev.get("subject", "(no subject)"),
path=f"/{calendarId}/{ev.get('id', '')}",
isFolder=False,
mimeType="text/calendar",
metadata={
"id": ev.get("id"),
"start": (ev.get("start") or {}).get("dateTime"),
"end": (ev.get("end") or {}).get("dateTime"),
"location": (ev.get("location") or {}).get("displayName"),
"organizer": (ev.get("organizer") or {}).get("emailAddress", {}).get("address"),
"isAllDay": ev.get("isAllDay", False),
"webLink": ev.get("webLink"),
},
)
for ev in events
]
async def _resolveCalendarId(self, ref: str) -> Optional[str]:
"""Resolve a calendar reference (display name / 'default' / id) to a Graph
calendar id. Returns None if nothing matches."""
if not ref:
return None
r = ref.strip()
# Heuristic: Graph ids are long URL-safe strings without spaces.
if len(r) > 60 and " " not in r:
return r
result = await self._graphGet("me/calendars?$top=100")
if "error" in result:
_raiseGraphError(result, "MSFT Calendar list")
cals = result.get("value", [])
for c in cals:
if c.get("id") == r:
return r
if r.lower() in ("default", "primary", "calendar", "kalender"):
for c in cals:
if c.get("isDefaultCalendar"):
return c.get("id")
for c in cals:
if (c.get("name") or "").strip().lower() == r.lower():
return c.get("id")
return None
@staticmethod
def _parseDateRange(filterStr: Optional[str]) -> tuple:
return _parseDateRange(filterStr)
async def download(self, path: str) -> DownloadResult:
cleanPath = (path or "").strip("/")
if "/" not in cleanPath:
return DownloadResult()
eventId = cleanPath.split("/")[-1]
ev = await self._graphGet(f"me/events/{eventId}")
if "error" in ev:
logger.warning(f"MSFT Calendar event fetch failed: {ev['error']}")
return DownloadResult()
icsBytes = _eventToIcs(ev)
subject = ev.get("subject") or eventId
safeName = _safeFileName(subject) or "event"
return DownloadResult(
data=icsBytes,
fileName=f"{safeName}.ics",
mimeType="text/calendar",
)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "Calendar upload not supported"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
effectiveLimit = self._DEFAULT_EVENT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_EVENT_LIMIT))
startDateTime, endDateTime = self._parseDateRange(query)
if startDateTime and endDateTime:
endpoint = (
f"me/calendarView"
f"?startDateTime={startDateTime}&endDateTime={endDateTime}"
f"&$top={effectiveLimit}&$orderby=start/dateTime"
f"&$select=id,subject,start,end,location,organizer,isAllDay"
)
else:
safeQuery = query.replace("'", "''").replace('"', '\\"')
endpoint = f'me/events?$search="{safeQuery}"&$top={effectiveLimit}&$select=id,subject,start,end,location,organizer,isAllDay'
result = await self._graphGet(endpoint)
if "error" in result:
_raiseGraphError(result, "MSFT Calendar search")
calendarId = (path or "").strip("/").split("/")[0] if path else "search"
return [
ExternalEntry(
name=ev.get("subject", "(no subject)"),
path=f"/{calendarId}/{ev.get('id', '')}",
isFolder=False,
mimeType="text/calendar",
metadata={
"id": ev.get("id"),
"start": (ev.get("start") or {}).get("dateTime"),
"end": (ev.get("end") or {}).get("dateTime"),
"location": (ev.get("location") or {}).get("displayName"),
"organizer": (ev.get("organizer") or {}).get("emailAddress", {}).get("address"),
"isAllDay": ev.get("isAllDay", False),
},
)
for ev in result.get("value", [])
]
# ---------------------------------------------------------------------------
# Contacts Adapter
# ---------------------------------------------------------------------------
class ContactsAdapter(_GraphApiMixin, ServiceAdapter):
"""ServiceAdapter for Outlook Contacts via Microsoft Graph.
Path conventions:
``""`` -> list contact folders (default + custom)
``"/<folderId>"`` -> list contacts in that folder; the
virtual id ``default`` maps to
``/me/contacts`` (the user's primary
contact list)
``"/<folderId>/<contactId>"`` -> reserved for future detail browse
Downloads return a synthesised vCard 3.0 (.vcf) since Microsoft Graph
does not expose a ``/$value`` endpoint for contacts.
"""
_DEFAULT_CONTACT_LIMIT = 200
_MAX_CONTACT_LIMIT = 1000
_PAGE_SIZE = 100
_DEFAULT_FOLDER_ID = "default"
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
cleanPath = (path or "").strip("/")
if not cleanPath:
folders: List[ExternalEntry] = [
ExternalEntry(
name="Kontakte",
path=f"/{self._DEFAULT_FOLDER_ID}",
isFolder=True,
metadata={"id": self._DEFAULT_FOLDER_ID, "isDefault": True},
),
]
result = await self._graphGet("me/contactFolders?$top=100")
if "error" not in result:
for f in result.get("value", []):
folders.append(
ExternalEntry(
name=f.get("displayName", ""),
path=f"/{f.get('id', '')}",
isFolder=True,
metadata={"id": f.get("id"), "parentFolderId": f.get("parentFolderId")},
)
)
else:
logger.warning(f"MSFT contactFolders list failed: {result['error']}")
return folders
# The path segment may be a contact-folder display name or an already-
# resolved folder id (or the virtual 'default'); resolve first.
folderRef = cleanPath.split("/", 1)[0]
folderId = await self._resolveContactFolderId(folderRef)
if not folderId:
raise ValueError(
f"Contact folder not found: '{folderRef}'. Browse the root ('/') to "
f"list folders and use the returned id."
)
effectiveLimit = self._DEFAULT_CONTACT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_CONTACT_LIMIT))
pageSize = min(self._PAGE_SIZE, effectiveLimit)
if folderId == self._DEFAULT_FOLDER_ID:
endpoint: Optional[str] = f"me/contacts?$top={pageSize}&$orderby=displayName"
else:
endpoint = f"me/contactFolders/{folderId}/contacts?$top={pageSize}&$orderby=displayName"
contacts: List[Dict[str, Any]] = []
while endpoint and len(contacts) < effectiveLimit:
result = await self._graphGet(endpoint)
if "error" in result:
if not contacts:
_raiseGraphError(result, "MSFT contacts list")
break
for c in result.get("value", []):
contacts.append(c)
if len(contacts) >= effectiveLimit:
break
nextLink = result.get("@odata.nextLink")
endpoint = stripGraphBase(nextLink) if nextLink else None
return [
ExternalEntry(
name=c.get("displayName") or _personLabel(c) or "(no name)",
path=f"/{folderId}/{c.get('id', '')}",
isFolder=False,
mimeType="text/vcard",
metadata={
"id": c.get("id"),
"givenName": c.get("givenName"),
"surname": c.get("surname"),
"companyName": c.get("companyName"),
"emailAddresses": [e.get("address") for e in (c.get("emailAddresses") or []) if e.get("address")],
"businessPhones": c.get("businessPhones") or [],
"mobilePhone": c.get("mobilePhone"),
},
)
for c in contacts
]
async def _resolveContactFolderId(self, ref: str) -> Optional[str]:
"""Resolve a contact-folder reference (display name / 'default' / id) to a
folder id. Returns None if nothing matches."""
if not ref:
return None
r = ref.strip()
if r == self._DEFAULT_FOLDER_ID or r.lower() in ("kontakte", "contacts", "default"):
return self._DEFAULT_FOLDER_ID
# Heuristic: Graph ids are long URL-safe strings without spaces.
if len(r) > 60 and " " not in r:
return r
result = await self._graphGet("me/contactFolders?$top=100")
if "error" in result:
_raiseGraphError(result, "MSFT contactFolders list")
for f in result.get("value", []):
if f.get("id") == r:
return r
for f in result.get("value", []):
if (f.get("displayName") or "").strip().lower() == r.lower():
return f.get("id")
return None
async def download(self, path: str) -> DownloadResult:
cleanPath = (path or "").strip("/")
if "/" not in cleanPath:
return DownloadResult()
contactId = cleanPath.split("/")[-1]
c = await self._graphGet(f"me/contacts/{contactId}")
if "error" in c:
logger.warning(f"MSFT contact fetch failed: {c['error']}")
return DownloadResult()
vcfBytes = _contactToVcard(c)
label = c.get("displayName") or _personLabel(c) or contactId
safeName = _safeFileName(label) or "contact"
return DownloadResult(
data=vcfBytes,
fileName=f"{safeName}.vcf",
mimeType="text/vcard",
)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "Contacts upload not supported"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
safeQuery = query.replace('"', '\\"')
effectiveLimit = self._DEFAULT_CONTACT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_CONTACT_LIMIT))
endpoint = f"me/contacts?$search=\"{safeQuery}\"&$top={effectiveLimit}"
result = await self._graphGet(endpoint)
if "error" in result:
_raiseGraphError(result, "MSFT contacts search")
return [
ExternalEntry(
name=c.get("displayName") or _personLabel(c) or "(no name)",
path=f"/search/{c.get('id', '')}",
isFolder=False,
mimeType="text/vcard",
metadata={
"id": c.get("id"),
"givenName": c.get("givenName"),
"surname": c.get("surname"),
"companyName": c.get("companyName"),
"emailAddresses": [e.get("address") for e in (c.get("emailAddresses") or []) if e.get("address")],
"businessPhones": c.get("businessPhones") or [],
"mobilePhone": c.get("mobilePhone"),
},
)
for c in result.get("value", [])
]
# ---------------------------------------------------------------------------
# MsftConnector (1:n)
# ---------------------------------------------------------------------------
class MsftConnector(ProviderConnector):
"""Microsoft ProviderConnector -- 1 connection → n services."""
_SERVICE_MAP = {
"sharepoint": SharepointAdapter,
"outlook": OutlookAdapter,
"teams": TeamsAdapter,
"onedrive": OneDriveAdapter,
"calendar": CalendarAdapter,
"contact": ContactsAdapter,
}
def getAvailableServices(self) -> List[str]:
return list(self._SERVICE_MAP.keys())
def getServiceAdapter(self, service: str) -> ServiceAdapter:
adapterClass = self._SERVICE_MAP.get(service)
if not adapterClass:
raise ValueError(f"Unknown MSFT service: {service}. Available: {list(self._SERVICE_MAP.keys())}")
return adapterClass(self.accessToken)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parseSharepointPath(path: str) -> tuple:
"""Parse a SharePoint path into (siteId, innerPath).
Expected format: /sites/<siteId>/<innerPath>
Also accepts bare siteId if no /sites/ prefix.
"""
if not path:
return ("", "")
clean = path.strip("/")
if clean.startswith("sites/"):
parts = clean.split("/", 2)
siteId = parts[1] if len(parts) > 1 else ""
innerPath = parts[2] if len(parts) > 2 else ""
return (siteId, innerPath)
parts = clean.split("/", 1)
return (parts[0], parts[1] if len(parts) > 1 else "")
def _matchFilter(entry: ExternalEntry, pattern: str) -> bool:
"""Simple glob-like filter (supports * wildcard)."""
import fnmatch
return fnmatch.fnmatch(entry.name.lower(), pattern.lower())
def _safeFileName(name: str) -> str:
"""Strip path-unsafe characters and trim length so the result is a usable file name."""
return re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name or "")[:80].strip(". ")
def _personLabel(contact: Dict[str, Any]) -> str:
given = (contact.get("givenName") or "").strip()
surname = (contact.get("surname") or "").strip()
if given or surname:
return f"{given} {surname}".strip()
company = (contact.get("companyName") or "").strip()
return company
def _icsEscape(value: str) -> str:
"""Escape RFC 5545 reserved characters in TEXT properties."""
if value is None:
return ""
return (
value.replace("\\", "\\\\")
.replace(";", "\\;")
.replace(",", "\\,")
.replace("\r\n", "\\n")
.replace("\n", "\\n")
)
def _icsDateTime(value: Optional[str]) -> Optional[str]:
"""Convert an ISO datetime string to an RFC 5545 DATE-TIME value (UTC)."""
if not value:
return None
try:
normalized = value.replace("Z", "+00:00") if value.endswith("Z") else value
dt = datetime.fromisoformat(normalized)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
except (TypeError, ValueError):
return None
def _eventToIcs(event: Dict[str, Any]) -> bytes:
"""Build a minimal RFC 5545 VCALENDAR/VEVENT for a Graph event payload."""
uid = event.get("iCalUId") or event.get("id") or "unknown@poweron"
summary = _icsEscape(event.get("subject") or "")
location = _icsEscape((event.get("location") or {}).get("displayName") or "")
body = (event.get("body") or {}).get("content") or ""
description = _icsEscape(body)
dtstart = _icsDateTime((event.get("start") or {}).get("dateTime"))
dtend = _icsDateTime((event.get("end") or {}).get("dateTime"))
dtstamp = _icsDateTime(event.get("lastModifiedDateTime")) or datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
lines = [
"BEGIN:VCALENDAR",
"VERSION:2.0",
"PRODID:-//PowerOn//MSFT-Calendar-Adapter//EN",
"CALSCALE:GREGORIAN",
"BEGIN:VEVENT",
f"UID:{uid}",
f"DTSTAMP:{dtstamp}",
]
if dtstart:
lines.append(f"DTSTART:{dtstart}")
if dtend:
lines.append(f"DTEND:{dtend}")
if summary:
lines.append(f"SUMMARY:{summary}")
if location:
lines.append(f"LOCATION:{location}")
if description:
lines.append(f"DESCRIPTION:{description}")
organizer = (event.get("organizer") or {}).get("emailAddress", {}).get("address")
if organizer:
lines.append(f"ORGANIZER:mailto:{organizer}")
for att in (event.get("attendees") or []):
addr = (att.get("emailAddress") or {}).get("address")
if addr:
lines.append(f"ATTENDEE:mailto:{addr}")
lines.append("END:VEVENT")
lines.append("END:VCALENDAR")
return ("\r\n".join(lines) + "\r\n").encode("utf-8")
def _contactToVcard(contact: Dict[str, Any]) -> bytes:
"""Build a vCard 3.0 from a Graph /me/contacts payload."""
given = contact.get("givenName") or ""
surname = contact.get("surname") or ""
middle = contact.get("middleName") or ""
fn = contact.get("displayName") or _personLabel(contact) or contact.get("companyName") or ""
lines = [
"BEGIN:VCARD",
"VERSION:3.0",
f"N:{surname};{given};{middle};;",
f"FN:{fn}",
]
if contact.get("companyName"):
org = contact["companyName"]
if contact.get("department"):
org = f"{org};{contact['department']}"
lines.append(f"ORG:{org}")
if contact.get("jobTitle"):
lines.append(f"TITLE:{contact['jobTitle']}")
for em in (contact.get("emailAddresses") or []):
addr = em.get("address")
if addr:
lines.append(f"EMAIL;TYPE=INTERNET:{addr}")
for phone in (contact.get("businessPhones") or []):
if phone:
lines.append(f"TEL;TYPE=WORK,VOICE:{phone}")
if contact.get("mobilePhone"):
lines.append(f"TEL;TYPE=CELL,VOICE:{contact['mobilePhone']}")
for phone in (contact.get("homePhones") or []):
if phone:
lines.append(f"TEL;TYPE=HOME,VOICE:{phone}")
def _appendAddress(addr: Dict[str, Any], typ: str) -> None:
if not addr:
return
street = addr.get("street") or ""
city = addr.get("city") or ""
state = addr.get("state") or ""
postal = addr.get("postalCode") or ""
country = addr.get("countryOrRegion") or ""
if any([street, city, state, postal, country]):
lines.append(f"ADR;TYPE={typ}:;;{street};{city};{state};{postal};{country}")
_appendAddress(contact.get("businessAddress") or {}, "WORK")
_appendAddress(contact.get("homeAddress") or {}, "HOME")
_appendAddress(contact.get("otherAddress") or {}, "OTHER")
if contact.get("personalNotes"):
lines.append(f"NOTE:{_icsEscape(contact['personalNotes'])}")
lines.append(f"UID:{contact.get('id', '')}")
lines.append("END:VCARD")
return ("\r\n".join(lines) + "\r\n").encode("utf-8")