gateway/modules/connectors/providerInfomaniak/connectorInfomaniak.py
2026-04-29 01:52:47 +02:00

961 lines
38 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Infomaniak ProviderConnector -- kDrive + Calendar + Contacts via PAT.
The PAT carries one or more of these scopes:
- ``drive`` -> kDrive (active here)
- ``workspace:calendar`` -> Calendar (active here)
- ``workspace:contact`` -> Contacts (active here)
- ``workspace:mail`` -> Mail (no public PAT-friendly endpoint yet)
Mail is intentionally NOT in ``_SERVICE_MAP`` until we find a
PAT-authenticated endpoint -- the public ``/1/mail`` and
``mail.infomaniak.com/api/pim/mail*`` routes either don't exist (404
nginx) or 302 to OAuth, so wiring a stub adapter would only confuse
users.
Path conventions (leading slash, ``ServiceAdapter`` paths always start with
``/``):
kDrive (api.infomaniak.com, requires ``account_id`` query arg):
/ -- list drives in the user's account
/{driveId} -- root folder of a drive
/{driveId}/{fileId} -- folder children OR file (download)
Calendar (calendar.infomaniak.com PIM):
/ -- list calendars accessible to the user
/{calendarId} -- events of one calendar
/{calendarId}/{eventId} -- single event (.ics download)
Contacts (contacts.infomaniak.com PIM):
/ -- list address books
/{addressBookId} -- contacts in that address book
/{addressBookId}/{contactId} -- single contact (.vcf download)
"""
import logging
import re
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, TypedDict
from urllib.parse import quote
import aiohttp
from modules.connectors.connectorProviderBase import (
ProviderConnector,
ServiceAdapter,
DownloadResult,
)
from modules.datamodels.datamodelDataSource import ExternalEntry
logger = logging.getLogger(__name__)
_API_BASE = "https://api.infomaniak.com"
_CALENDAR_BASE = "https://calendar.infomaniak.com"
_CONTACTS_BASE = "https://contacts.infomaniak.com"
_PIM_PREFIX = "/api/pim"
class InfomaniakOwnerIdentity(TypedDict):
"""Minimal identity payload for the PAT owner.
``accountId`` is the only field the kDrive adapter needs at runtime.
``displayName`` is harvested for the connection UI; both fields come
from the same PIM Owner record.
"""
accountId: int
displayName: Optional[str]
class InfomaniakIdentityError(RuntimeError):
"""Raised when no owner identity can be derived from a PAT."""
async def _infomaniakGet(
token: str,
endpoint: str,
baseUrl: str = _API_BASE,
) -> Dict[str, Any]:
"""Single GET against an Infomaniak host.
``endpoint`` is appended to ``baseUrl`` (handles leading slash). Returns
parsed JSON, or ``{'error': ...}`` for non-2xx / network failures.
"""
url = f"{baseUrl.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
timeout = aiohttp.ClientTimeout(total=20)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers, allow_redirects=False) as resp:
if resp.status in (200, 201):
return await resp.json()
errorText = await resp.text()
logger.warning(f"Infomaniak GET {url} -> {resp.status}: {errorText[:300]}")
return {"error": f"{resp.status}: {errorText[:200]}"}
except Exception as e:
logger.error(f"Infomaniak GET {url} crashed: {e}")
return {"error": str(e)}
async def _infomaniakDownload(
token: str,
endpoint: str,
baseUrl: str = _API_BASE,
) -> Optional[bytes]:
"""Binary download from an Infomaniak host. Returns bytes or ``None``.
Unlike :func:`_infomaniakGet`, this follows redirects: kDrive's
``/2/drive/{driveId}/files/{fileId}/download`` answers with
``302 -> presigned CDN URL`` (standard for bandwidth-heavy
transfers), and the same pattern shows up on Calendar/Contacts
export endpoints. Refusing to follow would lose every download.
The Authorization header is preserved across the redirect by
aiohttp because the host is the same Infomaniak property.
"""
url = f"{baseUrl.rstrip('/')}/{endpoint.lstrip('/')}"
headers = {"Authorization": f"Bearer {token}"}
timeout = aiohttp.ClientTimeout(total=120)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers, allow_redirects=True) as resp:
if resp.status == 200:
return await resp.read()
logger.warning(
f"Infomaniak download {url} -> {resp.status}: "
f"{(await resp.text())[:300]}"
)
return None
except Exception as e:
logger.error(f"Infomaniak download {url} crashed: {e}")
return None
def _unwrapData(payload: Any) -> Any:
"""Infomaniak wraps successful responses as ``{result: 'success', data: ...}``."""
if isinstance(payload, dict) and "data" in payload and "result" in payload:
return payload.get("data")
return payload
def _firstOwnerRecord(payload: Any, listKey: str) -> Optional[Dict[str, Any]]:
"""Pick the first user-owned record from a PIM list response.
Both PIM Calendar (``calendars``) and PIM Contacts (``addressbooks``)
return ``{result, data: {<listKey>: [...]}}``. Owner-records have a
positive numeric ``user_id`` and an integer ``account_id``; shared /
public records (e.g. holiday calendars) carry ``user_id = -1`` and
``account_id = null`` and are skipped.
"""
data = _unwrapData(payload) if payload else None
if not isinstance(data, dict):
return None
records = data.get(listKey)
if not isinstance(records, list):
return None
for rec in records:
if not isinstance(rec, dict):
continue
userId = rec.get("user_id")
accountId = rec.get("account_id")
if isinstance(userId, int) and userId > 0 and isinstance(accountId, int):
return rec
return None
async def resolveOwnerIdentity(token: str) -> InfomaniakOwnerIdentity:
"""Derive the PAT owner's display identity from PIM Calendar / Contacts.
Used purely for UI display on the connection (``externalUsername`` /
``externalId``). The PIM endpoints embed the kSuite ``account_id``
and the user's display name in their owner records, which is what
the ConnectionsPage shows.
Calendar is queried first because it is the more universally
provisioned PIM service; Contacts is the equivalent fallback.
Raises :class:`InfomaniakIdentityError` when neither yields an
owner record.
"""
sources = (
(_CALENDAR_BASE, f"{_PIM_PREFIX}/calendar", "calendars"),
(_CONTACTS_BASE, f"{_PIM_PREFIX}/addressbook", "addressbooks"),
)
for baseUrl, endpoint, listKey in sources:
payload = await _infomaniakGet(token, endpoint, baseUrl=baseUrl)
if isinstance(payload, dict) and payload.get("error"):
continue
owner = _firstOwnerRecord(payload, listKey)
if owner is None:
continue
return InfomaniakOwnerIdentity(
accountId=int(owner["account_id"]),
displayName=owner.get("name") or None,
)
raise InfomaniakIdentityError(
"Could not resolve Infomaniak owner identity from PIM Calendar or "
"Contacts. The PAT must carry 'workspace:calendar' or "
"'workspace:contact' so we can label the connection."
)
async def listAccessibleDrives(token: str) -> List[Dict[str, Any]]:
"""Return every kDrive the PAT can reach (admin OR user role).
Hits ``GET /2/drive/init?with=drives`` -- the only PAT-friendly
endpoint that enumerates a user's drives **independently of the
Drive-Manager admin role**. The plain ``/2/drive?account_id=...``
listing is filtered to drives where the caller is an admin and
therefore returns an empty array for everyone with ``role: user``,
even though the same user can read/write the drive's files via
``/2/drive/{driveId}/...``.
The endpoint requires only the ``drive`` PAT scope -- no
``accounts``, no ``user_info``, no admin permission. Each entry
matches the shape documented for ``GET /2/drive/{drive_id}``
(``id``, ``name``, ``account_id``, ``role``, ...).
Raises :class:`InfomaniakIdentityError` when the PAT does not carry
the ``drive`` scope or the response is malformed.
"""
payload = await _infomaniakGet(token, "/2/drive/init?with=drives")
if isinstance(payload, dict) and payload.get("error"):
raise InfomaniakIdentityError(
"Could not list Infomaniak kDrives. The PAT must carry the "
f"'drive' scope (/2/drive/init said: {payload['error']})."
)
data = _unwrapData(payload)
if not isinstance(data, dict):
raise InfomaniakIdentityError(
"Unexpected /2/drive/init response shape (expected an object)."
)
drives = data.get("drives") or []
if not isinstance(drives, list):
raise InfomaniakIdentityError(
"Unexpected /2/drive/init response: 'drives' is not a list."
)
return [d for d in drives if isinstance(d, dict) and d.get("id")]
def _lastNumericSegment(segments: List[str]) -> Optional[str]:
"""Return the last all-digit segment (kDrive file/folder IDs are int).
The agent sometimes appends the human-readable filename to a path,
e.g. ``/2980592/12/platform-overview.html``. The kDrive API does
not accept names -- only numeric IDs -- so we strip trailing
non-numeric segments and pick the last integer ID.
Returns ``None`` if no numeric segment exists.
"""
for seg in reversed(segments):
if seg.isdigit():
return seg
return None
class KdriveAdapter(ServiceAdapter):
"""kDrive ServiceAdapter -- browse drives, folders, files.
Drive enumeration goes through :func:`listAccessibleDrives` which
calls ``/2/drive/init?with=drives``. That endpoint returns every
drive the PAT can read regardless of the Drive-Manager admin role
-- unlike the documented ``/2/drive?account_id=...`` listing which
silently returns an empty array for users with ``role: 'user'``
(the most common case for kSuite members).
The drive list is cached on the adapter instance so each browse
pays for one ``/2/drive/init`` call at most.
File-vs-folder handling: a DataSource may point at a single file
(e.g. ``/{driveId}/{fileId}`` where ``fileId`` is a regular file).
Calling ``/files/{fileId}/files`` on a file answers
``400 destination_not_a_directory`` -- so :meth:`browse` first
fetches the item's metadata and, if ``type=file``, returns a
one-element list describing the file itself instead of pretending
the directory is empty.
"""
def __init__(self, accessToken: str):
self._token = accessToken
self._drives: Optional[List[Dict[str, Any]]] = None
async def _ensureDrives(self) -> List[Dict[str, Any]]:
if self._drives is not None:
return self._drives
self._drives = await listAccessibleDrives(self._token)
return self._drives
async def _fetchItemMeta(self, driveId: str, fileId: str) -> Optional[Dict[str, Any]]:
"""Return the kDrive file/folder metadata dict, or ``None`` on error."""
meta = await _infomaniakGet(self._token, f"/2/drive/{driveId}/files/{fileId}")
if not isinstance(meta, dict) or meta.get("error"):
return None
data = _unwrapData(meta)
return data if isinstance(data, dict) else None
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
cleanPath = (path or "").strip("/")
segments = [s for s in cleanPath.split("/") if s]
if not segments:
return await self._listDrives()
driveId = segments[0]
if len(segments) == 1:
return await self._listChildren(driveId, fileId=None, limit=limit)
fileId = _lastNumericSegment(segments[1:])
if fileId is None:
return []
meta = await self._fetchItemMeta(driveId, fileId)
if meta is not None and meta.get("type") == "file":
return [ExternalEntry(
name=meta.get("name") or fileId,
path=f"/{driveId}/{fileId}",
isFolder=False,
size=meta.get("size"),
mimeType=meta.get("mime_type"),
lastModified=meta.get("last_modified_at"),
metadata={"id": fileId, "kind": "file"},
)]
return await self._listChildren(driveId, fileId=fileId, limit=limit)
async def _listDrives(self) -> List[ExternalEntry]:
drives = await self._ensureDrives()
entries: List[ExternalEntry] = []
for drive in drives:
driveId = str(drive.get("id", ""))
if not driveId:
continue
entries.append(ExternalEntry(
name=drive.get("name") or driveId,
path=f"/{driveId}",
isFolder=True,
metadata={
"id": driveId,
"kind": "drive",
"accountId": drive.get("account_id"),
"role": drive.get("role"),
},
))
return entries
async def _listChildren(
self,
driveId: str,
fileId: Optional[str],
limit: Optional[int],
) -> List[ExternalEntry]:
if fileId is None:
endpoint = f"/2/drive/{driveId}/files"
else:
endpoint = f"/2/drive/{driveId}/files/{fileId}/files"
pageSize = max(1, min(int(limit or 200), 1000))
endpoint = f"{endpoint}?per_page={pageSize}"
result = await _infomaniakGet(self._token, endpoint)
if isinstance(result, dict) and result.get("error"):
logger.warning(
f"kDrive list-children {driveId}/{fileId or 'root'} failed: {result['error']}"
)
return []
data = _unwrapData(result)
items = data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else []
entries: List[ExternalEntry] = []
for item in items:
itemId = str(item.get("id", ""))
if not itemId:
continue
isFolder = item.get("type") == "dir"
entries.append(ExternalEntry(
name=item.get("name", itemId),
path=f"/{driveId}/{itemId}",
isFolder=isFolder,
size=item.get("size") if not isFolder else None,
mimeType=item.get("mime_type") if not isFolder else None,
lastModified=item.get("last_modified_at"),
metadata={"id": itemId, "kind": item.get("type", "")},
))
return entries
async def download(self, path: str) -> DownloadResult:
segments = [s for s in (path or "").strip("/").split("/") if s]
if len(segments) < 2:
return DownloadResult()
driveId = segments[0]
# Agent may append the filename: ``/{driveId}/{fileId}/{name}``.
# Pull the last numeric segment instead of trusting segments[-1].
fileId = _lastNumericSegment(segments[1:])
if fileId is None:
return DownloadResult()
meta = await self._fetchItemMeta(driveId, fileId)
fileName = (meta or {}).get("name") or fileId
mimeType = (meta or {}).get("mime_type") or "application/octet-stream"
content = await _infomaniakDownload(
self._token, f"/2/drive/{driveId}/files/{fileId}/download"
)
if content is None:
return DownloadResult()
return DownloadResult(data=content, fileName=fileName, mimeType=mimeType)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "kDrive upload not yet implemented"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
segments = [s for s in (path or "").strip("/").split("/") if s]
if not segments:
drives = await self._listDrives()
if not drives:
return []
driveId = (drives[0].metadata or {}).get("id") or drives[0].path.strip("/")
else:
driveId = segments[0]
pageSize = max(1, min(int(limit or 50), 200))
endpoint = f"/2/drive/{driveId}/files/search?query={query}&per_page={pageSize}"
result = await _infomaniakGet(self._token, endpoint)
if isinstance(result, dict) and result.get("error"):
return []
data = _unwrapData(result)
items = data if isinstance(data, list) else data.get("items", []) if isinstance(data, dict) else []
entries: List[ExternalEntry] = []
for item in items:
itemId = str(item.get("id", ""))
if not itemId:
continue
isFolder = item.get("type") == "dir"
entries.append(ExternalEntry(
name=item.get("name", itemId),
path=f"/{driveId}/{itemId}",
isFolder=isFolder,
size=item.get("size") if not isFolder else None,
mimeType=item.get("mime_type") if not isFolder else None,
metadata={"id": itemId},
))
return entries
def _safeFileName(label: str, fallback: str) -> str:
"""Sanitize a string for use as a filename. Trims and caps at 80 chars."""
cleaned = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", str(label or "")).strip(". ")
return cleaned[:80] or fallback
class CalendarAdapter(ServiceAdapter):
"""Infomaniak Calendar adapter -- browse calendars + events, .ics download.
Uses the public PIM endpoints at ``calendar.infomaniak.com/api/pim``,
which authenticate with the PAT scope ``workspace:calendar``.
Path layout:
``/`` -> list calendars
``/{calendarId}`` -> list events of that calendar
``/{calendarId}/{eventId}`` -> single event (download as .ics)
Endpoint particulars:
Listing events runs against ``/api/pim/event`` with the calendar
id as a query arg (the per-calendar nested route
``/calendar/{id}/event`` is **not** PAT-friendly -- it 302s to the
OAuth login page). Infomaniak enforces a hard ``from``/``to``
window of less than 3 months, so this adapter queries a fixed
90-day window centered on today (30 days back, 60 days forward),
which covers typical UDB browsing. Date format is ``Y-m-d H:i:s``.
Event detail and ``.ics`` export are addressed by event id alone
(``/api/pim/event/{eventId}`` and ``.../export``); the calendar
id from the path is kept only for tree-navigation continuity.
"""
# Vendor enforces ``Range must be lower than 3 months``. We stay
# comfortably below to keep one call per browse.
_PAST_DAYS = 30
_FUTURE_DAYS = 60
def __init__(self, accessToken: str):
self._token = accessToken
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
segments = [s for s in (path or "").strip("/").split("/") if s]
if not segments:
return await self._listCalendars()
if len(segments) == 1:
return await self._listEvents(segments[0], limit=limit)
return []
async def _listCalendars(self) -> List[ExternalEntry]:
result = await _infomaniakGet(
self._token, f"{_PIM_PREFIX}/calendar", baseUrl=_CALENDAR_BASE
)
if isinstance(result, dict) and result.get("error"):
logger.warning(f"Calendar list-calendars failed: {result['error']}")
return []
data = _unwrapData(result)
calendars = data.get("calendars", []) if isinstance(data, dict) else []
entries: List[ExternalEntry] = []
for cal in calendars:
calId = str(cal.get("id", ""))
if not calId:
continue
isShared = (cal.get("user_id") or 0) <= 0 or cal.get("account_id") is None
entries.append(ExternalEntry(
name=cal.get("name") or calId,
path=f"/{calId}",
isFolder=True,
metadata={
"id": calId,
"kind": "calendar",
"color": cal.get("color"),
"shared": isShared,
"default": bool(cal.get("default")),
},
))
return entries
def _eventWindow(self) -> tuple:
now = datetime.now(timezone.utc)
fromStr = (now - timedelta(days=self._PAST_DAYS)).strftime("%Y-%m-%d %H:%M:%S")
toStr = (now + timedelta(days=self._FUTURE_DAYS)).strftime("%Y-%m-%d %H:%M:%S")
return fromStr, toStr
async def _listEvents(
self,
calendarId: str,
limit: Optional[int],
) -> List[ExternalEntry]:
fromStr, toStr = self._eventWindow()
endpoint = (
f"{_PIM_PREFIX}/event"
f"?calendar_id={calendarId}"
f"&from={quote(fromStr)}"
f"&to={quote(toStr)}"
)
result = await _infomaniakGet(self._token, endpoint, baseUrl=_CALENDAR_BASE)
if isinstance(result, dict) and result.get("error"):
logger.warning(f"Calendar list-events {calendarId} failed: {result['error']}")
return []
data = _unwrapData(result)
events = data if isinstance(data, list) else data.get("events", []) if isinstance(data, dict) else []
entries: List[ExternalEntry] = []
for ev in events:
evId = str(ev.get("id") or ev.get("uid") or "")
if not evId:
continue
title = ev.get("title") or ev.get("summary") or "(no title)"
entries.append(ExternalEntry(
name=title,
path=f"/{calendarId}/{evId}",
isFolder=False,
metadata={
"id": evId,
"kind": "event",
"start": ev.get("start"),
"end": ev.get("end"),
"location": ev.get("location"),
"updated": ev.get("updated_at"),
},
))
if limit is not None:
return entries[: int(limit)]
return entries
async def download(self, path: str) -> DownloadResult:
segments = [s for s in (path or "").strip("/").split("/") if s]
if len(segments) < 2:
return DownloadResult()
eventId = segments[1]
content = await _infomaniakDownload(
self._token,
f"{_PIM_PREFIX}/event/{eventId}/export",
baseUrl=_CALENDAR_BASE,
)
if content is None:
return DownloadResult()
title = eventId
meta = await _infomaniakGet(
self._token,
f"{_PIM_PREFIX}/event/{eventId}",
baseUrl=_CALENDAR_BASE,
)
if isinstance(meta, dict) and not meta.get("error"):
unwrapped = _unwrapData(meta)
if isinstance(unwrapped, dict):
event = unwrapped.get("event") if "event" in unwrapped else unwrapped
if isinstance(event, dict):
title = event.get("title") or event.get("summary") or eventId
return DownloadResult(
data=content,
fileName=f"{_safeFileName(title, 'event')}.ics",
mimeType="text/calendar",
)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "Calendar upload not yet implemented"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
# The PIM Calendar API has no public search endpoint we can rely on.
# Cheap fallback: list events in the current calendar (or all of
# them) within the default window and filter case-insensitively on
# title/location.
calendars = (
await self._listCalendars()
if not path
else [ExternalEntry(name="", path=path, isFolder=True)]
)
if not calendars:
return []
needle = (query or "").strip().lower()
results: List[ExternalEntry] = []
for cal in calendars:
calId = (cal.metadata or {}).get("id") or cal.path.strip("/")
for ev in await self._listEvents(calId, limit=limit):
hay = " ".join(
str(v) for v in (
ev.name,
(ev.metadata or {}).get("location") or "",
)
).lower()
if not needle or needle in hay:
results.append(ev)
if limit is not None and len(results) >= int(limit):
break
return results[: int(limit)] if limit is not None else results
def _vcardEscape(value: Any) -> str:
"""Escape a value for vCard 3.0 -- backslash, comma, semicolon, newline."""
text = "" if value is None else str(value)
return (
text.replace("\\", "\\\\")
.replace(";", "\\;")
.replace(",", "\\,")
.replace("\r\n", "\\n")
.replace("\n", "\\n")
)
def _renderInfomaniakVcard(record: Dict[str, Any]) -> str:
"""Render an Infomaniak contact record as a vCard 3.0 string.
The Contacts PIM ``/contact/{id}/export`` endpoint is not PAT-friendly
(302s to the OAuth login page), and ``/contact/{id}`` returns 500 with
a PAT, so we cannot retrieve the canonical .vcf or detail blob from
Infomaniak. Instead we synthesize a vCard 3.0 payload from the
listing record fetched with ``with=emails,phones,addresses,details``.
vCard 3.0 is the common-denominator format universally accepted by
Outlook, Google Contacts, Apple Contacts and Thunderbird (4.0 still
has poor Outlook import compatibility).
"""
firstname = record.get("firstname") or ""
lastname = record.get("lastname") or ""
fullName = (
record.get("name")
or " ".join(p for p in (firstname, lastname) if p).strip()
or "Contact"
)
organization = record.get("organization") or ""
note = record.get("note") or ""
emails = record.get("emails") or []
phones = record.get("phones") or []
addresses = record.get("addresses") or []
websites = record.get("websites") or []
lines = ["BEGIN:VCARD", "VERSION:3.0"]
# N: Last;First;Middle;Prefix;Suffix
lines.append(f"N:{_vcardEscape(lastname)};{_vcardEscape(firstname)};;;")
lines.append(f"FN:{_vcardEscape(fullName)}")
if organization:
lines.append(f"ORG:{_vcardEscape(organization)}")
for email in emails:
if isinstance(email, str) and email:
lines.append(f"EMAIL;TYPE=INTERNET:{_vcardEscape(email)}")
elif isinstance(email, dict) and email.get("address"):
lines.append(f"EMAIL;TYPE=INTERNET:{_vcardEscape(email['address'])}")
for phone in phones:
if isinstance(phone, str) and phone:
lines.append(f"TEL:{_vcardEscape(phone)}")
elif isinstance(phone, dict) and phone.get("number"):
lines.append(f"TEL:{_vcardEscape(phone['number'])}")
for addr in addresses:
if isinstance(addr, dict):
# ADR: PO-Box;Extended;Street;City;Region;Postal;Country
lines.append(
"ADR:;;"
f"{_vcardEscape(addr.get('street'))};"
f"{_vcardEscape(addr.get('city'))};"
f"{_vcardEscape(addr.get('region'))};"
f"{_vcardEscape(addr.get('zip') or addr.get('postal_code'))};"
f"{_vcardEscape(addr.get('country'))}"
)
for site in websites:
if isinstance(site, str) and site:
lines.append(f"URL:{_vcardEscape(site)}")
elif isinstance(site, dict) and site.get("url"):
lines.append(f"URL:{_vcardEscape(site['url'])}")
if note:
lines.append(f"NOTE:{_vcardEscape(note)}")
lines.append("END:VCARD")
return "\r\n".join(lines) + "\r\n"
class ContactAdapter(ServiceAdapter):
"""Infomaniak Contacts adapter -- browse address books + contacts, .vcf download.
Uses the public PIM endpoint at ``contacts.infomaniak.com/api/pim``,
which authenticates with the PAT scope ``workspace:contact``.
Path layout:
``/`` -> list address books
``/{addressBookId}`` -> list contacts in that book
``/{addressBookId}/{contactId}`` -> single contact (download as .vcf)
Endpoint particulars:
Listing both address books and contacts is PAT-friendly. The
contact-listing call uses ``with=emails,phones,addresses,details``
so each record arrives with all the fields needed for vCard
synthesis -- Infomaniak skips them by default. Detail and export
endpoints (``/contact/{id}``, ``/contact/{id}/export``) are **not**
PAT-friendly (the former 500s, the latter 302s to OAuth), so the
``download`` path re-fetches the listing and renders the vCard
ourselves via :func:`_renderInfomaniakVcard`.
"""
_DEFAULT_CONTACT_LIMIT = 200
_MAX_CONTACT_LIMIT = 1000
_CONTACT_FIELDS = "emails,phones,addresses,details"
def __init__(self, accessToken: str):
self._token = accessToken
async def browse(
self,
path: str,
filter: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
segments = [s for s in (path or "").strip("/").split("/") if s]
if not segments:
return await self._listAddressBooks()
if len(segments) == 1:
return await self._listContacts(segments[0], limit=limit)
return []
async def _listAddressBooks(self) -> List[ExternalEntry]:
result = await _infomaniakGet(
self._token, f"{_PIM_PREFIX}/addressbook", baseUrl=_CONTACTS_BASE
)
if isinstance(result, dict) and result.get("error"):
logger.warning(f"Contacts list-addressbooks failed: {result['error']}")
return []
data = _unwrapData(result)
books = data.get("addressbooks", []) if isinstance(data, dict) else []
entries: List[ExternalEntry] = []
for book in books:
bookId = str(book.get("id", ""))
if not bookId:
continue
isShared = bool(book.get("is_shared")) or (book.get("user_id") or 0) <= 0
# The shared organisation directory has an empty name -- give it a
# human label so the UDB tree is not blank.
name = book.get("name") or (
"Organisation" if book.get("is_dynamic_organisation_member_directory") else bookId
)
entries.append(ExternalEntry(
name=name,
path=f"/{bookId}",
isFolder=True,
metadata={
"id": bookId,
"kind": "addressbook",
"color": book.get("color"),
"shared": isShared,
"default": bool(book.get("default")),
},
))
return entries
async def _fetchContacts(
self,
addressBookId: str,
perPage: int,
) -> List[Dict[str, Any]]:
"""Raw listing call -- shared by browse and download."""
endpoint = (
f"{_PIM_PREFIX}/addressbook/{addressBookId}/contact"
f"?per_page={perPage}&with={self._CONTACT_FIELDS}"
)
result = await _infomaniakGet(self._token, endpoint, baseUrl=_CONTACTS_BASE)
if isinstance(result, dict) and result.get("error"):
logger.warning(
f"Contacts list-contacts {addressBookId} failed: {result['error']}"
)
return []
data = _unwrapData(result)
if isinstance(data, list):
return [c for c in data if isinstance(c, dict)]
if isinstance(data, dict):
contacts = data.get("contacts", [])
return [c for c in contacts if isinstance(c, dict)]
return []
async def _listContacts(
self,
addressBookId: str,
limit: Optional[int],
) -> List[ExternalEntry]:
effectiveLimit = self._DEFAULT_CONTACT_LIMIT if limit is None else max(
1, min(int(limit), self._MAX_CONTACT_LIMIT),
)
contacts = await self._fetchContacts(addressBookId, perPage=effectiveLimit)
entries: List[ExternalEntry] = []
for c in contacts:
cId = str(c.get("id") or c.get("uid") or "")
if not cId:
continue
firstName = c.get("firstname")
lastName = c.get("lastname")
displayName = (
c.get("name")
or " ".join(p for p in (firstName, lastName) if p).strip()
or (c.get("emails") or [None])[0]
or cId
)
firstEmail = (c.get("emails") or [None])[0]
firstPhone = (c.get("phones") or [None])[0]
entries.append(ExternalEntry(
name=str(displayName),
path=f"/{addressBookId}/{cId}",
isFolder=False,
metadata={
"id": cId,
"kind": "contact",
"email": firstEmail,
"phone": firstPhone,
"organization": c.get("organization"),
},
))
return entries
async def download(self, path: str) -> DownloadResult:
segments = [s for s in (path or "").strip("/").split("/") if s]
if len(segments) < 2:
return DownloadResult()
addressBookId, contactId = segments[0], segments[1]
# The PIM contact-detail endpoint (``/contact/{id}``) returns 500
# against a PAT, and ``/contact/{id}/export`` 302s to OAuth. We
# therefore re-fetch the listing (which IS PAT-friendly) with all
# vCard-relevant fields, then synthesize the .vcf ourselves.
contacts = await self._fetchContacts(
addressBookId, perPage=self._MAX_CONTACT_LIMIT
)
record = next((c for c in contacts if str(c.get("id")) == contactId), None)
if record is None:
logger.warning(
f"Contacts download: contact {contactId} not found in book "
f"{addressBookId}"
)
return DownloadResult()
firstName = record.get("firstname") or ""
lastName = record.get("lastname") or ""
displayName = (
record.get("name")
or " ".join(p for p in (firstName, lastName) if p).strip()
or contactId
)
vcardText = _renderInfomaniakVcard(record)
return DownloadResult(
data=vcardText.encode("utf-8"),
fileName=f"{_safeFileName(displayName, 'contact')}.vcf",
mimeType="text/vcard",
)
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
return {"error": "Contacts upload not yet implemented"}
async def search(
self,
query: str,
path: Optional[str] = None,
limit: Optional[int] = None,
) -> List[ExternalEntry]:
# No public search endpoint -- list contacts of the current (or all)
# address books and filter client-side on display name / email.
books = (
await self._listAddressBooks()
if not path
else [ExternalEntry(name="", path=path, isFolder=True)]
)
if not books:
return []
needle = (query or "").strip().lower()
results: List[ExternalEntry] = []
for book in books:
bookId = (book.metadata or {}).get("id") or book.path.strip("/")
for c in await self._listContacts(bookId, limit=limit):
hay = " ".join(
str(v) for v in (
c.name,
(c.metadata or {}).get("email") or "",
(c.metadata or {}).get("organization") or "",
)
).lower()
if not needle or needle in hay:
results.append(c)
if limit is not None and len(results) >= int(limit):
break
return results[: int(limit)] if limit is not None else results
class InfomaniakConnector(ProviderConnector):
"""Infomaniak ProviderConnector -- kDrive + Calendar + Contacts today.
Mail is reserved on the PAT (scope ``workspace:mail``) but not wired
up here yet -- Infomaniak has no public PAT-friendly Mail endpoint
today (the PIM Mail routes 302 to OAuth, the legacy ``/api/mail`` route
301-redirects to an internal Cyrus port). Once a working endpoint is
found, the corresponding adapter can be slotted into ``_SERVICE_MAP``
without any token rotation on the user side.
"""
_SERVICE_MAP = {
"kdrive": KdriveAdapter,
"calendar": CalendarAdapter,
"contact": ContactAdapter,
}
def getAvailableServices(self) -> List[str]:
return list(self._SERVICE_MAP.keys())
def getServiceAdapter(self, service: str) -> ServiceAdapter:
adapterClass = self._SERVICE_MAP.get(service)
if not adapterClass:
raise ValueError(
f"Unknown Infomaniak service: {service}. "
f"Available: {list(self._SERVICE_MAP.keys())}"
)
return adapterClass(self.accessToken)