import sys
import os
import ast
import inspect
import time
import threading
import gc
import weakref
import linecache
import re
import json
import subprocess
from collections import defaultdict
from datetime import datetime
class MemoryTracker:
def __init__(self):
self.allocations = defaultdict(list)
self.references = weakref.WeakValueDictionary()
self.peak_memory = 0
self.current_memory = 0
def track_object(self, obj, location):
try:
obj_id = id(obj)
obj_size = sys.getsizeof(obj)
self.allocations[location].append(
{
"id": obj_id,
"type": type(obj).__name__,
"size": obj_size,
"timestamp": time.time(),
}
)
self.current_memory += obj_size
if self.current_memory > self.peak_memory:
self.peak_memory = self.current_memory
except:
pass
def analyze_leaks(self):
gc.collect()
leaks = []
for obj in gc.get_objects():
if sys.getrefcount(obj) > 10:
try:
leaks.append(
{
"type": type(obj).__name__,
"refcount": sys.getrefcount(obj),
"size": sys.getsizeof(obj),
}
)
except:
pass
return sorted(leaks, key=lambda x: x["refcount"], reverse=True)[:20]
def get_report(self):
return {
"peak_memory": self.peak_memory,
"current_memory": self.current_memory,
"allocation_count": sum((len(v) for v in self.allocations.values())),
"leaks": self.analyze_leaks(),
}
class PerformanceProfiler:
def __init__(self):
self.function_times = defaultdict(lambda: {"calls": 0, "total_time": 0.0, "self_time": 0.0})
self.call_stack = []
self.start_times = {}
def enter_function(self, frame):
func_name = self._get_function_name(frame)
self.call_stack.append(func_name)
self.start_times[id(frame)] = time.perf_counter()
def exit_function(self, frame):
func_name = self._get_function_name(frame)
frame_id = id(frame)
if frame_id in self.start_times:
elapsed = time.perf_counter() - self.start_times[frame_id]
self.function_times[func_name]["calls"] += 1
self.function_times[func_name]["total_time"] += elapsed
self.function_times[func_name]["self_time"] += elapsed
del self.start_times[frame_id]
if self.call_stack:
self.call_stack.pop()
def _get_function_name(self, frame):
return f"{frame.f_code.co_filename}:{frame.f_code.co_name}:{frame.f_lineno}"
def get_hotspots(self, limit=20):
sorted_funcs = sorted(
self.function_times.items(), key=lambda x: x[1]["total_time"], reverse=True
)
return sorted_funcs[:limit]
class StaticAnalyzer(ast.NodeVisitor):
def __init__(self):
self.issues = []
self.complexity = 0
self.unused_vars = set()
self.undefined_vars = set()
self.defined_vars = set()
self.used_vars = set()
self.functions = {}
self.classes = {}
self.imports = []
def visit_FunctionDef(self, node):
self.functions[node.name] = {
"lineno": node.lineno,
"args": [arg.arg for arg in node.args.args],
"decorators": [
d.id if isinstance(d, ast.Name) else "complex" for d in node.decorator_list
],
"complexity": self._calculate_complexity(node),
}
if len(node.body) == 0:
self.issues.append(f"Line {node.lineno}: Empty function '{node.name}'")
if len(node.args.args) > 7:
self.issues.append(
f"Line {node.lineno}: Function '{node.name}' has too many parameters ({len(node.args.args)})"
)
self.generic_visit(node)
def visit_ClassDef(self, node):
self.classes[node.name] = {
"lineno": node.lineno,
"bases": [b.id if isinstance(b, ast.Name) else "complex" for b in node.bases],
"methods": [],
}
self.generic_visit(node)
def visit_Import(self, node):
for alias in node.names:
self.imports.append(alias.name)
self.generic_visit(node)
def visit_ImportFrom(self, node):
if node.module:
self.imports.append(node.module)
self.generic_visit(node)
def visit_Name(self, node):
if isinstance(node.ctx, ast.Store):
self.defined_vars.add(node.id)
elif isinstance(node.ctx, ast.Load):
self.used_vars.add(node.id)
self.generic_visit(node)
def visit_If(self, node):
self.complexity += 1
self.generic_visit(node)
def visit_For(self, node):
self.complexity += 1
self.generic_visit(node)
def visit_While(self, node):
self.complexity += 1
self.generic_visit(node)
def visit_ExceptHandler(self, node):
self.complexity += 1
if node.type is None:
self.issues.append(f"Line {node.lineno}: Bare except clause (catches all exceptions)")
self.generic_visit(node)
def visit_BinOp(self, node):
if isinstance(node.op, ast.Add):
if isinstance(node.left, ast.Str) or isinstance(node.right, ast.Str):
self.issues.append(
f"Line {node.lineno}: String concatenation with '+' (use f-strings or join)"
)
self.generic_visit(node)
def _calculate_complexity(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.For, ast.While, ast.ExceptHandler)):
complexity += 1
return complexity
def finalize(self):
self.unused_vars = self.defined_vars - self.used_vars
self.undefined_vars = self.used_vars - self.defined_vars - set(dir(__builtins__))
for var in self.unused_vars:
self.issues.append(f"Unused variable: '{var}'")
def analyze_code(self, source_code):
try:
tree = ast.parse(source_code)
self.visit(tree)
self.finalize()
return {
"issues": self.issues,
"complexity": self.complexity,
"functions": self.functions,
"classes": self.classes,
"imports": self.imports,
"unused_vars": list(self.unused_vars),
}
except SyntaxError as e:
return {"error": f"Syntax error at line {e.lineno}: {e.msg}"}
class DynamicTracer:
def __init__(self):
self.execution_trace = []
self.exception_trace = []
self.variable_changes = defaultdict(list)
self.line_coverage = set()
self.function_calls = defaultdict(int)
self.max_trace_length = 10000
self.memory_tracker = MemoryTracker()
self.profiler = PerformanceProfiler()
self.trace_active = False
def trace_calls(self, frame, event, arg):
if not self.trace_active:
return
if len(self.execution_trace) >= self.max_trace_length:
return self.trace_calls
co = frame.f_code
func_name = co.co_name
filename = co.co_filename
line_no = frame.f_lineno
if "site-packages" in filename or filename.startswith("<"):
return self.trace_calls
trace_entry = {
"event": event,
"function": func_name,
"filename": filename,
"lineno": line_no,
"timestamp": time.time(),
}
if event == "call":
self.function_calls[f"{filename}:{func_name}"] += 1
self.profiler.enter_function(frame)
trace_entry["locals"] = {
k: repr(v)[:100] for k, v in frame.f_locals.items() if not k.startswith("__")
}
elif event == "return":
self.profiler.exit_function(frame)
trace_entry["return_value"] = repr(arg)[:100] if arg else None
elif event == "line":
self.line_coverage.add((filename, line_no))
line_code = linecache.getline(filename, line_no).strip()
trace_entry["code"] = line_code
for var, value in frame.f_locals.items():
if not var.startswith("__"):
self.variable_changes[var].append(
{"line": line_no, "value": repr(value)[:100], "timestamp": time.time()}
)
self.memory_tracker.track_object(value, f"{filename}:{line_no}")
elif event == "exception":
exc_type, exc_value, exc_tb = arg
self.exception_trace.append(
{
"type": exc_type.__name__,
"message": str(exc_value),
"filename": filename,
"function": func_name,
"lineno": line_no,
"timestamp": time.time(),
}
)
trace_entry["exception"] = {"type": exc_type.__name__, "message": str(exc_value)}
self.execution_trace.append(trace_entry)
return self.trace_calls
def start_tracing(self):
self.trace_active = True
sys.settrace(self.trace_calls)
threading.settrace(self.trace_calls)
def stop_tracing(self):
self.trace_active = False
sys.settrace(None)
threading.settrace(None)
def get_trace_report(self):
return {
"execution_trace": self.execution_trace[-100:],
"exception_trace": self.exception_trace,
"line_coverage": list(self.line_coverage),
"function_calls": dict(self.function_calls),
"variable_changes": {k: v[-10:] for k, v in self.variable_changes.items()},
"hotspots": self.profiler.get_hotspots(20),
"memory_report": self.memory_tracker.get_report(),
}
class GitBisectAutomator:
def __init__(self, repo_path="."):
self.repo_path = repo_path
def is_git_repo(self):
try:
result = subprocess.run(
["git", "rev-parse", "--git-dir"],
cwd=self.repo_path,
capture_output=True,
text=True,
)
return result.returncode == 0
except:
return False
def get_commit_history(self, limit=50):
try:
result = subprocess.run(
["git", "log", f"--max-count={limit}", "--oneline"],
cwd=self.repo_path,
capture_output=True,
text=True,
)
if result.returncode == 0:
commits = []
for line in result.stdout.strip().split("\n"):
parts = line.split(" ", 1)
if len(parts) == 2:
commits.append({"hash": parts[0], "message": parts[1]})
return commits
except:
pass
return []
def blame_file(self, filepath):
try:
result = subprocess.run(
["git", "blame", "-L", "1,50", filepath],
cwd=self.repo_path,
capture_output=True,
text=True,
)
if result.returncode == 0:
return result.stdout
except:
pass
return None
class LogAnalyzer:
def __init__(self):
self.log_patterns = {
"error": re.compile("error|exception|fail|critical", re.IGNORECASE),
"warning": re.compile("warn|caution", re.IGNORECASE),
"debug": re.compile("debug|trace", re.IGNORECASE),
"timestamp": re.compile(
"\\\\d{4}-\\\\d{2}-\\\\d{2}[\\\\s_T]\\\\d{2}:\\\\d{2}:\\\\d{2}"
),
}
self.anomalies = []
def analyze_logs(self, log_content):
lines = log_content.split("\n")
errors = []
warnings = []
timestamps = []
for i, line in enumerate(lines):
if self.log_patterns["error"].search(line):
errors.append({"line": i + 1, "content": line})
elif self.log_patterns["warning"].search(line):
warnings.append({"line": i + 1, "content": line})
ts_match = self.log_patterns["timestamp"].search(line)
if ts_match:
timestamps.append(ts_match.group())
return {
"total_lines": len(lines),
"errors": errors[:50],
"warnings": warnings[:50],
"error_count": len(errors),
"warning_count": len(warnings),
"timestamps": timestamps[:20],
}
class ExceptionAnalyzer:
def __init__(self):
self.exceptions = []
self.exception_counts = defaultdict(int)
def capture_exception(self, exc_type, exc_value, exc_traceback):
exc_info = {
"type": exc_type.__name__,
"message": str(exc_value),
"timestamp": time.time(),
"traceback": [],
}
tb = exc_traceback
while tb is not None:
frame = tb.tb_frame
exc_info["traceback"].append(
{
"filename": frame.f_code.co_filename,
"function": frame.f_code.co_name,
"lineno": tb.tb_lineno,
"locals": {
k: repr(v)[:100]
for k, v in frame.f_locals.items()
if not k.startswith("__")
},
}
)
tb = tb.tb_next
self.exceptions.append(exc_info)
self.exception_counts[exc_type.__name__] += 1
return exc_info
def get_report(self):
return {
"total_exceptions": len(self.exceptions),
"exception_types": dict(self.exception_counts),
"recent_exceptions": self.exceptions[-10:],
}
class TestGenerator:
def __init__(self):
self.test_cases = []
def generate_tests_for_function(self, func_name, func_signature):
test_template = f"def test_{func_name}_basic():\n result = {func_name}()\n assert result is not None\n\ndef test_{func_name}_edge_cases():\n pass\n\ndef test_{func_name}_exceptions():\n pass\n"
return test_template
def analyze_function_for_tests(self, func_obj):
sig = inspect.signature(func_obj)
test_inputs = []
for param_name, param in sig.parameters.items():
if param.annotation != inspect.Parameter.empty:
param_type = param.annotation
if param_type == int:
test_inputs.append([0, 1, -1, 100])
elif param_type == str:
test_inputs.append(["", "test", "a" * 100])
elif param_type == list:
test_inputs.append([[], [1], [1, 2, 3]])
else:
test_inputs.append([None])
else:
test_inputs.append([None, 0, "", []])
return test_inputs
class CodeFlowVisualizer:
def __init__(self):
self.flow_graph = defaultdict(list)
self.call_hierarchy = defaultdict(set)
def build_flow_from_trace(self, execution_trace):
for i in range(len(execution_trace) - 1):
current = execution_trace[i]
next_step = execution_trace[i + 1]
current_node = f"{current['function']}:{current['lineno']}"
next_node = f"{next_step['function']}:{next_step['lineno']}"
self.flow_graph[current_node].append(next_node)
if current["event"] == "call":
caller = current["function"]
callee = next_step["function"]
self.call_hierarchy[caller].add(callee)
def generate_text_visualization(self):
output = []
output.append("Call Hierarchy:")
for caller, callees in sorted(self.call_hierarchy.items()):
output.append(f" {caller}")
for callee in sorted(callees):
output.append(f" -> {callee}")
return "\n".join(output)
class AutomatedDebugger:
def __init__(self):
self.static_analyzer = StaticAnalyzer()
self.dynamic_tracer = DynamicTracer()
self.exception_analyzer = ExceptionAnalyzer()
self.log_analyzer = LogAnalyzer()
self.git_automator = GitBisectAutomator()
self.test_generator = TestGenerator()
self.flow_visualizer = CodeFlowVisualizer()
self.original_excepthook = sys.excepthook
def analyze_source_file(self, filepath):
try:
with open(filepath, "r", encoding="utf-8") as f:
source_code = f.read()
return self.static_analyzer.analyze_code(source_code)
except Exception as e:
return {"error": str(e)}
def run_with_tracing(self, target_func, *args, **kwargs):
self.dynamic_tracer.start_tracing()
result = None
exception = None
try:
result = target_func(*args, **kwargs)
except Exception as e:
exception = self.exception_analyzer.capture_exception(type(e), e, e.__traceback__)
finally:
self.dynamic_tracer.stop_tracing()
self.flow_visualizer.build_flow_from_trace(self.dynamic_tracer.execution_trace)
return {
"result": result,
"exception": exception,
"trace": self.dynamic_tracer.get_trace_report(),
"flow": self.flow_visualizer.generate_text_visualization(),
}
def analyze_logs(self, log_file_or_content):
if os.path.isfile(log_file_or_content):
with open(log_file_or_content, "r", encoding="utf-8") as f:
content = f.read()
else:
content = log_file_or_content
return self.log_analyzer.analyze_logs(content)
def generate_debug_report(self, output_file="debug_report.json"):
report = {
"timestamp": datetime.now().isoformat(),
"static_analysis": self.static_analyzer.issues,
"dynamic_trace": self.dynamic_tracer.get_trace_report(),
"exceptions": self.exception_analyzer.get_report(),
"git_info": (
self.git_automator.get_commit_history(10)
if self.git_automator.is_git_repo()
else None
),
"flow_visualization": self.flow_visualizer.generate_text_visualization(),
}
with open(output_file, "w") as f:
json.dump(report, f, indent=2, default=str)
return report
def auto_debug_function(self, func, test_inputs=None):
results = []
if test_inputs is None:
test_inputs = self.test_generator.analyze_function_for_tests(func)
for input_set in test_inputs:
try:
if isinstance(input_set, list):
result = self.run_with_tracing(func, *input_set)
else:
result = self.run_with_tracing(func, input_set)
results.append(
{
"input": input_set,
"success": result["exception"] is None,
"output": result["result"],
"trace_summary": {
"function_calls": len(result["trace"]["function_calls"]),
"exceptions": len(result["trace"]["exception_trace"]),
},
}
)
except Exception as e:
results.append({"input": input_set, "success": False, "error": str(e)})
return results
_memory_tracker = MemoryTracker()
_performance_profiler = PerformanceProfiler()
_static_analyzer = StaticAnalyzer()
_dynamic_tracer = DynamicTracer()
_git_automator = GitBisectAutomator()
_log_analyzer = LogAnalyzer()
_exception_analyzer = ExceptionAnalyzer()
_test_generator = TestGenerator()
_code_flow_visualizer = CodeFlowVisualizer()
_automated_debugger = AutomatedDebugger()
def track_memory_allocation(location: str = "manual") -> dict:
"""Track current memory allocation at a specific location."""
try:
_memory_tracker.track_object({}, location)
return {
"status": "success",
"current_memory": _memory_tracker.current_memory,
"peak_memory": _memory_tracker.peak_memory,
"location": location,
}
except Exception as e:
return {"status": "error", "error": str(e)}
def analyze_memory_leaks() -> dict:
"""Analyze potential memory leaks in the current process."""
try:
leaks = _memory_tracker.analyze_leaks()
return {"status": "success", "leaks_found": len(leaks), "top_leaks": leaks[:10]}
except Exception as e:
return {"status": "error", "error": str(e)}
def get_memory_report() -> dict:
"""Get a comprehensive memory usage report."""
try:
return {"status": "success", "report": _memory_tracker.get_report()}
except Exception as e:
return {"status": "error", "error": str(e)}
def start_performance_profiling() -> dict:
"""Start performance profiling."""
try:
PerformanceProfiler()
return {"status": "success", "message": "Performance profiling started"}
except Exception as e:
return {"status": "error", "error": str(e)}
def stop_performance_profiling() -> dict:
"""Stop performance profiling and get hotspots."""
try:
hotspots = _performance_profiler.get_hotspots(20)
return {"status": "success", "hotspots": hotspots}
except Exception as e:
return {"status": "error", "error": str(e)}
def analyze_source_code(source_code: str) -> dict:
"""Perform static analysis on Python source code."""
try:
analyzer = StaticAnalyzer()
result = analyzer.analyze_code(source_code)
return {"status": "success", "analysis": result}
except Exception as e:
return {"status": "error", "error": str(e)}
def analyze_source_file(filepath: str) -> dict:
"""Analyze a Python source file statically."""
try:
result = _automated_debugger.analyze_source_file(filepath)
return {"status": "success", "filepath": filepath, "analysis": result}
except Exception as e:
return {"status": "error", "error": str(e)}
def start_dynamic_tracing() -> dict:
"""Start dynamic execution tracing."""
try:
_dynamic_tracer.start_tracing()
return {"status": "success", "message": "Dynamic tracing started"}
except Exception as e:
return {"status": "error", "error": str(e)}
def stop_dynamic_tracing() -> dict:
"""Stop dynamic tracing and get trace report."""
try:
_dynamic_tracer.stop_tracing()
report = _dynamic_tracer.get_trace_report()
return {"status": "success", "trace_report": report}
except Exception as e:
return {"status": "error", "error": str(e)}
def get_git_commit_history(limit: int = 50) -> dict:
"""Get recent git commit history."""
try:
commits = _git_automator.get_commit_history(limit)
return {
"status": "success",
"commits": commits,
"is_git_repo": _git_automator.is_git_repo(),
}
except Exception as e:
return {"status": "error", "error": str(e)}
def blame_file(filepath: str) -> dict:
"""Get git blame information for a file."""
try:
blame_output = _git_automator.blame_file(filepath)
return {"status": "success", "filepath": filepath, "blame": blame_output}
except Exception as e:
return {"status": "error", "error": str(e)}
def analyze_log_content(log_content: str) -> dict:
"""Analyze log content for errors, warnings, and patterns."""
try:
analysis = _log_analyzer.analyze_logs(log_content)
return {"status": "success", "analysis": analysis}
except Exception as e:
return {"status": "error", "error": str(e)}
def analyze_log_file(filepath: str) -> dict:
"""Analyze a log file."""
try:
analysis = _automated_debugger.analyze_logs(filepath)
return {"status": "success", "filepath": filepath, "analysis": analysis}
except Exception as e:
return {"status": "error", "error": str(e)}
def get_exception_report() -> dict:
"""Get a report of captured exceptions."""
try:
report = _exception_analyzer.get_report()
return {"status": "success", "exception_report": report}
except Exception as e:
return {"status": "error", "error": str(e)}
def generate_tests_for_function(func_name: str, func_signature: str = "") -> dict:
"""Generate test templates for a function."""
try:
test_code = _test_generator.generate_tests_for_function(func_name, func_signature)
return {"status": "success", "func_name": func_name, "test_code": test_code}
except Exception as e:
return {"status": "error", "error": str(e)}
def visualize_code_flow_from_trace(execution_trace) -> dict:
"""Visualize code flow from execution trace."""
try:
visualizer = CodeFlowVisualizer()
visualizer.build_flow_from_trace(execution_trace)
visualization = visualizer.generate_text_visualization()
return {"status": "success", "flow_visualization": visualization}
except Exception as e:
return {"status": "error", "error": str(e)}
def run_function_with_debugging(func_code: str, *args, **kwargs) -> dict:
"""Execute a function with full debugging."""
try:
local_vars = {}
exec(func_code, globals(), local_vars)
func = None
for name, obj in local_vars.items():
if callable(obj) and (not name.startswith("_")):
func = obj
break
if func is None:
return {"status": "error", "error": "No function found in code"}
result = _automated_debugger.run_with_tracing(func, *args, **kwargs)
return {"status": "success", "debug_result": result}
except Exception as e:
return {"status": "error", "error": str(e)}
def generate_comprehensive_debug_report(output_file: str = "debug_report.json") -> dict:
"""Generate a comprehensive debug report."""
try:
report = _automated_debugger.generate_debug_report(output_file)
return {
"status": "success",
"output_file": output_file,
"report_summary": {
"static_issues": len(report.get("static_analysis", [])),
"exceptions": report.get("exceptions", {}).get("total_exceptions", 0),
"function_calls": len(report.get("dynamic_trace", {}).get("function_calls", {})),
},
}
except Exception as e:
return {"status": "error", "error": str(e)}
def auto_debug_function(func_code: str, test_inputs: list = None) -> dict:
"""Automatically debug a function with test inputs."""
try:
local_vars = {}
exec(func_code, globals(), local_vars)
func = None
for name, obj in local_vars.items():
if callable(obj) and (not name.startswith("_")):
func = obj
break
if func is None:
return {"status": "error", "error": "No function found in code"}
if test_inputs is None:
test_inputs = _test_generator.analyze_function_for_tests(func)
results = _automated_debugger.auto_debug_function(func, test_inputs)
return {"status": "success", "debug_results": results}
except Exception as e:
return {"status": "error", "error": str(e)}