gateway/modules/features/trustee/accounting/connectors/accountingConnectorBexio.py
2026-04-13 01:37:29 +02:00

262 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Bexio accounting connector.
API docs: https://docs.bexio.com/
Auth: Personal Access Token (PAT) as Bearer token.
Base URL: https://api.bexio.com/
Note: Bexio uses internal account IDs (int), not account numbers.
The connector caches the chart of accounts to resolve accountNumber -> account_id.
"""
import logging
from typing import List, Dict, Any, Optional
import aiohttp
from ..accountingConnectorBase import (
BaseAccountingConnector,
AccountingBooking,
AccountingChart,
ConnectorConfigField,
SyncResult,
)
from modules.shared.i18nRegistry import t
logger = logging.getLogger(__name__)
_DEFAULT_API_BASE_URL = "https://api.bexio.com/"
class AccountingConnectorBexio(BaseAccountingConnector):
def __init__(self):
self._chartCache: Dict[str, List[Dict[str, Any]]] = {}
def getConnectorType(self) -> str:
return "bexio"
def getConnectorLabel(self) -> str:
return "Bexio"
def getRequiredConfigFields(self) -> List[ConnectorConfigField]:
return [
ConnectorConfigField(
key="apiBaseUrl",
label=t("API Base URL"),
fieldType="text",
secret=False,
placeholder="https://api.bexio.com/",
),
ConnectorConfigField(
key="clientName",
label=t("Mandantenname"),
fieldType="text",
secret=False,
placeholder="e.g. poweronag",
),
ConnectorConfigField(
key="accessToken",
label=t("Persönlicher Zugriffstoken"),
fieldType="password",
secret=True,
placeholder="PAT from developer.bexio.com",
),
]
def _buildUrl(self, config: Dict[str, Any], resource: str) -> str:
apiBaseUrl = str(config.get("apiBaseUrl") or "").strip()
if not apiBaseUrl:
raise ValueError("Missing required config: apiBaseUrl")
apiBaseUrl = apiBaseUrl.rstrip("/")
resourcePath = resource.lstrip("/")
return f"{apiBaseUrl}/{resourcePath}"
def _buildHeaders(self, config: Dict[str, Any]) -> Dict[str, str]:
return {
"Authorization": f"Bearer {config['accessToken']}",
"Accept": "application/json",
"Content-Type": "application/json",
}
async def testConnection(self, config: Dict[str, Any]) -> SyncResult:
apiBaseUrl = str(config.get("apiBaseUrl") or "")
clientName = str(config.get("clientName") or "")
accessToken = str(config.get("accessToken") or "")
if not apiBaseUrl or not clientName or not accessToken:
return SyncResult(
success=False,
errorMessage=(
f"Missing credentials: apiBaseUrl={bool(apiBaseUrl)}, "
f"clientName={bool(clientName)}, accessToken={bool(accessToken)}"
),
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "3.0/users/me"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True)
body = await resp.text()
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def _loadRawAccounts(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Load raw account list and cache it for accountNumber -> id mapping."""
cacheKey = config.get("accessToken", "")[:16]
if cacheKey in self._chartCache:
return self._chartCache[cacheKey]
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "2.0/accounts"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
accounts = await resp.json()
self._chartCache[cacheKey] = accounts
return accounts
except Exception as e:
logger.error(f"Bexio loadRawAccounts error: {e}")
return []
def _resolveAccountId(self, accounts: List[Dict[str, Any]], accountNumber: str) -> Optional[int]:
"""Resolve an account number string to a Bexio internal account_id."""
for acc in accounts:
if str(acc.get("account_no", "")) == accountNumber:
return acc.get("id")
return None
async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]:
accounts = await self._loadRawAccounts(config)
return [
AccountingChart(
accountNumber=str(acc.get("account_no", "")),
label=acc.get("name", ""),
accountType=acc.get("account_type", None),
)
for acc in accounts
]
async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
"""Push a manual entry to Bexio.
Bexio requires account_id (int) rather than account numbers, so we resolve
via the cached chart of accounts.
"""
try:
accounts = await self._loadRawAccounts(config)
entries = []
for line in booking.lines:
debitAccId = self._resolveAccountId(accounts, line.accountNumber) if line.debitAmount > 0 else None
creditAccId = self._resolveAccountId(accounts, line.accountNumber) if line.creditAmount > 0 else None
amount = line.debitAmount if line.debitAmount > 0 else line.creditAmount
if debitAccId is None and creditAccId is None:
return SyncResult(success=False, errorMessage=f"Account {line.accountNumber} not found in Bexio chart")
entry: Dict[str, Any] = {"amount": str(amount), "description": line.description or booking.description}
if debitAccId:
entry["debit_account_id"] = debitAccId
if creditAccId:
entry["credit_account_id"] = creditAccId
if line.taxCode:
entry["tax_id"] = line.taxCode
entries.append(entry)
entryType = "manual_single_entry" if len(entries) == 1 else "manual_group_entry"
payload = {
"date": booking.bookingDate,
"reference_nr": booking.reference,
"type": entryType,
"entries": entries,
}
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "3.0/accounting/manual-entries")
async with session.post(url, headers=self._buildHeaders(config), json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
body = await resp.json() if resp.content_type == "application/json" else {"raw": await resp.text()}
if resp.status in (200, 201):
externalId = str(body.get("id", "")) if isinstance(body, dict) else None
return SyncResult(success=True, externalId=externalId, rawResponse=body)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}", rawResponse=body)
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
try:
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, f"3.0/accounting/manual-entries/{externalId}")
async with session.get(url, headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True, externalId=externalId)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getJournalEntries(self, config: Dict[str, Any], dateFrom: Optional[str] = None, dateTo: Optional[str] = None, accountNumbers: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Read manual entries from Bexio. API: GET 3.0/accounting/manual-entries"""
try:
accounts = await self._loadRawAccounts(config)
accMap = {acc.get("id"): str(acc.get("account_no", "")) for acc in accounts}
async with aiohttp.ClientSession() as session:
url = self._buildUrl(config, "3.0/accounting/manual-entries")
params: Dict[str, str] = {}
if dateFrom:
params["date_from"] = dateFrom
if dateTo:
params["date_to"] = dateTo
async with session.get(url, headers=self._buildHeaders(config), params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status != 200:
logger.error(f"Bexio getJournalEntries failed: HTTP {resp.status}")
return []
items = await resp.json()
entries = []
for item in (items if isinstance(items, list) else []):
lines = []
totalAmt = 0.0
for e in (item.get("entries") or []):
amt = float(e.get("amount", 0))
debitAccId = e.get("debit_account_id")
creditAccId = e.get("credit_account_id")
lines.append({
"accountNumber": accMap.get(debitAccId, str(debitAccId or "")),
"debitAmount": amt,
"creditAmount": 0.0,
"description": e.get("description", ""),
"taxCode": str(e.get("tax_id", "")) if e.get("tax_id") else None,
})
if creditAccId and creditAccId != debitAccId:
lines.append({
"accountNumber": accMap.get(creditAccId, str(creditAccId or "")),
"debitAmount": 0.0,
"creditAmount": amt,
"description": e.get("description", ""),
})
totalAmt += amt
entries.append({
"externalId": str(item.get("id", "")),
"bookingDate": item.get("date", ""),
"reference": item.get("reference_nr", ""),
"description": item.get("text", ""),
"currency": "CHF",
"totalAmount": totalAmt,
"lines": lines,
})
return entries
except Exception as e:
logger.error(f"Bexio getJournalEntries error: {e}")
return []
async def getCustomers(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
try:
async with aiohttp.ClientSession() as session:
async with session.get(self._buildUrl(config, "2.0/contact"), headers=self._buildHeaders(config), timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
return await resp.json()
except Exception as e:
logger.error(f"Bexio getCustomers error: {e}")
return []