Compare commits

...

4 Commits

Author SHA1 Message Date
a6c276862e feat: implement scrolling in editor
Some checks failed
Tests / test (macos-latest, 3.10) (push) Waiting to run
Tests / test (macos-latest, 3.11) (push) Waiting to run
Tests / test (macos-latest, 3.12) (push) Waiting to run
Tests / test (windows-latest, 3.10) (push) Waiting to run
Tests / test (windows-latest, 3.11) (push) Waiting to run
Tests / test (windows-latest, 3.12) (push) Waiting to run
Lint / lint (push) Failing after 24s
Tests / test (ubuntu-latest, 3.10) (push) Failing after 40s
Tests / test (ubuntu-latest, 3.11) (push) Failing after 37s
Tests / test (ubuntu-latest, 3.12) (push) Failing after 1m3s
feat: update version to 1.26.0
refactor: adjust cursor position based on scroll_y
maintenance: update changelog with new version information
2025-11-08 00:42:52 +01:00
55d8814f94 feat: add autonomous mode command-line argument
feat: improve error handling in autonomous mode
feat: enhance assistant output for better user experience
feat: track usage and cost in autonomous mode
refactor: update api call function to accept database connection
refactor: update list models function to accept database connection
refactor: update assistant class to track api requests
refactor: update http client to log requests
maintenance: update pyproject.toml version to 1.25.0
docs: update changelog with version 1.24.0 changes
2025-11-08 00:35:41 +01:00
a3d5696c19 maintenance: update project dependencies and improve file handling
feat: add autonomous mode with command-line argument
refactor: improve assistant output and result tracking
refactor: handle autonomous mode in assistant
refactor: improve error handling in web tools
2025-11-07 22:07:32 +01:00
cc4b0e46e1 maintenance: bump project version to 1.23.0
maintenance: add dependencies to requirements.txt
docs: update changelog with version 1.22.0 release notes
refactor: ignore png and gemini files
2025-11-07 21:46:54 +01:00
15 changed files with 452 additions and 42 deletions

3
.gitignore vendored
View File

@ -7,6 +7,9 @@ __pycache__/
ab
# C extensions
*.so
*.png
GEMINI.md
# Distribution / packaging
.Python

View File

@ -19,6 +19,38 @@
## Version 1.25.0 - 2025-11-08
Autonomous mode now has improved error handling and tracks usage and costs. Several internal components were updated to improve reliability and logging.
**Changes:** 10 files, 342 lines
**Languages:** Markdown (8 lines), Python (332 lines), TOML (2 lines)
## Version 1.24.0 - 2025-11-07
Users can now run the tool in autonomous mode using a command-line argument. We've also improved error handling and assistant output for a better experience.
**Changes:** 5 files, 62 lines
**Languages:** Markdown (8 lines), Python (52 lines), TOML (2 lines)
## Version 1.23.0 - 2025-11-07
This release updates project dependencies and improves file handling. The changelog now includes details about the previous version (1.22.0).
**Changes:** 4 files, 23 lines
**Languages:** Markdown (8 lines), Other (3 lines), TOML (2 lines), Text (10 lines)
## Version 1.22.0 - 2025-11-07
This release bumps the project version to 1.22.0 and removes an internal version print statement. Tests have been updated to use more accurate terminology.
**Changes:** 4 files, 29 lines
**Languages:** Markdown (8 lines), Python (19 lines), TOML (2 lines)
## Version 1.21.0 - 2025-11-07
The project has been updated to version 1.21.0, and the release notes for version 1.20.0 are now included in the changelog.

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "rp"
version = "1.21.0"
version = "1.25.0"
description = "R python edition. The ultimate autonomous AI CLI."
readme = "README.md"
requires-python = ">=3.10"

10
requirements.txt Normal file
View File

@ -0,0 +1,10 @@
pydantic==2.12.3
jinja2==3.1.4
cryptography==43.0.0
docker==7.1.0
gitpython==3.1.43
websockets==13.0.1
pytest==8.3.2
bcrypt==4.1.3
python-slugify==8.0.4
requests>=2.31.0

View File

@ -44,6 +44,7 @@ Commands in interactive mode:
parser.add_argument("-u", "--api-url", help="API endpoint URL")
parser.add_argument("--model-list-url", help="Model list endpoint URL")
parser.add_argument("-i", "--interactive", action="store_true", help="Interactive mode")
parser.add_argument("-a", "--autonomous", action="store_true", help="Autonomous mode")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument(
"--debug", action="store_true", help="Enable debug mode with detailed logging"

View File

@ -97,6 +97,14 @@ def process_response_autonomous(assistant, response):
get_tools_definition(),
verbose=assistant.verbose,
)
if "usage" in follow_up:
usage = follow_up["usage"]
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
assistant.usage_tracker.track_request(assistant.model, input_tokens, output_tokens)
cost = assistant.usage_tracker._calculate_cost(assistant.model, input_tokens, output_tokens)
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
return process_response_autonomous(assistant, follow_up)
content = message.get("content", "")
from rp.ui import render_markdown

View File

@ -7,7 +7,7 @@ from rp.core.http_client import http_client
logger = logging.getLogger("rp")
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False):
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False, db_conn=None):
try:
messages = auto_slim_messages(messages, verbose=verbose)
logger.debug(f"=== API CALL START ===")
@ -34,8 +34,15 @@ def call_api(messages, model, api_url, api_key, use_tools, tools_definition, ver
logger.debug(f"Tool calling enabled with {len(tools_definition)} tools")
request_json = data
logger.debug(f"Request payload size: {len(request_json)} bytes")
# Log the API request to database if db_conn is provided
if db_conn:
from rp.tools.database import log_api_request
log_result = log_api_request(model, api_url, request_json, db_conn)
if log_result.get("status") != "success":
logger.warning(f"Failed to log API request: {log_result.get('error')}")
logger.debug("Sending HTTP request...")
response = http_client.post(api_url, headers=headers, json_data=request_json)
response = http_client.post(api_url, headers=headers, json_data=request_json, db_conn=db_conn)
if response.get("error"):
if "status" in response:
logger.error(f"API HTTP Error: {response['status']} - {response.get('text', '')}")
@ -82,7 +89,7 @@ def list_models(model_list_url, api_key):
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
response = http_client.get(model_list_url, headers=headers)
response = http_client.get(model_list_url, headers=headers, db_conn=None)
if response.get("error"):
return {"error": response.get("text", "HTTP error")}
data = json.loads(response["text"])

View File

@ -32,7 +32,7 @@ from rp.tools.agents import (
remove_agent,
)
from rp.tools.command import kill_process, run_command, tail_process
from rp.tools.database import db_get, db_query, db_set
from rp.tools.database import db_get, db_query, db_set, log_api_request
from rp.tools.filesystem import (
chdir,
clear_edit_tracker,
@ -105,6 +105,7 @@ class Assistant:
self.background_monitoring = False
self.usage_tracker = UsageTracker()
self.background_tasks = set()
self.last_result = None
self.init_database()
self.messages.append(init_system_message(args))
try:
@ -137,6 +138,9 @@ class Assistant:
cursor.execute(
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS api_request_logs\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n timestamp REAL, model TEXT, api_url TEXT,\n request_payload TEXT)"
)
self.db_conn.commit()
logger.debug("Database initialized successfully")
except Exception as e:
@ -309,6 +313,7 @@ class Assistant:
self.use_tools,
get_tools_definition(),
verbose=self.verbose,
db_conn=self.db_conn,
)
return self.process_response(follow_up)
content = message.get("content", "")
@ -405,7 +410,9 @@ class Assistant:
# Use enhanced processing if available, otherwise fall back to basic processing
if hasattr(self, "enhanced") and self.enhanced:
result = self.enhanced.process_with_enhanced_context(user_input)
if result != self.last_result:
print(result)
self.last_result = result
else:
process_message(self, user_input)
except EOFError:
@ -423,6 +430,19 @@ class Assistant:
message = sys.stdin.read()
process_message(self, message)
def run_autonomous(self):
if self.args.message:
task = self.args.message
else:
self.setup_readline()
task = input("> ").strip()
if not task:
print("No task provided. Exiting.")
return
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, task)
def cleanup(self):
if hasattr(self, "enhanced") and self.enhanced:
try:
@ -446,7 +466,9 @@ class Assistant:
def run(self):
try:
if self.args.interactive or (not self.args.message and sys.stdin.isatty()):
if self.args.autonomous:
self.run_autonomous()
elif self.args.interactive or (not self.args.message and sys.stdin.isatty()):
self.run_repl()
else:
self.run_single()
@ -471,6 +493,7 @@ def process_message(assistant, message):
assistant.use_tools,
get_tools_definition(),
verbose=assistant.verbose,
db_conn=assistant.db_conn,
)
spinner.stop()
if "usage" in response:
@ -482,4 +505,6 @@ def process_message(assistant, message):
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
result = assistant.process_response(response)
if result != assistant.last_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
assistant.last_result = result

View File

@ -1,3 +1,4 @@
import json
import logging
import time
import requests
@ -19,6 +20,7 @@ class SyncHTTPClient:
data: Optional[bytes] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0,
db_conn=None,
) -> Dict[str, Any]:
"""Make a sync HTTP request using requests with retry logic."""
attempt = 0
@ -35,6 +37,19 @@ class SyncHTTPClient:
timeout=timeout,
)
response.raise_for_status() # Raise an exception for bad status codes
# Prepare request body for logging
if json_data is not None:
request_body = json.dumps(json_data)
elif data is not None:
request_body = data.decode('utf-8') if isinstance(data, bytes) else str(data)
else:
request_body = ""
# Log the request
if db_conn:
from rp.tools.database import log_http_request
log_result = log_http_request(method, url, request_body, response.text, response.status_code, db_conn)
if log_result.get("status") != "success":
logger.warning(f"Failed to log HTTP request: {log_result.get('error')}")
return {
"status": response.status_code,
"headers": dict(response.headers),
@ -58,9 +73,9 @@ class SyncHTTPClient:
return {"error": True, "exception": str(e)}
def get(
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, db_conn=None
) -> Dict[str, Any]:
return self.request("GET", url, headers=headers, timeout=timeout)
return self.request("GET", url, headers=headers, timeout=timeout, db_conn=db_conn)
def post(
self,
@ -69,9 +84,10 @@ class SyncHTTPClient:
data: Optional[bytes] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: float = 30.0,
db_conn=None,
) -> Dict[str, Any]:
return self.request(
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout, db_conn=db_conn
)
def set_default_headers(self, headers: Dict[str, str]):

View File

@ -26,6 +26,7 @@ class RPEditor:
self.lines = [""]
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
self.mode = "normal"
self.command = ""
self.stdscr = None
@ -222,9 +223,10 @@ class RPEditor:
try:
self.stdscr.clear()
height, width = self.stdscr.getmaxyx()
for i, line in enumerate(self.lines):
if i >= height - 1:
break
for i in range(height - 1):
line_idx = self.scroll_y + i
if line_idx < len(self.lines):
line = self.lines[line_idx]
try:
display_line = line[: width - 1] if len(line) >= width else line
self.stdscr.addstr(i, 0, display_line)
@ -238,9 +240,10 @@ class RPEditor:
except curses.error:
pass
cursor_x = min(self.cursor_x, width - 1)
cursor_y = min(self.cursor_y, height - 2)
cursor_y_display = self.cursor_y - self.scroll_y
if 0 <= cursor_y_display < height - 1:
try:
self.stdscr.move(cursor_y, cursor_x)
self.stdscr.move(cursor_y_display, cursor_x)
except curses.error:
pass
self.stdscr.refresh()
@ -321,15 +324,33 @@ class RPEditor:
if self.prev_key == ord("g"):
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
elif key == ord("G"):
self.cursor_y = max(0, len(self.lines) - 1)
self.cursor_x = 0
# Adjust scroll_y
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
elif key == ord("u"):
self.undo()
elif key == 18:
self.redo()
elif key == 19:
self._save_file()
elif key == curses.KEY_PPAGE: # Page Up
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
page_size = height - 2
self.cursor_y = max(0, self.cursor_y - page_size)
self.scroll_y = max(0, self.scroll_y - page_size)
elif key == curses.KEY_NPAGE: # Page Down
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
page_size = height - 2
self.cursor_y = min(len(self.lines) - 1, self.cursor_y + page_size)
self.scroll_y = min(max(0, len(self.lines) - height + 1), self.scroll_y + page_size)
self.prev_key = key
except Exception:
pass
@ -420,6 +441,13 @@ class RPEditor:
elif new_y >= len(self.lines):
self.cursor_y = max(0, len(self.lines) - 1)
self.cursor_x = len(self.lines[self.cursor_y])
# Adjust scroll_y to keep cursor visible
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y < self.scroll_y:
self.scroll_y = self.cursor_y
elif self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
def save_state(self):
"""Save current state for undo."""
@ -428,6 +456,7 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.undo_stack.append(state)
if len(self.undo_stack) > self.max_undo:
@ -442,6 +471,7 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.redo_stack.append(current_state)
state = self.undo_stack.pop()
@ -450,6 +480,7 @@ class RPEditor:
self.cursor_x = min(
state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0
)
self.scroll_y = state.get("scroll_y", 0)
def redo(self):
"""Redo last undone change."""
@ -459,6 +490,7 @@ class RPEditor:
"lines": list(self.lines),
"cursor_y": self.cursor_y,
"cursor_x": self.cursor_x,
"scroll_y": self.scroll_y,
}
self.undo_stack.append(current_state)
state = self.redo_stack.pop()
@ -467,6 +499,7 @@ class RPEditor:
self.cursor_x = min(
state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0
)
self.scroll_y = state.get("scroll_y", 0)
def _insert_text(self, text):
"""Insert text at cursor position."""
@ -571,6 +604,7 @@ class RPEditor:
self.lines = text.splitlines() if text else [""]
self.cursor_y = 0
self.cursor_x = 0
self.scroll_y = 0
def set_text(self, text):
"""Thread-safe text setting."""
@ -589,6 +623,13 @@ class RPEditor:
line_num = max(0, min(line_num - 1, len(self.lines) - 1))
self.cursor_y = line_num
self.cursor_x = 0
# Adjust scroll_y
if self.stdscr:
height, _ = self.stdscr.getmaxyx()
if self.cursor_y < self.scroll_y:
self.scroll_y = self.cursor_y
elif self.cursor_y >= self.scroll_y + height - 1:
self.scroll_y = self.cursor_y - height + 2
def goto_line(self, line_num):
"""Thread-safe goto line."""

View File

@ -17,14 +17,7 @@ from rp.tools.editor import (
open_editor,
)
from rp.tools.filesystem import (
chdir,
getpwd,
index_source_directory,
list_directory,
mkdir,
read_file,
search_replace,
write_file,
get_uid, read_specific_lines, replace_specific_line, insert_line_at_position, delete_specific_line, read_file, write_file, list_directory, mkdir, chdir, getpwd, index_source_directory, search_replace, get_editor, close_editor, open_editor, editor_insert_text, editor_replace_text, display_edit_summary, display_edit_timeline, clear_edit_tracker
)
from rp.tools.lsp import get_diagnostics
from rp.tools.memory import (
@ -54,9 +47,11 @@ agent = execute_agent_task
__all__ = [
"add_knowledge_entry",
"agent",
"apply_patch",
"bash",
"chdir",
"clear_edit_tracker",
"close_editor",
"collaborate_agents",
"create_agent",
@ -65,23 +60,28 @@ __all__ = [
"db_query",
"db_set",
"delete_knowledge_entry",
"delete_specific_line",
"diagnostics",
"post_image",
"display_edit_summary",
"display_edit_timeline",
"edit",
"editor_insert_text",
"editor_replace_text",
"editor_search",
"execute_agent_task",
"get_editor",
"get_knowledge_by_category",
"get_knowledge_entry",
"get_knowledge_statistics",
"get_tools_definition",
"get_uid",
"getpwd",
"glob",
"glob_files",
"grep",
"http_fetch",
"index_source_directory",
"insert_line_at_position",
"kill_process",
"list_agents",
"list_directory",
@ -89,9 +89,12 @@ __all__ = [
"mkdir",
"open_editor",
"patch",
"post_image",
"python_exec",
"read_file",
"read_specific_lines",
"remove_agent",
"replace_specific_line",
"run_command",
"run_command_interactive",
"search_knowledge",
@ -104,3 +107,4 @@ __all__ = [
"write",
"write_file",
]

View File

@ -23,6 +23,7 @@ def _create_api_wrapper():
use_tools=use_tools,
tools_definition=tools_definition,
verbose=False,
db_conn=None,
)
return api_wrapper

View File

@ -1,3 +1,4 @@
import json
import time
@ -74,3 +75,55 @@ def db_query(query, db_conn):
return {"status": "success", "rows_affected": cursor.rowcount}
except Exception as e:
return {"status": "error", "error": str(e)}
def log_api_request(model, api_url, request_payload, db_conn):
"""Log an API request to the database.
Args:
model: The model used.
api_url: The API URL.
request_payload: The JSON payload sent.
db_conn: Database connection.
Returns:
Dict with status.
"""
if not db_conn:
return {"status": "error", "error": "Database not initialized"}
try:
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO api_request_logs (timestamp, model, api_url, request_payload) VALUES (?, ?, ?, ?)",
(time.time(), model, api_url, json.dumps(request_payload)),
)
db_conn.commit()
return {"status": "success"}
except Exception as e:
return {"status": "error", "error": str(e)}
def log_http_request(method, url, request_body, response_body, status_code, db_conn):
"""Log an HTTP request to the database.
Args:
method: The HTTP method.
url: The URL.
request_body: The request body.
response_body: The response body.
status_code: The status code.
db_conn: Database connection.
Returns:
Dict with status.
"""
if not db_conn:
return {"status": "error", "error": "Database not initialized"}
try:
cursor = db_conn.cursor()
cursor.execute(
"INSERT INTO request_log (timestamp, method, url, request_body, response_body, status_code) VALUES (?, ?, ?, ?, ?, ?)",
(time.time(), method, url, request_body, response_body, status_code),
)
db_conn.commit()
return {"status": "success"}
except Exception as e:
return {"status": "error", "error": str(e)}

View File

@ -16,6 +16,207 @@ def get_uid():
return _id
def read_specific_lines(filepath: str, start_line: int, end_line: Optional[int] = None, db_conn: Optional[Any] = None) -> dict:
"""
Read specific lines or a range of lines from a file.
This function allows reading a single line or a contiguous range of lines from the specified file.
It supports optional database connection for tracking read operations.
Args:
filepath (str): The path to the file to read from. Supports user home directory expansion (e.g., ~).
start_line (int): The 1-based line number to start reading from.
end_line (Optional[int]): The 1-based line number to end reading at (inclusive). If None, reads only the start_line.
db_conn (Optional[Any]): An optional database connection object for tracking read operations. If provided, marks the file as read in the database.
Returns:
dict: A dictionary containing the status and either the content or an error message.
- On success: {"status": "success", "content": str} where content is the lines joined by newlines.
- On error: {"status": "error", "error": str} with the exception message.
Raises:
None: Exceptions are caught and returned in the response dictionary.
Examples:
# Read line 5 only
result = read_specific_lines("example.txt", 5)
# Read lines 10 to 20
result = read_specific_lines("example.txt", 10, 20)
"""
try:
path = os.path.expanduser(filepath)
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if start_line < 1 or start_line > total_lines:
return {"status": "error", "error": f"Start line {start_line} is out of range. File has {total_lines} lines."}
if end_line is None:
end_line = start_line
if end_line < start_line or end_line > total_lines:
return {"status": "error", "error": f"End line {end_line} is out of range. File has {total_lines} lines."}
selected_lines = lines[start_line - 1:end_line]
content = ''.join(selected_lines)
if db_conn:
from rp.tools.database import db_set
db_set("read:" + path, "true", db_conn)
return {"status": "success", "content": content}
except Exception as e:
return {"status": "error", "error": str(e)}
def replace_specific_line(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Replace the content of a specific line in a file.
This function replaces the entire content of a single line with new text. It supports optional database tracking
and diff display. The file must be read first if a database connection is provided.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number to replace.
new_content (str): The new content to place on the specified line (should not include trailing newline).
db_conn (Optional[Any]): An optional database connection for tracking. Requires the file to be read first.
show_diff (bool): If True, displays a diff of the changes after replacement.
Returns:
dict: A dictionary with status and message or error.
- On success: {"status": "success", "message": str} describing the operation.
- On error: {"status": "error", "error": str} with the exception or validation message.
Raises:
None: Exceptions are handled internally.
Examples:
# Replace line 3 with new text
result = replace_specific_line("file.txt", 3, "New line content")
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if line_number < 1 or line_number > total_lines:
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
old_content = ''.join(lines)
lines[line_number - 1] = new_content + '\n' if not new_content.endswith('\n') else new_content
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Replaced line {line_number} in {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def insert_line_at_position(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Insert a new line at a specific position in a file.
This function inserts new content as a new line before the specified line number. If line_number is beyond the file's length,
it appends to the end. Supports database tracking and diff display.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number before which to insert the new line. If greater than total lines, appends.
new_content (str): The content for the new line (trailing newline is added if missing).
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
show_diff (bool): If True, displays a diff of the changes after insertion.
Returns:
dict: Status and message or error.
- Success: {"status": "success", "message": str}
- Error: {"status": "error", "error": str}
Examples:
# Insert before line 5
result = insert_line_at_position("file.txt", 5, "Inserted line")
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
old_content = ''.join(lines)
insert_index = min(line_number - 1, len(lines))
lines.insert(insert_index, new_content + '\n' if not new_content.endswith('\n') else new_content)
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Inserted line at position {line_number} in {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def delete_specific_line(filepath: str, line_number: int, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
"""
Delete a specific line from a file.
This function removes the specified line from the file. Supports database tracking and diff display.
Args:
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
line_number (int): The 1-based line number to delete.
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
show_diff (bool): If True, displays a diff of the changes after deletion.
Returns:
dict: Status and message or error.
- Success: {"status": "success", "message": str}
- Error: {"status": "error", "error": str}
Examples:
# Delete line 10
result = delete_specific_line("file.txt", 10)
"""
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from rp.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
with open(path, 'r') as file:
lines = file.readlines()
total_lines = len(lines)
if line_number < 1 or line_number > total_lines:
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
old_content = ''.join(lines)
del lines[line_number - 1]
new_full_content = ''.join(lines)
with open(path, 'w') as file:
file.writelines(lines)
if show_diff:
diff_result = display_content_diff(old_content, new_full_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
return {"status": "success", "message": f"Deleted line {line_number} from {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
"""
Read the contents of a file.
@ -381,3 +582,4 @@ def clear_edit_tracker():
clear_tracker()
return {"status": "success", "message": "Edit tracker cleared"}

View File

@ -4,6 +4,11 @@ import urllib.parse
import urllib.request
import json
import urllib.parse
import urllib.request
def http_fetch(url, headers=None):
"""Fetch content from an HTTP URL.
@ -15,25 +20,26 @@ def http_fetch(url, headers=None):
Dict with status and content.
"""
try:
req = urllib.request.Request(url)
request = urllib.request.Request(url)
if headers:
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req) as response:
for header_key, header_value in headers.items():
request.add_header(header_key, header_value)
with urllib.request.urlopen(request) as response:
content = response.read().decode("utf-8")
return {"status": "success", "content": content[:10000]}
except Exception as e:
return {"status": "error", "error": str(e)}
except Exception as exception:
return {"status": "error", "error": str(exception)}
def _perform_search(base_url, query, params=None):
try:
full_url = f"https://static.molodetz.nl/search.cgi?query={query}"
encoded_query = urllib.parse.quote(query)
full_url = f"{base_url}?query={encoded_query}"
with urllib.request.urlopen(full_url) as response:
content = response.read().decode("utf-8")
return {"status": "success", "content": json.loads(content)}
except Exception as e:
return {"status": "error", "error": str(e)}
except Exception as exception:
return {"status": "error", "error": str(exception)}
def web_search(query):
@ -60,3 +66,4 @@ def web_search_news(query):
"""
base_url = "https://search.molodetz.nl/search"
return _perform_search(base_url, query)