import inspect from typing import get_type_hints, get_origin, get_args import pr.tools def _type_to_json_schema(py_type): """Convert Python type to JSON Schema type.""" if py_type == str: return {"type": "string"} elif py_type == int: return {"type": "integer"} elif py_type == float: return {"type": "number"} elif py_type == bool: return {"type": "boolean"} elif get_origin(py_type) == list: return {"type": "array", "items": _type_to_json_schema(get_args(py_type)[0])} elif get_origin(py_type) == dict: return {"type": "object"} else: # Default to string for unknown types return {"type": "string"} def _generate_tool_schema(func): """Generate JSON Schema for a tool function.""" sig = inspect.signature(func) docstring = func.__doc__ or "" # Extract description from docstring description = docstring.strip().split("\n")[0] if docstring else "" # Get type hints type_hints = get_type_hints(func) properties = {} required = [] for param_name, param in sig.parameters.items(): if param_name in ["db_conn", "python_globals"]: # Skip internal parameters continue param_type = type_hints.get(param_name, str) schema = _type_to_json_schema(param_type) # Add description from docstring if available param_doc = "" if docstring: lines = docstring.split("\n") in_args = False for line in lines: line = line.strip() if line.startswith("Args:") or line.startswith("Arguments:"): in_args = True continue elif in_args and line.startswith(param_name + ":"): param_doc = line.split(":", 1)[1].strip() break elif in_args and line == "": continue elif in_args and not line.startswith(" "): break if param_doc: schema["description"] = param_doc # Set default if available if param.default != inspect.Parameter.empty: schema["default"] = param.default properties[param_name] = schema # Required if no default if param.default == inspect.Parameter.empty: required.append(param_name) return { "type": "function", "function": { "name": func.__name__, "description": description, "parameters": { "type": "object", "properties": properties, "required": required, }, }, } def get_tools_definition(): """Dynamically generate tool definitions from all tool functions.""" tools = [] # Get all functions from pr.tools modules for name in dir(pr.tools): if name.startswith("_"): continue obj = getattr(pr.tools, name) if callable(obj) and hasattr(obj, "__module__") and obj.__module__.startswith("pr.tools."): # Check if it's a tool function (has docstring and proper signature) if obj.__doc__: try: schema = _generate_tool_schema(obj) tools.append(schema) except Exception as e: print(f"Warning: Could not generate schema for {name}: {e}") continue return tools def get_func_map(db_conn=None, python_globals=None): """Dynamically generate function map for tool execution.""" func_map = {} # Include all functions from __all__ in pr.tools for name in getattr(pr.tools, "__all__", []): if name.startswith("_"): continue obj = getattr(pr.tools, name, None) if callable(obj) and hasattr(obj, "__module__") and obj.__module__.startswith("pr.tools."): sig = inspect.signature(obj) params = list(sig.parameters.keys()) # Create wrapper based on parameters if "db_conn" in params and "python_globals" in params: func_map[name] = ( lambda func=obj, db_conn=db_conn, python_globals=python_globals, **kw: func( **kw, db_conn=db_conn, python_globals=python_globals ) ) elif "db_conn" in params: func_map[name] = lambda func=obj, db_conn=db_conn, **kw: func(**kw, db_conn=db_conn) elif "python_globals" in params: func_map[name] = lambda func=obj, python_globals=python_globals, **kw: func( **kw, python_globals=python_globals ) else: func_map[name] = lambda func=obj, **kw: func(**kw) return func_map