# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Analyze all imports in the gateway codebase and generate a CSV report. Helps identify which imports need to be cleaned up after refactoring. """ import os import ast import csv import sys from pathlib import Path from typing import List, Tuple, Optional # Gateway root directory GATEWAY_ROOT = Path(__file__).parent.parent OUTPUT_FILE = GATEWAY_ROOT / "scripts" / "import_analysis.csv" def getModuleName(filePath: Path) -> str: """Convert file path to module name format.""" relPath = filePath.relative_to(GATEWAY_ROOT.parent) # Remove .py extension and convert to module format modulePath = str(relPath).replace(os.sep, ".").replace("/", ".") if modulePath.endswith(".py"): modulePath = modulePath[:-3] return modulePath def getImportedModuleName(importNode: ast.AST) -> List[str]: """Extract imported module names from import node.""" modules = [] if isinstance(importNode, ast.Import): for alias in importNode.names: modules.append(alias.name) elif isinstance(importNode, ast.ImportFrom): if importNode.module: # For relative imports, we'll mark them specially if importNode.level > 0: modules.append(f"{'.' * importNode.level}{importNode.module or ''}") else: modules.append(importNode.module) elif importNode.level > 0: # Pure relative import like "from . import x" modules.append("." * importNode.level) return modules def findEnclosingFunction(node: ast.AST, tree: ast.Module) -> Optional[str]: """Find the function name that contains this import, if any.""" for item in ast.walk(tree): if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): for child in ast.walk(item): if child is node: return item.name return None def checkModuleExists(moduleName: str, currentFile: Path) -> bool: """Check if an imported module exists.""" if moduleName.startswith("."): # Relative import - check relative to current file's directory currentDir = currentFile.parent # Count the number of dots to determine how many levels up dotCount = 0 for char in moduleName: if char == ".": dotCount += 1 else: break # Get the module path after the dots relativePath = moduleName[dotCount:] if not relativePath and dotCount == 1: return True # "from . import x" - current package # Navigate up directories based on dot count # . = current package (dotCount=1, go to parent dir which is the package) # .. = parent package (dotCount=2, go up 2 levels) # etc. baseDir = currentDir for _ in range(dotCount - 1): # -1 because currentDir is already at file level baseDir = baseDir.parent if not relativePath: # Pure relative import like "from .. import x" return baseDir.is_dir() # Convert module path to file path parts = relativePath.split(".") checkPath = baseDir for part in parts: checkPath = checkPath / part # Check if it's a package or module if checkPath.is_dir() and (checkPath / "__init__.py").exists(): return True if checkPath.with_suffix(".py").exists(): return True return False # Absolute import if moduleName.startswith("modules."): # Internal module - check in gateway parts = moduleName.split(".") checkPath = GATEWAY_ROOT for part in parts: checkPath = checkPath / part # Check if it's a package or module if checkPath.is_dir() and (checkPath / "__init__.py").exists(): return True if checkPath.with_suffix(".py").exists(): return True return False # External module - assume it exists (can't easily verify without importing) return True def analyzeFile(filePath: Path) -> List[Tuple[str, str, str, str]]: """Analyze imports in a single file.""" results = [] moduleName = getModuleName(filePath) try: with open(filePath, "r", encoding="utf-8") as f: content = f.read() tree = ast.parse(content, filename=str(filePath)) except (SyntaxError, UnicodeDecodeError) as e: print(f"Error parsing {filePath}: {e}") return results # Find all imports and their positions for node in ast.walk(tree): if isinstance(node, (ast.Import, ast.ImportFrom)): importedModules = getImportedModuleName(node) # Determine position position = "header" # Check if import is inside a function by examining parent nodes # We need to traverse the tree structure for item in ast.walk(tree): if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): # Check if this import node is within the function's body for bodyNode in ast.walk(item): if bodyNode is node: position = f"function {item.name}" break if position != "header": break for importedModule in importedModules: # Check if module exists exists = checkModuleExists(importedModule, filePath) validStr = "Yes" if exists else "No" # Format imported module name if importedModule.startswith("."): # Make relative import absolute for clarity importedModuleDisplay = f"(relative) {importedModule}" else: importedModuleDisplay = importedModule results.append((moduleName, importedModuleDisplay, position, validStr)) return results def main(): """Main function to analyze all imports.""" allResults = [] # Find all Python files in gateway for root, dirs, files in os.walk(GATEWAY_ROOT): # Skip __pycache__ directories dirs[:] = [d for d in dirs if d != "__pycache__"] for file in files: if file.endswith(".py"): filePath = Path(root) / file results = analyzeFile(filePath) allResults.extend(results) # Sort results allResults.sort(key=lambda x: (x[0], x[1])) # Write CSV with open(OUTPUT_FILE, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["module_name", "imported_module_name", "position", "import_valid"]) writer.writerows(allResults) print(f"Analysis complete. Found {len(allResults)} imports.") print(f"Output written to: {OUTPUT_FILE}") # Print summary of invalid imports invalidImports = [r for r in allResults if r[3] == "No"] if invalidImports: print(f"\nFound {len(invalidImports)} potentially invalid imports:") for moduleName, importedModule, position, _ in invalidImports[:20]: print(f" {moduleName} -> {importedModule} ({position})") if len(invalidImports) > 20: print(f" ... and {len(invalidImports) - 20} more") if __name__ == "__main__": main()