# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Video extractor for common video formats. Extracts metadata (duration, resolution, codec, bitrate) and produces a `videostream` ContentPart. Video data is never base64-encoded due to size. Optional dependency: mutagen (for rich metadata from MP4/WebM containers). """ from typing import Any, Dict, List import logging import struct from modules.datamodels.datamodelExtraction import ContentPart from ..subUtils import makeId from ..subRegistry import Extractor logger = logging.getLogger(__name__) _VIDEO_MIME_TYPES = [ "video/mp4", "video/webm", "video/x-msvideo", "video/avi", "video/quicktime", "video/x-matroska", "video/x-ms-wmv", "video/mpeg", "video/ogg", ] _VIDEO_EXTENSIONS = [".mp4", ".webm", ".avi", ".mov", ".mkv", ".wmv", ".mpeg", ".mpg", ".ogv"] class VideoExtractor(Extractor): """Extractor for video files. Produces: - 1 text ContentPart with metadata summary - 1 videostream ContentPart (no inline data -- too large) """ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: if mimeType in _VIDEO_MIME_TYPES: return True lower = (fileName or "").lower() return any(lower.endswith(ext) for ext in _VIDEO_EXTENSIONS) def getSupportedExtensions(self) -> list[str]: return list(_VIDEO_EXTENSIONS) def getSupportedMimeTypes(self) -> list[str]: return list(_VIDEO_MIME_TYPES) def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: fileName = context.get("fileName", "video") mimeType = context.get("mimeType") or "video/mp4" fileSize = len(fileBytes) rootId = makeId() parts: List[ContentPart] = [] meta = _extractMetadata(fileBytes, fileName) meta["size"] = fileSize meta["fileName"] = fileName meta["mimeType"] = mimeType metaLines = [f"Video file: {fileName}"] if meta.get("duration"): mins = int(meta["duration"] // 60) secs = int(meta["duration"] % 60) metaLines.append(f"Duration: {mins}:{secs:02d}") if meta.get("width") and meta.get("height"): metaLines.append(f"Resolution: {meta['width']}x{meta['height']}") if meta.get("codec"): metaLines.append(f"Codec: {meta['codec']}") if meta.get("bitrate"): metaLines.append(f"Bitrate: {meta['bitrate']} kbps") if meta.get("fps"): metaLines.append(f"FPS: {meta['fps']}") metaLines.append(f"Size: {fileSize:,} bytes") parts.append(ContentPart( id=rootId, parentId=None, label="metadata", typeGroup="text", mimeType="text/plain", data="\n".join(metaLines), metadata=meta, )) parts.append(ContentPart( id=makeId(), parentId=rootId, label="videostream", typeGroup="videostream", mimeType=mimeType, data="", metadata={"size": fileSize, "inlined": False}, )) return parts def _extractMetadata(fileBytes: bytes, fileName: str) -> Dict[str, Any]: """Extract video metadata using mutagen (optional) with basic fallback.""" meta: Dict[str, Any] = {} try: import mutagen import io mediaFile = mutagen.File(io.BytesIO(fileBytes)) if mediaFile is not None and mediaFile.info: meta["duration"] = getattr(mediaFile.info, "length", None) meta["bitrate"] = getattr(mediaFile.info, "bitrate", None) if meta["bitrate"]: meta["bitrate"] = meta["bitrate"] // 1000 if hasattr(mediaFile.info, "video"): for stream in (mediaFile.info.video if isinstance(mediaFile.info.video, list) else [mediaFile.info.video]): if hasattr(stream, "width"): meta["width"] = stream.width if hasattr(stream, "height"): meta["height"] = stream.height if hasattr(stream, "codec"): meta["codec"] = stream.codec width = getattr(mediaFile.info, "width", None) height = getattr(mediaFile.info, "height", None) if width and height: meta["width"] = width meta["height"] = height fps = getattr(mediaFile.info, "fps", None) if fps: meta["fps"] = round(fps, 2) codec = getattr(mediaFile.info, "codec", None) if codec: meta["codec"] = codec return {k: v for k, v in meta.items() if v is not None} except ImportError: logger.debug("mutagen not installed -- using basic video metadata extraction") except Exception as e: logger.debug(f"mutagen video metadata extraction failed: {e}") lower = fileName.lower() if lower.endswith(".mp4"): meta.update(_parseMp4Header(fileBytes)) elif lower.endswith(".avi"): meta.update(_parseAviHeader(fileBytes)) return {k: v for k, v in meta.items() if v is not None} def _parseMp4Header(fileBytes: bytes) -> Dict[str, Any]: """Minimal MP4 moov/mvhd parser for duration and timescale.""" meta: Dict[str, Any] = {} try: pos = 0 while pos < len(fileBytes) - 8: boxSize = struct.unpack_from(">I", fileBytes, pos)[0] boxType = fileBytes[pos + 4:pos + 8] if boxSize < 8: break if boxType == b"moov": meta.update(_parseMoovBox(fileBytes[pos + 8:pos + boxSize])) break pos += boxSize except Exception: pass return meta def _parseMoovBox(data: bytes) -> Dict[str, Any]: """Parse moov box to find mvhd with duration.""" meta: Dict[str, Any] = {} pos = 0 while pos < len(data) - 8: try: boxSize = struct.unpack_from(">I", data, pos)[0] boxType = data[pos + 4:pos + 8] if boxSize < 8: break if boxType == b"mvhd": version = data[pos + 8] if version == 0 and pos + 28 < len(data): timeScale = struct.unpack_from(">I", data, pos + 20)[0] duration = struct.unpack_from(">I", data, pos + 24)[0] if timeScale > 0: meta["duration"] = duration / timeScale break pos += boxSize except Exception: break return meta def _parseAviHeader(fileBytes: bytes) -> Dict[str, Any]: """Minimal AVI header parser for resolution.""" meta: Dict[str, Any] = {} if len(fileBytes) < 72: return meta try: if fileBytes[:4] != b"RIFF" or fileBytes[8:12] != b"AVI ": return meta width = struct.unpack_from("