gateway/modules/features/trustee/accounting/connectors/accountingConnectorAbacus.py
2026-04-26 08:31:35 +02:00

524 lines
23 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Abacus ERP accounting connector.
API docs: https://downloads.abacus.ch/fileadmin/ablage/abaconnect/htmlfiles/docs/restapi/abacus_rest_api.html
Auth: OAuth 2.0 Client Credentials (Service User).
Each Abacus instance has its own host URL; there is no central cloud endpoint.
Entity API uses OData V4 format.
Account balances:
Abacus exposes an ``AccountBalances`` entity (per fiscal year), but its
availability depends on the customer's Abacus license / Profile and is
NOT guaranteed for all instances. The robust default is therefore to
aggregate balances locally from ``GeneralJournalEntries`` (always
present). If a future iteration confirms the entity for a specific
instance, ``getAccountBalances`` can be extended to prefer that source
via a config flag (e.g. ``useAccountBalancesEntity: true``).
"""
import base64
import calendar
import logging
import time
from typing import List, Dict, Any, Optional, Tuple
import aiohttp
from ..accountingConnectorBase import (
BaseAccountingConnector,
AccountingBooking,
AccountingChart,
AccountingPeriodBalance,
ConnectorConfigField,
SyncResult,
)
from modules.shared.i18nRegistry import t
logger = logging.getLogger(__name__)
def _formatLastDayOfMonth(year: int, month: int) -> str:
lastDay = calendar.monthrange(year, month)[1]
return f"{year:04d}-{month:02d}-{lastDay:02d}"
def _isIncomeStatementAccount(accountNumber: str) -> bool:
"""Swiss KMU-Kontenrahmen heuristic: 1xxx + 2xxx -> balance sheet (cumulative);
3xxx..9xxx -> income statement (reset per fiscal year).
"""
a = (accountNumber or "").strip()
if not a or not a[0].isdigit():
return False
return a[0] not in ("1", "2")
class AccountingConnectorAbacus(BaseAccountingConnector):
def __init__(self):
self._tokenCache: Dict[str, Dict[str, Any]] = {}
def getConnectorType(self) -> str:
return "abacus"
def getConnectorLabel(self) -> str:
return "Abacus ERP"
def getRequiredConfigFields(self) -> List[ConnectorConfigField]:
return [
ConnectorConfigField(
key="apiBaseUrl",
label=t("API Base URL"),
fieldType="text",
secret=False,
placeholder="e.g. https://abacus.meinefirma.ch/api/entity/v1/",
),
ConnectorConfigField(
key="clientName",
label=t("Mandantenname"),
fieldType="text",
secret=False,
placeholder="e.g. 7777",
),
ConnectorConfigField(
key="clientId",
label=t("Client-ID"),
fieldType="text",
secret=False,
),
ConnectorConfigField(
key="clientSecret",
label=t("Client-Secret"),
fieldType="password",
secret=True,
),
]
def _buildBaseUrl(self, config: Dict[str, Any]) -> str:
apiBaseUrl = str(config.get("apiBaseUrl") or "").strip()
if not apiBaseUrl:
raise ValueError("Missing required config: apiBaseUrl")
if not apiBaseUrl.startswith("http"):
apiBaseUrl = f"https://{apiBaseUrl}"
return apiBaseUrl.rstrip("/")
def _buildAuthBaseUrl(self, config: Dict[str, Any]) -> str:
apiBaseUrl = str(config.get("apiBaseUrl") or "").strip()
if not apiBaseUrl:
raise ValueError("Missing required config: apiBaseUrl")
if not apiBaseUrl.startswith("http"):
apiBaseUrl = f"https://{apiBaseUrl}"
apiBaseUrl = apiBaseUrl.rstrip("/")
if "/api/entity/v1" in apiBaseUrl:
return apiBaseUrl.split("/api/entity/v1", 1)[0]
if "/api/" in apiBaseUrl:
return apiBaseUrl.split("/api/", 1)[0]
return apiBaseUrl
async def _getAccessToken(self, config: Dict[str, Any]) -> Optional[str]:
"""Obtain an OAuth access token using client_credentials grant.
Tokens are cached and refreshed when expired (default 600s).
"""
cacheKey = f"{config.get('apiBaseUrl')}_{config.get('clientName')}_{config.get('clientId')}"
cached = self._tokenCache.get(cacheKey)
if cached and cached.get("expiresAt", 0) > time.time() + 30:
return cached["accessToken"]
baseUrl = self._buildAuthBaseUrl(config)
try:
async with aiohttp.ClientSession() as session:
# Step 1: discover token endpoint
async with session.get(f"{baseUrl}/.well-known/openid-configuration", timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
logger.error(f"Abacus OIDC discovery failed: HTTP {resp.status}")
return None
oidc = await resp.json()
tokenEndpoint = oidc.get("token_endpoint")
if not tokenEndpoint:
logger.error("Abacus OIDC: no token_endpoint found")
return None
# Step 2: request token
credentials = base64.b64encode(f"{config['clientId']}:{config['clientSecret']}".encode()).decode()
headers = {"Authorization": f"Basic {credentials}", "Content-Type": "application/x-www-form-urlencoded"}
async with session.post(tokenEndpoint, headers=headers, data="grant_type=client_credentials", timeout=aiohttp.ClientTimeout(total=15)) as tokenResp:
if tokenResp.status != 200:
body = await tokenResp.text()
logger.error(f"Abacus token request failed: HTTP {tokenResp.status}: {body[:200]}")
return None
tokenData = await tokenResp.json()
accessToken = tokenData.get("access_token")
expiresIn = tokenData.get("expires_in", 600)
self._tokenCache[cacheKey] = {"accessToken": accessToken, "expiresAt": time.time() + expiresIn}
return accessToken
except Exception as e:
logger.error(f"Abacus token acquisition error: {e}")
return None
def _buildEntityUrl(self, config: Dict[str, Any], entity: str) -> str:
baseUrl = self._buildBaseUrl(config)
clientName = config.get("clientName")
if not clientName:
raise ValueError("Missing required config: clientName")
return f"{baseUrl}/{clientName}/{entity}"
async def _buildAuthHeaders(self, config: Dict[str, Any]) -> Optional[Dict[str, str]]:
token = await self._getAccessToken(config)
if not token:
return None
return {"Authorization": f"Bearer {token}", "Accept": "application/json", "Content-Type": "application/json"}
async def testConnection(self, config: Dict[str, Any]) -> SyncResult:
apiBaseUrl = str(config.get("apiBaseUrl") or "")
clientName = str(config.get("clientName") or "")
clientId = str(config.get("clientId") or "")
clientSecret = str(config.get("clientSecret") or "")
if not apiBaseUrl or not clientName or not clientId or not clientSecret:
return SyncResult(
success=False,
errorMessage=(
f"Missing credentials: apiBaseUrl={bool(apiBaseUrl)}, "
f"clientName={bool(clientName)}, clientId={bool(clientId)}, "
f"clientSecret={bool(clientSecret)}"
),
)
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Accounts?$top=1")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True)
body = await resp.text()
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}: {body[:200]}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getChartOfAccounts(self, config: Dict[str, Any], accountType: Optional[str] = None) -> List[AccountingChart]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
charts: List[AccountingChart] = []
url: Optional[str] = self._buildEntityUrl(config, "Accounts")
try:
async with aiohttp.ClientSession() as session:
while url:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
break
data = await resp.json()
for item in data.get("value", []):
charts.append(AccountingChart(
accountNumber=str(item.get("AccountNumber", item.get("Id", ""))),
label=item.get("Name", item.get("Description", "")),
accountType=item.get("AccountType", None),
))
url = data.get("@odata.nextLink")
except Exception as e:
logger.error(f"Abacus getChartOfAccounts error: {e}")
return charts
async def pushBooking(self, config: Dict[str, Any], booking: AccountingBooking) -> SyncResult:
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
lines = []
for line in booking.lines:
entry: Dict[str, Any] = {
"AccountId": line.accountNumber,
"Text": line.description or booking.description,
}
if line.debitAmount > 0:
entry["DebitAmount"] = line.debitAmount
if line.creditAmount > 0:
entry["CreditAmount"] = line.creditAmount
if line.taxCode:
entry["TaxCode"] = line.taxCode
if line.costCenter:
entry["CostCenterId"] = line.costCenter
lines.append(entry)
payload = {
"JournalDate": booking.bookingDate,
"Reference": booking.reference,
"Text": booking.description,
"Lines": lines,
}
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "GeneralJournalEntries")
async with session.post(url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as resp:
body = await resp.json() if resp.content_type and "json" in resp.content_type else {"raw": await resp.text()}
if resp.status in (200, 201):
externalId = str(body.get("Id", "")) if isinstance(body, dict) else None
return SyncResult(success=True, externalId=externalId, rawResponse=body)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}", rawResponse=body)
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getBookingStatus(self, config: Dict[str, Any], externalId: str) -> SyncResult:
headers = await self._buildAuthHeaders(config)
if not headers:
return SyncResult(success=False, errorMessage="Failed to obtain access token")
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, f"GeneralJournalEntries({externalId})")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
return SyncResult(success=True, externalId=externalId)
return SyncResult(success=False, errorMessage=f"HTTP {resp.status}")
except Exception as e:
return SyncResult(success=False, errorMessage=str(e))
async def getJournalEntries(self, config: Dict[str, Any], dateFrom: Optional[str] = None, dateTo: Optional[str] = None, accountNumbers: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Read GeneralJournalEntries from Abacus (OData V4, paginated)."""
headers = await self._buildAuthHeaders(config)
if not headers:
return []
filterParts = []
if dateFrom:
filterParts.append(f"JournalDate ge {dateFrom}")
if dateTo:
filterParts.append(f"JournalDate le {dateTo}")
queryParams = ""
if filterParts:
queryParams = "?$filter=" + " and ".join(filterParts)
entries: List[Dict[str, Any]] = []
url: Optional[str] = self._buildEntityUrl(config, f"GeneralJournalEntries{queryParams}")
try:
async with aiohttp.ClientSession() as session:
while url:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status != 200:
break
data = await resp.json()
for item in data.get("value", []):
lines = []
totalAmt = 0.0
for line in (item.get("Lines") or []):
debit = float(line.get("DebitAmount", 0))
credit = float(line.get("CreditAmount", 0))
lines.append({
"accountNumber": str(line.get("AccountId", "")),
"debitAmount": debit,
"creditAmount": credit,
"description": line.get("Text", ""),
"taxCode": line.get("TaxCode"),
"costCenter": line.get("CostCenterId"),
})
totalAmt += max(debit, credit)
entries.append({
"externalId": str(item.get("Id", "")),
"bookingDate": str(item.get("JournalDate", "")).split("T")[0],
"reference": item.get("Reference", ""),
"description": item.get("Text", ""),
"currency": "CHF",
"totalAmount": totalAmt,
"lines": lines,
})
url = data.get("@odata.nextLink")
except Exception as e:
logger.error(f"Abacus getJournalEntries error: {e}")
return entries
async def getCustomers(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Debtors")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
data = await resp.json()
return data.get("value", [])
except Exception as e:
logger.error(f"Abacus getCustomers error: {e}")
return []
async def getVendors(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
headers = await self._buildAuthHeaders(config)
if not headers:
return []
try:
async with aiohttp.ClientSession() as session:
url = self._buildEntityUrl(config, "Creditors")
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status != 200:
return []
data = await resp.json()
return data.get("value", [])
except Exception as e:
logger.error(f"Abacus getVendors error: {e}")
return []
async def getAccountBalances(
self,
config: Dict[str, Any],
years: List[int],
accountNumbers: Optional[List[str]] = None,
) -> List[AccountingPeriodBalance]:
"""Aggregate account balances from ``GeneralJournalEntries`` (OData V4).
Strategy:
1. Page through ``GET GeneralJournalEntries?$filter=JournalDate le YYYY-12-31``
until ``@odata.nextLink`` is exhausted. Including ALL prior years
is required to compute the carry-over for balance-sheet accounts.
2. Per (account, year, month) accumulate ``DebitAmount``/``CreditAmount``
from ``Lines``.
3. Income-statement accounts (3xxx-9xxx) reset to 0 per fiscal year;
balance-sheet accounts (1xxx-2xxx) carry their cumulative balance.
Optional optimization (not yet active): if the customer's Abacus
instance ships the ``AccountBalances`` OData entity, it can return
authoritative period balances directly. Detect via a probe GET on
``AccountBalances?$top=1`` and prefer that source. This is intentionally
deferred until we hit a customer where the entity is available --
the local aggregation is always-correct fallback.
"""
if not years:
return []
sortedYears = sorted({int(y) for y in years if y})
minYear = sortedYears[0]
maxYear = sortedYears[-1]
accountNumbersSet = set(accountNumbers) if accountNumbers else None
headers = await self._buildAuthHeaders(config)
if not headers:
logger.warning("Abacus getAccountBalances: no access token, skipping")
return []
rawEntries = await self._fetchAllJournalEntries(config, headers, dateTo=f"{maxYear}-12-31")
movements: Dict[Tuple[str, int, int], Dict[str, float]] = {}
seenAccounts: set = set()
for entry in rawEntries:
dateRaw = str(entry.get("JournalDate") or "")[:10]
if len(dateRaw) < 7:
continue
try:
year = int(dateRaw[:4])
month = int(dateRaw[5:7])
except ValueError:
continue
for line in (entry.get("Lines") or []):
accNo = str(line.get("AccountId") or "").strip()
if not accNo:
continue
seenAccounts.add(accNo)
try:
debit = float(line.get("DebitAmount") or 0)
credit = float(line.get("CreditAmount") or 0)
except (TypeError, ValueError):
continue
if debit == 0 and credit == 0:
continue
bucket = movements.setdefault((accNo, year, month), {"debit": 0.0, "credit": 0.0})
bucket["debit"] += debit
bucket["credit"] += credit
results: List[AccountingPeriodBalance] = []
for accNo in sorted(seenAccounts):
if accountNumbersSet is not None and accNo not in accountNumbersSet:
continue
isER = _isIncomeStatementAccount(accNo)
preMinYearBalance = 0.0
if not isER:
for (a, yr, _mo), m in movements.items():
if a == accNo and yr < minYear:
preMinYearBalance += m["debit"] - m["credit"]
cumulativeOpeningOfYear = preMinYearBalance
for year in sortedYears:
yearOpening = 0.0 if isER else cumulativeOpeningOfYear
running = yearOpening
yearDebit = 0.0
yearCredit = 0.0
for month in range(1, 13):
opening = running
mov = movements.get((accNo, year, month), {"debit": 0.0, "credit": 0.0})
running = opening + mov["debit"] - mov["credit"]
yearDebit += mov["debit"]
yearCredit += mov["credit"]
results.append(AccountingPeriodBalance(
accountNumber=accNo,
periodYear=year,
periodMonth=month,
openingBalance=round(opening, 2),
debitTotal=round(mov["debit"], 2),
creditTotal=round(mov["credit"], 2),
closingBalance=round(running, 2),
currency="CHF",
asOfDate=_formatLastDayOfMonth(year, month),
))
results.append(AccountingPeriodBalance(
accountNumber=accNo,
periodYear=year,
periodMonth=0,
openingBalance=round(yearOpening, 2),
debitTotal=round(yearDebit, 2),
creditTotal=round(yearCredit, 2),
closingBalance=round(running, 2),
currency="CHF",
asOfDate=f"{year}-12-31",
))
cumulativeOpeningOfYear = running
logger.info(
"Abacus getAccountBalances: %s rows from %s journal entries (years=%s)",
len(results), len(rawEntries), sortedYears,
)
return results
async def _fetchAllJournalEntries(
self,
config: Dict[str, Any],
headers: Dict[str, str],
dateTo: str,
) -> List[Dict[str, Any]]:
"""Page through ``GeneralJournalEntries`` (OData V4) following ``@odata.nextLink``.
We filter ``JournalDate le dateTo`` to bound the result, but include
ALL prior years (no lower bound) so cumulative balance-sheet
carry-over is correct.
"""
results: List[Dict[str, Any]] = []
baseUrl = self._buildEntityUrl(config, f"GeneralJournalEntries?$filter=JournalDate le {dateTo}")
nextUrl: Optional[str] = baseUrl
async with aiohttp.ClientSession() as session:
while nextUrl:
try:
async with session.get(nextUrl, headers=headers, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status != 200:
body = await resp.text()
logger.warning("Abacus GeneralJournalEntries HTTP %s: %s", resp.status, body[:200])
break
data = await resp.json()
except Exception as ex:
logger.warning("Abacus GeneralJournalEntries request failed: %s", ex)
break
page = data.get("value") or []
if not isinstance(page, list):
break
results.extend(page)
nextUrl = data.get("@odata.nextLink")
return results