feat: implement autonomous agent with function calls
This commit is contained in:
parent
aa167aee07
commit
5f7b37e3aa
@ -6,6 +6,14 @@
|
||||
|
||||
|
||||
|
||||
|
||||
## Version 0.6.0 - 2025-11-05
|
||||
|
||||
The tokenizer and database classes have been improved for better performance and maintainability. This change doesn't affect how users interact with the software, but developers will find the code easier to work with.
|
||||
|
||||
**Changes:** 2 files, 207 lines
|
||||
**Languages:** Markdown (8 lines), Python (199 lines)
|
||||
|
||||
## Version 0.5.0 - 2025-11-05
|
||||
|
||||
Users can now run the application inside a chroot container. Developers can use the new `chroot.py` script to initialize and enter these containers.
|
||||
|
||||
529
elon.py
Normal file
529
elon.py
Normal file
@ -0,0 +1,529 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import inspect
|
||||
import json
|
||||
import traceback
|
||||
import datetime
|
||||
import readline
|
||||
import os
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
import base64
|
||||
import pathlib
|
||||
import http.client
|
||||
from typing import (
|
||||
get_type_hints,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Union,
|
||||
get_origin,
|
||||
get_args,
|
||||
)
|
||||
|
||||
|
||||
class Elon:
|
||||
|
||||
def __init__(self, model: str = "google/gemma-3-12b-it"):
|
||||
self.model = model
|
||||
self.api_url = "https://static.molodetz.nl/rp.cgi/api/v1/chat/completions"
|
||||
self.vision_url = "https://static.molodetz.nl/rp.vision.cgi"
|
||||
self.search_url = "https://static.molodetz.nl/search.cgi"
|
||||
self.api_key = "retoorded"
|
||||
self.messages = []
|
||||
self._initialize_conversation()
|
||||
|
||||
def _initialize_conversation(self):
|
||||
system_prompt = """You are a precise autonomous agent. Execute tasks methodically using available functions.
|
||||
|
||||
Core principles:
|
||||
- Break complex tasks into sequential function calls
|
||||
- Verify each result before proceeding
|
||||
- If a function fails, analyze the error and try an alternative approach
|
||||
- Chain functions logically: search → fetch → analyze → act
|
||||
- Complete tasks fully without requesting human input
|
||||
|
||||
Response protocol:
|
||||
- Respond ONLY with valid JSON
|
||||
- Format: [{"name": "function_name", "parameters": {"param": "value"}}]
|
||||
- Multiple calls: [{"name": "func1", "parameters": {...}}, {"name": "func2", "parameters": {...}}]
|
||||
- Task complete: true
|
||||
- Never include explanatory text with function calls"""
|
||||
|
||||
self.messages = [{"role": "system", "content": system_prompt}]
|
||||
|
||||
def vision_analyze(
|
||||
self,
|
||||
image_path: str,
|
||||
prompt: str = "Describe what you see in this image in detail",
|
||||
) -> dict:
|
||||
"""Analyze image content using computer vision. Provide absolute or relative file path and detailed prompt describing what information you need extracted from the image. Returns detailed description of image contents."""
|
||||
try:
|
||||
resolved_path = str(pathlib.Path(image_path).resolve().absolute())
|
||||
|
||||
with open(resolved_path, "rb") as f:
|
||||
image_bytes = f.read()
|
||||
|
||||
encoded_image = base64.b64encode(image_bytes).decode("utf-8")
|
||||
|
||||
payload = json.dumps(
|
||||
{"data": encoded_image, "path": resolved_path, "prompt": prompt}
|
||||
).encode("utf-8")
|
||||
|
||||
url_parts = self.vision_url.split("/")
|
||||
host = url_parts[2]
|
||||
path = "/" + "/".join(url_parts[3:])
|
||||
|
||||
connection = http.client.HTTPSConnection(host)
|
||||
connection.request(
|
||||
"POST",
|
||||
path,
|
||||
payload,
|
||||
{
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(payload)),
|
||||
"User-Agent": "AutonomousAgent/1.0",
|
||||
},
|
||||
)
|
||||
|
||||
response = connection.getresponse()
|
||||
response_data = response.read().decode("utf-8")
|
||||
connection.close()
|
||||
|
||||
if response.status == 200:
|
||||
return {"status": "success", "analysis": response_data}
|
||||
else:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"HTTP {response.status}: {response.reason}",
|
||||
}
|
||||
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"Image file not found: {image_path}"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def http_fetch(self, url: str) -> dict:
|
||||
"""Fetch and return content from any HTTP/HTTPS URL. Use this to retrieve web pages, APIs, or any online resource. Returns up to 10000 characters of content. Useful for reading documentation, articles, or API responses."""
|
||||
try:
|
||||
request = urllib.request.Request(url)
|
||||
request.add_header("User-Agent", "AutonomousAgent/1.0")
|
||||
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
content = response.read().decode("utf-8")
|
||||
return {
|
||||
"status": "success",
|
||||
"url": url,
|
||||
"content": content[:10000],
|
||||
"length": len(content),
|
||||
}
|
||||
except urllib.error.HTTPError as e:
|
||||
return {"status": "error", "message": f"HTTP {e.code}: {e.reason}"}
|
||||
except urllib.error.URLError as e:
|
||||
return {"status": "error", "message": f"URL error: {str(e.reason)}"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def web_search(self, query: str) -> dict:
|
||||
"""Search the web for current information on any topic. Returns list of relevant results with titles, URLs and snippets. Use this when you need to find information, research topics, or discover resources. Query should be clear and specific."""
|
||||
try:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
full_url = f"{self.search_url}?query={encoded_query}"
|
||||
|
||||
with urllib.request.urlopen(full_url, timeout=30) as response:
|
||||
results = json.loads(response.read().decode("utf-8"))
|
||||
return {"status": "success", "query": query, "results": results}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def web_search_news(self, query: str) -> dict:
|
||||
"""Search for recent news articles and current events related to query. Returns news results with headlines, sources and publication dates. Use when you need latest updates, breaking news, or time-sensitive information. More focused on recent content than general web_search."""
|
||||
try:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
full_url = f"{self.search_url}?query={encoded_query}"
|
||||
|
||||
with urllib.request.urlopen(full_url, timeout=30) as response:
|
||||
results = json.loads(response.read().decode("utf-8"))
|
||||
return {"status": "success", "query": query, "news_results": results}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def read_file(self, filepath: str) -> dict:
|
||||
"""Read and return complete contents of a text file from filesystem. Provide absolute or relative path. Use this to access configuration files, data files, logs, or any text-based content stored locally. Returns full file content as string."""
|
||||
try:
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
return {
|
||||
"status": "success",
|
||||
"filepath": filepath,
|
||||
"content": content,
|
||||
"size": len(content),
|
||||
}
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"File not found: {filepath}"}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {filepath}"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def write_file(self, filepath: str, content: str) -> dict:
|
||||
"""Write content to a file on filesystem. Creates new file or overwrites existing file at specified path. Use this to save results, generate reports, create configuration files, or persist any text data. Provide full content to write."""
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
return {
|
||||
"status": "success",
|
||||
"filepath": filepath,
|
||||
"bytes_written": len(content),
|
||||
}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {filepath}"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def list_directory(self, path: str = ".") -> dict:
|
||||
"""List all files and directories in specified path. Defaults to current directory if no path provided. Returns list of names. Use this to explore filesystem structure, find files, or verify file existence before other file operations."""
|
||||
try:
|
||||
entries = os.listdir(path)
|
||||
return {
|
||||
"status": "success",
|
||||
"path": os.path.abspath(path),
|
||||
"entries": sorted(entries),
|
||||
"count": len(entries),
|
||||
}
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"Directory not found: {path}"}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {path}"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def _convert_type_to_schema(self, type_hint: Any) -> Dict[str, Any]:
|
||||
origin = get_origin(type_hint)
|
||||
|
||||
if type_hint == str:
|
||||
return {"type": "string"}
|
||||
elif type_hint == int:
|
||||
return {"type": "integer"}
|
||||
elif type_hint == float:
|
||||
return {"type": "number"}
|
||||
elif type_hint == bool:
|
||||
return {"type": "boolean"}
|
||||
elif type_hint == list or origin == list:
|
||||
args = get_args(type_hint)
|
||||
if args:
|
||||
return {"type": "array", "items": self._convert_type_to_schema(args[0])}
|
||||
return {"type": "array"}
|
||||
elif type_hint == dict or origin == dict:
|
||||
return {"type": "object"}
|
||||
elif origin == Union:
|
||||
args = get_args(type_hint)
|
||||
if type(None) in args:
|
||||
non_none_types = [arg for arg in args if arg != type(None)]
|
||||
if len(non_none_types) == 1:
|
||||
schema = self._convert_type_to_schema(non_none_types[0])
|
||||
schema["nullable"] = True
|
||||
return schema
|
||||
return {"type": "string"}
|
||||
return {"type": "string"}
|
||||
|
||||
def _generate_function_schemas(self) -> List[Dict[str, Any]]:
|
||||
schemas = []
|
||||
excluded_methods = {
|
||||
"run",
|
||||
"execute",
|
||||
"_initialize_conversation",
|
||||
"_convert_type_to_schema",
|
||||
"_generate_function_schemas",
|
||||
"_build_system_prompt",
|
||||
"_parse_response",
|
||||
"_execute_functions",
|
||||
"_format_results",
|
||||
}
|
||||
|
||||
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
|
||||
if name.startswith("_") or name in excluded_methods:
|
||||
continue
|
||||
|
||||
signature = inspect.signature(method)
|
||||
|
||||
try:
|
||||
type_hints = get_type_hints(method)
|
||||
except Exception:
|
||||
type_hints = {}
|
||||
|
||||
parameters = {"type": "object", "properties": {}, "required": []}
|
||||
|
||||
for param_name, param in signature.parameters.items():
|
||||
if param_name == "self":
|
||||
continue
|
||||
|
||||
param_type = type_hints.get(param_name, str)
|
||||
parameters["properties"][param_name] = self._convert_type_to_schema(
|
||||
param_type
|
||||
)
|
||||
|
||||
if param.default == inspect.Parameter.empty:
|
||||
parameters["required"].append(param_name)
|
||||
|
||||
docstring = inspect.getdoc(method) or f"Execute {name}"
|
||||
|
||||
schemas.append(
|
||||
{"name": name, "description": docstring, "parameters": parameters}
|
||||
)
|
||||
|
||||
return schemas
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
schemas = self._generate_function_schemas()
|
||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
return f"""You are a precise autonomous agent. Execute tasks methodically using available functions.
|
||||
|
||||
Core principles:
|
||||
- Break complex tasks into sequential function calls
|
||||
- Verify each result before proceeding
|
||||
- If a function fails, analyze the error and try an alternative approach
|
||||
- Chain functions logically: search → fetch → analyze → act
|
||||
- Complete tasks fully without requesting human input
|
||||
|
||||
Response protocol:
|
||||
- Respond ONLY with valid JSON
|
||||
- Format: [{{"name": "function_name", "parameters": {{"param": "value"}}}}]
|
||||
- Multiple calls: [{{"name": "func1", "parameters": {{}}}}, {{"name": "func2", "parameters": {{}}}}]
|
||||
- Task complete: true
|
||||
- Never include explanatory text with function calls
|
||||
|
||||
Example multi-step task:
|
||||
User: "Find and summarize the latest article about AI"
|
||||
Response: [{{"name": "web_search", "parameters": {{"query": "latest AI article"}}}}, {{"name": "http_fetch", "parameters": {{"url": "<result_url>"}}}}]
|
||||
|
||||
Timestamp: {timestamp}
|
||||
|
||||
Available functions:
|
||||
{json.dumps(schemas, indent=2)}"""
|
||||
|
||||
def _parse_response(self, response: str) -> Optional[List[Dict[str, Any]]]:
|
||||
response = response.strip()
|
||||
|
||||
if response.startswith("```json\n") and response.endswith("\n```"):
|
||||
response = response[len("```json\n") : -len("\n```")]
|
||||
|
||||
response = response.strip()
|
||||
|
||||
if response.lower() == "true":
|
||||
return None
|
||||
|
||||
try:
|
||||
if response.startswith("[") and response.endswith("]"):
|
||||
parsed = json.loads(response)
|
||||
return parsed if isinstance(parsed, list) else [parsed]
|
||||
elif response.startswith("{") and response.endswith("}"):
|
||||
return [json.loads(response)]
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def _execute_functions(
|
||||
self, function_calls: List[Dict[str, Any]]
|
||||
) -> List[Dict[str, Any]]:
|
||||
results = []
|
||||
|
||||
for call in function_calls:
|
||||
result_entry = {"function": call.get("name"), "result": None, "error": None}
|
||||
|
||||
try:
|
||||
function_name = call.get("name")
|
||||
parameters = call.get("parameters", {})
|
||||
|
||||
if not function_name:
|
||||
raise ValueError("Function name missing")
|
||||
|
||||
if not hasattr(self, function_name):
|
||||
raise AttributeError(f"Unknown function: {function_name}")
|
||||
|
||||
function = getattr(self, function_name)
|
||||
|
||||
if not callable(function) or function_name.startswith("_"):
|
||||
raise TypeError(f"Cannot call: {function_name}")
|
||||
|
||||
result_entry["result"] = function(**parameters)
|
||||
|
||||
except Exception as e:
|
||||
result_entry["error"] = str(e)
|
||||
result_entry["traceback"] = traceback.format_exc()
|
||||
|
||||
results.append(result_entry)
|
||||
|
||||
return results
|
||||
|
||||
def _format_results(self, results: List[Dict[str, Any]]) -> str:
|
||||
formatted = []
|
||||
|
||||
for result in results:
|
||||
function_name = result["function"]
|
||||
|
||||
if result["error"]:
|
||||
formatted.append(
|
||||
f"Function '{function_name}' failed: {result['error']}"
|
||||
)
|
||||
else:
|
||||
result_str = (
|
||||
json.dumps(result["result"])
|
||||
if isinstance(result["result"], dict)
|
||||
else str(result["result"])
|
||||
)
|
||||
if len(result_str) > 5000:
|
||||
result_str = result_str[:5000] + "... (truncated)"
|
||||
formatted.append(f"Function '{function_name}' returned: {result_str}")
|
||||
|
||||
return "\n\n".join(formatted)
|
||||
|
||||
def execute(self, user_query: str):
|
||||
self.messages.append({"role": "user", "content": user_query})
|
||||
|
||||
print(f"\n{'='*70}")
|
||||
print(f"Query: {user_query}")
|
||||
print(f"{'='*70}\n")
|
||||
|
||||
max_iterations = 50
|
||||
|
||||
for iteration in range(max_iterations):
|
||||
self.messages[0]["content"] = self._build_system_prompt()
|
||||
|
||||
try:
|
||||
payload = json.dumps(
|
||||
{"model": self.model, "messages": self.messages}
|
||||
).encode("utf-8")
|
||||
|
||||
request = urllib.request.Request(
|
||||
self.api_url,
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
with urllib.request.urlopen(request, timeout=600) as response:
|
||||
result = json.loads(response.read().decode("utf-8"))
|
||||
llm_response = result["choices"][0]["message"]["content"]
|
||||
|
||||
print(f"[Iteration {iteration + 1}]")
|
||||
preview = (
|
||||
llm_response[:300] + "..."
|
||||
if len(llm_response) > 300
|
||||
else llm_response
|
||||
)
|
||||
print(f"Response: {preview}\n")
|
||||
|
||||
function_calls = self._parse_response(llm_response)
|
||||
|
||||
if function_calls:
|
||||
print(f"Executing {len(function_calls)} function(s):")
|
||||
execution_results = self._execute_functions(function_calls)
|
||||
|
||||
for result in execution_results:
|
||||
status_symbol = "✓" if not result["error"] else "✗"
|
||||
output = (
|
||||
result["result"]
|
||||
if not result["error"]
|
||||
else result["error"]
|
||||
)
|
||||
output_preview = str(output)[:100]
|
||||
print(
|
||||
f" {status_symbol} {result['function']}: {output_preview}"
|
||||
)
|
||||
print()
|
||||
|
||||
self.messages.append(
|
||||
{"role": "assistant", "content": llm_response}
|
||||
)
|
||||
self.messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": self._format_results(execution_results),
|
||||
}
|
||||
)
|
||||
else:
|
||||
print(f"{'='*70}")
|
||||
print(f"Completed in {iteration + 1} iteration(s)")
|
||||
print(f"{'='*70}\n")
|
||||
if llm_response.lower() != "true":
|
||||
print(f"Result: {llm_response}\n")
|
||||
break
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error: {e.code} - {e.reason}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
traceback.print_exc()
|
||||
break
|
||||
else:
|
||||
print(f"Maximum iterations ({max_iterations}) reached\n")
|
||||
|
||||
def run(self):
|
||||
def complete_path(text, state):
|
||||
if os.path.isdir(text):
|
||||
directory = text
|
||||
prefix = ""
|
||||
else:
|
||||
directory, prefix = os.path.split(text)
|
||||
if not directory:
|
||||
directory = "."
|
||||
|
||||
try:
|
||||
entries = os.listdir(directory)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
matches = [e for e in entries if e.startswith(prefix)]
|
||||
|
||||
if state < len(matches):
|
||||
return os.path.join(directory, matches[state])
|
||||
return None
|
||||
|
||||
readline.set_completer(complete_path)
|
||||
readline.parse_and_bind("tab: complete")
|
||||
|
||||
history_path = "/tmp/autonomous_agent.history"
|
||||
try:
|
||||
readline.read_history_file(history_path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
print(f"Autonomous Agent initialized")
|
||||
print(f"Model: {self.model}")
|
||||
print(f"Functions: vision_analyze, http_fetch, web_search, web_search_news")
|
||||
print(f" read_file, write_file, list_directory")
|
||||
print(f"Type 'exit' or 'quit' to stop\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("→ ")
|
||||
|
||||
if not user_input.strip():
|
||||
continue
|
||||
|
||||
if user_input.strip().lower() in ["exit", "quit", "q"]:
|
||||
print("Shutting down...")
|
||||
break
|
||||
|
||||
readline.write_history_file(history_path)
|
||||
self.execute(user_input)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted. Type 'exit' to quit.")
|
||||
continue
|
||||
except EOFError:
|
||||
print("\nShutting down...")
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
agent = Elon(model="x-ai/grok-code-fast-1")
|
||||
agent.run()
|
||||
Loading…
Reference in New Issue
Block a user