Compare commits
2 Commits
c61f895135
...
15e3a02506
| Author | SHA1 | Date | |
|---|---|---|---|
| 15e3a02506 | |||
| 25be023b70 |
16
CHANGELOG.md
16
CHANGELOG.md
@ -9,6 +9,22 @@
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
## Version 0.10.0 - 2025-12-13
|
||||
|
||||
This release adds new functions for executing Python code asynchronously (`py_exec_async`) and synchronously (`py_exec`), analyzing images via base64 encoding (`vision_analyze`), and interacting with users (`talk_with_user`). It also introduces file system operations including listing directories (`list_directory`), reading files, and writing files, and improves HTTP request handling to return content as strings.
|
||||
|
||||
**Changes:** 1 files, 571 lines
|
||||
**Languages:** Python (571 lines)
|
||||
|
||||
## Version 0.9.0 - 2025-11-16
|
||||
|
||||
You can now export data as JSON or RSS. A command-line tool is available to extract mentions, and documentation has been added to help you use it.
|
||||
|
||||
**Changes:** 2 files, 209 lines
|
||||
**Languages:** Markdown (8 lines), Python (201 lines)
|
||||
|
||||
## Version 0.8.0 - 2025-11-05
|
||||
|
||||
Users can now connect external tools to automate more complex tasks. Developers can integrate new tools using the updated elon.py file.
|
||||
|
||||
575
elon.py
575
elon.py
@ -6,6 +6,10 @@ import traceback
|
||||
import datetime
|
||||
import readline
|
||||
import os
|
||||
import sys
|
||||
import io
|
||||
import asyncio
|
||||
from typing import Any, Dict
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
@ -31,7 +35,7 @@ class Elon:
|
||||
self.api_url = "https://static.molodetz.nl/rp.cgi/api/v1/chat/completions"
|
||||
self.vision_url = "https://static.molodetz.nl/rp.vision.cgi"
|
||||
self.search_url = "https://static.molodetz.nl/search.cgi"
|
||||
self.api_key = "retoorded"
|
||||
self.api_key = os.getenv('ELON_API_KEY', 'retoorded')
|
||||
self.messages = []
|
||||
self._initialize_conversation()
|
||||
|
||||
@ -44,9 +48,14 @@ Core principles:
|
||||
- If a function fails, analyze the error and try an alternative approach
|
||||
- Chain functions logically: search → fetch → analyze → act
|
||||
- Complete tasks fully without requesting human input
|
||||
- Never truncate any output
|
||||
- Never include explanatory text with function calls
|
||||
- Only provide concrete answers instead of placeholders
|
||||
- Only provide complete valid syntax when providing some code
|
||||
- You can do literally anything using py_exec tool. This is your priority tool making it possible to execute arbitrary Python code.
|
||||
|
||||
Response protocol:
|
||||
- Respond ONLY with valid JSON
|
||||
- Respond ONLY with valid JSON containing valid content
|
||||
- Format: [{"name": "function_name", "parameters": {"param": "value"}}]
|
||||
- Multiple calls: [{"name": "func1", "parameters": {...}}, {"name": "func2", "parameters": {...}}]
|
||||
- Task complete: true
|
||||
@ -54,476 +63,150 @@ Response protocol:
|
||||
|
||||
self.messages = [{"role": "system", "content": system_prompt}]
|
||||
|
||||
def vision_analyze(
|
||||
self,
|
||||
image_path: str,
|
||||
prompt: str = "Describe what you see in this image in detail",
|
||||
) -> dict:
|
||||
def py_exec_async(self, python_source_code: str) -> Any:
|
||||
"""
|
||||
Asynchronously execute Python code and return the result.
|
||||
|
||||
Args:
|
||||
python_source_code (str): The Python code to execute.
|
||||
It will be executed using the `exec` statement using
|
||||
the local and global namespaces meaning that you can
|
||||
access global variables and functions from the current
|
||||
namespace what gives you a lot of power. You could even
|
||||
add new functions to the current namespace that will
|
||||
be available for the next execution using AI.
|
||||
|
||||
Returns:
|
||||
Any: The result of the executed Python code.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
return loop.run_in_executor(None, self.py_exec, python_source_code)
|
||||
|
||||
def py_exec(self, python_source_code: str) -> Any:
|
||||
"""
|
||||
Execute Python code and return the stdout.
|
||||
|
||||
Args:
|
||||
python_source_code (str): The Python code to execute.
|
||||
It will be executed using the `exec` statement using
|
||||
the local and global namespaces meaning that you can
|
||||
access global variables and functions from the current
|
||||
namespace what gives you a lot of power. You could even
|
||||
add new functions to the current namespace that will
|
||||
be available for the next execution using AI.
|
||||
Returns:
|
||||
Any: The stdout of the executed Python code.
|
||||
"""
|
||||
try:
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = captured_output = io.StringIO()
|
||||
exec(python_source_code, globals(), self.__dict__)
|
||||
output = captured_output.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
if output:
|
||||
return {"success": True, "stdout": output}
|
||||
else:
|
||||
return {"success": True, "stdout": "No output"}
|
||||
except Exception as e:
|
||||
return {"success": False, "stderr": str(e)}
|
||||
|
||||
def talk_with_user(self, message: str, is_question: bool = False) -> dict:
|
||||
"""Send a message to the user. You can also ask questions. Returns answer. This is the only way to interact with the user."""
|
||||
print(f"Bot: {message}")
|
||||
if is_question:
|
||||
result = input("→ ")
|
||||
else:
|
||||
result = "Message received by user."
|
||||
return {"success": True, "message": result}
|
||||
|
||||
def vision_analyze(self, image_path: str, prompt: str = "Describe what you see in this image in detail") -> dict:
|
||||
"""Analyze image content using computer vision. Provide absolute or relative file path and detailed prompt describing what information you need extracted from the image. Returns detailed description of image contents."""
|
||||
try:
|
||||
resolved_path = str(pathlib.Path(image_path).resolve().absolute())
|
||||
|
||||
with open(resolved_path, "rb") as f:
|
||||
image_bytes = f.read()
|
||||
|
||||
encoded_image = base64.b64encode(image_bytes).decode("utf-8")
|
||||
|
||||
payload = json.dumps(
|
||||
{"data": encoded_image, "path": resolved_path, "prompt": prompt}
|
||||
).encode("utf-8")
|
||||
|
||||
url_parts = self.vision_url.split("/")
|
||||
host = url_parts[2]
|
||||
path = "/" + "/".join(url_parts[3:])
|
||||
|
||||
connection = http.client.HTTPSConnection(host)
|
||||
connection.request(
|
||||
"POST",
|
||||
path,
|
||||
payload,
|
||||
{
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(payload)),
|
||||
"User-Agent": "AutonomousAgent/1.0",
|
||||
},
|
||||
)
|
||||
|
||||
response = connection.getresponse()
|
||||
response_data = response.read().decode("utf-8")
|
||||
connection.close()
|
||||
|
||||
if response.status == 200:
|
||||
return {"status": "success", "analysis": response_data}
|
||||
else:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": f"HTTP {response.status}: {response.reason}",
|
||||
}
|
||||
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"Image file not found: {image_path}"}
|
||||
with open(image_path, 'rb') as file:
|
||||
image_data = file.read()
|
||||
encoded_image = base64.b64encode(image_data).decode('utf-8')
|
||||
payload = {
|
||||
"model": "vision-model", # Assuming a model; adjust as needed
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"}}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
data = json.dumps(payload).encode('utf-8')
|
||||
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.api_key}'}
|
||||
req = urllib.request.Request(self.vision_url, data=data, headers=headers)
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
response = json.loads(resp.read().decode('utf-8'))
|
||||
description = response.get('choices', [{}])[0].get('message', {}).get('content', 'No description available')
|
||||
return {"status": "success", "description": description}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def http_fetch(self, url: str) -> dict:
|
||||
def http_fetch(self, url: str) -> str:
|
||||
"""Fetch and return content from any HTTP/HTTPS URL. Use this to retrieve web pages, APIs, or any online resource. Returns up to 10000 characters of content. Useful for reading documentation, articles, or API responses."""
|
||||
try:
|
||||
request = urllib.request.Request(url)
|
||||
request.add_header("User-Agent", "AutonomousAgent/1.0")
|
||||
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
content = response.read().decode("utf-8")
|
||||
return {
|
||||
"status": "success",
|
||||
"url": url,
|
||||
"content": content[:10000],
|
||||
"length": len(content),
|
||||
}
|
||||
except urllib.error.HTTPError as e:
|
||||
return {"status": "error", "message": f"HTTP {e.code}: {e.reason}"}
|
||||
except urllib.error.URLError as e:
|
||||
return {"status": "error", "message": f"URL error: {str(e.reason)}"}
|
||||
req = urllib.request.Request(url)
|
||||
with urllib.request.urlopen(req) as response:
|
||||
content = response.read().decode('utf-8')
|
||||
return {"status": "success", "content": content[:10000]}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def web_search(self, query: str) -> dict:
|
||||
"""Search the web for current information on any topic. Returns list of relevant results with titles, URLs and snippets. Use this when you need to find information, research topics, or discover resources. Query should be clear and specific."""
|
||||
try:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
full_url = f"{self.search_url}?query={encoded_query}"
|
||||
|
||||
with urllib.request.urlopen(full_url, timeout=30) as response:
|
||||
results = json.loads(response.read().decode("utf-8"))
|
||||
return {"status": "success", "query": query, "results": results}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def web_search_news(self, query: str) -> dict:
|
||||
"""Search for recent news articles and current events related to query. Returns news results with headlines, sources and publication dates. Use when you need latest updates, breaking news, or time-sensitive information. More focused on recent content than general web_search."""
|
||||
try:
|
||||
encoded_query = urllib.parse.quote(query)
|
||||
full_url = f"{self.search_url}?query={encoded_query}"
|
||||
|
||||
with urllib.request.urlopen(full_url, timeout=30) as response:
|
||||
results = json.loads(response.read().decode("utf-8"))
|
||||
return {"status": "success", "query": query, "news_results": results}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
def list_directory(self, path: str = ".") -> list:
|
||||
"""List all files and directories in specified path. Defaults to current directory if no path provided. Returns list of names. Use this to explore filesystem structure, find files, or verify file existence before other file operations."""
|
||||
return os.listdir(path)
|
||||
|
||||
def read_file(self, filepath: str) -> dict:
|
||||
"""Read and return complete contents of a text file from filesystem. Provide absolute or relative path. Use this to access configuration files, data files, logs, or any text-based content stored locally. Returns full file content as string."""
|
||||
try:
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
return {
|
||||
"status": "success",
|
||||
"filepath": filepath,
|
||||
"content": content,
|
||||
"size": len(content),
|
||||
}
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"File not found: {filepath}"}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {filepath}"}
|
||||
with open(filepath, 'r') as file:
|
||||
content = file.read()
|
||||
return {"status": "success", "filepath": filepath, "content": content}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def write_file(self, filepath: str, content: str) -> dict:
|
||||
"""Write content to a file on filesystem. Creates new file or overwrites existing file at specified path. Use this to save results, generate reports, create configuration files, or persist any text data. Provide full content to write."""
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
return {
|
||||
"status": "success",
|
||||
"filepath": filepath,
|
||||
"bytes_written": len(content),
|
||||
}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {filepath}"}
|
||||
with open(filepath, 'w') as file:
|
||||
file.write(content)
|
||||
return {"status": "success", "filepath": filepath}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def list_directory(self, path: str = ".") -> dict:
|
||||
"""List all files and directories in specified path. Defaults to current directory if no path provided. Returns list of names. Use this to explore filesystem structure, find files, or verify file existence before other file operations."""
|
||||
def web_search(self, query: str) -> list:
|
||||
"""Search the web for current information on any topic. Returns list of relevant results with titles, URLs and snippets. Use this when you need to find information, research topics, or discover resources. Query should be clear and specific."""
|
||||
try:
|
||||
entries = os.listdir(path)
|
||||
return {
|
||||
"status": "success",
|
||||
"path": os.path.abspath(path),
|
||||
"entries": sorted(entries),
|
||||
"count": len(entries),
|
||||
payload = {
|
||||
"query": query,
|
||||
"type": "general"
|
||||
}
|
||||
except FileNotFoundError:
|
||||
return {"status": "error", "message": f"Directory not found: {path}"}
|
||||
except PermissionError:
|
||||
return {"status": "error", "message": f"Permission denied: {path}"}
|
||||
data = json.dumps(payload).encode('utf-8')
|
||||
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.api_key}'}
|
||||
req = urllib.request.Request(self.search_url, data=data, headers=headers)
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
result = json.loads(resp.read().decode('utf-8'))
|
||||
return result # Assuming the response is a list of results
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
def _convert_type_to_schema(self, type_hint: Any) -> Dict[str, Any]:
|
||||
origin = get_origin(type_hint)
|
||||
|
||||
if type_hint == str:
|
||||
return {"type": "string"}
|
||||
elif type_hint == int:
|
||||
return {"type": "integer"}
|
||||
elif type_hint == float:
|
||||
return {"type": "number"}
|
||||
elif type_hint == bool:
|
||||
return {"type": "boolean"}
|
||||
elif type_hint == list or origin == list:
|
||||
args = get_args(type_hint)
|
||||
if args:
|
||||
return {"type": "array", "items": self._convert_type_to_schema(args[0])}
|
||||
return {"type": "array"}
|
||||
elif type_hint == dict or origin == dict:
|
||||
return {"type": "object"}
|
||||
elif origin == Union:
|
||||
args = get_args(type_hint)
|
||||
if type(None) in args:
|
||||
non_none_types = [arg for arg in args if arg != type(None)]
|
||||
if len(non_none_types) == 1:
|
||||
schema = self._convert_type_to_schema(non_none_types[0])
|
||||
schema["nullable"] = True
|
||||
return schema
|
||||
return {"type": "string"}
|
||||
return {"type": "string"}
|
||||
|
||||
def _generate_function_schemas(self) -> List[Dict[str, Any]]:
|
||||
schemas = []
|
||||
excluded_methods = {
|
||||
"run",
|
||||
"execute",
|
||||
"_initialize_conversation",
|
||||
"_convert_type_to_schema",
|
||||
"_generate_function_schemas",
|
||||
"_build_system_prompt",
|
||||
"_parse_response",
|
||||
"_execute_functions",
|
||||
"_format_results",
|
||||
}
|
||||
|
||||
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
|
||||
if name.startswith("_") or name in excluded_methods:
|
||||
continue
|
||||
|
||||
signature = inspect.signature(method)
|
||||
|
||||
try:
|
||||
type_hints = get_type_hints(method)
|
||||
except Exception:
|
||||
type_hints = {}
|
||||
|
||||
parameters = {"type": "object", "properties": {}, "required": []}
|
||||
|
||||
for param_name, param in signature.parameters.items():
|
||||
if param_name == "self":
|
||||
continue
|
||||
|
||||
param_type = type_hints.get(param_name, str)
|
||||
parameters["properties"][param_name] = self._convert_type_to_schema(
|
||||
param_type
|
||||
)
|
||||
|
||||
if param.default == inspect.Parameter.empty:
|
||||
parameters["required"].append(param_name)
|
||||
|
||||
docstring = inspect.getdoc(method) or f"Execute {name}"
|
||||
|
||||
schemas.append(
|
||||
{"name": name, "description": docstring, "parameters": parameters}
|
||||
)
|
||||
|
||||
return schemas
|
||||
|
||||
def _build_system_prompt(self) -> str:
|
||||
schemas = self._generate_function_schemas()
|
||||
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
return f"""You are a precise autonomous agent. Execute tasks methodically using available functions.
|
||||
|
||||
Core principles:
|
||||
- Break complex tasks into sequential function calls
|
||||
- Verify each result before proceeding
|
||||
- If a function fails, analyze the error and try an alternative approach
|
||||
- Chain functions logically: search → fetch → analyze → act
|
||||
- Complete tasks fully without requesting human input
|
||||
|
||||
Response protocol:
|
||||
- Respond ONLY with valid JSON
|
||||
- Format: [{{"name": "function_name", "parameters": {{"param": "value"}}}}]
|
||||
- Multiple calls: [{{"name": "func1", "parameters": {{}}}}, {{"name": "func2", "parameters": {{}}}}]
|
||||
- Task complete: true
|
||||
- Never include explanatory text with function calls
|
||||
|
||||
Example multi-step task:
|
||||
User: "Find and summarize the latest article about AI"
|
||||
Response: [{{"name": "web_search", "parameters": {{"query": "latest AI article"}}}}, {{"name": "http_fetch", "parameters": {{"url": "<result_url>"}}}}]
|
||||
|
||||
Timestamp: {timestamp}
|
||||
|
||||
Available functions:
|
||||
{json.dumps(schemas, indent=2)}"""
|
||||
|
||||
def _parse_response(self, response: str) -> Optional[List[Dict[str, Any]]]:
|
||||
response = response.strip()
|
||||
|
||||
if response.startswith("```json\n") and response.endswith("\n```"):
|
||||
response = response[len("```json\n") : -len("\n```")]
|
||||
|
||||
response = response.strip()
|
||||
|
||||
if response.lower() == "true":
|
||||
return None
|
||||
|
||||
def web_search_news(self, query: str) -> list:
|
||||
"""Search for recent news articles and current events related to query. Returns news results with headlines, sources and publication dates. Use when you need latest updates, breaking news, or time-sensitive information. More focused on recent content than general web_search."""
|
||||
try:
|
||||
if response.startswith("[") and response.endswith("]"):
|
||||
parsed = json.loads(response)
|
||||
return parsed if isinstance(parsed, list) else [parsed]
|
||||
elif response.startswith("{") and response.endswith("}"):
|
||||
return [json.loads(response)]
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def _execute_functions(
|
||||
self, function_calls: List[Dict[str, Any]]
|
||||
) -> List[Dict[str, Any]]:
|
||||
results = []
|
||||
|
||||
for call in function_calls:
|
||||
result_entry = {"function": call.get("name"), "result": None, "error": None}
|
||||
|
||||
try:
|
||||
function_name = call.get("name")
|
||||
parameters = call.get("parameters", {})
|
||||
|
||||
if not function_name:
|
||||
raise ValueError("Function name missing")
|
||||
|
||||
if not hasattr(self, function_name):
|
||||
raise AttributeError(f"Unknown function: {function_name}")
|
||||
|
||||
function = getattr(self, function_name)
|
||||
|
||||
if not callable(function) or function_name.startswith("_"):
|
||||
raise TypeError(f"Cannot call: {function_name}")
|
||||
|
||||
result_entry["result"] = function(**parameters)
|
||||
|
||||
except Exception as e:
|
||||
result_entry["error"] = str(e)
|
||||
result_entry["traceback"] = traceback.format_exc()
|
||||
|
||||
results.append(result_entry)
|
||||
|
||||
return results
|
||||
|
||||
def _format_results(self, results: List[Dict[str, Any]]) -> str:
|
||||
formatted = []
|
||||
|
||||
for result in results:
|
||||
function_name = result["function"]
|
||||
|
||||
if result["error"]:
|
||||
formatted.append(
|
||||
f"Function '{function_name}' failed: {result['error']}"
|
||||
)
|
||||
else:
|
||||
result_str = (
|
||||
json.dumps(result["result"])
|
||||
if isinstance(result["result"], dict)
|
||||
else str(result["result"])
|
||||
)
|
||||
if len(result_str) > 5000:
|
||||
result_str = result_str[:5000] + "... (truncated)"
|
||||
formatted.append(f"Function '{function_name}' returned: {result_str}")
|
||||
|
||||
return "\n\n".join(formatted)
|
||||
|
||||
def execute(self, user_query: str):
|
||||
self.messages.append({"role": "user", "content": user_query})
|
||||
|
||||
print(f"\n{'='*70}")
|
||||
print(f"Query: {user_query}")
|
||||
print(f"{'='*70}\n")
|
||||
|
||||
max_iterations = 50
|
||||
|
||||
for iteration in range(max_iterations):
|
||||
self.messages[0]["content"] = self._build_system_prompt()
|
||||
|
||||
try:
|
||||
payload = json.dumps(
|
||||
{"model": self.model, "messages": self.messages}
|
||||
).encode("utf-8")
|
||||
|
||||
request = urllib.request.Request(
|
||||
self.api_url,
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
with urllib.request.urlopen(request, timeout=600) as response:
|
||||
result = json.loads(response.read().decode("utf-8"))
|
||||
llm_response = result["choices"][0]["message"]["content"]
|
||||
|
||||
print(f"[Iteration {iteration + 1}]")
|
||||
preview = (
|
||||
llm_response[:300] + "..."
|
||||
if len(llm_response) > 300
|
||||
else llm_response
|
||||
)
|
||||
print(f"Response: {preview}\n")
|
||||
|
||||
function_calls = self._parse_response(llm_response)
|
||||
|
||||
if function_calls:
|
||||
print(f"Executing {len(function_calls)} function(s):")
|
||||
execution_results = self._execute_functions(function_calls)
|
||||
|
||||
for result in execution_results:
|
||||
status_symbol = "âś“" if not result["error"] else "âś—"
|
||||
output = (
|
||||
result["result"]
|
||||
if not result["error"]
|
||||
else result["error"]
|
||||
)
|
||||
output_preview = str(output)[:100]
|
||||
print(
|
||||
f" {status_symbol} {result['function']}: {output_preview}"
|
||||
)
|
||||
print()
|
||||
|
||||
self.messages.append(
|
||||
{"role": "assistant", "content": llm_response}
|
||||
)
|
||||
self.messages.append(
|
||||
{
|
||||
"role": "user",
|
||||
"content": self._format_results(execution_results),
|
||||
}
|
||||
)
|
||||
else:
|
||||
print(f"{'='*70}")
|
||||
print(f"Completed in {iteration + 1} iteration(s)")
|
||||
print(f"{'='*70}\n")
|
||||
if llm_response.lower() != "true":
|
||||
print(f"Result: {llm_response}\n")
|
||||
break
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error: {e.code} - {e.reason}")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
traceback.print_exc()
|
||||
break
|
||||
else:
|
||||
print(f"Maximum iterations ({max_iterations}) reached\n")
|
||||
|
||||
def run(self):
|
||||
def complete_path(text, state):
|
||||
if os.path.isdir(text):
|
||||
directory = text
|
||||
prefix = ""
|
||||
else:
|
||||
directory, prefix = os.path.split(text)
|
||||
if not directory:
|
||||
directory = "."
|
||||
|
||||
try:
|
||||
entries = os.listdir(directory)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
matches = [e for e in entries if e.startswith(prefix)]
|
||||
|
||||
if state < len(matches):
|
||||
return os.path.join(directory, matches[state])
|
||||
return None
|
||||
|
||||
readline.set_completer(complete_path)
|
||||
readline.parse_and_bind("tab: complete")
|
||||
|
||||
history_path = "/tmp/autonomous_agent.history"
|
||||
try:
|
||||
readline.read_history_file(history_path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
print(f"Autonomous Agent initialized")
|
||||
print(f"Model: {self.model}")
|
||||
print(f"Functions: vision_analyze, http_fetch, web_search, web_search_news")
|
||||
print(f" read_file, write_file, list_directory")
|
||||
print(f"Type 'exit' or 'quit' to stop\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("→ ")
|
||||
|
||||
if not user_input.strip():
|
||||
continue
|
||||
|
||||
if user_input.strip().lower() in ["exit", "quit", "q"]:
|
||||
print("Shutting down...")
|
||||
break
|
||||
|
||||
readline.write_history_file(history_path)
|
||||
self.execute(user_input)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted. Type 'exit' to quit.")
|
||||
continue
|
||||
except EOFError:
|
||||
print("\nShutting down...")
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
agent = Elon(model="x-ai/grok-code-fast-1")
|
||||
agent.run()
|
||||
payload = {
|
||||
"query": query,
|
||||
"type": "news"
|
||||
}
|
||||
data = json.dumps(payload).encode('utf-8')
|
||||
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.api_key}'}
|
||||
req = urllib.request.Request(self.search_url, data=data, headers=headers)
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
result = json.loads(resp.read().decode('utf-8'))
|
||||
return result # Assuming the response is a list of news items
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
248
rgithook.py
248
rgithook.py
@ -4,7 +4,9 @@ API_URL = "https://static.molodetz.nl/rp.cgi/api/v1/chat/completions"
|
||||
MODEL = "google/gemma-3-12b-it:free"
|
||||
TEMPERATURE = 1.0
|
||||
MAX_TOKENS = None
|
||||
HOOK_PATH = ".git/hooks/prepare-commit-msg"
|
||||
HOOK_PATH_PRECOMMIT = ".git/hooks/pre-commit"
|
||||
HOOK_PATH_PREPAREMSG = ".git/hooks/prepare-commit-msg"
|
||||
TEMP_MSG_FILE = ".git/COMMIT_MSG_GENERATED"
|
||||
|
||||
import sys
|
||||
import subprocess
|
||||
@ -32,23 +34,29 @@ def install_hook():
|
||||
try:
|
||||
if not os.path.exists('.git'):
|
||||
return False
|
||||
|
||||
|
||||
script_path = os.path.abspath(__file__)
|
||||
hook_dir = os.path.dirname(HOOK_PATH)
|
||||
|
||||
hook_dir = os.path.dirname(HOOK_PATH_PRECOMMIT)
|
||||
|
||||
if not os.path.exists(hook_dir):
|
||||
os.makedirs(hook_dir)
|
||||
|
||||
if os.path.exists(HOOK_PATH):
|
||||
with open(HOOK_PATH, 'r') as f:
|
||||
if script_path in f.read():
|
||||
return False
|
||||
|
||||
with open(HOOK_PATH, 'w') as f:
|
||||
f.write(f'#!/bin/bash\n{script_path} "$@"\n')
|
||||
|
||||
os.chmod(HOOK_PATH, 0o755)
|
||||
return True
|
||||
|
||||
installed = False
|
||||
|
||||
for hook_path, mode in [(HOOK_PATH_PRECOMMIT, 'pre-commit'), (HOOK_PATH_PREPAREMSG, 'prepare-commit-msg')]:
|
||||
needs_install = True
|
||||
if os.path.exists(hook_path):
|
||||
with open(hook_path, 'r') as f:
|
||||
if script_path in f.read():
|
||||
needs_install = False
|
||||
|
||||
if needs_install:
|
||||
with open(hook_path, 'w') as f:
|
||||
f.write(f'#!/bin/bash\n{script_path} {mode} "$@"\n')
|
||||
os.chmod(hook_path, 0o755)
|
||||
installed = True
|
||||
|
||||
return installed
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@ -217,52 +225,52 @@ def call_ai(prompt):
|
||||
def generate_commit_message(diff, files):
|
||||
try:
|
||||
files_list = "\n".join([f"- {f}" for f in files[:20]])
|
||||
|
||||
prompt = f"""You write commit messages for code changes.
|
||||
|
||||
Changed files:
|
||||
prompt = f"""You are a senior software engineer writing precise git commit messages following the Conventional Commits specification.
|
||||
|
||||
CONTEXT:
|
||||
Files modified: {len(files)}
|
||||
{files_list}
|
||||
|
||||
Code changes:
|
||||
DIFF:
|
||||
{diff[:12000]}
|
||||
|
||||
Write a commit message with this format:
|
||||
<prefix>: <description>
|
||||
TASK:
|
||||
Analyze the changes and write a commit message that accurately describes the modifications.
|
||||
|
||||
Format rules:
|
||||
- All lowercase for the prefix
|
||||
- Colon and space after prefix
|
||||
- Start description with lowercase letter
|
||||
- No period at the end
|
||||
- Max 72 characters total
|
||||
- Use imperative mood (Add not Added)
|
||||
OUTPUT FORMAT:
|
||||
<type>: <concise description>
|
||||
|
||||
Choose one prefix:
|
||||
- fix: for bug fixes
|
||||
Example: fix: resolve null pointer error in user login
|
||||
Example: fix: correct date format in export function
|
||||
RULES:
|
||||
1. Use exactly one line per logical change
|
||||
2. Type must be lowercase
|
||||
3. Description starts with lowercase verb in imperative mood
|
||||
4. No trailing punctuation
|
||||
5. Maximum 72 characters per line
|
||||
6. Group related changes under a single type when appropriate
|
||||
7. Order by importance (most significant change first)
|
||||
|
||||
- feat: for new features
|
||||
Example: feat: add dark mode toggle to settings
|
||||
Example: feat: implement search filter for products
|
||||
TYPES (select the most appropriate):
|
||||
- feat: new functionality or capability added to the codebase
|
||||
- fix: correction of a bug or erroneous behavior
|
||||
- refactor: code restructuring without changing external behavior
|
||||
- perf: performance optimization or improvement
|
||||
- docs: documentation additions or modifications
|
||||
- test: test additions or modifications
|
||||
- build: build system or dependency changes
|
||||
- ci: continuous integration configuration changes
|
||||
- style: formatting changes (whitespace, semicolons, etc.)
|
||||
- chore: routine maintenance tasks
|
||||
|
||||
- docs: for documentation changes
|
||||
Example: docs: update api endpoint descriptions
|
||||
Example: docs: add setup guide for development
|
||||
ANALYSIS GUIDELINES:
|
||||
- Identify the primary purpose of the change
|
||||
- Distinguish between new features and modifications to existing ones
|
||||
- Recognize bug fixes by error handling, null checks, or condition corrections
|
||||
- Note refactoring by structural changes without behavior modification
|
||||
- Detect performance work by optimization patterns or caching additions
|
||||
|
||||
- perf: for performance improvements
|
||||
Example: perf: reduce database query time by 40%
|
||||
Example: perf: optimize image loading with lazy load
|
||||
|
||||
- refactor: for code restructuring
|
||||
Example: refactor: simplify user validation logic
|
||||
Example: refactor: extract common functions to utils
|
||||
|
||||
- maintenance: for routine updates and maintenance
|
||||
Example: maintenance: update dependencies to latest versions
|
||||
Example: maintenance: clean up unused imports and files
|
||||
|
||||
Reply with ONLY the commit message, nothing else."""
|
||||
OUTPUT:
|
||||
Provide only the commit message lines, no explanations or additional text."""
|
||||
|
||||
message = call_ai(prompt)
|
||||
if not message:
|
||||
@ -270,30 +278,40 @@ Reply with ONLY the commit message, nothing else."""
|
||||
|
||||
message = message.strip().strip('"').strip("'")
|
||||
|
||||
prefixes = ['fix:', 'feat:', 'docs:', 'perf:', 'refactor:', 'maintenance:']
|
||||
if not any(message.startswith(p) for p in prefixes):
|
||||
message = f"feat: {message}"
|
||||
lines = message.split('\n')
|
||||
processed_lines = []
|
||||
prefixes = ['feat:', 'fix:', 'refactor:', 'perf:', 'docs:', 'test:', 'build:', 'ci:', 'style:', 'chore:']
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if not any(line.startswith(p) for p in prefixes):
|
||||
line = f"chore: {line}"
|
||||
processed_lines.append(line)
|
||||
|
||||
return message
|
||||
if not processed_lines:
|
||||
return generate_fallback_message(files)
|
||||
|
||||
return '\n'.join(processed_lines)
|
||||
except Exception:
|
||||
return generate_fallback_message(files)
|
||||
|
||||
def generate_fallback_message(files):
|
||||
try:
|
||||
if not files:
|
||||
return "feat: update project"
|
||||
|
||||
return "chore: update project"
|
||||
|
||||
exts = set()
|
||||
for f in files:
|
||||
ext = os.path.splitext(f)[1]
|
||||
if ext:
|
||||
exts.add(ext[1:])
|
||||
|
||||
|
||||
if exts:
|
||||
return f"feat: update {', '.join(sorted(exts)[:3])} files"
|
||||
return "feat: update project files"
|
||||
return f"chore: update {', '.join(sorted(exts)[:3])} files"
|
||||
return "chore: update project files"
|
||||
except Exception:
|
||||
return "feat: update project"
|
||||
return "chore: update project"
|
||||
|
||||
def create_git_tag(version):
|
||||
try:
|
||||
@ -334,20 +352,32 @@ def update_changelog(version, commit_message, stats, files):
|
||||
|
||||
lang_summary = ", ".join(lang_stats) if lang_stats else "No code changes"
|
||||
|
||||
functional_desc_prompt = f"""You write changelog entries for software releases.
|
||||
functional_desc_prompt = f"""You are a technical writer creating changelog entries for a software release.
|
||||
|
||||
Commit message: {commit_message}
|
||||
Changed files: {", ".join(files[:10])}
|
||||
COMMIT SUMMARY:
|
||||
{commit_message}
|
||||
|
||||
Write a short functional description of what changed for users or developers.
|
||||
Use simple clear words, be direct, max 2 sentences.
|
||||
Focus on what it does, not how.
|
||||
AFFECTED FILES:
|
||||
{", ".join(files[:10])}
|
||||
|
||||
Reply with ONLY the description text."""
|
||||
TASK:
|
||||
Write a concise changelog entry describing the functional impact of these changes.
|
||||
|
||||
REQUIREMENTS:
|
||||
1. Maximum two sentences
|
||||
2. Focus on user-visible or developer-relevant changes
|
||||
3. Use present tense, active voice
|
||||
4. Be specific about what functionality is added, modified, or fixed
|
||||
5. Avoid implementation details unless architecturally significant
|
||||
6. No marketing language or superlatives
|
||||
|
||||
OUTPUT:
|
||||
Provide only the changelog description text, no formatting or prefixes."""
|
||||
|
||||
functional_desc = call_ai(functional_desc_prompt)
|
||||
if not functional_desc:
|
||||
functional_desc = commit_message.split(':', 1)[1].strip() if ':' in commit_message else commit_message
|
||||
first_line = commit_message.split('\n')[0]
|
||||
functional_desc = first_line.split(':', 1)[1].strip() if ':' in first_line else first_line
|
||||
|
||||
entry_lines = [
|
||||
f"## Version {version} - {today}",
|
||||
@ -382,6 +412,48 @@ Reply with ONLY the description text."""
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def run_pre_commit():
|
||||
diff = get_git_diff()
|
||||
files = get_changed_files()
|
||||
|
||||
if not diff and not files:
|
||||
sys.exit(0)
|
||||
|
||||
stats = analyze_diff_stats(diff)
|
||||
commit_message = generate_commit_message(diff, files)
|
||||
|
||||
current_version, source = get_version()
|
||||
new_version = update_version(current_version, source)
|
||||
|
||||
update_changelog(new_version, commit_message, stats, files)
|
||||
safe_run(['git', 'add', 'CHANGELOG.md'])
|
||||
|
||||
with open(TEMP_MSG_FILE, 'w') as f:
|
||||
f.write(f"{new_version}\n{commit_message}")
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def run_prepare_commit_msg(commit_msg_file):
|
||||
if not os.path.exists(TEMP_MSG_FILE):
|
||||
sys.exit(0)
|
||||
|
||||
with open(TEMP_MSG_FILE, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
lines = content.split('\n', 1)
|
||||
new_version = lines[0] if lines else "0.1.0"
|
||||
commit_message = lines[1] if len(lines) > 1 else "chore: update project"
|
||||
|
||||
with open(commit_msg_file, 'w') as f:
|
||||
f.write(commit_message + '\n')
|
||||
|
||||
create_git_tag(new_version)
|
||||
|
||||
os.remove(TEMP_MSG_FILE)
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
if len(sys.argv) < 2:
|
||||
@ -390,36 +462,24 @@ def main():
|
||||
else:
|
||||
print("Git hook already installed")
|
||||
sys.exit(0)
|
||||
|
||||
commit_msg_file = sys.argv[1]
|
||||
|
||||
current_version, source = get_version()
|
||||
new_version = update_version(current_version, source)
|
||||
|
||||
diff = get_git_diff()
|
||||
files = get_changed_files()
|
||||
stats = analyze_diff_stats(diff)
|
||||
|
||||
commit_message = generate_commit_message(diff, files)
|
||||
|
||||
update_changelog(new_version, commit_message, stats, files)
|
||||
safe_run(['git', 'add', 'CHANGELOG.md'])
|
||||
|
||||
create_git_tag(new_version)
|
||||
|
||||
with open(commit_msg_file, 'w') as f:
|
||||
f.write(commit_message + '\n')
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
mode = sys.argv[1]
|
||||
|
||||
if mode == 'pre-commit':
|
||||
run_pre_commit()
|
||||
elif mode == 'prepare-commit-msg':
|
||||
if len(sys.argv) >= 3:
|
||||
run_prepare_commit_msg(sys.argv[2])
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(0)
|
||||
except Exception:
|
||||
try:
|
||||
if len(sys.argv) >= 2:
|
||||
with open(sys.argv[1], 'w') as f:
|
||||
f.write("feat: update project\n")
|
||||
if os.path.exists(TEMP_MSG_FILE):
|
||||
os.remove(TEMP_MSG_FILE)
|
||||
except Exception:
|
||||
pass
|
||||
sys.exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user