fixed db stream upload
This commit is contained in:
parent
c4a9a66c60
commit
e7874d8e38
3 changed files with 86 additions and 46 deletions
|
|
@ -26,13 +26,14 @@ from modules.system.databaseHealth import (
|
||||||
_scanOrphans,
|
_scanOrphans,
|
||||||
)
|
)
|
||||||
from modules.system.databaseMigration import (
|
from modules.system.databaseMigration import (
|
||||||
|
_buildIdRemapFromPayload,
|
||||||
_exportDatabases,
|
_exportDatabases,
|
||||||
_exportSingleDb,
|
_exportSingleDb,
|
||||||
_getAvailableDatabases,
|
_getAvailableDatabases,
|
||||||
_getInstanceLabel,
|
_getInstanceLabel,
|
||||||
_importDatabases,
|
_importDatabases,
|
||||||
_importSingleDb,
|
_importSingleDb,
|
||||||
_prepareImport,
|
_loadLiveSystemObjectIds,
|
||||||
_validateImportPayload,
|
_validateImportPayload,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -479,50 +480,75 @@ def getMigrationExportDownload(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/migration/prepare-import")
|
@router.post("/migration/upload-import")
|
||||||
@limiter.limit("5/minute")
|
@limiter.limit("5/minute")
|
||||||
async def postMigrationPrepareImport(
|
async def postMigrationUploadImport(
|
||||||
request: Request,
|
request: Request,
|
||||||
file: UploadFile = File(...),
|
file: UploadFile = File(...),
|
||||||
currentUser: User = Depends(requireSysAdmin),
|
currentUser: User = Depends(requireSysAdmin),
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Validate + remap system-object IDs and return metadata for per-DB import.
|
"""Upload a backup file to disk (chunked, no full RAM load), validate it,
|
||||||
|
and return a token + metadata for per-DB import.
|
||||||
The remapped payload is stored server-side in memory (returned as opaque token)
|
|
||||||
so the frontend can drive per-DB import calls without re-uploading.
|
|
||||||
"""
|
"""
|
||||||
try:
|
import os
|
||||||
rawBytes = await file.read()
|
import tempfile
|
||||||
payload = json.loads(rawBytes.decode("utf-8"))
|
|
||||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
detail=f"Invalid JSON file: {e}",
|
|
||||||
) from e
|
|
||||||
|
|
||||||
logger.info("SysAdmin migration prepare-import: user=%s", currentUser.username)
|
|
||||||
|
|
||||||
result = _prepareImport(payload)
|
|
||||||
if not result.get("valid"):
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
detail={"message": "Payload validation failed", "warnings": result.get("warnings", [])},
|
|
||||||
)
|
|
||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
token = str(uuid.uuid4())
|
token = str(uuid.uuid4())
|
||||||
|
tmpDir = tempfile.gettempdir()
|
||||||
|
filePath = os.path.join(tmpDir, f"poweron_import_{token}.json")
|
||||||
|
|
||||||
|
logger.info("SysAdmin migration upload-import: user=%s streaming to %s", currentUser.username, filePath)
|
||||||
|
|
||||||
|
totalBytes = 0
|
||||||
|
chunkSize = 1024 * 1024
|
||||||
|
try:
|
||||||
|
with open(filePath, "wb") as f:
|
||||||
|
while True:
|
||||||
|
chunk = await file.read(chunkSize)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
f.write(chunk)
|
||||||
|
totalBytes += len(chunk)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Upload-import write failed: %s", e)
|
||||||
|
if os.path.exists(filePath):
|
||||||
|
os.remove(filePath)
|
||||||
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Upload failed: {e}") from e
|
||||||
|
|
||||||
|
logger.info("SysAdmin migration upload-import: %s bytes written to disk", totalBytes)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(filePath, "r", encoding="utf-8") as f:
|
||||||
|
payload = json.load(f)
|
||||||
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||||
|
os.remove(filePath)
|
||||||
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}") from e
|
||||||
|
|
||||||
|
from modules.system.databaseMigration import _prepareImport
|
||||||
|
result = _prepareImport(payload)
|
||||||
|
|
||||||
|
liveIds = _loadLiveSystemObjectIds()
|
||||||
|
remap = _buildIdRemapFromPayload(payload, liveIds)
|
||||||
|
if remap:
|
||||||
|
logger.info("System-object ID remap: %s", remap)
|
||||||
|
from modules.system.databaseMigration import _remapSystemObjectIds
|
||||||
|
_remapSystemObjectIds(payload, remap)
|
||||||
|
|
||||||
|
protectedIds = list(set(liveIds.values()))
|
||||||
|
|
||||||
_pendingImports[token] = {
|
_pendingImports[token] = {
|
||||||
"payload": payload,
|
"payload": payload,
|
||||||
"protectedIds": result["protectedIds"],
|
"protectedIds": protectedIds,
|
||||||
|
"filePath": filePath,
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"valid": True,
|
|
||||||
"token": token,
|
"token": token,
|
||||||
"databases": result["databases"],
|
"valid": result.get("valid", False),
|
||||||
"warnings": result["warnings"],
|
"databases": result.get("databases", []),
|
||||||
"systemObjectsFound": result["systemObjectsFound"],
|
"warnings": result.get("warnings", []),
|
||||||
"protectedIds": result["protectedIds"],
|
"systemObjectsFound": result.get("systemObjectsFound", []),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -536,7 +562,7 @@ def postMigrationImportSingle(
|
||||||
body: dict,
|
body: dict,
|
||||||
currentUser: User = Depends(requireSysAdmin),
|
currentUser: User = Depends(requireSysAdmin),
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Import a single database from a previously prepared payload.
|
"""Import a single database from a previously uploaded + prepared payload.
|
||||||
|
|
||||||
Body: ``{token, database, mode}``
|
Body: ``{token, database, mode}``
|
||||||
"""
|
"""
|
||||||
|
|
@ -545,17 +571,11 @@ def postMigrationImportSingle(
|
||||||
mode = body.get("mode", "merge")
|
mode = body.get("mode", "merge")
|
||||||
|
|
||||||
if mode not in ("replace", "merge"):
|
if mode not in ("replace", "merge"):
|
||||||
raise HTTPException(
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.")
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
detail=f"Invalid mode: '{mode}'.",
|
|
||||||
)
|
|
||||||
|
|
||||||
pending = _pendingImports.get(token)
|
pending = _pendingImports.get(token)
|
||||||
if not pending:
|
if not pending:
|
||||||
raise HTTPException(
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token.")
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
detail="Invalid or expired import token. Please re-upload the file.",
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info("SysAdmin migration import-single: user=%s db=%s mode=%s", currentUser.username, database, mode)
|
logger.info("SysAdmin migration import-single: user=%s db=%s mode=%s", currentUser.username, database, mode)
|
||||||
|
|
||||||
|
|
@ -578,8 +598,14 @@ def postMigrationImportDone(
|
||||||
body: dict,
|
body: dict,
|
||||||
currentUser: User = Depends(requireSysAdmin),
|
currentUser: User = Depends(requireSysAdmin),
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Clean up the server-side payload cache after import is complete."""
|
"""Clean up the server-side payload cache and temp file."""
|
||||||
|
import os
|
||||||
|
|
||||||
token = body.get("token", "")
|
token = body.get("token", "")
|
||||||
if token in _pendingImports:
|
pending = _pendingImports.pop(token, None)
|
||||||
del _pendingImports[token]
|
if pending and pending.get("filePath"):
|
||||||
|
try:
|
||||||
|
os.remove(pending["filePath"])
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
|
|
|
||||||
|
|
@ -303,6 +303,18 @@ def _remapSystemObjectIds(payload: dict, remap: Dict[str, str]) -> dict:
|
||||||
return payload
|
return payload
|
||||||
|
|
||||||
|
|
||||||
|
def _remapDbTables(tables: dict, remap: Dict[str, str]) -> None:
|
||||||
|
"""In-place remap system-object IDs in a single DB's tables dict."""
|
||||||
|
if not remap:
|
||||||
|
return
|
||||||
|
remapSet = set(remap.keys())
|
||||||
|
for tableName, rows in tables.items():
|
||||||
|
if not isinstance(rows, list):
|
||||||
|
continue
|
||||||
|
for row in rows:
|
||||||
|
_remapRowValues(row, remap, remapSet)
|
||||||
|
|
||||||
|
|
||||||
def _remapRowValues(row: dict, remap: Dict[str, str], remapSet: Set[str]) -> None:
|
def _remapRowValues(row: dict, remap: Dict[str, str], remapSet: Set[str]) -> None:
|
||||||
"""In-place replace string values in a row dict that match a remap key."""
|
"""In-place replace string values in a row dict that match a remap key."""
|
||||||
for key, val in row.items():
|
for key, val in row.items():
|
||||||
|
|
|
||||||
|
|
@ -31,11 +31,13 @@ openpyxl>=3.1.2 # Für Excel-Dateien
|
||||||
python-pptx>=0.6.21 # Für PowerPoint-Dateien
|
python-pptx>=0.6.21 # Für PowerPoint-Dateien
|
||||||
|
|
||||||
## Data Processing & Analysis
|
## Data Processing & Analysis
|
||||||
numpy==1.26.3 # Version die mit pandas und matplotlib kompatibel ist
|
numpy==1.26.3; python_version < "3.13"
|
||||||
pandas==2.2.3 # Aktuelle Version beibehalten
|
numpy>=2.1.0; python_version >= "3.13"
|
||||||
|
pandas==2.2.3
|
||||||
|
|
||||||
## Data Visualization
|
## Data Visualization
|
||||||
matplotlib==3.8.0 # Aktuelle Version beibehalten
|
matplotlib==3.8.0; python_version < "3.13"
|
||||||
|
matplotlib>=3.9.0; python_version >= "3.13"
|
||||||
seaborn==0.13.0
|
seaborn==0.13.0
|
||||||
markdown
|
markdown
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue