import inspect
from typing import get_args, get_origin, get_type_hints
import rp.tools
def _type_to_json_schema(py_type):
"""Convert Python type to JSON Schema type."""
if py_type == str:
return {"type": "string"}
elif py_type == int:
return {"type": "integer"}
elif py_type == float:
return {"type": "number"}
elif py_type == bool:
return {"type": "boolean"}
elif get_origin(py_type) == list:
return {"type": "array", "items": _type_to_json_schema(get_args(py_type)[0])}
elif get_origin(py_type) == dict:
return {"type": "object"}
else:
return {"type": "string"}
def _generate_tool_schema(func):
"""Generate JSON Schema for a tool function."""
sig = inspect.signature(func)
docstring = func.__doc__ or ""
description = docstring.strip().split("\n")[0] if docstring else ""
type_hints = get_type_hints(func)
properties = {}
required = []
for param_name, param in sig.parameters.items():
if param_name in ["db_conn", "python_globals"]:
continue
param_type = type_hints.get(param_name, str)
schema = _type_to_json_schema(param_type)
param_doc = ""
if docstring:
lines = docstring.split("\n")
in_args = False
for line in lines:
line = line.strip()
if line.startswith("Args:") or line.startswith("Arguments:"):
in_args = True
continue
elif in_args and line.startswith(param_name + ":"):
param_doc = line.split(":", 1)[1].strip()
break
elif in_args and line == "":
continue
elif in_args and (not line.startswith(" ")):
break
if param_doc:
schema["description"] = param_doc
if param.default != inspect.Parameter.empty:
schema["default"] = param.default
properties[param_name] = schema
if param.default == inspect.Parameter.empty:
required.append(param_name)
return {
"type": "function",
"function": {
"name": func.__name__,
"description": description,
"parameters": {"type": "object", "properties": properties, "required": required},
},
}
def get_tools_definition():
"""Dynamically generate tool definitions from all tool functions."""
tools = []
seen_functions = set()
all_names = getattr(rp.tools, "__all__", [])
for name in all_names:
if name.startswith("_"):
continue
obj = getattr(rp.tools, name, None)
if callable(obj) and hasattr(obj, "__module__") and obj.__module__.startswith("rp.tools."):
# Skip duplicates by checking function identity
if id(obj) in seen_functions:
continue
seen_functions.add(id(obj))
if obj.__doc__:
try:
schema = _generate_tool_schema(obj)
tools.append(schema)
except Exception as e:
print(f"Warning: Could not generate schema for {name}: {e}")
continue
return tools
def get_func_map(db_conn=None, python_globals=None):
"""Dynamically generate function map for tool execution."""
func_map = {}
for name in getattr(rp.tools, "__all__", []):
if name.startswith("_"):
continue
obj = getattr(rp.tools, name, None)
if callable(obj) and hasattr(obj, "__module__") and obj.__module__.startswith("rp.tools."):
sig = inspect.signature(obj)
params = list(sig.parameters.keys())
if "db_conn" in params and "python_globals" in params:
func_map[name] = (
lambda func=obj, db_conn=db_conn, python_globals=python_globals, **kw: func(
**kw, db_conn=db_conn, python_globals=python_globals
)
)
elif "db_conn" in params:
func_map[name] = lambda func=obj, db_conn=db_conn, **kw: func(**kw, db_conn=db_conn)
elif "python_globals" in params:
func_map[name] = lambda func=obj, python_globals=python_globals, **kw: func(
**kw, python_globals=python_globals
)
else:
func_map[name] = lambda func=obj, **kw: func(**kw)
return func_map