feat: add autonomous mode command-line argument
feat: improve error handling in autonomous mode feat: enhance assistant output for better user experience feat: track usage and cost in autonomous mode refactor: update api call function to accept database connection refactor: update list models function to accept database connection refactor: update assistant class to track api requests refactor: update http client to log requests maintenance: update pyproject.toml version to 1.25.0 docs: update changelog with version 1.24.0 changes
This commit is contained in:
parent
a3d5696c19
commit
55d8814f94
@ -20,6 +20,14 @@
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Version 1.24.0 - 2025-11-07
|
||||||
|
|
||||||
|
Users can now run the tool in autonomous mode using a command-line argument. We've also improved error handling and assistant output for a better experience.
|
||||||
|
|
||||||
|
**Changes:** 5 files, 62 lines
|
||||||
|
**Languages:** Markdown (8 lines), Python (52 lines), TOML (2 lines)
|
||||||
|
|
||||||
## Version 1.23.0 - 2025-11-07
|
## Version 1.23.0 - 2025-11-07
|
||||||
|
|
||||||
|
|||||||
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "rp"
|
name = "rp"
|
||||||
version = "1.23.0"
|
version = "1.24.0"
|
||||||
description = "R python edition. The ultimate autonomous AI CLI."
|
description = "R python edition. The ultimate autonomous AI CLI."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.10"
|
requires-python = ">=3.10"
|
||||||
|
|||||||
@ -97,6 +97,14 @@ def process_response_autonomous(assistant, response):
|
|||||||
get_tools_definition(),
|
get_tools_definition(),
|
||||||
verbose=assistant.verbose,
|
verbose=assistant.verbose,
|
||||||
)
|
)
|
||||||
|
if "usage" in follow_up:
|
||||||
|
usage = follow_up["usage"]
|
||||||
|
input_tokens = usage.get("prompt_tokens", 0)
|
||||||
|
output_tokens = usage.get("completion_tokens", 0)
|
||||||
|
assistant.usage_tracker.track_request(assistant.model, input_tokens, output_tokens)
|
||||||
|
cost = assistant.usage_tracker._calculate_cost(assistant.model, input_tokens, output_tokens)
|
||||||
|
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
|
||||||
|
print(f"{Colors.YELLOW}💰 Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
|
||||||
return process_response_autonomous(assistant, follow_up)
|
return process_response_autonomous(assistant, follow_up)
|
||||||
content = message.get("content", "")
|
content = message.get("content", "")
|
||||||
from rp.ui import render_markdown
|
from rp.ui import render_markdown
|
||||||
|
|||||||
@ -7,7 +7,7 @@ from rp.core.http_client import http_client
|
|||||||
logger = logging.getLogger("rp")
|
logger = logging.getLogger("rp")
|
||||||
|
|
||||||
|
|
||||||
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False):
|
def call_api(messages, model, api_url, api_key, use_tools, tools_definition, verbose=False, db_conn=None):
|
||||||
try:
|
try:
|
||||||
messages = auto_slim_messages(messages, verbose=verbose)
|
messages = auto_slim_messages(messages, verbose=verbose)
|
||||||
logger.debug(f"=== API CALL START ===")
|
logger.debug(f"=== API CALL START ===")
|
||||||
@ -34,8 +34,15 @@ def call_api(messages, model, api_url, api_key, use_tools, tools_definition, ver
|
|||||||
logger.debug(f"Tool calling enabled with {len(tools_definition)} tools")
|
logger.debug(f"Tool calling enabled with {len(tools_definition)} tools")
|
||||||
request_json = data
|
request_json = data
|
||||||
logger.debug(f"Request payload size: {len(request_json)} bytes")
|
logger.debug(f"Request payload size: {len(request_json)} bytes")
|
||||||
|
# Log the API request to database if db_conn is provided
|
||||||
|
if db_conn:
|
||||||
|
|
||||||
|
from rp.tools.database import log_api_request
|
||||||
|
log_result = log_api_request(model, api_url, request_json, db_conn)
|
||||||
|
if log_result.get("status") != "success":
|
||||||
|
logger.warning(f"Failed to log API request: {log_result.get('error')}")
|
||||||
logger.debug("Sending HTTP request...")
|
logger.debug("Sending HTTP request...")
|
||||||
response = http_client.post(api_url, headers=headers, json_data=request_json)
|
response = http_client.post(api_url, headers=headers, json_data=request_json, db_conn=db_conn)
|
||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
if "status" in response:
|
if "status" in response:
|
||||||
logger.error(f"API HTTP Error: {response['status']} - {response.get('text', '')}")
|
logger.error(f"API HTTP Error: {response['status']} - {response.get('text', '')}")
|
||||||
@ -82,7 +89,7 @@ def list_models(model_list_url, api_key):
|
|||||||
headers = {}
|
headers = {}
|
||||||
if api_key:
|
if api_key:
|
||||||
headers["Authorization"] = f"Bearer {api_key}"
|
headers["Authorization"] = f"Bearer {api_key}"
|
||||||
response = http_client.get(model_list_url, headers=headers)
|
response = http_client.get(model_list_url, headers=headers, db_conn=None)
|
||||||
if response.get("error"):
|
if response.get("error"):
|
||||||
return {"error": response.get("text", "HTTP error")}
|
return {"error": response.get("text", "HTTP error")}
|
||||||
data = json.loads(response["text"])
|
data = json.loads(response["text"])
|
||||||
|
|||||||
@ -32,7 +32,7 @@ from rp.tools.agents import (
|
|||||||
remove_agent,
|
remove_agent,
|
||||||
)
|
)
|
||||||
from rp.tools.command import kill_process, run_command, tail_process
|
from rp.tools.command import kill_process, run_command, tail_process
|
||||||
from rp.tools.database import db_get, db_query, db_set
|
from rp.tools.database import db_get, db_query, db_set, log_api_request
|
||||||
from rp.tools.filesystem import (
|
from rp.tools.filesystem import (
|
||||||
chdir,
|
chdir,
|
||||||
clear_edit_tracker,
|
clear_edit_tracker,
|
||||||
@ -138,6 +138,9 @@ class Assistant:
|
|||||||
cursor.execute(
|
cursor.execute(
|
||||||
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
|
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
|
||||||
)
|
)
|
||||||
|
cursor.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS api_request_logs\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n timestamp REAL, model TEXT, api_url TEXT,\n request_payload TEXT)"
|
||||||
|
)
|
||||||
self.db_conn.commit()
|
self.db_conn.commit()
|
||||||
logger.debug("Database initialized successfully")
|
logger.debug("Database initialized successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -310,6 +313,7 @@ class Assistant:
|
|||||||
self.use_tools,
|
self.use_tools,
|
||||||
get_tools_definition(),
|
get_tools_definition(),
|
||||||
verbose=self.verbose,
|
verbose=self.verbose,
|
||||||
|
db_conn=self.db_conn,
|
||||||
)
|
)
|
||||||
return self.process_response(follow_up)
|
return self.process_response(follow_up)
|
||||||
content = message.get("content", "")
|
content = message.get("content", "")
|
||||||
@ -462,9 +466,9 @@ class Assistant:
|
|||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
if self.args.autonomous or (not self.args.interactive and not self.args.message and sys.stdin.isatty()):
|
if self.args.autonomous:
|
||||||
self.run_autonomous()
|
self.run_autonomous()
|
||||||
elif self.args.interactive:
|
elif self.args.interactive or (not self.args.message and sys.stdin.isatty()):
|
||||||
self.run_repl()
|
self.run_repl()
|
||||||
else:
|
else:
|
||||||
self.run_single()
|
self.run_single()
|
||||||
@ -489,6 +493,7 @@ def process_message(assistant, message):
|
|||||||
assistant.use_tools,
|
assistant.use_tools,
|
||||||
get_tools_definition(),
|
get_tools_definition(),
|
||||||
verbose=assistant.verbose,
|
verbose=assistant.verbose,
|
||||||
|
db_conn=assistant.db_conn,
|
||||||
)
|
)
|
||||||
spinner.stop()
|
spinner.stop()
|
||||||
if "usage" in response:
|
if "usage" in response:
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import requests
|
import requests
|
||||||
@ -19,6 +20,7 @@ class SyncHTTPClient:
|
|||||||
data: Optional[bytes] = None,
|
data: Optional[bytes] = None,
|
||||||
json_data: Optional[Dict[str, Any]] = None,
|
json_data: Optional[Dict[str, Any]] = None,
|
||||||
timeout: float = 30.0,
|
timeout: float = 30.0,
|
||||||
|
db_conn=None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Make a sync HTTP request using requests with retry logic."""
|
"""Make a sync HTTP request using requests with retry logic."""
|
||||||
attempt = 0
|
attempt = 0
|
||||||
@ -35,6 +37,19 @@ class SyncHTTPClient:
|
|||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
response.raise_for_status() # Raise an exception for bad status codes
|
response.raise_for_status() # Raise an exception for bad status codes
|
||||||
|
# Prepare request body for logging
|
||||||
|
if json_data is not None:
|
||||||
|
request_body = json.dumps(json_data)
|
||||||
|
elif data is not None:
|
||||||
|
request_body = data.decode('utf-8') if isinstance(data, bytes) else str(data)
|
||||||
|
else:
|
||||||
|
request_body = ""
|
||||||
|
# Log the request
|
||||||
|
if db_conn:
|
||||||
|
from rp.tools.database import log_http_request
|
||||||
|
log_result = log_http_request(method, url, request_body, response.text, response.status_code, db_conn)
|
||||||
|
if log_result.get("status") != "success":
|
||||||
|
logger.warning(f"Failed to log HTTP request: {log_result.get('error')}")
|
||||||
return {
|
return {
|
||||||
"status": response.status_code,
|
"status": response.status_code,
|
||||||
"headers": dict(response.headers),
|
"headers": dict(response.headers),
|
||||||
@ -58,9 +73,9 @@ class SyncHTTPClient:
|
|||||||
return {"error": True, "exception": str(e)}
|
return {"error": True, "exception": str(e)}
|
||||||
|
|
||||||
def get(
|
def get(
|
||||||
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0
|
self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, db_conn=None
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
return self.request("GET", url, headers=headers, timeout=timeout)
|
return self.request("GET", url, headers=headers, timeout=timeout, db_conn=db_conn)
|
||||||
|
|
||||||
def post(
|
def post(
|
||||||
self,
|
self,
|
||||||
@ -69,9 +84,10 @@ class SyncHTTPClient:
|
|||||||
data: Optional[bytes] = None,
|
data: Optional[bytes] = None,
|
||||||
json_data: Optional[Dict[str, Any]] = None,
|
json_data: Optional[Dict[str, Any]] = None,
|
||||||
timeout: float = 30.0,
|
timeout: float = 30.0,
|
||||||
|
db_conn=None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
return self.request(
|
return self.request(
|
||||||
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout
|
"POST", url, headers=headers, data=data, json_data=json_data, timeout=timeout, db_conn=db_conn
|
||||||
)
|
)
|
||||||
|
|
||||||
def set_default_headers(self, headers: Dict[str, str]):
|
def set_default_headers(self, headers: Dict[str, str]):
|
||||||
|
|||||||
@ -17,14 +17,7 @@ from rp.tools.editor import (
|
|||||||
open_editor,
|
open_editor,
|
||||||
)
|
)
|
||||||
from rp.tools.filesystem import (
|
from rp.tools.filesystem import (
|
||||||
chdir,
|
get_uid, read_specific_lines, replace_specific_line, insert_line_at_position, delete_specific_line, read_file, write_file, list_directory, mkdir, chdir, getpwd, index_source_directory, search_replace, get_editor, close_editor, open_editor, editor_insert_text, editor_replace_text, display_edit_summary, display_edit_timeline, clear_edit_tracker
|
||||||
getpwd,
|
|
||||||
index_source_directory,
|
|
||||||
list_directory,
|
|
||||||
mkdir,
|
|
||||||
read_file,
|
|
||||||
search_replace,
|
|
||||||
write_file,
|
|
||||||
)
|
)
|
||||||
from rp.tools.lsp import get_diagnostics
|
from rp.tools.lsp import get_diagnostics
|
||||||
from rp.tools.memory import (
|
from rp.tools.memory import (
|
||||||
@ -54,9 +47,11 @@ agent = execute_agent_task
|
|||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"add_knowledge_entry",
|
"add_knowledge_entry",
|
||||||
|
"agent",
|
||||||
"apply_patch",
|
"apply_patch",
|
||||||
"bash",
|
"bash",
|
||||||
"chdir",
|
"chdir",
|
||||||
|
"clear_edit_tracker",
|
||||||
"close_editor",
|
"close_editor",
|
||||||
"collaborate_agents",
|
"collaborate_agents",
|
||||||
"create_agent",
|
"create_agent",
|
||||||
@ -65,23 +60,28 @@ __all__ = [
|
|||||||
"db_query",
|
"db_query",
|
||||||
"db_set",
|
"db_set",
|
||||||
"delete_knowledge_entry",
|
"delete_knowledge_entry",
|
||||||
|
"delete_specific_line",
|
||||||
"diagnostics",
|
"diagnostics",
|
||||||
"post_image",
|
"display_edit_summary",
|
||||||
|
"display_edit_timeline",
|
||||||
"edit",
|
"edit",
|
||||||
"editor_insert_text",
|
"editor_insert_text",
|
||||||
"editor_replace_text",
|
"editor_replace_text",
|
||||||
"editor_search",
|
"editor_search",
|
||||||
"execute_agent_task",
|
"execute_agent_task",
|
||||||
|
"get_editor",
|
||||||
"get_knowledge_by_category",
|
"get_knowledge_by_category",
|
||||||
"get_knowledge_entry",
|
"get_knowledge_entry",
|
||||||
"get_knowledge_statistics",
|
"get_knowledge_statistics",
|
||||||
"get_tools_definition",
|
"get_tools_definition",
|
||||||
|
"get_uid",
|
||||||
"getpwd",
|
"getpwd",
|
||||||
"glob",
|
"glob",
|
||||||
"glob_files",
|
"glob_files",
|
||||||
"grep",
|
"grep",
|
||||||
"http_fetch",
|
"http_fetch",
|
||||||
"index_source_directory",
|
"index_source_directory",
|
||||||
|
"insert_line_at_position",
|
||||||
"kill_process",
|
"kill_process",
|
||||||
"list_agents",
|
"list_agents",
|
||||||
"list_directory",
|
"list_directory",
|
||||||
@ -89,9 +89,12 @@ __all__ = [
|
|||||||
"mkdir",
|
"mkdir",
|
||||||
"open_editor",
|
"open_editor",
|
||||||
"patch",
|
"patch",
|
||||||
|
"post_image",
|
||||||
"python_exec",
|
"python_exec",
|
||||||
"read_file",
|
"read_file",
|
||||||
|
"read_specific_lines",
|
||||||
"remove_agent",
|
"remove_agent",
|
||||||
|
"replace_specific_line",
|
||||||
"run_command",
|
"run_command",
|
||||||
"run_command_interactive",
|
"run_command_interactive",
|
||||||
"search_knowledge",
|
"search_knowledge",
|
||||||
@ -104,3 +107,4 @@ __all__ = [
|
|||||||
"write",
|
"write",
|
||||||
"write_file",
|
"write_file",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@ -23,6 +23,7 @@ def _create_api_wrapper():
|
|||||||
use_tools=use_tools,
|
use_tools=use_tools,
|
||||||
tools_definition=tools_definition,
|
tools_definition=tools_definition,
|
||||||
verbose=False,
|
verbose=False,
|
||||||
|
db_conn=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
return api_wrapper
|
return api_wrapper
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
@ -74,3 +75,55 @@ def db_query(query, db_conn):
|
|||||||
return {"status": "success", "rows_affected": cursor.rowcount}
|
return {"status": "success", "rows_affected": cursor.rowcount}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"status": "error", "error": str(e)}
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
def log_api_request(model, api_url, request_payload, db_conn):
|
||||||
|
"""Log an API request to the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model: The model used.
|
||||||
|
api_url: The API URL.
|
||||||
|
request_payload: The JSON payload sent.
|
||||||
|
db_conn: Database connection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status.
|
||||||
|
"""
|
||||||
|
if not db_conn:
|
||||||
|
return {"status": "error", "error": "Database not initialized"}
|
||||||
|
try:
|
||||||
|
cursor = db_conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO api_request_logs (timestamp, model, api_url, request_payload) VALUES (?, ?, ?, ?)",
|
||||||
|
(time.time(), model, api_url, json.dumps(request_payload)),
|
||||||
|
)
|
||||||
|
db_conn.commit()
|
||||||
|
return {"status": "success"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
def log_http_request(method, url, request_body, response_body, status_code, db_conn):
|
||||||
|
"""Log an HTTP request to the database.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
method: The HTTP method.
|
||||||
|
url: The URL.
|
||||||
|
request_body: The request body.
|
||||||
|
response_body: The response body.
|
||||||
|
status_code: The status code.
|
||||||
|
db_conn: Database connection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with status.
|
||||||
|
"""
|
||||||
|
if not db_conn:
|
||||||
|
return {"status": "error", "error": "Database not initialized"}
|
||||||
|
try:
|
||||||
|
cursor = db_conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
"INSERT INTO request_log (timestamp, method, url, request_body, response_body, status_code) VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
|
(time.time(), method, url, request_body, response_body, status_code),
|
||||||
|
)
|
||||||
|
db_conn.commit()
|
||||||
|
return {"status": "success"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|||||||
@ -16,6 +16,207 @@ def get_uid():
|
|||||||
return _id
|
return _id
|
||||||
|
|
||||||
|
|
||||||
|
def read_specific_lines(filepath: str, start_line: int, end_line: Optional[int] = None, db_conn: Optional[Any] = None) -> dict:
|
||||||
|
"""
|
||||||
|
Read specific lines or a range of lines from a file.
|
||||||
|
|
||||||
|
This function allows reading a single line or a contiguous range of lines from the specified file.
|
||||||
|
It supports optional database connection for tracking read operations.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath (str): The path to the file to read from. Supports user home directory expansion (e.g., ~).
|
||||||
|
start_line (int): The 1-based line number to start reading from.
|
||||||
|
end_line (Optional[int]): The 1-based line number to end reading at (inclusive). If None, reads only the start_line.
|
||||||
|
db_conn (Optional[Any]): An optional database connection object for tracking read operations. If provided, marks the file as read in the database.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A dictionary containing the status and either the content or an error message.
|
||||||
|
- On success: {"status": "success", "content": str} where content is the lines joined by newlines.
|
||||||
|
- On error: {"status": "error", "error": str} with the exception message.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
None: Exceptions are caught and returned in the response dictionary.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
# Read line 5 only
|
||||||
|
result = read_specific_lines("example.txt", 5)
|
||||||
|
|
||||||
|
# Read lines 10 to 20
|
||||||
|
result = read_specific_lines("example.txt", 10, 20)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
path = os.path.expanduser(filepath)
|
||||||
|
with open(path, 'r') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
total_lines = len(lines)
|
||||||
|
if start_line < 1 or start_line > total_lines:
|
||||||
|
return {"status": "error", "error": f"Start line {start_line} is out of range. File has {total_lines} lines."}
|
||||||
|
if end_line is None:
|
||||||
|
end_line = start_line
|
||||||
|
if end_line < start_line or end_line > total_lines:
|
||||||
|
return {"status": "error", "error": f"End line {end_line} is out of range. File has {total_lines} lines."}
|
||||||
|
selected_lines = lines[start_line - 1:end_line]
|
||||||
|
content = ''.join(selected_lines)
|
||||||
|
if db_conn:
|
||||||
|
from rp.tools.database import db_set
|
||||||
|
db_set("read:" + path, "true", db_conn)
|
||||||
|
return {"status": "success", "content": content}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def replace_specific_line(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
|
||||||
|
"""
|
||||||
|
Replace the content of a specific line in a file.
|
||||||
|
|
||||||
|
This function replaces the entire content of a single line with new text. It supports optional database tracking
|
||||||
|
and diff display. The file must be read first if a database connection is provided.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
|
||||||
|
line_number (int): The 1-based line number to replace.
|
||||||
|
new_content (str): The new content to place on the specified line (should not include trailing newline).
|
||||||
|
db_conn (Optional[Any]): An optional database connection for tracking. Requires the file to be read first.
|
||||||
|
show_diff (bool): If True, displays a diff of the changes after replacement.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A dictionary with status and message or error.
|
||||||
|
- On success: {"status": "success", "message": str} describing the operation.
|
||||||
|
- On error: {"status": "error", "error": str} with the exception or validation message.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
None: Exceptions are handled internally.
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
# Replace line 3 with new text
|
||||||
|
result = replace_specific_line("file.txt", 3, "New line content")
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
path = os.path.expanduser(filepath)
|
||||||
|
if not os.path.exists(path):
|
||||||
|
return {"status": "error", "error": "File does not exist"}
|
||||||
|
if db_conn:
|
||||||
|
from rp.tools.database import db_get
|
||||||
|
read_status = db_get("read:" + path, db_conn)
|
||||||
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
||||||
|
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
|
||||||
|
with open(path, 'r') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
total_lines = len(lines)
|
||||||
|
if line_number < 1 or line_number > total_lines:
|
||||||
|
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
|
||||||
|
old_content = ''.join(lines)
|
||||||
|
lines[line_number - 1] = new_content + '\n' if not new_content.endswith('\n') else new_content
|
||||||
|
new_full_content = ''.join(lines)
|
||||||
|
with open(path, 'w') as file:
|
||||||
|
file.writelines(lines)
|
||||||
|
if show_diff:
|
||||||
|
diff_result = display_content_diff(old_content, new_full_content, filepath)
|
||||||
|
if diff_result["status"] == "success":
|
||||||
|
print(diff_result["visual_diff"])
|
||||||
|
return {"status": "success", "message": f"Replaced line {line_number} in {path}"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def insert_line_at_position(filepath: str, line_number: int, new_content: str, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
|
||||||
|
"""
|
||||||
|
Insert a new line at a specific position in a file.
|
||||||
|
|
||||||
|
This function inserts new content as a new line before the specified line number. If line_number is beyond the file's length,
|
||||||
|
it appends to the end. Supports database tracking and diff display.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
|
||||||
|
line_number (int): The 1-based line number before which to insert the new line. If greater than total lines, appends.
|
||||||
|
new_content (str): The content for the new line (trailing newline is added if missing).
|
||||||
|
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
|
||||||
|
show_diff (bool): If True, displays a diff of the changes after insertion.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Status and message or error.
|
||||||
|
- Success: {"status": "success", "message": str}
|
||||||
|
- Error: {"status": "error", "error": str}
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
# Insert before line 5
|
||||||
|
result = insert_line_at_position("file.txt", 5, "Inserted line")
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
path = os.path.expanduser(filepath)
|
||||||
|
if not os.path.exists(path):
|
||||||
|
return {"status": "error", "error": "File does not exist"}
|
||||||
|
if db_conn:
|
||||||
|
from rp.tools.database import db_get
|
||||||
|
read_status = db_get("read:" + path, db_conn)
|
||||||
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
||||||
|
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
|
||||||
|
with open(path, 'r') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
old_content = ''.join(lines)
|
||||||
|
insert_index = min(line_number - 1, len(lines))
|
||||||
|
lines.insert(insert_index, new_content + '\n' if not new_content.endswith('\n') else new_content)
|
||||||
|
new_full_content = ''.join(lines)
|
||||||
|
with open(path, 'w') as file:
|
||||||
|
file.writelines(lines)
|
||||||
|
if show_diff:
|
||||||
|
diff_result = display_content_diff(old_content, new_full_content, filepath)
|
||||||
|
if diff_result["status"] == "success":
|
||||||
|
print(diff_result["visual_diff"])
|
||||||
|
return {"status": "success", "message": f"Inserted line at position {line_number} in {path}"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def delete_specific_line(filepath: str, line_number: int, db_conn: Optional[Any] = None, show_diff: bool = True) -> dict:
|
||||||
|
"""
|
||||||
|
Delete a specific line from a file.
|
||||||
|
|
||||||
|
This function removes the specified line from the file. Supports database tracking and diff display.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath (str): The path to the file to modify. Supports user home directory expansion (e.g., ~).
|
||||||
|
line_number (int): The 1-based line number to delete.
|
||||||
|
db_conn (Optional[Any]): Optional database connection for tracking. File must be read first if provided.
|
||||||
|
show_diff (bool): If True, displays a diff of the changes after deletion.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: Status and message or error.
|
||||||
|
- Success: {"status": "success", "message": str}
|
||||||
|
- Error: {"status": "error", "error": str}
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
# Delete line 10
|
||||||
|
result = delete_specific_line("file.txt", 10)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
path = os.path.expanduser(filepath)
|
||||||
|
if not os.path.exists(path):
|
||||||
|
return {"status": "error", "error": "File does not exist"}
|
||||||
|
if db_conn:
|
||||||
|
from rp.tools.database import db_get
|
||||||
|
read_status = db_get("read:" + path, db_conn)
|
||||||
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
||||||
|
return {"status": "error", "error": "File must be read before writing. Please read the file first."}
|
||||||
|
with open(path, 'r') as file:
|
||||||
|
lines = file.readlines()
|
||||||
|
total_lines = len(lines)
|
||||||
|
if line_number < 1 or line_number > total_lines:
|
||||||
|
return {"status": "error", "error": f"Line number {line_number} is out of range. File has {total_lines} lines."}
|
||||||
|
old_content = ''.join(lines)
|
||||||
|
del lines[line_number - 1]
|
||||||
|
new_full_content = ''.join(lines)
|
||||||
|
with open(path, 'w') as file:
|
||||||
|
file.writelines(lines)
|
||||||
|
if show_diff:
|
||||||
|
diff_result = display_content_diff(old_content, new_full_content, filepath)
|
||||||
|
if diff_result["status"] == "success":
|
||||||
|
print(diff_result["visual_diff"])
|
||||||
|
return {"status": "success", "message": f"Deleted line {line_number} from {path}"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
|
def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
|
||||||
"""
|
"""
|
||||||
Read the contents of a file.
|
Read the contents of a file.
|
||||||
@ -381,3 +582,4 @@ def clear_edit_tracker():
|
|||||||
|
|
||||||
clear_tracker()
|
clear_tracker()
|
||||||
return {"status": "success", "message": "Edit tracker cleared"}
|
return {"status": "success", "message": "Edit tracker cleared"}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user