208 lines
7 KiB
Python
208 lines
7 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""Video extractor for common video formats.
|
|
|
|
Extracts metadata (duration, resolution, codec, bitrate) and produces
|
|
a `videostream` ContentPart. Video data is never base64-encoded due to size.
|
|
|
|
Optional dependency: mutagen (for rich metadata from MP4/WebM containers).
|
|
"""
|
|
|
|
from typing import Any, Dict, List
|
|
import logging
|
|
import struct
|
|
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subUtils import makeId
|
|
from ..subRegistry import Extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_VIDEO_MIME_TYPES = [
|
|
"video/mp4",
|
|
"video/webm",
|
|
"video/x-msvideo",
|
|
"video/avi",
|
|
"video/quicktime",
|
|
"video/x-matroska",
|
|
"video/x-ms-wmv",
|
|
"video/mpeg",
|
|
"video/ogg",
|
|
]
|
|
_VIDEO_EXTENSIONS = [".mp4", ".webm", ".avi", ".mov", ".mkv", ".wmv", ".mpeg", ".mpg", ".ogv"]
|
|
|
|
|
|
class VideoExtractor(Extractor):
|
|
"""Extractor for video files.
|
|
|
|
Produces:
|
|
- 1 text ContentPart with metadata summary
|
|
- 1 videostream ContentPart (no inline data -- too large)
|
|
"""
|
|
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
if mimeType in _VIDEO_MIME_TYPES:
|
|
return True
|
|
lower = (fileName or "").lower()
|
|
return any(lower.endswith(ext) for ext in _VIDEO_EXTENSIONS)
|
|
|
|
def getSupportedExtensions(self) -> list[str]:
|
|
return list(_VIDEO_EXTENSIONS)
|
|
|
|
def getSupportedMimeTypes(self) -> list[str]:
|
|
return list(_VIDEO_MIME_TYPES)
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
fileName = context.get("fileName", "video")
|
|
mimeType = context.get("mimeType") or "video/mp4"
|
|
fileSize = len(fileBytes)
|
|
|
|
rootId = makeId()
|
|
parts: List[ContentPart] = []
|
|
|
|
meta = _extractMetadata(fileBytes, fileName)
|
|
meta["size"] = fileSize
|
|
meta["fileName"] = fileName
|
|
meta["mimeType"] = mimeType
|
|
|
|
metaLines = [f"Video file: {fileName}"]
|
|
if meta.get("duration"):
|
|
mins = int(meta["duration"] // 60)
|
|
secs = int(meta["duration"] % 60)
|
|
metaLines.append(f"Duration: {mins}:{secs:02d}")
|
|
if meta.get("width") and meta.get("height"):
|
|
metaLines.append(f"Resolution: {meta['width']}x{meta['height']}")
|
|
if meta.get("codec"):
|
|
metaLines.append(f"Codec: {meta['codec']}")
|
|
if meta.get("bitrate"):
|
|
metaLines.append(f"Bitrate: {meta['bitrate']} kbps")
|
|
if meta.get("fps"):
|
|
metaLines.append(f"FPS: {meta['fps']}")
|
|
metaLines.append(f"Size: {fileSize:,} bytes")
|
|
|
|
parts.append(ContentPart(
|
|
id=rootId, parentId=None, label="metadata",
|
|
typeGroup="text", mimeType="text/plain",
|
|
data="\n".join(metaLines), metadata=meta,
|
|
))
|
|
|
|
parts.append(ContentPart(
|
|
id=makeId(), parentId=rootId, label="videostream",
|
|
typeGroup="videostream", mimeType=mimeType,
|
|
data="", metadata={"size": fileSize, "inlined": False},
|
|
))
|
|
|
|
return parts
|
|
|
|
|
|
def _extractMetadata(fileBytes: bytes, fileName: str) -> Dict[str, Any]:
|
|
"""Extract video metadata using mutagen (optional) with basic fallback."""
|
|
meta: Dict[str, Any] = {}
|
|
|
|
try:
|
|
import mutagen
|
|
import io
|
|
mediaFile = mutagen.File(io.BytesIO(fileBytes))
|
|
if mediaFile is not None and mediaFile.info:
|
|
meta["duration"] = getattr(mediaFile.info, "length", None)
|
|
meta["bitrate"] = getattr(mediaFile.info, "bitrate", None)
|
|
if meta["bitrate"]:
|
|
meta["bitrate"] = meta["bitrate"] // 1000
|
|
|
|
if hasattr(mediaFile.info, "video"):
|
|
for stream in (mediaFile.info.video if isinstance(mediaFile.info.video, list) else [mediaFile.info.video]):
|
|
if hasattr(stream, "width"):
|
|
meta["width"] = stream.width
|
|
if hasattr(stream, "height"):
|
|
meta["height"] = stream.height
|
|
if hasattr(stream, "codec"):
|
|
meta["codec"] = stream.codec
|
|
|
|
width = getattr(mediaFile.info, "width", None)
|
|
height = getattr(mediaFile.info, "height", None)
|
|
if width and height:
|
|
meta["width"] = width
|
|
meta["height"] = height
|
|
|
|
fps = getattr(mediaFile.info, "fps", None)
|
|
if fps:
|
|
meta["fps"] = round(fps, 2)
|
|
|
|
codec = getattr(mediaFile.info, "codec", None)
|
|
if codec:
|
|
meta["codec"] = codec
|
|
|
|
return {k: v for k, v in meta.items() if v is not None}
|
|
except ImportError:
|
|
logger.debug("mutagen not installed -- using basic video metadata extraction")
|
|
except Exception as e:
|
|
logger.debug(f"mutagen video metadata extraction failed: {e}")
|
|
|
|
lower = fileName.lower()
|
|
if lower.endswith(".mp4"):
|
|
meta.update(_parseMp4Header(fileBytes))
|
|
elif lower.endswith(".avi"):
|
|
meta.update(_parseAviHeader(fileBytes))
|
|
|
|
return {k: v for k, v in meta.items() if v is not None}
|
|
|
|
|
|
def _parseMp4Header(fileBytes: bytes) -> Dict[str, Any]:
|
|
"""Minimal MP4 moov/mvhd parser for duration and timescale."""
|
|
meta: Dict[str, Any] = {}
|
|
try:
|
|
pos = 0
|
|
while pos < len(fileBytes) - 8:
|
|
boxSize = struct.unpack_from(">I", fileBytes, pos)[0]
|
|
boxType = fileBytes[pos + 4:pos + 8]
|
|
if boxSize < 8:
|
|
break
|
|
if boxType == b"moov":
|
|
meta.update(_parseMoovBox(fileBytes[pos + 8:pos + boxSize]))
|
|
break
|
|
pos += boxSize
|
|
except Exception:
|
|
pass
|
|
return meta
|
|
|
|
|
|
def _parseMoovBox(data: bytes) -> Dict[str, Any]:
|
|
"""Parse moov box to find mvhd with duration."""
|
|
meta: Dict[str, Any] = {}
|
|
pos = 0
|
|
while pos < len(data) - 8:
|
|
try:
|
|
boxSize = struct.unpack_from(">I", data, pos)[0]
|
|
boxType = data[pos + 4:pos + 8]
|
|
if boxSize < 8:
|
|
break
|
|
if boxType == b"mvhd":
|
|
version = data[pos + 8]
|
|
if version == 0 and pos + 28 < len(data):
|
|
timeScale = struct.unpack_from(">I", data, pos + 20)[0]
|
|
duration = struct.unpack_from(">I", data, pos + 24)[0]
|
|
if timeScale > 0:
|
|
meta["duration"] = duration / timeScale
|
|
break
|
|
pos += boxSize
|
|
except Exception:
|
|
break
|
|
return meta
|
|
|
|
|
|
def _parseAviHeader(fileBytes: bytes) -> Dict[str, Any]:
|
|
"""Minimal AVI header parser for resolution."""
|
|
meta: Dict[str, Any] = {}
|
|
if len(fileBytes) < 72:
|
|
return meta
|
|
try:
|
|
if fileBytes[:4] != b"RIFF" or fileBytes[8:12] != b"AVI ":
|
|
return meta
|
|
width = struct.unpack_from("<I", fileBytes, 64)[0]
|
|
height = struct.unpack_from("<I", fileBytes, 68)[0]
|
|
if 0 < width < 100000 and 0 < height < 100000:
|
|
meta["width"] = width
|
|
meta["height"] = height
|
|
except Exception:
|
|
pass
|
|
return meta
|