287 lines
No EOL
11 KiB
Python
287 lines
No EOL
11 KiB
Python
from typing import Dict, Any, Optional
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import docx
|
|
import PyPDF2
|
|
import json
|
|
import yaml
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, UTC
|
|
|
|
from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodDocument(MethodBase):
|
|
"""Document method implementation for document operations"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "document"
|
|
self.description = "Handle document operations like reading, writing, and converting documents"
|
|
self.auth_source = AuthSource.LOCAL # Document operations typically don't need auth
|
|
|
|
@property
|
|
def actions(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Available actions and their parameters"""
|
|
return {
|
|
"read": {
|
|
"description": "Read document content",
|
|
"retryMax": 2,
|
|
"timeout": 30,
|
|
"parameters": {
|
|
"path": {"type": "string", "required": True},
|
|
"format": {"type": "string", "required": False},
|
|
"encoding": {"type": "string", "required": False},
|
|
"includeMetadata": {"type": "boolean", "required": False}
|
|
}
|
|
},
|
|
"write": {
|
|
"description": "Write content to document",
|
|
"retryMax": 2,
|
|
"timeout": 30,
|
|
"parameters": {
|
|
"path": {"type": "string", "required": True},
|
|
"content": {"type": "string", "required": True},
|
|
"format": {"type": "string", "required": False},
|
|
"encoding": {"type": "string", "required": False},
|
|
"template": {"type": "string", "required": False}
|
|
}
|
|
},
|
|
"convert": {
|
|
"description": "Convert document between formats",
|
|
"retryMax": 2,
|
|
"timeout": 60,
|
|
"parameters": {
|
|
"sourcePath": {"type": "string", "required": True},
|
|
"targetPath": {"type": "string", "required": True},
|
|
"sourceFormat": {"type": "string", "required": False},
|
|
"targetFormat": {"type": "string", "required": False},
|
|
"options": {"type": "object", "required": False}
|
|
}
|
|
}
|
|
}
|
|
|
|
async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
|
|
"""Execute document method"""
|
|
try:
|
|
# Validate parameters
|
|
if not await self.validate_parameters(action, parameters):
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Invalid parameters for {action}"}
|
|
)
|
|
|
|
# Execute action
|
|
if action == "read":
|
|
return await self._read_document(parameters)
|
|
elif action == "write":
|
|
return await self._write_document(parameters)
|
|
elif action == "convert":
|
|
return await self._convert_document(parameters)
|
|
else:
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Unknown action: {action}"}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing document {action}: {e}")
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": str(e)}
|
|
)
|
|
|
|
async def _read_document(self, parameters: Dict[str, Any]) -> MethodResult:
|
|
"""Read document content"""
|
|
try:
|
|
path = Path(parameters["path"])
|
|
if not path.exists():
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"File not found: {path}"}
|
|
)
|
|
|
|
# Determine format if not specified
|
|
format = parameters.get("format")
|
|
if not format:
|
|
format = path.suffix[1:] if path.suffix else "txt"
|
|
|
|
# Read content based on format
|
|
content = ""
|
|
encoding = parameters.get("encoding", "utf-8")
|
|
include_metadata = parameters.get("includeMetadata", False)
|
|
|
|
if format.lower() in ["txt", "md"]:
|
|
with open(path, "r", encoding=encoding) as f:
|
|
content = f.read()
|
|
elif format.lower() == "docx":
|
|
doc = docx.Document(path)
|
|
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
|
elif format.lower() == "pdf":
|
|
with open(path, "rb") as f:
|
|
pdf = PyPDF2.PdfReader(f)
|
|
content = "\n".join([page.extract_text() for page in pdf.pages])
|
|
elif format.lower() == "json":
|
|
with open(path, "r", encoding=encoding) as f:
|
|
content = json.load(f)
|
|
elif format.lower() == "yaml":
|
|
with open(path, "r", encoding=encoding) as f:
|
|
content = yaml.safe_load(f)
|
|
elif format.lower() == "xml":
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
content = ET.tostring(root, encoding=encoding).decode(encoding)
|
|
else:
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Unsupported format: {format}"}
|
|
)
|
|
|
|
result = {
|
|
"path": str(path),
|
|
"format": format,
|
|
"content": content
|
|
}
|
|
|
|
if include_metadata:
|
|
result["metadata"] = {
|
|
"size": path.stat().st_size,
|
|
"modified": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(),
|
|
"created": datetime.fromtimestamp(path.stat().st_ctime, UTC).isoformat()
|
|
}
|
|
|
|
return self._create_result(
|
|
success=True,
|
|
data=result
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error reading document: {e}")
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Read failed: {str(e)}"}
|
|
)
|
|
|
|
async def _write_document(self, parameters: Dict[str, Any]) -> MethodResult:
|
|
"""Write content to document"""
|
|
try:
|
|
path = Path(parameters["path"])
|
|
|
|
# Create directory if it doesn't exist
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Determine format if not specified
|
|
format = parameters.get("format")
|
|
if not format:
|
|
format = path.suffix[1:] if path.suffix else "txt"
|
|
|
|
# Write content based on format
|
|
encoding = parameters.get("encoding", "utf-8")
|
|
content = parameters["content"]
|
|
template = parameters.get("template")
|
|
|
|
if format.lower() in ["txt", "md"]:
|
|
with open(path, "w", encoding=encoding) as f:
|
|
f.write(content)
|
|
elif format.lower() == "docx":
|
|
if template:
|
|
doc = docx.Document(template)
|
|
else:
|
|
doc = docx.Document()
|
|
doc.add_paragraph(content)
|
|
doc.save(path)
|
|
elif format.lower() == "pdf":
|
|
# TODO: Implement PDF writing
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": "PDF writing not implemented yet"}
|
|
)
|
|
elif format.lower() == "json":
|
|
with open(path, "w", encoding=encoding) as f:
|
|
json.dump(content, f, indent=2)
|
|
elif format.lower() == "yaml":
|
|
with open(path, "w", encoding=encoding) as f:
|
|
yaml.dump(content, f)
|
|
elif format.lower() == "xml":
|
|
with open(path, "w", encoding=encoding) as f:
|
|
f.write(content)
|
|
else:
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Unsupported format: {format}"}
|
|
)
|
|
|
|
return self._create_result(
|
|
success=True,
|
|
data={
|
|
"path": str(path),
|
|
"format": format,
|
|
"size": path.stat().st_size,
|
|
"modified": datetime.now(UTC).isoformat()
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error writing document: {e}")
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Write failed: {str(e)}"}
|
|
)
|
|
|
|
async def _convert_document(self, parameters: Dict[str, Any]) -> MethodResult:
|
|
"""Convert document between formats"""
|
|
try:
|
|
source_path = Path(parameters["sourcePath"])
|
|
target_path = Path(parameters["targetPath"])
|
|
|
|
if not source_path.exists():
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Source file not found: {source_path}"}
|
|
)
|
|
|
|
# Determine formats if not specified
|
|
source_format = parameters.get("sourceFormat")
|
|
if not source_format:
|
|
source_format = source_path.suffix[1:] if source_path.suffix else "txt"
|
|
|
|
target_format = parameters.get("targetFormat")
|
|
if not target_format:
|
|
target_format = target_path.suffix[1:] if target_path.suffix else "txt"
|
|
|
|
# Read source content
|
|
source_content = await self._read_document({
|
|
"path": str(source_path),
|
|
"format": source_format
|
|
})
|
|
|
|
if not source_content.success:
|
|
return source_content
|
|
|
|
# Write target content
|
|
target_content = await self._write_document({
|
|
"path": str(target_path),
|
|
"content": source_content.data["content"],
|
|
"format": target_format
|
|
})
|
|
|
|
if not target_content.success:
|
|
return target_content
|
|
|
|
return self._create_result(
|
|
success=True,
|
|
data={
|
|
"sourcePath": str(source_path),
|
|
"targetPath": str(target_path),
|
|
"sourceFormat": source_format,
|
|
"targetFormat": target_format,
|
|
"size": target_path.stat().st_size,
|
|
"modified": datetime.now(UTC).isoformat()
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error converting document: {e}")
|
|
return self._create_result(
|
|
success=False,
|
|
data={"error": f"Conversion failed: {str(e)}"}
|
|
) |