db-export streaming
This commit is contained in:
parent
2b58f7a45d
commit
f24b67ed85
2 changed files with 163 additions and 0 deletions
|
|
@ -36,6 +36,7 @@ from modules.system.databaseMigration import (
|
||||||
_importSingleDb,
|
_importSingleDb,
|
||||||
_prepareImport,
|
_prepareImport,
|
||||||
_validateImportPayload,
|
_validateImportPayload,
|
||||||
|
streamExportGenerator,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -478,6 +479,53 @@ def getMigrationExportSingle(
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/migration/export-stream")
|
||||||
|
@limiter.limit("2/minute")
|
||||||
|
def getMigrationExportStream(
|
||||||
|
request: Request,
|
||||||
|
databases: str = "all",
|
||||||
|
currentUser: User = Depends(requireSysAdmin),
|
||||||
|
):
|
||||||
|
"""Stream a full database export as a single JSON file download.
|
||||||
|
|
||||||
|
Uses server-side cursors and row-by-row serialization so that neither
|
||||||
|
backend memory nor browser JS heap is exhausted — works for any DB size.
|
||||||
|
"""
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from modules.shared.dbRegistry import getRegisteredDatabases
|
||||||
|
|
||||||
|
registeredDbs = getRegisteredDatabases()
|
||||||
|
|
||||||
|
if databases == "all":
|
||||||
|
dbList = sorted(registeredDbs.keys())
|
||||||
|
else:
|
||||||
|
dbList = [db.strip() for db in databases.split(",") if db.strip()]
|
||||||
|
invalid = [db for db in dbList if db not in registeredDbs]
|
||||||
|
if invalid:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=f"Unknown databases: {', '.join(invalid)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
if not dbList:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail="No databases selected for export.",
|
||||||
|
)
|
||||||
|
|
||||||
|
instanceLabel = _getInstanceLabel()
|
||||||
|
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ")
|
||||||
|
filename = f"export_{instanceLabel}_{timestamp}.json" if instanceLabel else f"export_{timestamp}.json"
|
||||||
|
|
||||||
|
logger.info("SysAdmin stream export: user=%s databases=%s", currentUser.username, dbList)
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
streamExportGenerator(dbList, instanceLabel),
|
||||||
|
media_type="application/json",
|
||||||
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _processUploadedFile(filePath: str, tmpDir: str, token: str) -> dict:
|
def _processUploadedFile(filePath: str, tmpDir: str, token: str) -> dict:
|
||||||
"""Parse JSON, validate, remap, split into per-DB files.
|
"""Parse JSON, validate, remap, split into per-DB files.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -180,6 +180,121 @@ def _readTableRows(conn, tableName: str) -> List[dict]:
|
||||||
return [{k: _jsonSafe(v) for k, v in dict(row).items()} for row in cur.fetchall()]
|
return [{k: _jsonSafe(v) for k, v in dict(row).items()} for row in cur.fetchall()]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Streaming Export (memory-safe, handles arbitrarily large databases)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def streamExportGenerator(databases: List[str], instanceLabel: str = ""):
|
||||||
|
"""Yield JSON fragments for a streaming database export.
|
||||||
|
|
||||||
|
Writes valid JSON incrementally (row-by-row, table-by-table) so that
|
||||||
|
neither the backend RAM nor the browser JS heap is saturated — works
|
||||||
|
for databases of any size.
|
||||||
|
|
||||||
|
The output format is identical to the non-streaming _exportDatabases():
|
||||||
|
{"meta": {...}, "databases": {"dbName": {"tables": {"tbl": [rows]}, ...}}}
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
|
||||||
|
registeredDbs = getRegisteredDatabases()
|
||||||
|
validDbs = [db for db in databases if db in registeredDbs]
|
||||||
|
|
||||||
|
totalDbs = 0
|
||||||
|
totalTables = 0
|
||||||
|
totalRecords = 0
|
||||||
|
|
||||||
|
yield '{"meta":'
|
||||||
|
metaPlaceholder = json.dumps({
|
||||||
|
"exportedAt": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"version": _EXPORT_FORMAT_VERSION,
|
||||||
|
"instanceLabel": instanceLabel,
|
||||||
|
"databaseCount": "<<PLACEHOLDER>>",
|
||||||
|
}, ensure_ascii=False)
|
||||||
|
yield metaPlaceholder
|
||||||
|
yield ',"databases":{'
|
||||||
|
|
||||||
|
firstDb = True
|
||||||
|
for dbName in validDbs:
|
||||||
|
excluded = _EXCLUDED_TABLES.get(dbName, set())
|
||||||
|
conn = None
|
||||||
|
try:
|
||||||
|
conn = _getConnection(dbName)
|
||||||
|
allTables = _listTables(conn)
|
||||||
|
modelTables = _getModelTablesForDb(dbName, allTables)
|
||||||
|
|
||||||
|
if not firstDb:
|
||||||
|
yield ','
|
||||||
|
firstDb = False
|
||||||
|
|
||||||
|
yield json.dumps(dbName, ensure_ascii=False)
|
||||||
|
yield ':{"tables":{'
|
||||||
|
|
||||||
|
firstTable = True
|
||||||
|
dbTableCount = 0
|
||||||
|
dbRecordCount = 0
|
||||||
|
|
||||||
|
for tbl in modelTables:
|
||||||
|
if tbl in excluded:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not firstTable:
|
||||||
|
yield ','
|
||||||
|
firstTable = False
|
||||||
|
|
||||||
|
yield json.dumps(tbl, ensure_ascii=False)
|
||||||
|
yield ':['
|
||||||
|
|
||||||
|
with conn.cursor(name=f"export_{dbName}_{tbl}") as cur:
|
||||||
|
cur.itersize = 2000
|
||||||
|
cur.execute(f'SELECT * FROM "{tbl}"')
|
||||||
|
|
||||||
|
firstRow = True
|
||||||
|
rowCount = 0
|
||||||
|
for row in cur:
|
||||||
|
if not firstRow:
|
||||||
|
yield ','
|
||||||
|
firstRow = False
|
||||||
|
safeRow = {k: _jsonSafe(v) for k, v in dict(row).items()}
|
||||||
|
yield json.dumps(safeRow, ensure_ascii=False, default=str)
|
||||||
|
rowCount += 1
|
||||||
|
|
||||||
|
yield ']'
|
||||||
|
dbTableCount += 1
|
||||||
|
dbRecordCount += rowCount
|
||||||
|
|
||||||
|
yield '},"summary":{'
|
||||||
|
firstSummaryTable = True
|
||||||
|
for tbl in modelTables:
|
||||||
|
if tbl in excluded:
|
||||||
|
continue
|
||||||
|
if not firstSummaryTable:
|
||||||
|
yield ','
|
||||||
|
firstSummaryTable = False
|
||||||
|
yield json.dumps(tbl, ensure_ascii=False)
|
||||||
|
yield ':{"recordCount":0}'
|
||||||
|
yield '}'
|
||||||
|
yield f',"tableCount":{dbTableCount},"totalRecords":{dbRecordCount}'
|
||||||
|
yield '}'
|
||||||
|
|
||||||
|
totalDbs += 1
|
||||||
|
totalTables += dbTableCount
|
||||||
|
totalRecords += dbRecordCount
|
||||||
|
logger.info("Stream export: %s done (%d tables, %d records)", dbName, dbTableCount, dbRecordCount)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Stream export failed for %s: %s", dbName, e)
|
||||||
|
if not firstDb or not firstTable:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
if conn:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
yield '}}'
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Validate
|
# Validate
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue