gateway/modules/routes/routeDataFiles.py
2026-03-24 14:16:46 +01:00

1004 lines
39 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from fastapi import APIRouter, HTTPException, Depends, File, UploadFile, Form, Path, Request, status, Query, Response, Body
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
import logging
import json
# Import auth module
from modules.auth import limiter, getCurrentUser, getRequestContext, RequestContext
# Import interfaces
import modules.interfaces.interfaceDbManagement as interfaceDbManagement
from modules.datamodels.datamodelFiles import FileItem, FilePreview
from modules.datamodels.datamodelFileFolder import FileFolder
from modules.shared.attributeUtils import getModelAttributeDefinitions
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
# Configure logger
logger = logging.getLogger(__name__)
async def _autoIndexFile(fileId: str, fileName: str, mimeType: str, user):
"""Background task: pre-scan + extraction + knowledge indexing.
Step 1: Structure Pre-Scan (AI-free) -> FileContentIndex (persisted)
Step 2: Content extraction via runExtraction -> ContentParts
Step 3: KnowledgeService.indexFile -> chunking + embedding -> Knowledge Store"""
userId = user.id if hasattr(user, "id") else str(user)
try:
mgmtInterface = interfaceDbManagement.getInterface(user)
mgmtInterface.updateFile(fileId, {"status": "processing"})
rawBytes = mgmtInterface.getFileData(fileId)
if not rawBytes:
logger.warning(f"Auto-index: no file data for {fileId}, skipping")
mgmtInterface.updateFile(fileId, {"status": "active"})
return
file_meta = mgmtInterface.getFile(fileId)
feature_instance_id = ""
mandate_id = ""
if file_meta:
if isinstance(file_meta, dict):
feature_instance_id = file_meta.get("featureInstanceId") or ""
mandate_id = file_meta.get("mandateId") or ""
else:
feature_instance_id = getattr(file_meta, "featureInstanceId", None) or ""
mandate_id = getattr(file_meta, "mandateId", None) or ""
logger.info(f"Auto-index starting for {fileName} ({len(rawBytes)} bytes, {mimeType})")
# Step 1: Structure Pre-Scan (AI-free)
from modules.serviceCenter.services.serviceKnowledge.subPreScan import preScanDocument
contentIndex = await preScanDocument(
fileData=rawBytes,
mimeType=mimeType,
fileId=fileId,
fileName=fileName,
userId=userId,
featureInstanceId=str(feature_instance_id) if feature_instance_id else "",
mandateId=str(mandate_id) if mandate_id else "",
)
logger.info(
f"Pre-scan complete for {fileName}: "
f"{contentIndex.totalObjects} objects"
)
# Persist FileContentIndex immediately
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
knowledgeDb = getKnowledgeInterface()
knowledgeDb.upsertFileContentIndex(contentIndex)
# Step 2: Content extraction (AI-free, produces ContentParts)
from modules.serviceCenter.services.serviceExtraction.subRegistry import ExtractorRegistry, ChunkerRegistry
from modules.serviceCenter.services.serviceExtraction.subPipeline import runExtraction
from modules.datamodels.datamodelExtraction import ExtractionOptions
extractorRegistry = ExtractorRegistry()
chunkerRegistry = ChunkerRegistry()
options = ExtractionOptions()
extracted = runExtraction(
extractorRegistry, chunkerRegistry,
rawBytes, fileName, mimeType, options,
)
contentObjects = []
for part in extracted.parts:
contentType = "text"
if part.typeGroup == "image":
contentType = "image"
elif part.typeGroup in ("binary", "container"):
contentType = "other"
if not part.data or not part.data.strip():
continue
contentObjects.append({
"contentObjectId": part.id,
"contentType": contentType,
"data": part.data,
"contextRef": {
"containerPath": fileName,
"location": part.label or "file",
**(part.metadata or {}),
},
})
logger.info(f"Extracted {len(contentObjects)} content objects from {fileName}")
if not contentObjects:
knowledgeDb.updateFileStatus(fileId, "indexed")
mgmtInterface.updateFile(fileId, {"status": "active"})
return
# Step 3: Knowledge indexing (chunking + embedding)
from modules.serviceCenter import getService
from modules.serviceCenter.context import ServiceCenterContext
ctx = ServiceCenterContext(
user=user,
mandate_id=str(mandate_id) if mandate_id else "",
feature_instance_id=str(feature_instance_id) if feature_instance_id else "",
)
knowledgeService = getService("knowledge", ctx)
await knowledgeService.indexFile(
fileId=fileId,
fileName=fileName,
mimeType=mimeType,
userId=userId,
featureInstanceId=str(feature_instance_id) if feature_instance_id else "",
mandateId=str(mandate_id) if mandate_id else "",
contentObjects=contentObjects,
structure=contentIndex.structure,
)
mgmtInterface.updateFile(fileId, {"status": "active"})
logger.info(f"Auto-index complete for file {fileId} ({fileName})")
except Exception as e:
logger.error(f"Auto-index failed for file {fileId}: {e}", exc_info=True)
try:
errMgmt = interfaceDbManagement.getInterface(user)
errMgmt.updateFile(fileId, {"status": "active"})
except Exception:
pass
# Model attributes for FileItem
fileAttributes = getModelAttributeDefinitions(FileItem)
# Create router for file endpoints
router = APIRouter(
prefix="/api/files",
tags=["Manage Files"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
@router.get("/list", response_model=PaginatedResponse[FileItem])
@limiter.limit("30/minute")
def get_files(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[FileItem]:
"""
Get files with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/files/list (no pagination - returns all items)
- GET /api/files/list?pagination={"page":1,"pageSize":10,"sort":[]}
- GET /api/files/list?pagination={"page":2,"pageSize":20,"sort":[{"field":"fileName","direction":"asc"}]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
managementInterface = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
)
result = managementInterface.getAllFiles(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[FileItem]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting files: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get files: {str(e)}"
)
@router.get("/list/filter-values")
@limiter.limit("60/minute")
def get_file_filter_values(
request: Request,
column: str = Query(..., description="Column key"),
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> list:
"""Return distinct filter values for a column in files."""
try:
managementInterface = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
)
crossFilterPagination = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
filters = paginationDict.get("filters", {})
filters.pop(column, None)
paginationDict["filters"] = filters
paginationDict.pop("sort", None)
crossFilterPagination = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError):
pass
try:
recordFilter = {"_createdBy": managementInterface.userId}
values = managementInterface.db.getDistinctColumnValues(
FileItem, column, crossFilterPagination, recordFilter
)
return sorted(values, key=lambda v: str(v).lower())
except Exception:
from modules.routes.routeDataUsers import _handleFilterValuesRequest
result = managementInterface.getAllFiles(pagination=None)
items = [r.model_dump() if hasattr(r, 'model_dump') else r for r in result]
return _handleFilterValuesRequest(items, column, pagination)
except Exception as e:
logger.error(f"Error getting filter values for files: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.post("/upload", status_code=status.HTTP_201_CREATED)
@limiter.limit("10/minute")
async def upload_file(
request: Request,
file: UploadFile = File(...),
workflowId: Optional[str] = Form(None),
featureInstanceId: Optional[str] = Form(None),
currentUser: User = Depends(getCurrentUser)
) -> JSONResponse:
# Add fileName property to UploadFile for consistency with backend model
file.fileName = file.filename
"""Upload a file"""
try:
managementInterface = interfaceDbManagement.getInterface(currentUser)
# Read file
fileContent = await file.read()
# Check size limits
maxSize = int(interfaceDbManagement.APP_CONFIG.get("File_Management_MAX_UPLOAD_SIZE_MB")) * 1024 * 1024 # in bytes
if len(fileContent) > maxSize:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large. Maximum size: {interfaceDbManagement.APP_CONFIG.get('File_Management_MAX_UPLOAD_SIZE_MB')}MB"
)
# Save file via LucyDOM interface in the database
fileItem, duplicateType = managementInterface.saveUploadedFile(fileContent, file.filename)
if featureInstanceId and not fileItem.featureInstanceId:
managementInterface.updateFile(fileItem.id, {"featureInstanceId": featureInstanceId})
fileItem.featureInstanceId = featureInstanceId
# Determine response message based on duplicate type
if duplicateType == "exact_duplicate":
message = f"File '{file.filename}' already exists with identical content. Reusing existing file."
elif duplicateType == "name_conflict":
message = f"File '{file.filename}' already exists with different content. Uploaded as '{fileItem.fileName}'."
else: # new_file
message = "File uploaded successfully"
# Convert FileItem to dictionary for JSON response
fileMeta = fileItem.model_dump()
# If workflowId is provided, include it in the response (not stored in FileItem model)
if workflowId:
fileMeta["workflowId"] = workflowId
# Trigger background auto-index pipeline (non-blocking)
# Also runs for duplicates in case the original was never successfully indexed
shouldIndex = duplicateType == "new_file"
if not shouldIndex:
try:
from modules.interfaces.interfaceDbKnowledge import getInterface as _getKnowledgeInterface
_kDb = _getKnowledgeInterface()
_existingIndex = _kDb.getFileContentIndex(fileItem.id)
if not _existingIndex:
shouldIndex = True
logger.info(f"Re-triggering auto-index for duplicate {fileItem.id} (not yet indexed)")
except Exception:
shouldIndex = True
if shouldIndex:
try:
import asyncio
asyncio.ensure_future(_autoIndexFile(
fileId=fileItem.id,
fileName=fileItem.fileName,
mimeType=fileItem.mimeType,
user=currentUser,
))
except Exception as indexErr:
logger.warning(f"Auto-index trigger failed (non-blocking): {indexErr}")
# Response with duplicate information
return JSONResponse({
"message": message,
"file": fileMeta,
"duplicateType": duplicateType,
"originalFileName": file.filename,
"storedFileName": fileItem.fileName,
"isDuplicate": duplicateType != "new_file"
})
except interfaceDbManagement.FileStorageError as e:
logger.error(f"Error during file upload (storage): {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
except Exception as e:
logger.error(f"Error during file upload: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error during file upload: {str(e)}"
)
# ── Folder endpoints (MUST be before /{fileId} catch-all) ─────────────────────
@router.get("/folders", response_model=List[Dict[str, Any]])
@limiter.limit("30/minute")
def list_folders(
request: Request,
parentId: Optional[str] = Query(None, description="Parent folder ID (omit for all folders)"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""List folders for the current user."""
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
if parentId is not None:
return mgmt.listFolders(parentId=parentId)
return mgmt.listFolders()
except Exception as e:
logger.error(f"Error listing folders: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/folders", status_code=status.HTTP_201_CREATED)
@limiter.limit("10/minute")
def create_folder(
request: Request,
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Create a new folder."""
name = body.get("name", "")
parentId = body.get("parentId")
if not name:
raise HTTPException(status_code=400, detail="name is required")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
return mgmt.createFolder(name=name, parentId=parentId)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating folder: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/folders/{folderId}")
@limiter.limit("10/minute")
def rename_folder(
request: Request,
folderId: str = Path(...),
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Rename a folder."""
newName = body.get("name", "")
if not newName:
raise HTTPException(status_code=400, detail="name is required")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
mgmt.renameFolder(folderId, newName)
return {"success": True, "folderId": folderId, "name": newName}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error renaming folder: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/folders/{folderId}")
@limiter.limit("10/minute")
def delete_folder(
request: Request,
folderId: str = Path(...),
recursive: bool = Query(False, description="Delete folder contents recursively"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Delete a folder. Use recursive=true to delete non-empty folders."""
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
return mgmt.deleteFolder(folderId, recursive=recursive)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error deleting folder: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/folders/{folderId}/move")
@limiter.limit("10/minute")
def move_folder(
request: Request,
folderId: str = Path(...),
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Move a folder to a new parent."""
targetParentId = body.get("targetParentId")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
mgmt.moveFolder(folderId, targetParentId)
return {"success": True, "folderId": folderId, "parentId": targetParentId}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error moving folder: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/folders/{folderId}/download")
@limiter.limit("10/minute")
def download_folder(
request: Request,
folderId: str = Path(..., description="ID of the folder to download as ZIP"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Response:
"""Download a folder (including subfolders) as a ZIP archive."""
import io
import zipfile
import urllib.parse
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
folder = mgmt.getFolder(folderId)
if not folder:
raise HTTPException(status_code=404, detail=f"Folder {folderId} not found")
folderName = folder.get("name", "download")
def _collectFiles(parentId: str, pathPrefix: str):
"""Recursively collect (zipPath, fileId) tuples."""
entries = []
for f in mgmt._getFilesByCurrentUser(recordFilter={"folderId": parentId}):
fname = f.get("fileName") or f.get("name") or f.get("id", "file")
entries.append((f"{pathPrefix}{fname}", f["id"]))
for sub in mgmt.listFolders(parentId=parentId):
subName = sub.get("name", sub["id"])
entries.extend(_collectFiles(sub["id"], f"{pathPrefix}{subName}/"))
return entries
fileEntries = _collectFiles(folderId, "")
if not fileEntries:
raise HTTPException(status_code=404, detail="Folder is empty")
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for zipPath, fileId in fileEntries:
data = mgmt.getFileData(fileId)
if data:
zf.writestr(zipPath, data)
buf.seek(0)
zipBytes = buf.getvalue()
encodedName = urllib.parse.quote(f"{folderName}.zip")
return Response(
content=zipBytes,
media_type="application/zip",
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encodedName}"
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error downloading folder as ZIP: {e}")
raise HTTPException(status_code=500, detail=f"Error downloading folder: {str(e)}")
@router.post("/batch-delete")
@limiter.limit("10/minute")
def batch_delete_items(
request: Request,
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Batch delete files/folders with a single SQL-backed operation per type."""
fileIds = body.get("fileIds") or []
folderIds = body.get("folderIds") or []
recursiveFolders = bool(body.get("recursiveFolders", True))
if not isinstance(fileIds, list) or not isinstance(folderIds, list):
raise HTTPException(status_code=400, detail="fileIds and folderIds must be arrays")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
result = {"deletedFiles": 0, "deletedFolders": 0}
if fileIds:
fileResult = mgmt.deleteFilesBatch(fileIds)
result["deletedFiles"] += fileResult.get("deletedFiles", 0)
if folderIds:
folderResult = mgmt.deleteFoldersBatch(folderIds, recursive=recursiveFolders)
result["deletedFiles"] += folderResult.get("deletedFiles", 0)
result["deletedFolders"] += folderResult.get("deletedFolders", 0)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error in batch delete: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/batch-move")
@limiter.limit("10/minute")
def batch_move_items(
request: Request,
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Batch move files/folders with a single SQL-backed operation per type."""
fileIds = body.get("fileIds") or []
folderIds = body.get("folderIds") or []
targetFolderId = body.get("targetFolderId")
targetParentId = body.get("targetParentId")
if not isinstance(fileIds, list) or not isinstance(folderIds, list):
raise HTTPException(status_code=400, detail="fileIds and folderIds must be arrays")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
result = {"movedFiles": 0, "movedFolders": 0}
if fileIds:
fileResult = mgmt.moveFilesBatch(fileIds, targetFolderId=targetFolderId)
result["movedFiles"] += fileResult.get("movedFiles", 0)
if folderIds:
folderResult = mgmt.moveFoldersBatch(folderIds, targetParentId=targetParentId)
result["movedFolders"] += folderResult.get("movedFolders", 0)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error in batch move: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ── Scope & neutralize tagging endpoints (before /{fileId} catch-all) ─────────
@router.patch("/{fileId}/scope")
@limiter.limit("30/minute")
def updateFileScope(
request: Request,
fileId: str = Path(..., description="ID of the file"),
scope: str = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Update the scope of a file. Global scope requires sysAdmin."""
try:
validScopes = {"personal", "featureInstance", "mandate", "global"}
if scope not in validScopes:
raise HTTPException(status_code=400, detail=f"Invalid scope: {scope}. Must be one of {validScopes}")
if scope == "global" and not context.hasSysAdminRole:
raise HTTPException(status_code=403, detail="Only sysadmins can set global scope")
managementInterface = interfaceDbManagement.getInterface(
context.user,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
managementInterface.updateFile(fileId, {"scope": scope})
try:
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
from modules.datamodels.datamodelKnowledge import FileContentIndex
knowledgeDb = getKnowledgeInterface()
indices = knowledgeDb.db.getRecordset(FileContentIndex, recordFilter={"id": fileId})
for idx in indices:
idxId = idx.get("id") if isinstance(idx, dict) else getattr(idx, "id", None)
if idxId:
knowledgeDb.db.recordModify(FileContentIndex, idxId, {"scope": scope})
except Exception as e:
logger.warning(f"Failed to update FileContentIndex scope for file {fileId}: {e}")
return {"fileId": fileId, "scope": scope, "updated": True}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating file scope: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{fileId}/neutralize")
@limiter.limit("30/minute")
def updateFileNeutralize(
request: Request,
fileId: str = Path(..., description="ID of the file"),
neutralize: bool = Body(..., embed=True),
context: RequestContext = Depends(getRequestContext),
) -> Dict[str, Any]:
"""Toggle neutralization flag on a file."""
try:
managementInterface = interfaceDbManagement.getInterface(
context.user,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
managementInterface.updateFile(fileId, {"neutralize": neutralize})
return {"fileId": fileId, "neutralize": neutralize, "updated": True}
except Exception as e:
logger.error(f"Error updating file neutralize flag: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ── File endpoints with path parameters (catch-all /{fileId}) ─────────────────
@router.get("/{fileId}", response_model=FileItem)
@limiter.limit("30/minute")
def get_file(
request: Request,
fileId: str = Path(..., description="ID of the file"),
currentUser: User = Depends(getCurrentUser)
) -> FileItem:
"""Get a file"""
try:
managementInterface = interfaceDbManagement.getInterface(currentUser)
# Get file via LucyDOM interface from the database
fileData = managementInterface.getFile(fileId)
if not fileData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
return fileData
except interfaceDbManagement.FileNotFoundError as e:
logger.warning(f"File not found: {str(e)}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)
)
except interfaceDbManagement.FilePermissionError as e:
logger.warning(f"No permission for file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(e)
)
except interfaceDbManagement.FileError as e:
logger.error(f"Error retrieving file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error retrieving file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving file: {str(e)}"
)
@router.put("/{fileId}", response_model=FileItem)
@limiter.limit("10/minute")
def update_file(
request: Request,
fileId: str = Path(..., description="ID of the file to update"),
file_info: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> FileItem:
"""Update file info"""
try:
managementInterface = interfaceDbManagement.getInterface(currentUser)
# Get the file from the database
file = managementInterface.getFile(fileId)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
# Check if user has access to the file using RBAC
if not managementInterface.checkRbacPermission(FileItem, "update", fileId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this file"
)
# Update the file
result = managementInterface.updateFile(fileId, file_info)
if not result:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update file"
)
# Get updated file
updatedFile = managementInterface.getFile(fileId)
return updatedFile
except HTTPException as he:
raise he
except Exception as e:
logger.error(f"Error updating file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
@router.delete("/{fileId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def delete_file(
request: Request,
fileId: str = Path(..., description="ID of the file to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Delete a file"""
managementInterface = interfaceDbManagement.getInterface(currentUser)
# Check if the file exists
existingFile = managementInterface.getFile(fileId)
if not existingFile:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
success = managementInterface.deleteFile(fileId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error deleting the file"
)
return {"message": f"File with ID {fileId} successfully deleted"}
@router.get("/stats", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def get_file_stats(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Returns statistics about the stored files"""
try:
managementInterface = interfaceDbManagement.getInterface(currentUser)
# Get all files - metadata only
allFiles = managementInterface.getAllFiles()
# Calculate statistics
totalFiles = len(allFiles)
totalSize = sum(file.fileSize for file in allFiles)
# Group by file type
fileTypes = {}
for file in allFiles:
fileType = file.mimeType.split("/")[0]
if fileType not in fileTypes:
fileTypes[fileType] = 0
fileTypes[fileType] += 1
return {
"totalFiles": totalFiles,
"totalSizeBytes": totalSize,
"fileTypes": fileTypes
}
except Exception as e:
logger.error(f"Error retrieving file statistics: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error retrieving file statistics: {str(e)}"
)
@router.get("/{fileId}/download")
@limiter.limit("30/minute")
def download_file(
request: Request,
fileId: str = Path(..., description="ID of the file to download"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Response:
"""Download a file. Uses mandate/instance context when present (e.g. from feature pages)."""
try:
managementInterface = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
)
# Get file data
fileData = managementInterface.getFile(fileId)
if not fileData:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found"
)
# Get file content
fileContent = managementInterface.getFileData(fileId)
if not fileContent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File content not found for ID {fileId}"
)
# Return file as response
# Properly encode filename for Content-Disposition header to handle Unicode characters
import urllib.parse
encoded_filename = urllib.parse.quote(fileData.fileName)
return Response(
content=fileContent,
media_type=fileData.mimeType,
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error downloading file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error downloading file: {str(e)}"
)
@router.get("/{fileId}/preview", response_model=FilePreview)
@limiter.limit("30/minute")
def preview_file(
request: Request,
fileId: str = Path(..., description="ID of the file to preview"),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> FilePreview:
"""Preview a file's content. Uses mandate/instance context when present."""
try:
managementInterface = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
)
# Get file preview using the correct method
preview = managementInterface.getFileContent(fileId)
if not preview:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found or no content available"
)
return preview
except HTTPException:
raise
except Exception as e:
logger.error(f"Error previewing file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error previewing file: {str(e)}"
)
@router.post("/{fileId}/move")
@limiter.limit("10/minute")
def move_file(
request: Request,
fileId: str = Path(...),
body: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Move a file to a different folder."""
targetFolderId = body.get("targetFolderId")
try:
mgmt = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None,
)
mgmt.updateFile(fileId, {"folderId": targetFolderId})
return {"success": True, "fileId": fileId, "folderId": targetFolderId}
except Exception as e:
logger.error(f"Error moving file: {e}")
raise HTTPException(status_code=500, detail=str(e))