Compare commits

..

No commits in common. "a6c276862ec890f0f185d4a01969ccbe8d7e1e8e" and "543dc44004d9eab342eedadde8f8d4389124dc5c" have entirely different histories.

15 changed files with 42 additions and 452 deletions

3
.gitignore vendored
View File

@ -7,9 +7,6 @@ __pycache__/
ab
# C extensions
*.so
*.png
GEMINI.md
# Distribution / packaging
.Python

View File

@ -19,38 +19,6 @@
## Version 1.25.0 - 2025-11-08
Autonomous mode now has improved error handling and tracks usage and costs. Several internal components were updated to improve reliability and logging.
**Changes:** 10 files, 342 lines
**Languages:** Markdown (8 lines), Python (332 lines), TOML (2 lines)
## Version 1.24.0 - 2025-11-07
Users can now run the tool in autonomous mode using a command-line argument. We've also improved error handling and assistant output for a better experience.
**Changes:** 5 files, 62 lines
**Languages:** Markdown (8 lines), Python (52 lines), TOML (2 lines)
## Version 1.23.0 - 2025-11-07
This release updates project dependencies and improves file handling. The changelog now includes details about the previous version (1.22.0).
**Changes:** 4 files, 23 lines
**Languages:** Markdown (8 lines), Other (3 lines), TOML (2 lines), Text (10 lines)
## Version 1.22.0 - 2025-11-07
This release bumps the project version to 1.22.0 and removes an internal version print statement. Tests have been updated to use more accurate terminology.
**Changes:** 4 files, 29 lines
**Languages:** Markdown (8 lines), Python (19 lines), TOML (2 lines)
## Version 1.21.0 - 2025-11-07
The project has been updated to version 1.21.0, and the release notes for version 1.20.0 are now included in the changelog.

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "rp"
version = "1.25.0"
version = "1.21.0"
description = "R python edition. The ultimate autonomous AI CLI."
readme = "README.md"
requires-python = ">=3.10"

View File

@ -1,10 +0,0 @@
pydantic==2.12.3
jinja2==3.1.4
cryptography==43.0.0
docker==7.1.0
gitpython==3.1.43
websockets==13.0.1
pytest==8.3.2
bcrypt==4.1.3
python-slugify==8.0.4
requests>=2.31.0

View File

@ -44,7 +44,6 @@ Commands in interactive mode:
parser.add_argument("-u", "--api-url", help="API endpoint URL")
parser.add_argument("--model-list-url", help="Model list endpoint URL")
parser.add_argument("-i", "--interactive", action="store_true", help="Interactive mode")
parser.add_argument("-a", "--autonomous", action="store_true", help="Autonomous mode")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument(
"--debug", action="store_true", help="Enable debug mode with detailed logging"

View File

@ -97,14 +97,6 @@ def process_response_autonomous(assistant, response):
get_tools_definition(),
verbose=assistant.verbose,
)
if "usage" in follow_up:
usage = follow_up["usage"]
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
assistant.usage_tracker.track_request(assistant.model, input_tokens, output_tokens)
cost = assistant.usage_tracker._calculate_cost(assistant.model, input_tokens, output_tokens)
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
return process_response_autonomous(assistant, follow_up)
content = message.get("content", "")
from rp.ui import render_markdown

View File

@ -7,7 +7,7 @@ from rp.core.http_client import http_client
logger = logging.getLogger("rp")
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False, db_conn=None):
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False):
try:
messages = auto_slim_messages(messages, verbose=verbose)
logger.debug(f"=== API CALL START ===")
@ -34,15 +34,8 @@ def call_api(messages, model, api_url, api_key, use_tools, tools_definition, ver
logger.debug(f"Tool calling enabled with {len(tools_definition)} tools")
request_json = data
logger.debug(f"Request payload size: {len(request_json)} bytes")
# Log the API request to database if db_conn is provided
if db_conn:
from rp.tools.database import log_api_request
log_result = log_api_request(model, api_url, request_json, db_conn)
if log_result.get("status") != "success":
logger.warning(f"Failed to log API request: {log_result.get('error')}")
logger.debug("Sending HTTP request...")
response = http_client.post(api_url, headers=headers, json_data=request_json, db_conn=db_conn)
response = http_client.post(api_url, headers=headers, json_data=request_json)
if response.get("error"):
if "status" in response:
logger.error(f"API HTTP Error: {response['status']} - {response.get('text', '')}")
@ -89,7 +82,7 @@ def list_models(model_list_url, api_key):
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
response = http_client.get(model_list_url, headers=headers, db_conn=None)
response = http_client.get(model_list_url, headers=headers)
if response.get("error"):
return {"error": response.get("text", "HTTP error")}
data = json.loads(response["text"])

View File

@ -32,7 +32,7 @@ from rp.tools.agents import (
remove_agent,
)
from rp.tools.command import kill_process, run_command, tail_process
from rp.tools.database import db_get, db_query, db_set, log_api_request
from rp.tools.database import db_get, db_query, db_set
from rp.tools.filesystem import (
chdir,
clear_edit_tracker,
@ -105,7 +105,6 @@ class Assistant:
self.background_monitoring = False
self.usage_tracker = UsageTracker()
self.background_tasks = set()
self.last_result = None
self.init_database()
self.messages.append(init_system_message(args))
try:
@ -138,9 +137,6 @@ class Assistant:
cursor.execute(
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS api_request_logs\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n timestamp REAL, model TEXT, api_url TEXT,\n request_payload TEXT)"
)
self.db_conn.commit()
logger.debug("Database initialized successfully")
except Exception as e:
@ -313,7 +309,6 @@ class Assistant:
self.use_tools,
get_tools_definition(),
verbose=self.verbose,
db_conn=self.db_conn,
)
return self.process_response(follow_up)
content = message.get("content", "")
@ -410,9 +405,7 @@ class Assistant:
# Use enhanced processing if available, otherwise fall back to basic processing
if hasattr(self, "enhanced") and self.enhanced:
result = self.enhanced.process_with_enhanced_context(user_input)
if result != self.last_result:
print(result)
self.last_result = result
print(result)
else:
process_message(self, user_input)
except EOFError:
@ -430,19 +423,6 @@ class Assistant:
message = sys.stdin.read()
process_message(self, message)
def run_autonomous(self):
if self.args.message:
task = self.args.message
else:
self.setup_readline()
task = input("> ").strip()
if not task:
print("No task provided. Exiting.")
return
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, task)
def cleanup(self):
if hasattr(self, "enhanced") and self.enhanced:
try:
@ -466,9 +446,7 @@ class Assistant:
def run(self):
try:
if self.args.autonomous:
self.run_autonomous()
elif self.args.interactive or (not self.args.message and sys.stdin.isatty()):
if self.args.interactive or (not self.args.message and sys.stdin.isatty()):
self.run_repl()
else:
self.run_single()
@ -493,7 +471,6 @@ def process_message(assistant, message):
assistant.use_tools,
get_tools_definition(),
verbose=assistant.verbose,
db_conn=assistant.db_conn,
)
spinner.stop()
if "usage" in response:
@ -505,6 +482,4 @@ def process_message(assistant, message):
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
result = assistant.process_response(response)
if result != assistant.last_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
assistant.last_result = result
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")

View File

@ -1,4 +1,3 @@
import json
import logging
import time
import requests
@ -20,7 +19,6 @@ class SyncHTTPClient:
data: Optional[bytes] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0,
db_conn=None,
) -> Dict[str, Any]:
"""Make a sync HTTP request using requests with retry logic."""
attempt = 0
@ -37,19 +35,6 @@ class SyncHTTPClient:
timeout=timeout,
)
response.raise_for_status() # Raise an exception for bad status codes
# Prepare request body for logging
if json_data is not None:
request_body = json.dumps(json_data)
elif data is not None:
request_body = data.decode('utf-8') if isinstance(data, bytes) else str(data)
else:
request_body = ""
# Log the request
if db_conn:
from rp.tools.database import log_http_request
log_result = log_http_request(method, url, request_body, response.text, response.status_code, db_conn)
if log_result.get("status") != "success":
logger.warning(f"Failed to log HTTP request: {log_result.get('error')}")
return {
"status": response.status_code,
"headers": dict(response.headers),
@ -73,9 +58,9 @@ class SyncHTTPClient:
return {"error": True, "exception": str(e)}
def get(
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, db_conn=None
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0
) -> Dict[str, Any]:
return self.request("GET", url, headers=headers, timeout=timeout, db_conn=db_conn)
return self.request("GET", url, headers=headers, timeout=timeout)
def post(
self,
@ -84,10 +69,9 @@ class SyncHTTPClient:
data: Optional[bytes] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0,
db_conn=None,
) -> Dict[str, Any]:
return self.request(
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout, db_conn=db_conn
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout
)
def set_default_headers(self, headers: Dict[str, str]):

View File

@ -26,7 +26,6 @@ class RPEditor:
self.lines = [""]
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
self.mode = "normal"
self.command = ""
self.stdscr = None
@ -223,15 +222,14 @@ class RPEditor:
try:
self.stdscr.clear()
height, width = self.stdscr.getmaxyx()
for i in range(height - 1):
line_idx = self.scroll_y + i
if line_idx < len(self.lines):
line = self.lines[line_idx]
try:
display_line = line[: width - 1] if len(line) >= width else line
self.stdscr.addstr(i, 0, display_line)
except curses.error:
pass
for i, line in enumerate(self.lines):
if i >= height - 1:
break
try:
display_line = line[: width - 1] if len(line) >= width else line
self.stdscr.addstr(i, 0, display_line)
except curses.error:
pass
status = f"{self.mode.upper()} | {self.filename or 'untitled'} | {self.cursor_y + 1}:{self.cursor_x + 1}"
if self.mode == "command":
status = self.command[: width - 1]
@ -240,12 +238,11 @@ class RPEditor:
except curses.error:
pass
cursor_x = min(self.cursor_x, width - 1)
cursor_y_display = self.cursor_y - self.scroll_y
if 0 <= cursor_y_display < height - 1:
try:
self.stdscr.move(cursor_y_display, cursor_x)
except curses.error:
pass
cursor_y = min(self.cursor_y, height - 2)
try:
self.stdscr.move(cursor_y, cursor_x)
except curses.error:
pass
self.stdscr.refresh()
except Exception:
pass
@ -324,33 +321,15 @@ class RPEditor:
if self.prev_key == ord("g"):
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
elif key == ord("G"):
self.cursor_y = max(0, len(self.lines) - 1)
self.cursor_x = 0
# Adjust scroll_y
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
elif key == ord("u"):
self.undo()
elif key == 18:
self.redo()
elif key == 19:
self._save_file()
elif key == curses.KEY_PPAGE: # Page Up
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
page_size = height - 2
self.cursor_y = max(0, self.cursor_y - page_size)
self.scroll_y = max(0, self.scroll_y - page_size)
elif key == curses.KEY_NPAGE: # Page Down
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
page_size = height - 2
self.cursor_y = min(len(self.lines) - 1, self.cursor_y + page_size)
self.scroll_y = min(max(0, len(self.lines) - height + 1), self.scroll_y + page_size)
self.prev_key = key
except Exception:
pass
@ -441,13 +420,6 @@ class RPEditor:
elif new_y >= len(self.lines):
self.cursor_y = max(0, len(self.lines) - 1)
self.cursor_x = len(self.lines[self.cursor_y])
# Adjust scroll_y to keep cursor visible
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y < self.scroll_y:
self.scroll_y = self.cursor_y
elif self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
def save_state(self):
"""Save current state for undo."""
@ -456,7 +428,6 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.undo_stack.append(state)
if len(self.undo_stack) > self.max_undo:
@ -471,7 +442,6 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.redo_stack.append(current_state)
state = self.undo_stack.pop()
@ -480,7 +450,6 @@ class RPEditor:
self.cursor_x = min(
state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0
)
self.scroll_y = state.get("scroll_y", 0)
def redo(self):
"""Redo last undone change."""
@ -490,7 +459,6 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.undo_stack.append(current_state)
state = self.redo_stack.pop()
@ -499,7 +467,6 @@ class RPEditor:
self.cursor_x = min(
state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0
)
self.scroll_y = state.get("scroll_y", 0)
def _insert_text(self, text):
"""Insert text at cursor position."""
@ -604,7 +571,6 @@ class RPEditor:
self.lines = text.splitlines() if text else [""]
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
def set_text(self, text):
"""Thread-safe text setting."""
@ -623,13 +589,6 @@ class RPEditor:
line_num = max(0, min(line_num - 1, len(self.lines) - 1))
self.cursor_y = line_num
self.cursor_x = 0
# Adjust scroll_y
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y < self.scroll_y:
self.scroll_y = self.cursor_y
elif self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
def goto_line(self, line_num):
"""Thread-safe goto line."""

View File

@ -17,7 +17,14 @@ from rp.tools.editor import (
open_editor,
)
from rp.tools.filesystem import (
get_uid, read_specific_lines, replace_specific_line, insert_line_at_position, delete_specific_line, read_file, write_file, list_directory, mkdir, chdir, getpwd, index_source_directory, search_replace, get_editor, close_editor, open_editor, editor_insert_text, editor_replace_text, display_edit_summary, display_edit_timeline, clear_edit_tracker
chdir,
getpwd,
index_source_directory,
list_directory,
mkdir,
read_file,
search_replace,
write_file,
)
from rp.tools.lsp import get_diagnostics
from rp.tools.memory import (
@ -47,11 +54,9 @@ agent = execute_agent_task
__all__ = [
"add_knowledge_entry",
"agent",
"apply_patch",
"bash",
"chdir",
"clear_edit_tracker",
"close_editor",
"collaborate_agents",
"create_agent",
@ -60,28 +65,23 @@ __all__ = [
"db_query",
"db_set",
"delete_knowledge_entry",
"delete_specific_line",
"diagnostics",
"display_edit_summary",
"display_edit_timeline",
"post_image",
"edit",
"editor_insert_text",
"editor_replace_text",
"editor_search",
"execute_agent_task",
"get_editor",
"get_knowledge_by_category",
"get_knowledge_entry",
"get_knowledge_statistics",
"get_tools_definition",
"get_uid",
"getpwd",
"glob",
"glob_files",
"grep",
"http_fetch",
"index_source_directory",
"insert_line_at_position",
"kill_process",
"list_agents",
"list_directory",
@ -89,12 +89,9 @@ __all__ = [
"mkdir",
"open_editor",
"patch",
"post_image",
"python_exec",
"read_file",
"read_specific_lines",
"remove_agent",
"replace_specific_line",
"run_command",
"run_command_interactive",
"search_knowledge",
@ -107,4 +104,3 @@ __all__ = [
"write",
"write_file",
]

View File

@ -23,7 +23,6 @@ def _create_api_wrapper():
use_tools=use_tools,
tools_definition=tools_definition,
verbose=False,
db_conn=None,
)
return api_wrapper

View File

@ -1,4 +1,3 @@
import json
import time
@ -75,55 +74,3 @@ def db_query(query, db_conn):
return {"status": "success", "rows_affected": cursor.rowcount}
except Exception as e:
return {"status": "error", "error": str(e)}
def log_api_request(model, api_url, request_payload, db_conn):
"""Log an API request to the database.
Args:
model: The model used.
api_url: The API URL.
request_payload: The JSON payload sent.
db_conn: Database connection.
Returns:
Dict with status.
"""
if not db_conn:
return {"status": "error", "error": "Database not initialized"}
try:
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO api_request_logs (timestamp, model, api_url, request_payload) VALUES (?, ?, ?, ?)",
(time.time(), model, api_url, json.dumps(request_payload)),
)
db_conn.commit()
return {"status": "success"}
except Exception as e:
return {"status": "error", "error": str(e)}
def log_http_request(method, url, request_body, response_body, status_code, db_conn):
"""Log an HTTP request to the database.
Args:
method: The HTTP method.
url: The URL.
request_body: The request body.
response_body: The response body.
status_code: The status code.
db_conn: Database connection.
Returns:
Dict with status.
"""
if not db_conn:
return {"status": "error", "error": "Database not initialized"}
try:
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO request_log (timestamp, method, url, request_body, response_body, status_code) VALUES (?, ?, ?, ?, ?, ?)",
(time.time(), method, url, request_body, response_body, status_code),
)
db_conn.commit()
return {"status": "success"}
except Exception as e:
return {"status": "error", "error": str(e)}

View File

@ -16,207 +16,6 @@ def get_uid():
return _id
def read_specific_lines(filepath: str, start_line: int, end_line: Optional[int] = None, db_conn: Optional[Any] = None) -> dict:
"""
Read specific lines or a range of lines from a file.
This function allows reading a single line or a contiguous range of lines from the specified file.
It supports optional database connection for tracking read operations.
Args:
filepath (str): The path to the file to read from. Supports user home directory expansion (e.g., ~).
start_line (int): The 1-based line number to start reading from.
end_line (Optional[int]): The 1-based line number to end reading at (inclusive). If None, reads only the start_line.
db_conn (Optional[Any]): An optional database connection object for tracking read operations. If provided, marks the file as read in the database.
Returns:
dict: A dictionary containing the status and either the content or an error message.
- On success: {"status": "success", "content": str} where content is the lines joined by newlines.
- On error: {"status": "error", "error": str} with the exception message.
Raises:
None: Exceptions are caught and returned in the response dictionary.
Examples:
# Read line 5 only
result = read_specific_lines("example.txt", 5)
# Read lines 10 to 20
result = read_specific_lines("example.txt", 10, 20)
"""
try:
path = os.path.expanduser(filepath)
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if start_line < 1 or start_line > total_lines:
return {"status": "error", "error": f"Start line {start_line} is out of range. File has {total_lines} lines."}
if end_line is None:
end_line = start_line
if end_line < start_line or end_line > total_lines:
return {"status": "error", "error": f"End line {end_line} is out of range. File has {total_lines} lines."}
selected_lines = lines[start_line - 1:end_line]
content = ''.join(selected_lines)
if db_conn:
from rp.tools.database import db_set
db_set("read:" + path, "true", db_conn)
return {"status": "success", "content": content}
except Exception as e:
return {"status": "error", "error": str(e)}
def replace_specific_line(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Replace the content of a specific line in a file.
This function replaces the entire content of a single line with new text. It supports optional database tracking
and diff display. The file must be read first if a database connection is provided.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number to replace.
new_content (str): The new content to place on the specified line (should not include trailing newline).
db_conn (Optional[Any]): An optional database connection for tracking. Requires the file to be read first.
show_diff (bool): If True, displays a diff of the changes after replacement.
Returns:
dict: A dictionary with status and message or error.
- On success: {"status": "success", "message": str} describing the operation.
- On error: {"status": "error", "error": str} with the exception or validation message.
Raises:
None: Exceptions are handled internally.
Examples:
# Replace line 3 with new text
result = replace_specific_line("file.txt", 3, "New line content")
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if line_number < 1 or line_number > total_lines:
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
old_content = ''.join(lines)
lines[line_number - 1] = new_content + '\n' if not new_content.endswith('\n') else new_content
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Replaced line {line_number} in {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def insert_line_at_position(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Insert a new line at a specific position in a file.
This function inserts new content as a new line before the specified line number. If line_number is beyond the file's length,
it appends to the end. Supports database tracking and diff display.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number before which to insert the new line. If greater than total lines, appends.
new_content (str): The content for the new line (trailing newline is added if missing).
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
show_diff (bool): If True, displays a diff of the changes after insertion.
Returns:
dict: Status and message or error.
- Success: {"status": "success", "message": str}
- Error: {"status": "error", "error": str}
Examples:
# Insert before line 5
result = insert_line_at_position("file.txt", 5, "Inserted line")
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
old_content = ''.join(lines)
insert_index = min(line_number - 1, len(lines))
lines.insert(insert_index, new_content + '\n' if not new_content.endswith('\n') else new_content)
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Inserted line at position {line_number} in {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def delete_specific_line(filepath: str, line_number: int, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Delete a specific line from a file.
This function removes the specified line from the file. Supports database tracking and diff display.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number to delete.
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
show_diff (bool): If True, displays a diff of the changes after deletion.
Returns:
dict: Status and message or error.
- Success: {"status": "success", "message": str}
- Error: {"status": "error", "error": str}
Examples:
# Delete line 10
result = delete_specific_line("file.txt", 10)
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if line_number < 1 or line_number > total_lines:
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
old_content = ''.join(lines)
del lines[line_number - 1]
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Deleted line {line_number} from {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
"""
Read the contents of a file.
@ -582,4 +381,3 @@ def clear_edit_tracker():
clear_tracker()
return {"status": "success", "message": "Edit tracker cleared"}

View File

@ -4,11 +4,6 @@ import urllib.parse
import urllib.request
import json
import urllib.parse
import urllib.request
def http_fetch(url, headers=None):
"""Fetch content from an HTTP URL.
@ -20,26 +15,25 @@ def http_fetch(url, headers=None):
Dict with status and content.
"""
try:
request = urllib.request.Request(url)
req = urllib.request.Request(url)
if headers:
for header_key, header_value in headers.items():
request.add_header(header_key, header_value)
with urllib.request.urlopen(request) as response:
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req) as response:
content = response.read().decode("utf-8")
return {"status": "success", "content": content[:10000]}
except Exception as exception:
return {"status": "error", "error": str(exception)}
except Exception as e:
return {"status": "error", "error": str(e)}
def _perform_search(base_url, query, params=None):
try:
encoded_query = urllib.parse.quote(query)
full_url = f"{base_url}?query={encoded_query}"
full_url = f"https://static.molodetz.nl/search.cgi?query={query}"
with urllib.request.urlopen(full_url) as response:
content = response.read().decode("utf-8")
return {"status": "success", "content": json.loads(content)}
except Exception as exception:
return {"status": "error", "error": str(exception)}
except Exception as e:
return {"status": "error", "error": str(e)}
def web_search(query):
@ -66,4 +60,3 @@ def web_search_news(query):
"""
base_url = "https://search.molodetz.nl/search"
return _perform_search(base_url, query)