gateway/modules/features/trustee/accounting/connectors/accountingConnectorAbacus.py
patrick-motsch 6b11d66766 fixes
2026-02-22 01:03:19 +01:00

258 lines
11 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Abacus ERP accounting connector.
API docs: https://downloads.abacus.ch/fileadmin/ablage/abaconnect/htmlfiles/docs/restapi/abacus_rest_api.html
Auth: OAuth 2.0 Client Credentials (Service User).
Each Abacus instance has its own host URL; there is no central cloud endpoint.
Entity API uses OData V4 format.
"""
import base64
import logging
import time
from typing import List, Dict, Any, Optional
import aiohttp
from ..accountingConnectorBase import (
BaseAccountingConnector,
AccountingBooking,
AccountingChart,
ConnectorConfigField,
SyncResult,
)
logger = logging.getLogger(__name__)
class AccountingConnectorAbacus(BaseAccountingConnector):
def __init__(self):
self._tokenCache: Dict[str, Dict[str, Any]] = {}
def getConnectorType(self) -> str:
return "abacus"
def getConnectorLabel(self) -> Dict[str, str]:
return {"en": "Abacus ERP", "de": "Abacus ERP", "fr": "Abacus ERP"}
def getRequiredConfigFields(self) -> List[ConnectorConfigField]:
return [
ConnectorConfigField(
key="abacusHost",
label={"en": "Abacus Host URL", "de": "Abacus Host-URL", "fr": "URL Hôte Abacus"},
fieldType="text",
secret=False,
placeholder="e.g. abacus.meinefirma.ch",
),
ConnectorConfigField(
key="mandant",
label={"en": "Mandant Number", "de": "Mandantennummer", "fr": "Numéro de mandant"},
fieldType="text",
secret=False,
placeholder="e.g. 7777",
),
ConnectorConfigField(
key="clientId",
label={"en": "Client ID", "de": "Client-ID", "fr": "ID Client"},
fieldType="text",
secret=False,
),
ConnectorConfigField(
key="clientSecret",
label={"en": "Client Secret", "de": "Client-Secret", "fr": "Secret Client"},
fieldType="password",
secret=True,
),
]
def _buildBaseUrl(self, config: Dict[str, Any]) -> str:
host = config["abacusHost"].rstrip("/")
if not host.startswith("http"):
host = f"https://{host}"
return host
async def _getAccessToken(self, config: Dict[str, Any]) -> Optional[str]:
"""Obtain an OAuth access token using client_credentials grant.
Tokens are cached and refreshed when expired (default 600s).
"""
cacheKey = f"{config.get('abacusHost')}_{config.get('clientId')}"
cached = self._tokenCache.get(cacheKey)
if cached and cached.get("expiresAt", 0) > time.time() + 30:
return cached["accessToken"]
baseUrl = self._buildBaseUrl(config)
try:
async with aiohttp.ClientSession() as session:
# Step 1: discover token endpoint
async with session.get(f"{baseUrl}/.well-known/openid-configuration", timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
logger.error(f"Abacus OIDC discovery failed: HTTP {resp.status}")
return None
oidc = await resp.json()
tokenEndpoint = oidc.get("token_endpoint")
if not tokenEndpoint:
logger.error("Abacus OIDC: no token_endpoint found")
return None
# Step 2: request token
credentials = base64.b64encode(f"{config['clientId']}:{config['clientSecret']}".encode()).decode()
headers = {"Authorization": f"Basic {credentials}", "Content-Type": "application/x-www-form-urlencoded"}
async with session.post(tokenEndpoint, headers=headers, data="grant_type=client_credentials", timeout=aiohttp.ClientTimeout(total=15)) as tokenResp:
if tokenResp.status != 200:
body = await tokenResp.text()
logger.error(f"Abacus token request failed: HTTP {tokenResp.status}: {body[:200]}")
return None
tokenData = await tokenResp.json()
accessToken = tokenData.get("access_token")
expiresIn = tokenData.get("expires_in", 600)
self._tokenCache[cacheKey] = {"accessToken": accessToken, "expiresAt": time.time() + expiresIn}
return accessToken
except Exception as e:
logger.error(f"Abacus token acquisition error: {e}")
return None
def _buildEntityUrl(self, config: Dict[str, Any], entity: str) -> str:
baseUrl = self._buildBaseUrl(config)
mandant = config["mandant"]
return f"{baseUrl}/api/entity/v1/{mandant}/{entity}"
async def _buildAuthHeaders(self, config: Dict[str, Any]) -> Optional[Dict[str, str]]:
token = await self._getAccessToken(config)
if not token:
return None
return {"Authorization": f"Bearer {token}", "Accept": "application/json", "Content-Type": "application/json"}
async def testConnection(self, config: Dict[str, Any]) -> SyncResult:
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Accounts?$top=1")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True)
body = await resp.text()
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
charts: List[AccountingChart] = []
url: Optional[str] = self._buildEntityUrl(config, "Accounts")
try:
async with aiohttp.ClientSession() as session:
while url:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
break
data = await resp.json()
for item in data.get("value", []):
charts.append(AccountingChart(
accountNumber=str(item.get("AccountNumber", item.get("Id", ""))),
label=item.get("Name", item.get("Description", "")),
accountType=item.get("AccountType", None),
))
url = data.get("@odata.nextLink")
except Exception as e:
logger.error(f"Abacus getChartOfAccounts error: {e}")
return charts
async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
lines = []
for line in booking.lines:
entry: Dict[str, Any] = {
"AccountId": line.accountNumber,
"Text": line.description or booking.description,
}
if line.debitAmount > 0:
entry["DebitAmount"] = line.debitAmount
if line.creditAmount > 0:
entry["CreditAmount"] = line.creditAmount
if line.taxCode:
entry["TaxCode"] = line.taxCode
if line.costCenter:
entry["CostCenterId"] = line.costCenter
lines.append(entry)
payload = {
"JournalDate": booking.bookingDate,
"Reference": booking.reference,
"Text": booking.description,
"Lines": lines,
}
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "GeneralJournalEntries")
async with session.post(url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
body = await resp.json() if resp.content_type and "json" in resp.content_type else {"raw": await resp.text()}
if resp.status in (200, 201):
externalId = str(body.get("Id", "")) if isinstance(body, dict) else None
return SyncResult(success=True, externalId=externalId, rawResponse=body)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}", rawResponse=body)
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, f"GeneralJournalEntries({externalId})")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True, externalId=externalId)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getCustomers(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Debtors")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
data = await resp.json()
return data.get("value", [])
except Exception as e:
logger.error(f"Abacus getCustomers error: {e}")
return []
async def getVendors(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Creditors")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
data = await resp.json()
return data.get("value", [])
except Exception as e:
logger.error(f"Abacus getVendors error: {e}")
return []