import hashlib
import os
import time
from pr.editor import RPEditor
from ..tools.patch import display_content_diff
from ..ui.diff_display import get_diff_stats
from ..ui.edit_feedback import track_edit, tracker
_id = 0
def get_uid():
global _id
_id += 3
return _id
def read_file(filepath, db_conn=None):
try:
path = os.path.expanduser(filepath)
with open(path) as f:
content = f.read()
if db_conn:
from pr.tools.database import db_set
db_set("read:" + path, "true", db_conn)
return {"status": "success", "content": content}
except Exception as e:
return {"status": "error", "error": str(e)}
def write_file(filepath, content, db_conn=None, show_diff=True):
try:
path = os.path.expanduser(filepath)
old_content = ""
is_new_file = not os.path.exists(path)
if not is_new_file and db_conn:
from pr.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
if not is_new_file:
with open(path) as f:
old_content = f.read()
operation = track_edit("WRITE", filepath, content=content, old_content=old_content)
tracker.mark_in_progress(operation)
if show_diff and not is_new_file:
diff_result = display_content_diff(old_content, content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
editor = RPEditor(path)
editor.set_text(content)
editor.save_file()
if os.path.exists(path) and db_conn:
try:
cursor = db_conn.cursor()
file_hash = hashlib.md5(old_content.encode()).hexdigest()
cursor.execute(
"SELECT MAX(version) FROM file_versions WHERE filepath = ?",
(filepath,),
)
result = cursor.fetchone()
version = (result[0] + 1) if result[0] else 1
cursor.execute(
"""INSERT INTO file_versions (filepath, content, hash, timestamp, version)
VALUES (?, ?, ?, ?, ?)""",
(filepath, old_content, file_hash, time.time(), version),
)
db_conn.commit()
except Exception:
pass
tracker.mark_completed(operation)
message = f"File written to {path}"
if show_diff and not is_new_file:
stats = get_diff_stats(old_content, content)
message += f" ({stats['insertions']}+ {stats['deletions']}-)"
return {"status": "success", "message": message}
except Exception as e:
if "operation" in locals():
tracker.mark_failed(operation)
return {"status": "error", "error": str(e)}
def list_directory(path=".", recursive=False):
try:
path = os.path.expanduser(path)
items = []
if recursive:
for root, dirs, files in os.walk(path):
for name in files:
item_path = os.path.join(root, name)
items.append(
{
"path": item_path,
"type": "file",
"size": os.path.getsize(item_path),
}
)
for name in dirs:
items.append({"path": os.path.join(root, name), "type": "directory"})
else:
for item in os.listdir(path):
item_path = os.path.join(path, item)
items.append(
{
"name": item,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": (os.path.getsize(item_path) if os.path.isfile(item_path) else None),
}
)
return {"status": "success", "items": items}
except Exception as e:
return {"status": "error", "error": str(e)}
def mkdir(path):
try:
os.makedirs(os.path.expanduser(path), exist_ok=True)
return {"status": "success", "message": f"Directory created at {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def chdir(path):
try:
os.chdir(os.path.expanduser(path))
return {"status": "success", "new_path": os.getcwd()}
except Exception as e:
return {"status": "error", "error": str(e)}
def getpwd():
try:
return {"status": "success", "path": os.getcwd()}
except Exception as e:
return {"status": "error", "error": str(e)}
def index_source_directory(path):
extensions = [
".py",
".js",
".ts",
".java",
".cpp",
".c",
".h",
".hpp",
".html",
".css",
".json",
".xml",
".md",
".sh",
".rb",
".go",
]
source_files = []
try:
for root, _, files in os.walk(os.path.expanduser(path)):
for file in files:
if any(file.endswith(ext) for ext in extensions):
filepath = os.path.join(root, file)
try:
with open(filepath, encoding="utf-8") as f:
content = f.read()
source_files.append({"path": filepath, "content": content})
except Exception:
continue
return {"status": "success", "indexed_files": source_files}
except Exception as e:
return {"status": "error", "error": str(e)}
def search_replace(filepath, old_string, new_string, db_conn=None):
try:
path = os.path.expanduser(filepath)
if not os.path.exists(path):
return {"status": "error", "error": "File does not exist"}
if db_conn:
from pr.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
with open(path) as f:
content = f.read()
content = content.replace(old_string, new_string)
with open(path, "w") as f:
f.write(content)
return {
"status": "success",
"message": f"Replaced '{old_string}' with '{new_string}' in {path}",
}
except Exception as e:
return {"status": "error", "error": str(e)}
_editors = {}
def get_editor(filepath):
if filepath not in _editors:
_editors[filepath] = RPEditor(filepath)
return _editors[filepath]
def close_editor(filepath):
try:
path = os.path.expanduser(filepath)
editor = get_editor(path)
editor.close()
return {"status": "success", "message": f"Editor closed for {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def open_editor(filepath):
try:
path = os.path.expanduser(filepath)
editor = RPEditor(path)
editor.start()
return {"status": "success", "message": f"Editor opened for {path}"}
except Exception as e:
return {"status": "error", "error": str(e)}
def editor_insert_text(filepath, text, line=None, col=None, show_diff=True, db_conn=None):
try:
path = os.path.expanduser(filepath)
if db_conn:
from pr.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
old_content = ""
if os.path.exists(path):
with open(path) as f:
old_content = f.read()
position = (line if line is not None else 0) * 1000 + (col if col is not None else 0)
operation = track_edit("INSERT", filepath, start_pos=position, content=text)
tracker.mark_in_progress(operation)
editor = get_editor(path)
if line is not None and col is not None:
editor.move_cursor_to(line, col)
editor.insert_text(text)
editor.save_file()
if show_diff and old_content:
with open(path) as f:
new_content = f.read()
diff_result = display_content_diff(old_content, new_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
tracker.mark_completed(operation)
return {"status": "success", "message": f"Inserted text in {path}"}
except Exception as e:
if "operation" in locals():
tracker.mark_failed(operation)
return {"status": "error", "error": str(e)}
def editor_replace_text(
filepath,
start_line,
start_col,
end_line,
end_col,
new_text,
show_diff=True,
db_conn=None,
):
try:
path = os.path.expanduser(filepath)
if db_conn:
from pr.tools.database import db_get
read_status = db_get("read:" + path, db_conn)
if read_status.get("status") != "success" or read_status.get("value") != "true":
return {
"status": "error",
"error": "File must be read before writing. Please read the file first.",
}
old_content = ""
if os.path.exists(path):
with open(path) as f:
old_content = f.read()
start_pos = start_line * 1000 + start_col
end_pos = end_line * 1000 + end_col
operation = track_edit(
"REPLACE",
filepath,
start_pos=start_pos,
end_pos=end_pos,
content=new_text,
old_content=old_content,
)
tracker.mark_in_progress(operation)
editor = get_editor(path)
editor.replace_text(start_line, start_col, end_line, end_col, new_text)
editor.save_file()
if show_diff and old_content:
with open(path) as f:
new_content = f.read()
diff_result = display_content_diff(old_content, new_content, filepath)
if diff_result["status"] == "success":
print(diff_result["visual_diff"])
tracker.mark_completed(operation)
return {"status": "success", "message": f"Replaced text in {path}"}
except Exception as e:
if "operation" in locals():
tracker.mark_failed(operation)
return {"status": "error", "error": str(e)}
def display_edit_summary():
from ..ui.edit_feedback import display_edit_summary
return display_edit_summary()
def display_edit_timeline(show_content=False):
from ..ui.edit_feedback import display_edit_timeline
return display_edit_timeline(show_content)
def clear_edit_tracker():
from ..ui.edit_feedback import clear_tracker
clear_tracker()
return {"status": "success", "message": "Edit tracker cleared"}