216 lines
7.5 KiB
Python
216 lines
7.5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Analyze all imports in the gateway codebase and generate a CSV report.
|
|
Helps identify which imports need to be cleaned up after refactoring.
|
|
"""
|
|
|
|
import os
|
|
import ast
|
|
import csv
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
|
|
# Gateway root directory
|
|
GATEWAY_ROOT = Path(__file__).parent.parent
|
|
OUTPUT_FILE = GATEWAY_ROOT / "scripts" / "import_analysis.csv"
|
|
|
|
|
|
def getModuleName(filePath: Path) -> str:
|
|
"""Convert file path to module name format."""
|
|
relPath = filePath.relative_to(GATEWAY_ROOT.parent)
|
|
# Remove .py extension and convert to module format
|
|
modulePath = str(relPath).replace(os.sep, ".").replace("/", ".")
|
|
if modulePath.endswith(".py"):
|
|
modulePath = modulePath[:-3]
|
|
return modulePath
|
|
|
|
|
|
def getImportedModuleName(importNode: ast.AST) -> List[str]:
|
|
"""Extract imported module names from import node."""
|
|
modules = []
|
|
|
|
if isinstance(importNode, ast.Import):
|
|
for alias in importNode.names:
|
|
modules.append(alias.name)
|
|
elif isinstance(importNode, ast.ImportFrom):
|
|
if importNode.module:
|
|
# For relative imports, we'll mark them specially
|
|
if importNode.level > 0:
|
|
modules.append(f"{'.' * importNode.level}{importNode.module or ''}")
|
|
else:
|
|
modules.append(importNode.module)
|
|
elif importNode.level > 0:
|
|
# Pure relative import like "from . import x"
|
|
modules.append("." * importNode.level)
|
|
|
|
return modules
|
|
|
|
|
|
def findEnclosingFunction(node: ast.AST, tree: ast.Module) -> Optional[str]:
|
|
"""Find the function name that contains this import, if any."""
|
|
for item in ast.walk(tree):
|
|
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
for child in ast.walk(item):
|
|
if child is node:
|
|
return item.name
|
|
return None
|
|
|
|
|
|
def checkModuleExists(moduleName: str, currentFile: Path) -> bool:
|
|
"""Check if an imported module exists."""
|
|
if moduleName.startswith("."):
|
|
# Relative import - check relative to current file's directory
|
|
currentDir = currentFile.parent
|
|
|
|
# Count the number of dots to determine how many levels up
|
|
dotCount = 0
|
|
for char in moduleName:
|
|
if char == ".":
|
|
dotCount += 1
|
|
else:
|
|
break
|
|
|
|
# Get the module path after the dots
|
|
relativePath = moduleName[dotCount:]
|
|
|
|
if not relativePath and dotCount == 1:
|
|
return True # "from . import x" - current package
|
|
|
|
# Navigate up directories based on dot count
|
|
# . = current package (dotCount=1, go to parent dir which is the package)
|
|
# .. = parent package (dotCount=2, go up 2 levels)
|
|
# etc.
|
|
baseDir = currentDir
|
|
for _ in range(dotCount - 1): # -1 because currentDir is already at file level
|
|
baseDir = baseDir.parent
|
|
|
|
if not relativePath:
|
|
# Pure relative import like "from .. import x"
|
|
return baseDir.is_dir()
|
|
|
|
# Convert module path to file path
|
|
parts = relativePath.split(".")
|
|
checkPath = baseDir
|
|
for part in parts:
|
|
checkPath = checkPath / part
|
|
|
|
# Check if it's a package or module
|
|
if checkPath.is_dir() and (checkPath / "__init__.py").exists():
|
|
return True
|
|
if checkPath.with_suffix(".py").exists():
|
|
return True
|
|
|
|
return False
|
|
|
|
# Absolute import
|
|
if moduleName.startswith("modules."):
|
|
# Internal module - check in gateway
|
|
parts = moduleName.split(".")
|
|
checkPath = GATEWAY_ROOT
|
|
for part in parts:
|
|
checkPath = checkPath / part
|
|
|
|
# Check if it's a package or module
|
|
if checkPath.is_dir() and (checkPath / "__init__.py").exists():
|
|
return True
|
|
if checkPath.with_suffix(".py").exists():
|
|
return True
|
|
|
|
return False
|
|
|
|
# External module - assume it exists (can't easily verify without importing)
|
|
return True
|
|
|
|
|
|
def analyzeFile(filePath: Path) -> List[Tuple[str, str, str, str]]:
|
|
"""Analyze imports in a single file."""
|
|
results = []
|
|
moduleName = getModuleName(filePath)
|
|
|
|
try:
|
|
with open(filePath, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
tree = ast.parse(content, filename=str(filePath))
|
|
except (SyntaxError, UnicodeDecodeError) as e:
|
|
print(f"Error parsing {filePath}: {e}")
|
|
return results
|
|
|
|
# Find all imports and their positions
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, (ast.Import, ast.ImportFrom)):
|
|
importedModules = getImportedModuleName(node)
|
|
|
|
# Determine position
|
|
position = "header"
|
|
|
|
# Check if import is inside a function by examining parent nodes
|
|
# We need to traverse the tree structure
|
|
for item in ast.walk(tree):
|
|
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
# Check if this import node is within the function's body
|
|
for bodyNode in ast.walk(item):
|
|
if bodyNode is node:
|
|
position = f"function {item.name}"
|
|
break
|
|
if position != "header":
|
|
break
|
|
|
|
for importedModule in importedModules:
|
|
# Check if module exists
|
|
exists = checkModuleExists(importedModule, filePath)
|
|
validStr = "Yes" if exists else "No"
|
|
|
|
# Format imported module name
|
|
if importedModule.startswith("."):
|
|
# Make relative import absolute for clarity
|
|
importedModuleDisplay = f"(relative) {importedModule}"
|
|
else:
|
|
importedModuleDisplay = importedModule
|
|
|
|
results.append((moduleName, importedModuleDisplay, position, validStr))
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
"""Main function to analyze all imports."""
|
|
allResults = []
|
|
|
|
# Find all Python files in gateway
|
|
for root, dirs, files in os.walk(GATEWAY_ROOT):
|
|
# Skip __pycache__ directories
|
|
dirs[:] = [d for d in dirs if d != "__pycache__"]
|
|
|
|
for file in files:
|
|
if file.endswith(".py"):
|
|
filePath = Path(root) / file
|
|
results = analyzeFile(filePath)
|
|
allResults.extend(results)
|
|
|
|
# Sort results
|
|
allResults.sort(key=lambda x: (x[0], x[1]))
|
|
|
|
# Write CSV
|
|
with open(OUTPUT_FILE, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(["module_name", "imported_module_name", "position", "import_valid"])
|
|
writer.writerows(allResults)
|
|
|
|
print(f"Analysis complete. Found {len(allResults)} imports.")
|
|
print(f"Output written to: {OUTPUT_FILE}")
|
|
|
|
# Print summary of invalid imports
|
|
invalidImports = [r for r in allResults if r[3] == "No"]
|
|
if invalidImports:
|
|
print(f"\nFound {len(invalidImports)} potentially invalid imports:")
|
|
for moduleName, importedModule, position, _ in invalidImports[:20]:
|
|
print(f" {moduleName} -> {importedModule} ({position})")
|
|
if len(invalidImports) > 20:
|
|
print(f" ... and {len(invalidImports) - 20} more")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|