940 lines
39 KiB
Python
940 lines
39 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""Google ProviderConnector -- Drive and Gmail via Google OAuth."""
|
|
|
|
import asyncio
|
|
import base64
|
|
import logging
|
|
import re
|
|
import urllib.parse
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import aiohttp
|
|
|
|
from modules.connectors.connectorProviderBase import ProviderConnector, ServiceAdapter, DownloadResult
|
|
from modules.shared.httpResilience import ResilientHttp
|
|
from modules.datamodels.datamodelDataSource import ExternalEntry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_http = ResilientHttp("Google", maxConcurrent=8, defaultTimeoutS=20)
|
|
|
|
_DRIVE_BASE = "https://www.googleapis.com/drive/v3"
|
|
_GMAIL_BASE = "https://gmail.googleapis.com/gmail/v1"
|
|
_CALENDAR_BASE = "https://www.googleapis.com/calendar/v3"
|
|
_PEOPLE_BASE = "https://people.googleapis.com/v1"
|
|
|
|
|
|
def _parseGoogleDateRange(text: Optional[str]) -> tuple:
|
|
"""Parse a date range from a filter/query string for Calendar timeMin/timeMax.
|
|
|
|
Supports two ISO dates, a single ISO date (~31 day window) or a YYYY-MM
|
|
month pattern. Returns RFC3339 UTC strings (timeMin, timeMax) or (None, None).
|
|
"""
|
|
if not text:
|
|
return (None, None)
|
|
|
|
def _toRfc3339(value: str) -> str:
|
|
value = value.strip().rstrip("Z")
|
|
if "T" not in value:
|
|
value = f"{value}T00:00:00"
|
|
return f"{value}Z"
|
|
|
|
isoMatch = re.findall(r'\d{4}-\d{2}-\d{2}(?:T[\d:]+)?', text)
|
|
if len(isoMatch) >= 2:
|
|
return (_toRfc3339(isoMatch[0]), _toRfc3339(isoMatch[1]))
|
|
if len(isoMatch) == 1:
|
|
try:
|
|
dt = datetime.fromisoformat(isoMatch[0])
|
|
return (_toRfc3339(isoMatch[0]), _toRfc3339((dt + timedelta(days=31)).strftime('%Y-%m-%dT00:00:00')))
|
|
except ValueError:
|
|
pass
|
|
monthMatch = re.match(r'^(\d{4})-(\d{2})$', text.strip())
|
|
if monthMatch:
|
|
year, month = int(monthMatch.group(1)), int(monthMatch.group(2))
|
|
start = f"{year}-{month:02d}-01T00:00:00"
|
|
end = f"{year + 1}-01-01T00:00:00" if month == 12 else f"{year}-{month + 1:02d}-01T00:00:00"
|
|
return (_toRfc3339(start), _toRfc3339(end))
|
|
return (None, None)
|
|
|
|
|
|
async def googleGet(token: str, url: str) -> Dict[str, Any]:
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
return await _http.getJson(url, headers=headers)
|
|
|
|
|
|
def _raiseGoogleError(result: Dict[str, Any], ctx: str) -> None:
|
|
"""Raise a clear error for a failed Google API response.
|
|
|
|
Browse/search must NOT swallow API failures into an empty result list, which
|
|
masks a real error as 'empty'. Callers wrap these in try/except.
|
|
"""
|
|
err = result.get("error") if isinstance(result, dict) else None
|
|
logger.warning("Google error (%s): %s", ctx, err or result)
|
|
raise RuntimeError(f"Google error ({ctx}): {err or result}")
|
|
|
|
|
|
class DriveAdapter(ServiceAdapter):
|
|
"""Google Drive ServiceAdapter -- browse files and folders."""
|
|
|
|
def __init__(self, accessToken: str):
|
|
self._token = accessToken
|
|
|
|
async def browse(
|
|
self,
|
|
path: str,
|
|
filter: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
folderId = (path or "").strip("/") or "root"
|
|
query = f"'{folderId}' in parents and trashed=false"
|
|
fields = "files(id,name,mimeType,size,modifiedTime,parents)"
|
|
pageSize = max(1, min(int(limit or 100), 1000))
|
|
url = f"{_DRIVE_BASE}/files?q={query}&fields={fields}&pageSize={pageSize}&orderBy=folder,name"
|
|
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google Drive browse")
|
|
|
|
entries = []
|
|
for f in result.get("files", []):
|
|
isFolder = f.get("mimeType") == "application/vnd.google-apps.folder"
|
|
entries.append(ExternalEntry(
|
|
name=f.get("name", ""),
|
|
path=f"/{f.get('id', '')}",
|
|
isFolder=isFolder,
|
|
size=int(f.get("size", 0)) if f.get("size") else None,
|
|
mimeType=f.get("mimeType") if not isFolder else None,
|
|
metadata={"id": f.get("id"), "modifiedTime": f.get("modifiedTime")},
|
|
))
|
|
return entries
|
|
|
|
_EXPORT_MIME_MAP = {
|
|
"application/vnd.google-apps.document": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/vnd.google-apps.spreadsheet": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.google-apps.presentation": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"application/vnd.google-apps.drawing": "application/pdf",
|
|
}
|
|
|
|
async def download(self, path: str) -> bytes:
|
|
fileId = (path or "").strip("/")
|
|
if not fileId:
|
|
return b""
|
|
headers = {"Authorization": f"Bearer {self._token}"}
|
|
dlTimeout = aiohttp.ClientTimeout(total=60)
|
|
try:
|
|
url = f"{_DRIVE_BASE}/files/{fileId}?alt=media"
|
|
data = await _http.getBytes(url, headers=headers, timeout=dlTimeout)
|
|
if data is not None:
|
|
return data
|
|
logger.debug(f"Google Drive direct download returned None for {fileId}")
|
|
|
|
metaUrl = f"{_DRIVE_BASE}/files/{fileId}?fields=mimeType,name"
|
|
meta = await _http.getJson(metaUrl, headers=headers)
|
|
if "error" in meta:
|
|
logger.warning(f"Google Drive metadata fetch failed for {fileId}: {meta['error']}")
|
|
return b""
|
|
fileMime = meta.get("mimeType", "")
|
|
fileName = meta.get("name", fileId)
|
|
|
|
exportMime = self._EXPORT_MIME_MAP.get(fileMime)
|
|
if not exportMime:
|
|
logger.warning(f"Google Drive: unsupported mimeType '{fileMime}' for file '{fileName}' ({fileId})")
|
|
return b""
|
|
|
|
exportUrl = f"{_DRIVE_BASE}/files/{fileId}/export?mimeType={exportMime}"
|
|
logger.info(f"Google Drive: exporting '{fileName}' as {exportMime}")
|
|
exported = await _http.getBytes(exportUrl, headers=headers, timeout=dlTimeout)
|
|
if exported is not None:
|
|
return exported
|
|
logger.warning(f"Google Drive export failed for '{fileName}'")
|
|
except Exception as e:
|
|
logger.error(f"Google Drive download failed for {fileId}: {e}")
|
|
return b""
|
|
|
|
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
|
|
return {"error": "Google Drive upload not yet implemented"}
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
path: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
safeQuery = query.replace("\\", "\\\\").replace("'", "\\'")
|
|
folderId = (path or "").strip("/")
|
|
# `fullText contains` matches file name AND content (and some metadata),
|
|
# which is what users expect from a search -- not just the file name.
|
|
qParts = [f"fullText contains '{safeQuery}'", "trashed=false"]
|
|
if folderId:
|
|
qParts.append(f"'{folderId}' in parents")
|
|
qStr = " and ".join(qParts)
|
|
effectiveLimit = max(1, int(limit)) if limit is not None else None
|
|
pageSize = min(effectiveLimit or 100, 1000)
|
|
logger.debug(f"Google Drive search: q={qStr}")
|
|
entries: List[ExternalEntry] = []
|
|
pageToken: Optional[str] = None
|
|
hardCap = effectiveLimit or 1000
|
|
while len(entries) < hardCap:
|
|
params = {
|
|
"q": qStr,
|
|
"fields": "nextPageToken,files(id,name,mimeType,size,modifiedTime)",
|
|
"pageSize": str(pageSize),
|
|
}
|
|
if pageToken:
|
|
params["pageToken"] = pageToken
|
|
url = f"{_DRIVE_BASE}/files?{urllib.parse.urlencode(params)}"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
if not entries:
|
|
_raiseGoogleError(result, "Google Drive search")
|
|
break
|
|
for f in result.get("files", []):
|
|
entries.append(ExternalEntry(
|
|
name=f.get("name", ""),
|
|
path=f"/{f.get('id', '')}",
|
|
isFolder=f.get("mimeType") == "application/vnd.google-apps.folder",
|
|
size=int(f.get("size", 0)) if f.get("size") else None,
|
|
mimeType=f.get("mimeType"),
|
|
metadata={"id": f.get("id"), "modifiedTime": f.get("modifiedTime")},
|
|
))
|
|
if len(entries) >= hardCap:
|
|
break
|
|
pageToken = result.get("nextPageToken")
|
|
if not pageToken:
|
|
break
|
|
if effectiveLimit is not None:
|
|
entries = entries[:effectiveLimit]
|
|
return entries
|
|
|
|
|
|
class GmailAdapter(ServiceAdapter):
|
|
"""Gmail ServiceAdapter -- browse labels and messages."""
|
|
|
|
def __init__(self, accessToken: str):
|
|
self._token = accessToken
|
|
|
|
_DEFAULT_MESSAGE_LIMIT = 100
|
|
_MAX_MESSAGE_LIMIT = 1000
|
|
_METADATA_FETCH_CAP = 200
|
|
|
|
async def browse(
|
|
self,
|
|
path: str,
|
|
filter: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list:
|
|
cleanPath = (path or "").strip("/")
|
|
|
|
if not cleanPath:
|
|
url = f"{_GMAIL_BASE}/users/me/labels"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Gmail labels")
|
|
_SYSTEM_LABELS = {"INBOX", "SENT", "DRAFT", "TRASH", "SPAM", "STARRED", "IMPORTANT"}
|
|
labels = []
|
|
for lbl in result.get("labels", []):
|
|
labelId = lbl.get("id", "")
|
|
labelName = lbl.get("name", labelId)
|
|
if lbl.get("type") == "system" and labelId not in _SYSTEM_LABELS:
|
|
continue
|
|
labels.append(ExternalEntry(
|
|
name=labelName,
|
|
path=f"/{labelId}",
|
|
isFolder=True,
|
|
metadata={"id": labelId, "type": lbl.get("type", "")},
|
|
))
|
|
labels.sort(key=lambda e: (0 if e.metadata.get("type") == "system" else 1, e.name))
|
|
return labels
|
|
|
|
effectiveLimit = self._DEFAULT_MESSAGE_LIMIT if limit is None else max(1, min(int(limit), self._MAX_MESSAGE_LIMIT))
|
|
labelId = await self._resolveLabelId(cleanPath)
|
|
if not labelId:
|
|
raise ValueError(
|
|
f"Gmail label not found: '{cleanPath}'. Browse the mailbox root ('/') "
|
|
f"to list available labels."
|
|
)
|
|
msgIds, totalEstimate = await self._listMessageIds(
|
|
params={"labelIds": labelId}, limit=effectiveLimit,
|
|
)
|
|
entries = await self._fetchMessageEntries(
|
|
msgIds[:self._METADATA_FETCH_CAP], labelPath=labelId,
|
|
)
|
|
if totalEstimate and totalEstimate > len(msgIds):
|
|
entries.append(ExternalEntry(
|
|
name=f"(~{totalEstimate} total messages estimated, {len(msgIds)} listed)",
|
|
path=f"/{labelId}/_count", isFolder=False,
|
|
metadata={"totalEstimate": totalEstimate, "listed": len(msgIds)},
|
|
))
|
|
elif len(msgIds) > self._METADATA_FETCH_CAP:
|
|
entries.append(ExternalEntry(
|
|
name=f"({len(msgIds)} messages listed, metadata shown for first {self._METADATA_FETCH_CAP})",
|
|
path=f"/{labelId}/_count", isFolder=False,
|
|
metadata={"listed": len(msgIds), "metadataShown": self._METADATA_FETCH_CAP},
|
|
))
|
|
return entries
|
|
|
|
async def _resolveLabelId(self, ref: str) -> Optional[str]:
|
|
"""Resolve a Gmail label reference (display name / system name / id) to a
|
|
label id. Returns None if nothing matches so the caller can raise a clear
|
|
error instead of querying with an invalid label."""
|
|
if not ref:
|
|
return None
|
|
r = ref.strip()
|
|
result = await googleGet(self._token, f"{_GMAIL_BASE}/users/me/labels")
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Gmail labels")
|
|
labels = result.get("labels", [])
|
|
# 1) exact id match (already-resolved id passes through)
|
|
for lbl in labels:
|
|
if lbl.get("id") == r:
|
|
return r
|
|
# 2) case-insensitive display-name match
|
|
for lbl in labels:
|
|
if (lbl.get("name") or "").strip().lower() == r.lower():
|
|
return lbl.get("id")
|
|
# 3) system label by uppercased name (INBOX, SENT, ...)
|
|
up = r.upper()
|
|
for lbl in labels:
|
|
if lbl.get("id") == up:
|
|
return up
|
|
return None
|
|
|
|
async def _listMessageIds(
|
|
self, params: Dict[str, str], limit: int,
|
|
) -> tuple:
|
|
"""Page through ``messages.list`` and return (msgIds, totalEstimate).
|
|
|
|
Gmail's ``maxResults`` caps at 500 per page, so we follow
|
|
``nextPageToken`` until we have ``limit`` ids or there are no more pages.
|
|
``resultSizeEstimate`` from the first page gives the agent an approximate
|
|
total count without having to download every message.
|
|
"""
|
|
msgIds: List[str] = []
|
|
totalEstimate: Optional[int] = None
|
|
pageToken: Optional[str] = None
|
|
pageSize = min(limit, 500)
|
|
while len(msgIds) < limit:
|
|
p = {**params, "maxResults": str(pageSize)}
|
|
if pageToken:
|
|
p["pageToken"] = pageToken
|
|
url = f"{_GMAIL_BASE}/users/me/messages?{urllib.parse.urlencode(p)}"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
if not msgIds:
|
|
_raiseGoogleError(result, "Gmail list messages")
|
|
break
|
|
if totalEstimate is None:
|
|
totalEstimate = result.get("resultSizeEstimate")
|
|
for m in result.get("messages", []):
|
|
mid = m.get("id", "")
|
|
if mid:
|
|
msgIds.append(mid)
|
|
if len(msgIds) >= limit:
|
|
break
|
|
pageToken = result.get("nextPageToken")
|
|
if not pageToken:
|
|
break
|
|
return msgIds, totalEstimate
|
|
|
|
async def _fetchMessageEntries(self, msgIds: List[str], labelPath: str = "") -> List[ExternalEntry]:
|
|
"""Resolve a list of Gmail message ids into ExternalEntries with
|
|
Subject/From/Date metadata. Detail fetches run concurrently to avoid a
|
|
slow sequential N+1 round-trip per message."""
|
|
if not msgIds:
|
|
return []
|
|
pathPrefix = f"/{labelPath}" if labelPath else ""
|
|
|
|
async def _one(msgId: str) -> ExternalEntry:
|
|
detailUrl = (
|
|
f"{_GMAIL_BASE}/users/me/messages/{msgId}"
|
|
f"?format=metadata&metadataHeaders=Subject&metadataHeaders=From&metadataHeaders=Date"
|
|
)
|
|
detail = await googleGet(self._token, detailUrl)
|
|
if "error" in detail:
|
|
return ExternalEntry(name=f"Message {msgId}", path=f"{pathPrefix}/{msgId}", isFolder=False,
|
|
metadata={"id": msgId})
|
|
headers = {h.get("name", ""): h.get("value", "") for h in detail.get("payload", {}).get("headers", [])}
|
|
return ExternalEntry(
|
|
name=headers.get("Subject", "(no subject)"),
|
|
path=f"{pathPrefix}/{msgId}",
|
|
isFolder=False,
|
|
metadata={
|
|
"id": msgId,
|
|
"from": headers.get("From", ""),
|
|
"date": headers.get("Date", ""),
|
|
"snippet": detail.get("snippet", ""),
|
|
},
|
|
)
|
|
|
|
return list(await asyncio.gather(*[_one(mid) for mid in msgIds]))
|
|
|
|
async def download(self, path: str) -> DownloadResult:
|
|
"""Download a Gmail message as RFC 822 EML via format=raw."""
|
|
cleanPath = (path or "").strip("/")
|
|
msgId = cleanPath.split("/")[-1] if cleanPath else ""
|
|
if not msgId:
|
|
return DownloadResult()
|
|
|
|
url = f"{_GMAIL_BASE}/users/me/messages/{msgId}?format=raw"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
return DownloadResult()
|
|
|
|
rawB64 = result.get("raw", "")
|
|
if not rawB64:
|
|
return DownloadResult()
|
|
|
|
emlBytes = base64.urlsafe_b64decode(rawB64)
|
|
|
|
metaUrl = f"{_GMAIL_BASE}/users/me/messages/{msgId}?format=metadata&metadataHeaders=Subject"
|
|
meta = await googleGet(self._token, metaUrl)
|
|
subject = msgId
|
|
if "error" not in meta:
|
|
for h in meta.get("payload", {}).get("headers", []):
|
|
if h.get("name", "").lower() == "subject":
|
|
subject = h.get("value", msgId)
|
|
break
|
|
safeName = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", subject)[:80].strip(". ") or "email"
|
|
|
|
return DownloadResult(
|
|
data=emlBytes,
|
|
fileName=f"{safeName}.eml",
|
|
mimeType="message/rfc822",
|
|
)
|
|
|
|
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
|
|
return {"error": "Gmail upload not applicable"}
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
path: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> list:
|
|
effectiveLimit = self._DEFAULT_MESSAGE_LIMIT if limit is None else max(1, min(int(limit), self._MAX_MESSAGE_LIMIT))
|
|
params: Dict[str, str] = {"q": query}
|
|
labelPath = (path or "").strip("/")
|
|
if labelPath:
|
|
labelId = await self._resolveLabelId(labelPath)
|
|
if not labelId:
|
|
raise ValueError(
|
|
f"Gmail label not found: '{labelPath}'. Browse the mailbox root ('/') "
|
|
f"to list available labels, or search without a label scope."
|
|
)
|
|
labelPath = labelId
|
|
params["labelIds"] = labelId
|
|
msgIds, totalEstimate = await self._listMessageIds(params, limit=effectiveLimit)
|
|
entries = await self._fetchMessageEntries(
|
|
msgIds[:self._METADATA_FETCH_CAP], labelPath=labelPath,
|
|
)
|
|
if totalEstimate and totalEstimate > len(msgIds):
|
|
entries.append(ExternalEntry(
|
|
name=f"(~{totalEstimate} total results estimated, {len(msgIds)} listed)",
|
|
path=f"/{labelPath or 'search'}/_count", isFolder=False,
|
|
metadata={"totalEstimate": totalEstimate, "listed": len(msgIds)},
|
|
))
|
|
elif len(msgIds) > self._METADATA_FETCH_CAP:
|
|
entries.append(ExternalEntry(
|
|
name=f"({len(msgIds)} results listed, metadata shown for first {self._METADATA_FETCH_CAP})",
|
|
path=f"/{labelPath or 'search'}/_count", isFolder=False,
|
|
metadata={"listed": len(msgIds), "metadataShown": self._METADATA_FETCH_CAP},
|
|
))
|
|
return entries
|
|
|
|
|
|
class CalendarAdapter(ServiceAdapter):
|
|
"""Google Calendar ServiceAdapter -- browse calendars, list events, .ics download.
|
|
|
|
Path conventions:
|
|
``""`` / ``"/"`` -> list calendars from ``calendarList``
|
|
``"/<calendarId>"`` -> list upcoming events in that calendar
|
|
``"/<calendarId>/<eventId>"`` -> reserved for future event detail browse
|
|
"""
|
|
|
|
_DEFAULT_EVENT_LIMIT = 100
|
|
_MAX_EVENT_LIMIT = 2500
|
|
|
|
def __init__(self, accessToken: str):
|
|
self._token = accessToken
|
|
|
|
async def browse(
|
|
self,
|
|
path: str,
|
|
filter: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
cleanPath = (path or "").strip("/")
|
|
if not cleanPath:
|
|
url = f"{_CALENDAR_BASE}/users/me/calendarList?maxResults=250"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google Calendar list")
|
|
calendars = result.get("items", [])
|
|
if filter:
|
|
f = filter.lower()
|
|
calendars = [c for c in calendars if f in (c.get("summary") or "").lower()]
|
|
return [
|
|
ExternalEntry(
|
|
name=c.get("summaryOverride") or c.get("summary", ""),
|
|
path=f"/{c.get('id', '')}",
|
|
isFolder=True,
|
|
metadata={
|
|
"id": c.get("id"),
|
|
"primary": c.get("primary", False),
|
|
"accessRole": c.get("accessRole"),
|
|
"backgroundColor": c.get("backgroundColor"),
|
|
"timeZone": c.get("timeZone"),
|
|
},
|
|
)
|
|
for c in calendars
|
|
]
|
|
|
|
from urllib.parse import quote
|
|
calendarId = cleanPath.split("/", 1)[0]
|
|
effectiveLimit = self._DEFAULT_EVENT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_EVENT_LIMIT))
|
|
url = (
|
|
f"{_CALENDAR_BASE}/calendars/{quote(calendarId, safe='')}/events"
|
|
f"?maxResults={effectiveLimit}&orderBy=startTime&singleEvents=true"
|
|
)
|
|
# Restrict to a date window when the filter is a date range, so large
|
|
# multi-year calendars only return the relevant period.
|
|
timeMin, timeMax = _parseGoogleDateRange(filter)
|
|
if timeMin and timeMax:
|
|
url += f"&timeMin={quote(timeMin, safe='')}&timeMax={quote(timeMax, safe='')}"
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google Calendar events")
|
|
events = result.get("items", [])
|
|
return [
|
|
ExternalEntry(
|
|
name=ev.get("summary", "(no title)"),
|
|
path=f"/{calendarId}/{ev.get('id', '')}",
|
|
isFolder=False,
|
|
mimeType="text/calendar",
|
|
metadata={
|
|
"id": ev.get("id"),
|
|
"start": (ev.get("start") or {}).get("dateTime") or (ev.get("start") or {}).get("date"),
|
|
"end": (ev.get("end") or {}).get("dateTime") or (ev.get("end") or {}).get("date"),
|
|
"location": ev.get("location"),
|
|
"organizer": (ev.get("organizer") or {}).get("email"),
|
|
"htmlLink": ev.get("htmlLink"),
|
|
"status": ev.get("status"),
|
|
},
|
|
)
|
|
for ev in events
|
|
]
|
|
|
|
async def download(self, path: str) -> DownloadResult:
|
|
from urllib.parse import quote
|
|
cleanPath = (path or "").strip("/")
|
|
if "/" not in cleanPath:
|
|
return DownloadResult()
|
|
calendarId, eventId = cleanPath.split("/", 1)
|
|
url = f"{_CALENDAR_BASE}/calendars/{quote(calendarId, safe='')}/events/{quote(eventId, safe='')}"
|
|
ev = await googleGet(self._token, url)
|
|
if "error" in ev:
|
|
logger.warning(f"Google Calendar event fetch failed: {ev['error']}")
|
|
return DownloadResult()
|
|
icsBytes = _googleEventToIcs(ev)
|
|
summary = ev.get("summary") or eventId
|
|
safeName = _googleSafeFileName(summary) or "event"
|
|
return DownloadResult(
|
|
data=icsBytes,
|
|
fileName=f"{safeName}.ics",
|
|
mimeType="text/calendar",
|
|
)
|
|
|
|
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
|
|
return {"error": "Google Calendar upload not supported"}
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
path: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
from urllib.parse import quote
|
|
calendarId = (path or "").strip("/").split("/", 1)[0] or "primary"
|
|
effectiveLimit = self._DEFAULT_EVENT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_EVENT_LIMIT))
|
|
# A date-range query maps to timeMin/timeMax (efficient window fetch);
|
|
# otherwise fall back to the free-text q parameter.
|
|
timeMin, timeMax = _parseGoogleDateRange(query)
|
|
if timeMin and timeMax:
|
|
url = (
|
|
f"{_CALENDAR_BASE}/calendars/{quote(calendarId, safe='')}/events"
|
|
f"?timeMin={quote(timeMin, safe='')}&timeMax={quote(timeMax, safe='')}"
|
|
f"&maxResults={effectiveLimit}&orderBy=startTime&singleEvents=true"
|
|
)
|
|
else:
|
|
url = (
|
|
f"{_CALENDAR_BASE}/calendars/{quote(calendarId, safe='')}/events"
|
|
f"?q={quote(query, safe='')}&maxResults={effectiveLimit}&singleEvents=true"
|
|
)
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google Calendar search")
|
|
return [
|
|
ExternalEntry(
|
|
name=ev.get("summary", "(no title)"),
|
|
path=f"/{calendarId}/{ev.get('id', '')}",
|
|
isFolder=False,
|
|
mimeType="text/calendar",
|
|
metadata={
|
|
"id": ev.get("id"),
|
|
"start": (ev.get("start") or {}).get("dateTime") or (ev.get("start") or {}).get("date"),
|
|
"end": (ev.get("end") or {}).get("dateTime") or (ev.get("end") or {}).get("date"),
|
|
},
|
|
)
|
|
for ev in result.get("items", [])
|
|
]
|
|
|
|
|
|
class ContactsAdapter(ServiceAdapter):
|
|
"""Google Contacts ServiceAdapter -- People API (read-only).
|
|
|
|
Path conventions:
|
|
``""`` / ``"/"`` -> list contact groups (incl. virtual ``all`` for the user's connections)
|
|
``"/all"`` -> list all ``people/me/connections``
|
|
``"/<groupResourceName>"`` -> list members of that contact group (e.g. ``contactGroups/myFriends``)
|
|
``"/<group>/<personId>"`` -> reserved for future detail browse;
|
|
``personId`` is the suffix after ``people/``
|
|
"""
|
|
|
|
_DEFAULT_CONTACT_LIMIT = 200
|
|
_MAX_CONTACT_LIMIT = 1000
|
|
_PERSON_FIELDS = (
|
|
"names,emailAddresses,phoneNumbers,organizations,addresses,biographies,memberships"
|
|
)
|
|
|
|
def __init__(self, accessToken: str):
|
|
self._token = accessToken
|
|
|
|
async def browse(
|
|
self,
|
|
path: str,
|
|
filter: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
cleanPath = (path or "").strip("/")
|
|
if not cleanPath:
|
|
entries: List[ExternalEntry] = [
|
|
ExternalEntry(
|
|
name="Alle Kontakte",
|
|
path="/all",
|
|
isFolder=True,
|
|
metadata={"id": "all", "isVirtual": True},
|
|
),
|
|
]
|
|
url = f"{_PEOPLE_BASE}/contactGroups?pageSize=200"
|
|
result = await googleGet(self._token, url)
|
|
if "error" not in result:
|
|
for grp in result.get("contactGroups", []):
|
|
name = grp.get("formattedName") or grp.get("name") or ""
|
|
if not name:
|
|
continue
|
|
entries.append(
|
|
ExternalEntry(
|
|
name=name,
|
|
path=f"/{grp.get('resourceName', '')}",
|
|
isFolder=True,
|
|
metadata={
|
|
"id": grp.get("resourceName"),
|
|
"memberCount": grp.get("memberCount", 0),
|
|
"groupType": grp.get("groupType"),
|
|
},
|
|
)
|
|
)
|
|
else:
|
|
logger.warning(f"Google contactGroups list failed: {result['error']}")
|
|
return entries
|
|
|
|
from urllib.parse import quote
|
|
effectiveLimit = self._DEFAULT_CONTACT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_CONTACT_LIMIT))
|
|
groupRef = cleanPath.split("/", 1)[0]
|
|
if groupRef == "all":
|
|
url = (
|
|
f"{_PEOPLE_BASE}/people/me/connections"
|
|
f"?pageSize={min(effectiveLimit, 1000)}&personFields={self._PERSON_FIELDS}"
|
|
)
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google People connections")
|
|
people = result.get("connections", [])
|
|
else:
|
|
groupResource = groupRef
|
|
grpUrl = (
|
|
f"{_PEOPLE_BASE}/{quote(groupResource, safe='/')}"
|
|
f"?maxMembers={min(effectiveLimit, 1000)}"
|
|
)
|
|
grpResult = await googleGet(self._token, grpUrl)
|
|
if "error" in grpResult:
|
|
_raiseGoogleError(grpResult, "Google contactGroup detail")
|
|
memberResourceNames = grpResult.get("memberResourceNames") or []
|
|
if not memberResourceNames:
|
|
return []
|
|
chunkSize = 200
|
|
people: List[Dict[str, Any]] = []
|
|
for i in range(0, min(len(memberResourceNames), effectiveLimit), chunkSize):
|
|
chunk = memberResourceNames[i : i + chunkSize]
|
|
params = "&".join(f"resourceNames={quote(rn, safe='/')}" for rn in chunk)
|
|
batchUrl = f"{_PEOPLE_BASE}/people:batchGet?{params}&personFields={self._PERSON_FIELDS}"
|
|
batchResult = await googleGet(self._token, batchUrl)
|
|
if "error" in batchResult:
|
|
logger.warning(f"Google People batchGet failed: {batchResult['error']}")
|
|
continue
|
|
for resp in batchResult.get("responses", []):
|
|
person = resp.get("person")
|
|
if person:
|
|
people.append(person)
|
|
if len(people) >= effectiveLimit:
|
|
break
|
|
|
|
return [
|
|
ExternalEntry(
|
|
name=_googlePersonLabel(p) or "(no name)",
|
|
path=f"/{groupRef}/{(p.get('resourceName', '') or '').split('/')[-1]}",
|
|
isFolder=False,
|
|
mimeType="text/vcard",
|
|
metadata={
|
|
"id": p.get("resourceName"),
|
|
"emails": [e.get("value") for e in (p.get("emailAddresses") or []) if e.get("value")],
|
|
"phones": [pn.get("value") for pn in (p.get("phoneNumbers") or []) if pn.get("value")],
|
|
"organization": (p.get("organizations") or [{}])[0].get("name") if p.get("organizations") else None,
|
|
},
|
|
)
|
|
for p in people[:effectiveLimit]
|
|
]
|
|
|
|
async def download(self, path: str) -> DownloadResult:
|
|
from urllib.parse import quote
|
|
cleanPath = (path or "").strip("/")
|
|
if "/" not in cleanPath:
|
|
return DownloadResult()
|
|
personSuffix = cleanPath.split("/")[-1]
|
|
if not personSuffix:
|
|
return DownloadResult()
|
|
url = f"{_PEOPLE_BASE}/people/{quote(personSuffix, safe='')}?personFields={self._PERSON_FIELDS}"
|
|
person = await googleGet(self._token, url)
|
|
if "error" in person:
|
|
logger.warning(f"Google People fetch failed: {person['error']}")
|
|
return DownloadResult()
|
|
vcfBytes = _googlePersonToVcard(person)
|
|
label = _googlePersonLabel(person) or personSuffix
|
|
safeName = _googleSafeFileName(label) or "contact"
|
|
return DownloadResult(
|
|
data=vcfBytes,
|
|
fileName=f"{safeName}.vcf",
|
|
mimeType="text/vcard",
|
|
)
|
|
|
|
async def upload(self, path: str, data: bytes, fileName: str) -> dict:
|
|
return {"error": "Google Contacts upload not supported"}
|
|
|
|
async def search(
|
|
self,
|
|
query: str,
|
|
path: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
) -> List[ExternalEntry]:
|
|
from urllib.parse import quote
|
|
effectiveLimit = self._DEFAULT_CONTACT_LIMIT if limit is None else max(1, min(int(limit), self._MAX_CONTACT_LIMIT))
|
|
url = (
|
|
f"{_PEOPLE_BASE}/people:searchContacts"
|
|
f"?query={quote(query, safe='')}&pageSize={min(effectiveLimit, 30)}"
|
|
f"&readMask={self._PERSON_FIELDS}"
|
|
)
|
|
result = await googleGet(self._token, url)
|
|
if "error" in result:
|
|
_raiseGoogleError(result, "Google Contacts search")
|
|
entries: List[ExternalEntry] = []
|
|
for r in result.get("results", []):
|
|
p = r.get("person") or {}
|
|
entries.append(
|
|
ExternalEntry(
|
|
name=_googlePersonLabel(p) or "(no name)",
|
|
path=f"/search/{(p.get('resourceName', '') or '').split('/')[-1]}",
|
|
isFolder=False,
|
|
mimeType="text/vcard",
|
|
metadata={
|
|
"id": p.get("resourceName"),
|
|
"emails": [e.get("value") for e in (p.get("emailAddresses") or []) if e.get("value")],
|
|
"phones": [pn.get("value") for pn in (p.get("phoneNumbers") or []) if pn.get("value")],
|
|
"organization": (p.get("organizations") or [{}])[0].get("name") if p.get("organizations") else None,
|
|
},
|
|
)
|
|
)
|
|
return entries
|
|
|
|
|
|
def _googleSafeFileName(name: str) -> str:
|
|
return re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name or "")[:80].strip(". ")
|
|
|
|
|
|
def _googleIcsEscape(value: str) -> str:
|
|
if value is None:
|
|
return ""
|
|
return (
|
|
value.replace("\\", "\\\\")
|
|
.replace(";", "\\;")
|
|
.replace(",", "\\,")
|
|
.replace("\r\n", "\\n")
|
|
.replace("\n", "\\n")
|
|
)
|
|
|
|
|
|
def _googleIcsDateTime(value: Optional[str]) -> Optional[str]:
|
|
"""Convert a Google Calendar dateTime/date string to RFC 5545 format (UTC)."""
|
|
if not value:
|
|
return None
|
|
try:
|
|
if "T" not in value:
|
|
dt = datetime.strptime(value, "%Y-%m-%d")
|
|
return dt.strftime("%Y%m%d")
|
|
normalized = value.replace("Z", "+00:00") if value.endswith("Z") else value
|
|
dt = datetime.fromisoformat(normalized)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _googleEventToIcs(event: Dict[str, Any]) -> bytes:
|
|
"""Build a minimal RFC 5545 VCALENDAR/VEVENT for a Google Calendar event."""
|
|
uid = event.get("iCalUID") or event.get("id") or "unknown@poweron"
|
|
summary = _googleIcsEscape(event.get("summary") or "")
|
|
location = _googleIcsEscape(event.get("location") or "")
|
|
description = _googleIcsEscape(event.get("description") or "")
|
|
rawStart = (event.get("start") or {}).get("dateTime") or (event.get("start") or {}).get("date")
|
|
rawEnd = (event.get("end") or {}).get("dateTime") or (event.get("end") or {}).get("date")
|
|
isAllDay = bool((event.get("start") or {}).get("date") and not (event.get("start") or {}).get("dateTime"))
|
|
dtstart = _googleIcsDateTime(rawStart)
|
|
dtend = _googleIcsDateTime(rawEnd)
|
|
dtstamp = _googleIcsDateTime(event.get("updated")) or datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
lines = [
|
|
"BEGIN:VCALENDAR",
|
|
"VERSION:2.0",
|
|
"PRODID:-//PowerOn//Google-Calendar-Adapter//EN",
|
|
"CALSCALE:GREGORIAN",
|
|
"BEGIN:VEVENT",
|
|
f"UID:{uid}",
|
|
f"DTSTAMP:{dtstamp}",
|
|
]
|
|
if dtstart:
|
|
lines.append(f"DTSTART;VALUE=DATE:{dtstart}" if isAllDay else f"DTSTART:{dtstart}")
|
|
if dtend:
|
|
lines.append(f"DTEND;VALUE=DATE:{dtend}" if isAllDay else f"DTEND:{dtend}")
|
|
if summary:
|
|
lines.append(f"SUMMARY:{summary}")
|
|
if location:
|
|
lines.append(f"LOCATION:{location}")
|
|
if description:
|
|
lines.append(f"DESCRIPTION:{description}")
|
|
organizer = (event.get("organizer") or {}).get("email")
|
|
if organizer:
|
|
lines.append(f"ORGANIZER:mailto:{organizer}")
|
|
for att in (event.get("attendees") or []):
|
|
addr = att.get("email")
|
|
if addr:
|
|
lines.append(f"ATTENDEE:mailto:{addr}")
|
|
lines.append("END:VEVENT")
|
|
lines.append("END:VCALENDAR")
|
|
return ("\r\n".join(lines) + "\r\n").encode("utf-8")
|
|
|
|
|
|
def _googlePersonLabel(person: Dict[str, Any]) -> str:
|
|
names = person.get("names") or []
|
|
if names:
|
|
primary = names[0]
|
|
display = primary.get("displayName") or ""
|
|
if display:
|
|
return display
|
|
given = primary.get("givenName") or ""
|
|
family = primary.get("familyName") or ""
|
|
full = f"{given} {family}".strip()
|
|
if full:
|
|
return full
|
|
orgs = person.get("organizations") or []
|
|
if orgs and orgs[0].get("name"):
|
|
return orgs[0]["name"]
|
|
emails = person.get("emailAddresses") or []
|
|
if emails and emails[0].get("value"):
|
|
return emails[0]["value"]
|
|
return ""
|
|
|
|
|
|
def _googlePersonToVcard(person: Dict[str, Any]) -> bytes:
|
|
"""Build a vCard 3.0 from a Google People API person payload."""
|
|
names = person.get("names") or []
|
|
primaryName = names[0] if names else {}
|
|
given = primaryName.get("givenName") or ""
|
|
family = primaryName.get("familyName") or ""
|
|
middle = primaryName.get("middleName") or ""
|
|
fn = primaryName.get("displayName") or _googlePersonLabel(person) or ""
|
|
|
|
lines = [
|
|
"BEGIN:VCARD",
|
|
"VERSION:3.0",
|
|
f"N:{family};{given};{middle};;",
|
|
f"FN:{fn}",
|
|
]
|
|
orgs = person.get("organizations") or []
|
|
if orgs:
|
|
org = orgs[0]
|
|
orgVal = org.get("name") or ""
|
|
if org.get("department"):
|
|
orgVal = f"{orgVal};{org['department']}"
|
|
if orgVal:
|
|
lines.append(f"ORG:{orgVal}")
|
|
if org.get("title"):
|
|
lines.append(f"TITLE:{org['title']}")
|
|
for em in (person.get("emailAddresses") or []):
|
|
addr = em.get("value")
|
|
if not addr:
|
|
continue
|
|
emailType = (em.get("type") or "INTERNET").upper()
|
|
lines.append(f"EMAIL;TYPE={emailType}:{addr}")
|
|
for ph in (person.get("phoneNumbers") or []):
|
|
val = ph.get("value")
|
|
if not val:
|
|
continue
|
|
phType = (ph.get("type") or "VOICE").upper()
|
|
lines.append(f"TEL;TYPE={phType}:{val}")
|
|
for addr in (person.get("addresses") or []):
|
|
street = addr.get("streetAddress") or ""
|
|
city = addr.get("city") or ""
|
|
region = addr.get("region") or ""
|
|
postal = addr.get("postalCode") or ""
|
|
country = addr.get("country") or ""
|
|
if any([street, city, region, postal, country]):
|
|
adrType = (addr.get("type") or "OTHER").upper()
|
|
lines.append(f"ADR;TYPE={adrType}:;;{street};{city};{region};{postal};{country}")
|
|
bios = person.get("biographies") or []
|
|
if bios and bios[0].get("value"):
|
|
lines.append(f"NOTE:{_googleIcsEscape(bios[0]['value'])}")
|
|
lines.append(f"UID:{person.get('resourceName', '')}")
|
|
lines.append("END:VCARD")
|
|
return ("\r\n".join(lines) + "\r\n").encode("utf-8")
|
|
|
|
|
|
class GoogleConnector(ProviderConnector):
|
|
"""Google ProviderConnector -- 1 connection -> Drive + Gmail + Calendar + Contacts."""
|
|
|
|
_SERVICE_MAP = {
|
|
"drive": DriveAdapter,
|
|
"gmail": GmailAdapter,
|
|
"calendar": CalendarAdapter,
|
|
"contact": ContactsAdapter,
|
|
}
|
|
|
|
def getAvailableServices(self) -> List[str]:
|
|
return list(self._SERVICE_MAP.keys())
|
|
|
|
def getServiceAdapter(self, service: str) -> ServiceAdapter:
|
|
adapterClass = self._SERVICE_MAP.get(service)
|
|
if not adapterClass:
|
|
raise ValueError(f"Unknown Google service: {service}. Available: {list(self._SERVICE_MAP.keys())}")
|
|
return adapterClass(self.accessToken)
|