# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Audio extractor for common audio formats. Extracts metadata (duration, bitrate, sample rate, channels) and produces an `audiostream` ContentPart. For files under 10 MB the base64 audio data is included; larger files only get metadata. Optional dependency: mutagen (for rich metadata). """ from typing import Any, Dict, List import base64 import logging import struct from modules.datamodels.datamodelExtraction import ContentPart from ..subUtils import makeId from ..subRegistry import Extractor logger = logging.getLogger(__name__) _AUDIO_MIME_TYPES = [ "audio/mpeg", "audio/mp3", "audio/wav", "audio/x-wav", "audio/ogg", "audio/flac", "audio/x-flac", "audio/mp4", "audio/x-m4a", "audio/aac", "audio/webm", ] _AUDIO_EXTENSIONS = [".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac", ".wma", ".webm"] _MAX_INLINE_SIZE = 10 * 1024 * 1024 # 10 MB class AudioExtractor(Extractor): """Extractor for audio files. Produces: - 1 text ContentPart with metadata summary - 1 audiostream ContentPart (base64 data included only if < 10 MB) """ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: if mimeType in _AUDIO_MIME_TYPES: return True lower = (fileName or "").lower() return any(lower.endswith(ext) for ext in _AUDIO_EXTENSIONS) def getSupportedExtensions(self) -> list[str]: return list(_AUDIO_EXTENSIONS) def getSupportedMimeTypes(self) -> list[str]: return list(_AUDIO_MIME_TYPES) def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: fileName = context.get("fileName", "audio") mimeType = context.get("mimeType") or "audio/mpeg" fileSize = len(fileBytes) rootId = makeId() parts: List[ContentPart] = [] meta = _extractMetadata(fileBytes, fileName) meta["size"] = fileSize meta["fileName"] = fileName meta["mimeType"] = mimeType metaLines = [f"Audio file: {fileName}"] if meta.get("duration"): mins = int(meta["duration"] // 60) secs = int(meta["duration"] % 60) metaLines.append(f"Duration: {mins}:{secs:02d}") if meta.get("bitrate"): metaLines.append(f"Bitrate: {meta['bitrate']} kbps") if meta.get("sampleRate"): metaLines.append(f"Sample rate: {meta['sampleRate']} Hz") if meta.get("channels"): metaLines.append(f"Channels: {meta['channels']}") if meta.get("title") or meta.get("artist") or meta.get("album"): metaLines.append(f"Title: {meta.get('title', 'N/A')}") metaLines.append(f"Artist: {meta.get('artist', 'N/A')}") metaLines.append(f"Album: {meta.get('album', 'N/A')}") metaLines.append(f"Size: {fileSize:,} bytes") parts.append(ContentPart( id=rootId, parentId=None, label="metadata", typeGroup="text", mimeType="text/plain", data="\n".join(metaLines), metadata=meta, )) audioData = "" if fileSize <= _MAX_INLINE_SIZE: audioData = base64.b64encode(fileBytes).decode("utf-8") parts.append(ContentPart( id=makeId(), parentId=rootId, label="audiostream", typeGroup="audiostream", mimeType=mimeType, data=audioData, metadata={"size": fileSize, "inlined": fileSize <= _MAX_INLINE_SIZE}, )) return parts def _extractMetadata(fileBytes: bytes, fileName: str) -> Dict[str, Any]: """Extract audio metadata using mutagen (optional) with stdlib fallback.""" meta: Dict[str, Any] = {} try: import mutagen import io audio = mutagen.File(io.BytesIO(fileBytes)) if audio is not None: if audio.info: meta["duration"] = getattr(audio.info, "length", None) meta["bitrate"] = getattr(audio.info, "bitrate", None) if meta["bitrate"]: meta["bitrate"] = meta["bitrate"] // 1000 meta["sampleRate"] = getattr(audio.info, "sample_rate", None) meta["channels"] = getattr(audio.info, "channels", None) tags = audio.tags if tags: meta["title"] = _getTag(tags, ["TIT2", "title", "\xa9nam"]) meta["artist"] = _getTag(tags, ["TPE1", "artist", "\xa9ART"]) meta["album"] = _getTag(tags, ["TALB", "album", "\xa9alb"]) return {k: v for k, v in meta.items() if v is not None} except ImportError: logger.debug("mutagen not installed -- using basic metadata extraction") except Exception as e: logger.debug(f"mutagen metadata extraction failed: {e}") lower = fileName.lower() if lower.endswith(".wav"): meta.update(_parseWavHeader(fileBytes)) return {k: v for k, v in meta.items() if v is not None} def _getTag(tags, keys: list) -> Any: """Try multiple tag keys and return the first found value.""" for key in keys: val = tags.get(key) if val is not None: return str(val) if not isinstance(val, str) else val return None def _parseWavHeader(fileBytes: bytes) -> Dict[str, Any]: """Minimal WAV header parser for basic metadata.""" meta: Dict[str, Any] = {} if len(fileBytes) < 44: return meta try: if fileBytes[:4] != b"RIFF" or fileBytes[8:12] != b"WAVE": return meta channels = struct.unpack_from("