fix import token lost across workers: persist token metadata to disk instead of in-memory dict
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
51ac15e501
commit
cb6b88aa3c
1 changed files with 30 additions and 8 deletions
|
|
@ -7,6 +7,7 @@ and database migration (backup / restore).
|
|||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status
|
||||
|
|
@ -570,13 +571,34 @@ async def postMigrationUploadImport(
|
|||
fileSizeMb = round(totalBytes / (1024 * 1024), 1)
|
||||
logger.info("SysAdmin migration upload-import: %s bytes on disk (%.1f MB)", totalBytes, fileSizeMb)
|
||||
|
||||
_pendingProcessing[token] = {"filePath": filePath, "tmpDir": tmpDir}
|
||||
_writeTokenMeta(token, "processing", {"filePath": filePath, "tmpDir": tmpDir})
|
||||
|
||||
return {"token": token, "fileSizeMb": fileSizeMb}
|
||||
|
||||
|
||||
_pendingProcessing: Dict[str, dict] = {}
|
||||
_pendingImports: Dict[str, dict] = {}
|
||||
def _tokenMetaPath(token: str, kind: str) -> str:
|
||||
import tempfile
|
||||
return os.path.join(tempfile.gettempdir(), f"poweron_{kind}_{token}.meta.json")
|
||||
|
||||
|
||||
def _writeTokenMeta(token: str, kind: str, data: dict):
|
||||
path = _tokenMetaPath(token, kind)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False)
|
||||
|
||||
|
||||
def _readTokenMeta(token: str, kind: str, pop: bool = False) -> dict | None:
|
||||
path = _tokenMetaPath(token, kind)
|
||||
if not os.path.exists(path):
|
||||
return None
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if pop:
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
return data
|
||||
|
||||
|
||||
@router.get("/migration/process-import-stream")
|
||||
|
|
@ -598,7 +620,7 @@ def getProcessImportStream(
|
|||
import queue
|
||||
import threading
|
||||
|
||||
pending = _pendingProcessing.pop(token, None)
|
||||
pending = _readTokenMeta(token, "processing", pop=True)
|
||||
if not pending:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Invalid or expired processing token.")
|
||||
|
|
@ -643,10 +665,10 @@ def getProcessImportStream(
|
|||
except OSError:
|
||||
pass
|
||||
|
||||
_pendingImports[token] = {
|
||||
_writeTokenMeta(token, "import", {
|
||||
"dbFiles": dbFiles,
|
||||
"protectedIds": protectedIds,
|
||||
}
|
||||
})
|
||||
|
||||
q.put({"phase": "done", "result": {
|
||||
"token": token,
|
||||
|
|
@ -704,7 +726,7 @@ def postMigrationImportSingle(
|
|||
if mode not in ("replace", "merge"):
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.")
|
||||
|
||||
pending = _pendingImports.get(token)
|
||||
pending = _readTokenMeta(token, "import")
|
||||
if not pending:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token.")
|
||||
|
||||
|
|
@ -749,7 +771,7 @@ def postMigrationImportDone(
|
|||
import os
|
||||
|
||||
token = body.get("token", "")
|
||||
pending = _pendingImports.pop(token, None)
|
||||
pending = _readTokenMeta(token, "import", pop=True)
|
||||
if pending:
|
||||
for dbEntry in pending.get("dbFiles", {}).values():
|
||||
if isinstance(dbEntry, str):
|
||||
|
|
|
|||
Loading…
Reference in a new issue