175 lines
6 KiB
Python
175 lines
6 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""Audio extractor for common audio formats.
|
|
|
|
Extracts metadata (duration, bitrate, sample rate, channels) and produces
|
|
an `audiostream` ContentPart. For files under 10 MB the base64 audio data
|
|
is included; larger files only get metadata.
|
|
|
|
Optional dependency: mutagen (for rich metadata).
|
|
"""
|
|
|
|
from typing import Any, Dict, List
|
|
import base64
|
|
import logging
|
|
import struct
|
|
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
from ..subUtils import makeId
|
|
from ..subRegistry import Extractor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_AUDIO_MIME_TYPES = [
|
|
"audio/mpeg",
|
|
"audio/mp3",
|
|
"audio/wav",
|
|
"audio/x-wav",
|
|
"audio/ogg",
|
|
"audio/flac",
|
|
"audio/x-flac",
|
|
"audio/mp4",
|
|
"audio/x-m4a",
|
|
"audio/aac",
|
|
"audio/webm",
|
|
]
|
|
_AUDIO_EXTENSIONS = [".mp3", ".wav", ".ogg", ".flac", ".m4a", ".aac", ".wma", ".webm"]
|
|
|
|
_MAX_INLINE_SIZE = 10 * 1024 * 1024 # 10 MB
|
|
|
|
|
|
class AudioExtractor(Extractor):
|
|
"""Extractor for audio files.
|
|
|
|
Produces:
|
|
- 1 text ContentPart with metadata summary
|
|
- 1 audiostream ContentPart (base64 data included only if < 10 MB)
|
|
"""
|
|
|
|
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
|
|
if mimeType in _AUDIO_MIME_TYPES:
|
|
return True
|
|
lower = (fileName or "").lower()
|
|
return any(lower.endswith(ext) for ext in _AUDIO_EXTENSIONS)
|
|
|
|
def getSupportedExtensions(self) -> list[str]:
|
|
return list(_AUDIO_EXTENSIONS)
|
|
|
|
def getSupportedMimeTypes(self) -> list[str]:
|
|
return list(_AUDIO_MIME_TYPES)
|
|
|
|
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
|
|
fileName = context.get("fileName", "audio")
|
|
mimeType = context.get("mimeType") or "audio/mpeg"
|
|
fileSize = len(fileBytes)
|
|
|
|
rootId = makeId()
|
|
parts: List[ContentPart] = []
|
|
|
|
meta = _extractMetadata(fileBytes, fileName)
|
|
meta["size"] = fileSize
|
|
meta["fileName"] = fileName
|
|
meta["mimeType"] = mimeType
|
|
|
|
metaLines = [f"Audio file: {fileName}"]
|
|
if meta.get("duration"):
|
|
mins = int(meta["duration"] // 60)
|
|
secs = int(meta["duration"] % 60)
|
|
metaLines.append(f"Duration: {mins}:{secs:02d}")
|
|
if meta.get("bitrate"):
|
|
metaLines.append(f"Bitrate: {meta['bitrate']} kbps")
|
|
if meta.get("sampleRate"):
|
|
metaLines.append(f"Sample rate: {meta['sampleRate']} Hz")
|
|
if meta.get("channels"):
|
|
metaLines.append(f"Channels: {meta['channels']}")
|
|
if meta.get("title") or meta.get("artist") or meta.get("album"):
|
|
metaLines.append(f"Title: {meta.get('title', 'N/A')}")
|
|
metaLines.append(f"Artist: {meta.get('artist', 'N/A')}")
|
|
metaLines.append(f"Album: {meta.get('album', 'N/A')}")
|
|
metaLines.append(f"Size: {fileSize:,} bytes")
|
|
|
|
parts.append(ContentPart(
|
|
id=rootId, parentId=None, label="metadata",
|
|
typeGroup="text", mimeType="text/plain",
|
|
data="\n".join(metaLines), metadata=meta,
|
|
))
|
|
|
|
audioData = ""
|
|
if fileSize <= _MAX_INLINE_SIZE:
|
|
audioData = base64.b64encode(fileBytes).decode("utf-8")
|
|
|
|
parts.append(ContentPart(
|
|
id=makeId(), parentId=rootId, label="audiostream",
|
|
typeGroup="audiostream", mimeType=mimeType,
|
|
data=audioData, metadata={"size": fileSize, "inlined": fileSize <= _MAX_INLINE_SIZE},
|
|
))
|
|
|
|
return parts
|
|
|
|
|
|
def _extractMetadata(fileBytes: bytes, fileName: str) -> Dict[str, Any]:
|
|
"""Extract audio metadata using mutagen (optional) with stdlib fallback."""
|
|
meta: Dict[str, Any] = {}
|
|
|
|
try:
|
|
import mutagen
|
|
import io
|
|
audio = mutagen.File(io.BytesIO(fileBytes))
|
|
if audio is not None:
|
|
if audio.info:
|
|
meta["duration"] = getattr(audio.info, "length", None)
|
|
meta["bitrate"] = getattr(audio.info, "bitrate", None)
|
|
if meta["bitrate"]:
|
|
meta["bitrate"] = meta["bitrate"] // 1000
|
|
meta["sampleRate"] = getattr(audio.info, "sample_rate", None)
|
|
meta["channels"] = getattr(audio.info, "channels", None)
|
|
|
|
tags = audio.tags
|
|
if tags:
|
|
meta["title"] = _getTag(tags, ["TIT2", "title", "\xa9nam"])
|
|
meta["artist"] = _getTag(tags, ["TPE1", "artist", "\xa9ART"])
|
|
meta["album"] = _getTag(tags, ["TALB", "album", "\xa9alb"])
|
|
|
|
return {k: v for k, v in meta.items() if v is not None}
|
|
except ImportError:
|
|
logger.debug("mutagen not installed -- using basic metadata extraction")
|
|
except Exception as e:
|
|
logger.debug(f"mutagen metadata extraction failed: {e}")
|
|
|
|
lower = fileName.lower()
|
|
if lower.endswith(".wav"):
|
|
meta.update(_parseWavHeader(fileBytes))
|
|
|
|
return {k: v for k, v in meta.items() if v is not None}
|
|
|
|
|
|
def _getTag(tags, keys: list) -> Any:
|
|
"""Try multiple tag keys and return the first found value."""
|
|
for key in keys:
|
|
val = tags.get(key)
|
|
if val is not None:
|
|
return str(val) if not isinstance(val, str) else val
|
|
return None
|
|
|
|
|
|
def _parseWavHeader(fileBytes: bytes) -> Dict[str, Any]:
|
|
"""Minimal WAV header parser for basic metadata."""
|
|
meta: Dict[str, Any] = {}
|
|
if len(fileBytes) < 44:
|
|
return meta
|
|
try:
|
|
if fileBytes[:4] != b"RIFF" or fileBytes[8:12] != b"WAVE":
|
|
return meta
|
|
channels = struct.unpack_from("<H", fileBytes, 22)[0]
|
|
sampleRate = struct.unpack_from("<I", fileBytes, 24)[0]
|
|
bitsPerSample = struct.unpack_from("<H", fileBytes, 34)[0]
|
|
dataSize = struct.unpack_from("<I", fileBytes, 40)[0]
|
|
|
|
meta["channels"] = channels
|
|
meta["sampleRate"] = sampleRate
|
|
meta["bitrate"] = (sampleRate * channels * bitsPerSample) // 1000
|
|
if sampleRate and channels and bitsPerSample:
|
|
meta["duration"] = dataSize / (sampleRate * channels * (bitsPerSample / 8))
|
|
except Exception:
|
|
pass
|
|
return meta
|