feat: upgrade version to 1.29.0

feat: add utf-8 encoding and error handling to file reading
feat: handle unicode decode errors when reading files
feat: add base64 encoding for binary files in filesystem tools
feat: allow search and replace on text files only
feat: prevent editor insert text on binary files
feat: prevent diff creation on binary files
feat: prevent diff display on binary files
maintenance: update mimetypes import
This commit is contained in:
retoor 2025-11-08 01:56:15 +01:00
parent 8ef3742f44
commit a289a8e402
8 changed files with 109 additions and 31 deletions

View File

@ -60,7 +60,7 @@ def init_system_message(args):
for context_file in [CONTEXT_FILE, GLOBAL_CONTEXT_FILE]:
if os.path.exists(context_file):
try:
with open(context_file) as f:
with open(context_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"
@ -71,7 +71,7 @@ def init_system_message(args):
if knowledge_path.exists() and knowledge_path.is_dir():
for knowledge_file in knowledge_path.iterdir():
try:
with open(knowledge_file) as f:
with open(knowledge_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"
@ -81,7 +81,7 @@ def init_system_message(args):
if args.context:
for ctx_file in args.context:
try:
with open(ctx_file) as f:
with open(ctx_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"

View File

@ -38,7 +38,7 @@ class SessionManager:
if not os.path.exists(session_file):
logger.warning(f"Session not found: {name}")
return None
with open(session_file) as f:
with open(session_file, encoding="utf-8") as f:
session_data = json.load(f)
logger.info(f"Session loaded: {name}")
return session_data
@ -53,7 +53,7 @@ class SessionManager:
if filename.endswith(".json"):
filepath = os.path.join(SESSIONS_DIR, filename)
try:
with open(filepath) as f:
with open(filepath, encoding="utf-8") as f:
data = json.load(f)
sessions.append(
{

View File

@ -68,7 +68,7 @@ class UsageTracker:
try:
history = []
if os.path.exists(USAGE_DB_FILE):
with open(USAGE_DB_FILE) as f:
with open(USAGE_DB_FILE, encoding="utf-8") as f:
history = json.load(f)
history.append(
{
@ -113,7 +113,7 @@ class UsageTracker:
if not os.path.exists(USAGE_DB_FILE):
return {"total_requests": 0, "total_tokens": 0, "total_cost": 0.0}
try:
with open(USAGE_DB_FILE) as f:
with open(USAGE_DB_FILE, encoding="utf-8") as f:
history = json.load(f)
total_tokens = sum((entry["total_tokens"] for entry in history))
total_cost = sum((entry["cost"] for entry in history))

View File

@ -118,6 +118,9 @@ class RPEditor:
self.lines = content.splitlines() if content else [""]
else:
self.lines = [""]
except UnicodeDecodeError:
# If it's a binary file or truly unreadable as text, treat as empty
self.lines = [""]
except Exception:
self.lines = [""]

View File

@ -35,10 +35,16 @@ class RPEditor:
def load_file(self):
try:
with open(self.filename) as f:
self.lines = f.read().splitlines()
if not self.lines:
self.lines = [""]
if self.filename:
with open(self.filename, encoding="utf-8", errors="replace") as f:
self.lines = f.read().splitlines()
if not self.lines:
self.lines = [""]
else:
self.lines = [""]
except UnicodeDecodeError:
# If it's a binary file or truly unreadable as text, treat as empty
self.lines = [""]
except:
self.lines = [""]

View File

@ -99,9 +99,17 @@ class AdvancedInputHandler:
try:
path = Path(filename).expanduser().resolve()
if path.exists() and path.is_file():
with open(path, encoding="utf-8", errors="replace") as f:
content = f.read()
return f"\n--- File: {filename} ---\n{content}\n--- End of {filename} ---\n"
mime_type, _ = mimetypes.guess_type(str(path))
if mime_type and (mime_type.startswith("text/") or mime_type in ["application/json", "application/xml"]):
with open(path, encoding="utf-8", errors="replace") as f:
content = f.read()
return f"\n--- File: {filename} ---\n{content}\n--- End of {filename} ---\n"
elif mime_type and not mime_type.startswith("image/"): # Handle other binary files
with open(path, "rb") as f:
binary_data = base64.b64encode(f.read()).decode("utf-8")
return f"\n--- Binary File: {filename} ({mime_type}) ---\ndata:{mime_type};base64,{binary_data}\n--- End of {filename} ---\n"
else:
return f"[File not included (unsupported type or already handled as image): {filename}]"
else:
return f"[File not found: {filename}]"
except Exception as e:

View File

@ -1,4 +1,6 @@
import base64
import hashlib
import mimetypes
import os
import time
from typing import Optional, Any
@ -278,8 +280,14 @@ def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
"""
try:
path = os.path.expanduser(filepath)
with open(path) as f:
content = f.read()
mime_type, _ = mimetypes.guess_type(str(path))
if mime_type and (mime_type.startswith("text/") or mime_type in ["application/json", "application/xml"]):
with open(path, encoding="utf-8", errors="replace") as f:
content = f.read()
else:
with open(path, "rb") as f:
binary_content = f.read()
content = f"data:{mime_type if mime_type else 'application/octet-stream'};base64,{base64.b64encode(binary_content).decode('utf-8')}"
if db_conn:
from rp.tools.database import db_set
@ -318,22 +326,51 @@ def write_file(
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
write_mode = "w"
write_encoding = "utf-8"
decoded_content = content
if content.startswith("data:"):
parts = content.split(",", 1)
if len(parts) == 2:
header = parts[0]
encoded_data = parts[1]
if ";base64" in header:
try:
decoded_content = base64.b64decode(encoded_data)
write_mode = "wb"
write_encoding = None
except Exception:
pass # Not a valid base64, treat as plain text
if not is_new_file:
with open(path) as f:
old_content = f.read()
if write_mode == "wb":
with open(path, "rb") as f:
old_content = f.read()
else:
with open(path, encoding="utf-8", errors="replace") as f:
old_content = f.read()
operation = track_edit("WRITE", filepath, content=content, old_content=old_content)
tracker.mark_in_progress(operation)
if show_diff and (not is_new_file):
if show_diff and (not is_new_file) and write_mode == "w": # Only show diff for text files
diff_result = display_content_diff(old_content, content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
editor = RPEditor(path)
editor.set_text(content)
editor.save_file()
if write_mode == "wb":
with open(path, write_mode) as f:
f.write(decoded_content)
else:
with open(path, write_mode, encoding=write_encoding) as f:
f.write(decoded_content)
if os.path.exists(path) and db_conn:
try:
cursor = db_conn.cursor()
file_hash = hashlib.md5(old_content.encode()).hexdigest()
file_hash = hashlib.md5(old_content.encode() if isinstance(old_content, str) else old_content).hexdigest()
cursor.execute(
"SELECT MAX(version) FROM file_versions WHERE filepath = ?", (filepath,)
)
@ -341,14 +378,14 @@ def write_file(
version = result[0] + 1 if result[0] else 1
cursor.execute(
"INSERT INTO file_versions (filepath, content, hash, timestamp, version)\n VALUES (?, ?, ?, ?, ?)",
(filepath, old_content, file_hash, time.time(), version),
(filepath, old_content if isinstance(old_content, str) else old_content.decode('utf-8', errors='replace'), file_hash, time.time(), version),
)
db_conn.commit()
except Exception:
pass
tracker.mark_completed(operation)
message = f"File written to {path}"
if show_diff and (not is_new_file):
if show_diff and (not is_new_file) and write_mode == "w":
stats = get_diff_stats(old_content, content)
message += f" ({stats['insertions']}+ {stats['deletions']}-)"
return {"status": "success", "message": message}
@ -476,6 +513,9 @@ def search_replace(
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
mime_type, _ = mimetypes.guess_type(str(path))
if not (mime_type and (mime_type.startswith("text/") or mime_type in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot perform search and replace on binary file: {filepath}"}
if db_conn:
from rp.tools.database import db_get
@ -485,7 +525,7 @@ def search_replace(
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
with open(path) as f:
with open(path, encoding="utf-8", errors="replace") as f:
content = f.read()
content = content.replace(old_string, new_string)
with open(path, "w") as f:
@ -531,6 +571,9 @@ def editor_insert_text(filepath, text, line=None, col=None, show_diff=True, db_c
operation = None
try:
path = os.path.expanduser(filepath)
mime_type, _ = mimetypes.guess_type(str(path))
if not (mime_type and (mime_type.startswith("text/") or mime_type in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot insert text into binary file: {filepath}"}
if db_conn:
from rp.tools.database import db_get
@ -542,7 +585,7 @@ def editor_insert_text(filepath, text, line=None, col=None, show_diff=True, db_c
}
old_content = ""
if os.path.exists(path):
with open(path) as f:
with open(path, encoding="utf-8", errors="replace") as f:
old_content = f.read()
position = (line if line is not None else 0) * 1000 + (col if col is not None else 0)
operation = track_edit("INSERT", filepath, start_pos=position, content=text)
@ -572,6 +615,9 @@ def editor_replace_text(
try:
operation = None
path = os.path.expanduser(filepath)
mime_type, _ = mimetypes.guess_type(str(path))
if not (mime_type and (mime_type.startswith("text/") or mime_type in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot replace text in binary file: {filepath}"}
if db_conn:
from rp.tools.database import db_get
@ -583,7 +629,7 @@ def editor_replace_text(
}
old_content = ""
if os.path.exists(path):
with open(path) as f:
with open(path, encoding="utf-8", errors="replace") as f:
old_content = f.read()
start_pos = start_line * 1000 + start_col
end_pos = end_line * 1000 + end_col

View File

@ -1,4 +1,5 @@
import difflib
import mimetypes
import os
import subprocess
import tempfile
@ -61,7 +62,15 @@ def create_diff(
try:
path1 = os.path.expanduser(file1)
path2 = os.path.expanduser(file2)
with open(path1) as f1, open(path2) as f2:
mime_type1, _ = mimetypes.guess_type(str(path1))
mime_type2, _ = mimetypes.guess_type(str(path2))
if not (mime_type1 and (mime_type1.startswith("text/") or mime_type1 in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot create diff for binary file: {file1}"}
if not (mime_type2 and (mime_type2.startswith("text/") or mime_type2 in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot create diff for binary file: {file2}"}
with open(path1, encoding="utf-8", errors="replace") as f1, open(
path2, encoding="utf-8", errors="replace"
) as f2:
content1 = f1.read()
content2 = f2.read()
if visual:
@ -91,9 +100,15 @@ def display_file_diff(filepath1, filepath2, format_type="unified", context_lines
try:
path1 = os.path.expanduser(filepath1)
path2 = os.path.expanduser(filepath2)
with open(path1) as f1:
mime_type1, _ = mimetypes.guess_type(str(path1))
mime_type2, _ = mimetypes.guess_type(str(path2))
if not (mime_type1 and (mime_type1.startswith("text/") or mime_type1 in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot display diff for binary file: {filepath1}"}
if not (mime_type2 and (mime_type2.startswith("text/") or mime_type2 in ["application/json", "application/xml"])):
return {"status": "error", "error": f"Cannot display diff for binary file: {filepath2}"}
with open(path1, encoding="utf-8", errors="replace") as f1:
old_content = f1.read()
with open(path2) as f2:
with open(path2, encoding="utf-8", errors="replace") as f2:
new_content = f2.read()
visual_diff = display_diff(old_content, new_content, filepath1, format_type)
stats = get_diff_stats(old_content, new_content)