gateway/modules/features/trustee/accounting/connectors/accountingConnectorBexio.py
2026-03-17 23:23:40 +01:00

205 lines
9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Bexio accounting connector.
API docs: https://docs.bexio.com/
Auth: Personal Access Token (PAT) as Bearer token.
Base URL: https://api.bexio.com/
Note: Bexio uses internal account IDs (int), not account numbers.
The connector caches the chart of accounts to resolve accountNumber -> account_id.
"""
import logging
from typing import List, Dict, Any, Optional
import aiohttp
from ..accountingConnectorBase import (
BaseAccountingConnector,
AccountingBooking,
AccountingChart,
ConnectorConfigField,
SyncResult,
)
logger = logging.getLogger(__name__)
_DEFAULT_API_BASE_URL = "https://api.bexio.com/"
class AccountingConnectorBexio(BaseAccountingConnector):
def __init__(self):
self._chartCache: Dict[str, List[Dict[str, Any]]] = {}
def getConnectorType(self) -> str:
return "bexio"
def getConnectorLabel(self) -> Dict[str, str]:
return {"en": "Bexio", "de": "Bexio", "fr": "Bexio"}
def getRequiredConfigFields(self) -> List[ConnectorConfigField]:
return [
ConnectorConfigField(
key="apiBaseUrl",
label={"en": "API Base URL", "de": "API Base URL", "fr": "URL de base API"},
fieldType="text",
secret=False,
placeholder="https://api.bexio.com/",
),
ConnectorConfigField(
key="clientName",
label={"en": "Client Name", "de": "Mandantenname", "fr": "Nom du client"},
fieldType="text",
secret=False,
placeholder="e.g. poweronag",
),
ConnectorConfigField(
key="accessToken",
label={"en": "Personal Access Token", "de": "Persönlicher Zugriffstoken", "fr": "Jeton d'accès personnel"},
fieldType="password",
secret=True,
placeholder="PAT from developer.bexio.com",
),
]
def _buildUrl(self, config: Dict[str, Any], resource: str) -> str:
apiBaseUrl = str(config.get("apiBaseUrl") or "").strip()
if not apiBaseUrl:
raise ValueError("Missing required config: apiBaseUrl")
apiBaseUrl = apiBaseUrl.rstrip("/")
resourcePath = resource.lstrip("/")
return f"{apiBaseUrl}/{resourcePath}"
def _buildHeaders(self, config: Dict[str, Any]) -> Dict[str, str]:
return {
"Authorization": f"Bearer {config['accessToken']}",
"Accept": "application/json",
"Content-Type": "application/json",
}
async def testConnection(self, config: Dict[str, Any]) -> SyncResult:
apiBaseUrl = str(config.get("apiBaseUrl") or "")
clientName = str(config.get("clientName") or "")
accessToken = str(config.get("accessToken") or "")
if not apiBaseUrl or not clientName or not accessToken:
return SyncResult(
success=False,
errorMessage=(
f"Missing credentials: apiBaseUrl={bool(apiBaseUrl)}, "
f"clientName={bool(clientName)}, accessToken={bool(accessToken)}"
),
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "3.0/users/me"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True)
body = await resp.text()
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def _loadRawAccounts(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Load raw account list and cache it for accountNumber -> id mapping."""
cacheKey = config.get("accessToken", "")[:16]
if cacheKey in self._chartCache:
return self._chartCache[cacheKey]
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "2.0/accounts"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
accounts = await resp.json()
self._chartCache[cacheKey] = accounts
return accounts
except Exception as e:
logger.error(f"Bexio loadRawAccounts error: {e}")
return []
def _resolveAccountId(self, accounts: List[Dict[str, Any]], accountNumber: str) -> Optional[int]:
"""Resolve an account number string to a Bexio internal account_id."""
for acc in accounts:
if str(acc.get("account_no", "")) == accountNumber:
return acc.get("id")
return None
async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]:
accounts = await self._loadRawAccounts(config)
return [
AccountingChart(
accountNumber=str(acc.get("account_no", "")),
label=acc.get("name", ""),
accountType=acc.get("account_type", None),
)
for acc in accounts
]
async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
"""Push a manual entry to Bexio.
Bexio requires account_id (int) rather than account numbers, so we resolve
via the cached chart of accounts.
"""
try:
accounts = await self._loadRawAccounts(config)
entries = []
for line in booking.lines:
debitAccId = self._resolveAccountId(accounts, line.accountNumber) if line.debitAmount > 0 else None
creditAccId = self._resolveAccountId(accounts, line.accountNumber) if line.creditAmount > 0 else None
amount = line.debitAmount if line.debitAmount > 0 else line.creditAmount
if debitAccId is None and creditAccId is None:
return SyncResult(success=False, errorMessage=f"Account {line.accountNumber} not found in Bexio chart")
entry: Dict[str, Any] = {"amount": str(amount), "description": line.description or booking.description}
if debitAccId:
entry["debit_account_id"] = debitAccId
if creditAccId:
entry["credit_account_id"] = creditAccId
if line.taxCode:
entry["tax_id"] = line.taxCode
entries.append(entry)
entryType = "manual_single_entry" if len(entries) == 1 else "manual_group_entry"
payload = {
"date": booking.bookingDate,
"reference_nr": booking.reference,
"type": entryType,
"entries": entries,
}
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "3.0/accounting/manual-entries")
async with session.post(url, headers=self._buildHeaders(config), json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
body = await resp.json() if resp.content_type == "application/json" else {"raw": await resp.text()}
if resp.status in (200, 201):
externalId = str(body.get("id", "")) if isinstance(body, dict) else None
return SyncResult(success=True, externalId=externalId, rawResponse=body)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}", rawResponse=body)
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
try:
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, f"3.0/accounting/manual-entries/{externalId}")
async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True, externalId=externalId)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getCustomers(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "2.0/contact"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
return await resp.json()
except Exception as e:
logger.error(f"Bexio getCustomers error: {e}")
return []