gateway/modules/serviceCenter/services/serviceExtraction/extractors/extractorVideo.py
2026-03-15 23:38:21 +01:00

208 lines
7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Video extractor for common video formats.
Extracts metadata (duration, resolution, codec, bitrate) and produces
a `videostream` ContentPart. Video data is never base64-encoded due to size.
Optional dependency: mutagen (for rich metadata from MP4/WebM containers).
"""
from typing import Any, Dict, List
import logging
import struct
from modules.datamodels.datamodelExtraction import ContentPart
from ..subUtils import makeId
from ..subRegistry import Extractor
logger = logging.getLogger(__name__)
_VIDEO_MIME_TYPES = [
"video/mp4",
"video/webm",
"video/x-msvideo",
"video/avi",
"video/quicktime",
"video/x-matroska",
"video/x-ms-wmv",
"video/mpeg",
"video/ogg",
]
_VIDEO_EXTENSIONS = [".mp4", ".webm", ".avi", ".mov", ".mkv", ".wmv", ".mpeg", ".mpg", ".ogv"]
class VideoExtractor(Extractor):
"""Extractor for video files.
Produces:
- 1 text ContentPart with metadata summary
- 1 videostream ContentPart (no inline data -- too large)
"""
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
if mimeType in _VIDEO_MIME_TYPES:
return True
lower = (fileName or "").lower()
return any(lower.endswith(ext) for ext in _VIDEO_EXTENSIONS)
def getSupportedExtensions(self) -> list[str]:
return list(_VIDEO_EXTENSIONS)
def getSupportedMimeTypes(self) -> list[str]:
return list(_VIDEO_MIME_TYPES)
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
fileName = context.get("fileName", "video")
mimeType = context.get("mimeType") or "video/mp4"
fileSize = len(fileBytes)
rootId = makeId()
parts: List[ContentPart] = []
meta = _extractMetadata(fileBytes, fileName)
meta["size"] = fileSize
meta["fileName"] = fileName
meta["mimeType"] = mimeType
metaLines = [f"Video file: {fileName}"]
if meta.get("duration"):
mins = int(meta["duration"] // 60)
secs = int(meta["duration"] % 60)
metaLines.append(f"Duration: {mins}:{secs:02d}")
if meta.get("width") and meta.get("height"):
metaLines.append(f"Resolution: {meta['width']}x{meta['height']}")
if meta.get("codec"):
metaLines.append(f"Codec: {meta['codec']}")
if meta.get("bitrate"):
metaLines.append(f"Bitrate: {meta['bitrate']} kbps")
if meta.get("fps"):
metaLines.append(f"FPS: {meta['fps']}")
metaLines.append(f"Size: {fileSize:,} bytes")
parts.append(ContentPart(
id=rootId, parentId=None, label="metadata",
typeGroup="text", mimeType="text/plain",
data="\n".join(metaLines), metadata=meta,
))
parts.append(ContentPart(
id=makeId(), parentId=rootId, label="videostream",
typeGroup="videostream", mimeType=mimeType,
data="", metadata={"size": fileSize, "inlined": False},
))
return parts
def _extractMetadata(fileBytes: bytes, fileName: str) -> Dict[str, Any]:
"""Extract video metadata using mutagen (optional) with basic fallback."""
meta: Dict[str, Any] = {}
try:
import mutagen
import io
mediaFile = mutagen.File(io.BytesIO(fileBytes))
if mediaFile is not None and mediaFile.info:
meta["duration"] = getattr(mediaFile.info, "length", None)
meta["bitrate"] = getattr(mediaFile.info, "bitrate", None)
if meta["bitrate"]:
meta["bitrate"] = meta["bitrate"] // 1000
if hasattr(mediaFile.info, "video"):
for stream in (mediaFile.info.video if isinstance(mediaFile.info.video, list) else [mediaFile.info.video]):
if hasattr(stream, "width"):
meta["width"] = stream.width
if hasattr(stream, "height"):
meta["height"] = stream.height
if hasattr(stream, "codec"):
meta["codec"] = stream.codec
width = getattr(mediaFile.info, "width", None)
height = getattr(mediaFile.info, "height", None)
if width and height:
meta["width"] = width
meta["height"] = height
fps = getattr(mediaFile.info, "fps", None)
if fps:
meta["fps"] = round(fps, 2)
codec = getattr(mediaFile.info, "codec", None)
if codec:
meta["codec"] = codec
return {k: v for k, v in meta.items() if v is not None}
except ImportError:
logger.debug("mutagen not installed -- using basic video metadata extraction")
except Exception as e:
logger.debug(f"mutagen video metadata extraction failed: {e}")
lower = fileName.lower()
if lower.endswith(".mp4"):
meta.update(_parseMp4Header(fileBytes))
elif lower.endswith(".avi"):
meta.update(_parseAviHeader(fileBytes))
return {k: v for k, v in meta.items() if v is not None}
def _parseMp4Header(fileBytes: bytes) -> Dict[str, Any]:
"""Minimal MP4 moov/mvhd parser for duration and timescale."""
meta: Dict[str, Any] = {}
try:
pos = 0
while pos < len(fileBytes) - 8:
boxSize = struct.unpack_from(">I", fileBytes, pos)[0]
boxType = fileBytes[pos + 4:pos + 8]
if boxSize < 8:
break
if boxType == b"moov":
meta.update(_parseMoovBox(fileBytes[pos + 8:pos + boxSize]))
break
pos += boxSize
except Exception:
pass
return meta
def _parseMoovBox(data: bytes) -> Dict[str, Any]:
"""Parse moov box to find mvhd with duration."""
meta: Dict[str, Any] = {}
pos = 0
while pos < len(data) - 8:
try:
boxSize = struct.unpack_from(">I", data, pos)[0]
boxType = data[pos + 4:pos + 8]
if boxSize < 8:
break
if boxType == b"mvhd":
version = data[pos + 8]
if version == 0 and pos + 28 < len(data):
timeScale = struct.unpack_from(">I", data, pos + 20)[0]
duration = struct.unpack_from(">I", data, pos + 24)[0]
if timeScale > 0:
meta["duration"] = duration / timeScale
break
pos += boxSize
except Exception:
break
return meta
def _parseAviHeader(fileBytes: bytes) -> Dict[str, Any]:
"""Minimal AVI header parser for resolution."""
meta: Dict[str, Any] = {}
if len(fileBytes) < 72:
return meta
try:
if fileBytes[:4] != b"RIFF" or fileBytes[8:12] != b"AVI ":
return meta
width = struct.unpack_from("<I", fileBytes, 64)[0]
height = struct.unpack_from("<I", fileBytes, 68)[0]
if 0 < width < 100000 and 0 < height < 100000:
meta["width"] = width
meta["height"] = height
except Exception:
pass
return meta