wiki/z-archive/implementation/implementation_extraction.md

11 KiB
Raw Blame History

PowerON Extraction Service Concept and Architecture

Goals

  • Normalize any document into a small set of processingready typeGroups: text, table, structure, image, binary, metadata, container.
  • Decouple extraction (split/normalize) from chunking and merging.
  • Scale to multipart/container formats (pdf, office) using recursive splitting.
  • Control cost/latency by honoring maxSize and chunkAllowed with AI only when needed.
  • Integrate with AI Prompt Builder entrypoint and support operationType behavior.

New Service Location

  • Base: gateway/modules/services/serviceExtraction/mainServiceExtraction.py
  • Submodules:
    • gateway/modules/services/serviceExtraction/subRegistry.py (extractor and chunker registries)
    • gateway/modules/services/serviceExtraction/subPipeline.py (3pass orchestration)
    • gateway/modules/services/serviceExtraction/formats/ (performat extractors)
    • gateway/modules/services/serviceExtraction/chunking/ (pertypeGroup chunkers)
    • gateway/modules/services/serviceExtraction/merging/ (pertypeGroup mergers)
    • gateway/modules/services/serviceExtraction/utils/ (encoding, mime, helpers)

No backwards compatibility is required; this is a clean introduction.

Core Data Model (standardized outputs)

  • ContentPart

    • id: str
    • parentId: Optional[str] (preserve hierarchy; root has None)
    • label: str (e.g., "page_2", "sheet_Jan", "table_1")
    • typeGroup: Literal["text","table","structure","image","binary","metadata","container"]
    • mimeType: str
    • data: str (utf8 text for text|table|structure; base64 for image|binary; empty for container)
    • metadata: Dict[str, Any] (size, pages, width/height, pageIndex, sheetName, sourceRanges, checksum, confidence, warnings)
  • ExtractedContent

    • id: str (document id)
    • parts: List[ContentPart] (flat list; hierarchy via parentId)
    • summary: Optional[Dict[str, Any]]

Notes:

  • metadata.sourceRanges or page/sheet indices allow provenance for merges/summaries.
  • metadata.confidence and metadata.warnings guide downstream AI/UX decisions.

MIME → typeGroup mapping (deterministic first)

  • text/plain, text/markdowntext
  • text/csvtable
  • application/json, application/xml, text/html, image/svg+xmlstructure
  • image/*image
  • application/pdf, application/vnd.openxmlformats-officedocument.*container
  • otherwise → binary

Container extractors are responsible for disaggregating into basic typeGroups.

3Pass Pipeline

  1. Identify and normalize (Split/Extract)

    • Start with a root container part representing the raw file.
    • Resolve extractor by mimeType/extension via registry.
    • Recursively split container formats into child parts until only basic typeGroups remain (text|table|structure|image|binary|metadata).
    • Output a single ExtractedContent per input document.
  2. Chunk

    • Route each basic typeGroup to its chunker:
      • text → sizebounded line/paragraph aware
      • table → rowbounded (CSV lines), schema aware optional
      • structure → JSON object/XML subtree/HTML block aware
      • image, binary, metadata, container → no chunking by default
    • Chunkers return chunks: List[Dict] with backreferences (partId, order).
  3. Merge

    • Strategy driven by call options and workflow:
      • text → concatenate by logical order (page/section) or keep per part
      • table → keep separate per table/sheet; optional schema merge
      • structure → preserve keys/paths; avoid lossy merges
      • image|binary → usually passthrough
      • metadata|container → excluded by default

Registries

  • ExtractorRegistry (in subRegistry.py)

    • Maps mimeType/extension to an Extractor instance.
    • Fallbacks: content sniffing, default binary extractor.
  • ChunkerRegistry (in subRegistry.py)

    • Maps typeGroup to a Chunker.

Base Interfaces

Use camelCase and prefix internal methods with _.

class Extractor:
    def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: ...
    def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: ...

class Chunker:
    def chunk(self, part: ContentPart, options: Dict[str, Any]) -> List[Dict[str, Any]]: ...

class Merger:
    def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: ...

Format Extractors (under formats/)

  • text_extractor.py → emits one text part
  • csv_extractor.py → emits one table part (CSV payload)
  • json_extractor.py, xml_extractor.py, html_extractor.py, svg_extractor.py → emit structure parts
  • image_extractor.py → emits one image part; optional OCR is handled by AI postprocessing
  • pdf_extractor.py → emits container root with children:
    • per page: text part if text found
    • per page: extracted images as image parts
    • per page/section metadata as metadata
  • docx_extractor.pycontainer + children: headings structure, paragraphs text, tables table, comments metadata
  • xlsx_extractor.pycontainer + children: each sheet as table CSV; properties metadata; charts as image or structure
  • pptx_extractor.pycontainer + slides: text boxes text, tables table, images image, notes metadata
  • legacy_*_extractor.pymetadata + binary with clear limitations
  • binary_extractor.py → single binary part

Chunkers (under chunking/)

  • text_chunker.py → size/paragraph aware; configurable sizes
  • table_chunker.py → split by row count/bytes, keep header propagation
  • structure_chunker.py → JSON object buckets, XML subtree buckets, HTML block buckets
  • binary_chunker.py → byte slicing when explicitly requested
  • noop_chunker.py → for image/metadata/container

Mergers (under merging/)

  • text_merger.py → page/section aware concatenation
  • table_merger.py → per sheet/table; optional schema merge
  • structure_merger.py → key/path preserving grouping
  • default_merger.py → passthrough

Orchestration (in subPipeline.py)

Highlevel flow for one document:

def runExtraction(document: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ExtractedContent:
    # Pass 1: extract/normalize
    parts = _extractAll(document, fileName, mimeType, options)

    # Pass 2: chunk if allowed
    if options.get("chunkAllowed", False):
        chunks = _chunkParts(parts, options)
    else:
        chunks = []

    # Pass 3: merge per strategy
    merged = _merge(parts, chunks, options.get("mergeStrategy", {}))

    return ExtractedContent(id=_makeId(), parts=merged, summary=_buildSummary(parts))

Entry Point and Options (in mainServiceExtraction.py)

The service is invoked by AI Prompt Builder with (documentList, options).

Supported options and effects:

  • prompt: str
    • If present, enables optional AI augmentation on extracted content/chunks based on operationType.
  • operationType: Literal["general","generate_plan","analyse_content","generate_content","web_research"]
    • general/analyse_content: prefer deterministic extraction; AI can summarize or answer over chunks.
    • generate_plan: produce structured structure outputs (bullet points, tasks) from text chunks.
    • generate_content: allow AI synthesis over merged text parts within maxSize.
    • web_research: treat extracted structure and text as context; AI orchestrator may fetch more docs upstream.
  • processDocumentsIndividually: bool
    • True: run the 3pass pipeline per document; apply maxSize per document; return list of results.
    • False: extract all docs → pool parts → global chunk/merge → apply maxSize across the pool; keep provenance by parentId and documentId.
  • maxSize: int and chunkAllowed: bool
    • Hard cap on total size of content passed to AI.
    • If chunkAllowed=True → prefer chunking to stay under maxSize; process chunks iteratively in priority order (e.g., text before images, or by page order).
    • If chunkAllowed=False → do not chunk; instead summarize down (per part, then hierarchical) until under maxSize.

Size governance policy:

  1. Compute sizes for candidate parts/chunks.
  2. If total ≤ maxSize → pass through.
  3. If total > maxSize and chunkAllowed → progressively include highestvalue chunks until the cap; optionally add a final global summary.
  4. If total > maxSize and not chunkAllowed → summarize per part, then merge summaries; ensure final text ≤ cap.

AI Integration

  • AI is optional and strictly after extraction.
  • Recommended placements:
    • OCR/VLM for image parts when requested.
    • LLM summarization for large text|structure|table parts to respect maxSize when chunkAllowed=False.
    • LLM question answering (analyse_content) over selected chunks.
  • All AI calls must respect budget/time guards and the size cap.

Error Handling

  • Every extractor must return either valid parts or a metadata part with warnings/error plus a binary fallback when applicable.
  • Include enough context in metadata to diagnose issues (library missing, parse error details) without leaking sensitive content.

Ordering and Provenance

  • Preserve logical order within a document (page index, slide index, sheet index).
  • Maintain parentId links to reconstruct hierarchy during merge and summarization.

Testing Strategy

  • Unit tests per extractor on small fixtures for each format.
  • Contract tests for the 3pass pipeline (endtoend) with mixed multipart documents.
  • Sizecap tests validating chunking vs summarization paths.

Migration Notes

  • Existing monolithic logic can be moved into formats/* and utils/* preserving robust decoding and Office/PDF heuristics, while removing AI calls from extractors.
  • ContentItem usage should shift to ContentPart (no backward compatibility required).

Minimal Pseudocode processDocumentsIndividually

def extractDocuments(documentList: List[Dict], options: Dict[str, Any]):
    if options.get("processDocumentsIndividually", True):
        results = []
        for doc in documentList:
            ec = runExtraction(doc.bytes, doc.fileName, doc.mimeType, options)
            ec = _applyAiIfRequested(ec, options)  # respects maxSize + chunkAllowed
            results.append(ec)
        return results
    else:
        # global pool
        parts = []
        for doc in documentList:
            ec = runExtraction(doc.bytes, doc.fileName, doc.mimeType, options)
            parts.extend(_tagWithDocumentId(ec.parts, doc.id))
        pooled = _poolAndLimit(parts, options)  # chunk/summarize to cap
        pooled = _applyAiIfRequestedOverPool(pooled, options)
        return pooled

Defaults and Configuration

  • Chunk sizes per typeGroup are centralized and configurable.
  • Merge strategies (text concat policy, table schema inference) are pluggable.
  • Registries support runtime extension (new formats) without touching the pipeline.

Summary

This design introduces a small, stable contract (ContentPart with typeGroup) and a 3pass pipeline that:

  • normalizes diverse documents into uniform parts,
  • chunks only what benefits from chunking,
  • merges predictably for downstream AI and workflow steps, while strictly enforcing maxSize and honoring chunkAllowed and processDocumentsIndividually.