|
import hashlib
|
|
import os
|
|
import time
|
|
from typing import Optional, Any
|
|
|
|
from pr.editor import RPEditor
|
|
|
|
from ..tools.patch import display_content_diff
|
|
from ..ui.diff_display import get_diff_stats
|
|
from ..ui.edit_feedback import track_edit, tracker
|
|
|
|
_id = 0
|
|
|
|
|
|
def get_uid():
|
|
global _id
|
|
_id += 3
|
|
return _id
|
|
|
|
|
|
def read_file(filepath: str, db_conn: Optional[Any] = None) -> dict:
|
|
"""
|
|
Read the contents of a file.
|
|
|
|
Args:
|
|
filepath: Path to the file to read
|
|
db_conn: Optional database connection for tracking
|
|
|
|
Returns:
|
|
dict: Status and content or error
|
|
"""
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
with open(path) as f:
|
|
content = f.read()
|
|
if db_conn:
|
|
from pr.tools.database import db_set
|
|
|
|
db_set("read:" + path, "true", db_conn)
|
|
return {"status": "success", "content": content}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def write_file(
|
|
filepath: str, content: str, db_conn: Optional[Any] = None, show_diff: bool = True
|
|
) -> dict:
|
|
"""
|
|
Write content to a file.
|
|
|
|
Args:
|
|
filepath: Path to the file to write
|
|
content: Content to write
|
|
db_conn: Optional database connection for tracking
|
|
show_diff: Whether to show diff of changes
|
|
|
|
Returns:
|
|
dict: Status and message or error
|
|
"""
|
|
operation = None
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
old_content = ""
|
|
is_new_file = not os.path.exists(path)
|
|
|
|
if not is_new_file and db_conn:
|
|
from pr.tools.database import db_get
|
|
|
|
read_status = db_get("read:" + path, db_conn)
|
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
|
return {
|
|
"status": "error",
|
|
"error": "File must be read before writing. Please read the file first.",
|
|
}
|
|
|
|
if not is_new_file:
|
|
with open(path) as f:
|
|
old_content = f.read()
|
|
|
|
operation = track_edit("WRITE", filepath, content=content, old_content=old_content)
|
|
tracker.mark_in_progress(operation)
|
|
|
|
if show_diff and not is_new_file:
|
|
diff_result = display_content_diff(old_content, content, filepath)
|
|
if diff_result["status"] == "success":
|
|
print(diff_result["visual_diff"])
|
|
|
|
editor = RPEditor(path)
|
|
editor.set_text(content)
|
|
editor.save_file()
|
|
|
|
if os.path.exists(path) and db_conn:
|
|
try:
|
|
cursor = db_conn.cursor()
|
|
file_hash = hashlib.md5(old_content.encode()).hexdigest()
|
|
|
|
cursor.execute(
|
|
"SELECT MAX(version) FROM file_versions WHERE filepath = ?",
|
|
(filepath,),
|
|
)
|
|
result = cursor.fetchone()
|
|
version = (result[0] + 1) if result[0] else 1
|
|
|
|
cursor.execute(
|
|
"""INSERT INTO file_versions (filepath, content, hash, timestamp, version)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(filepath, old_content, file_hash, time.time(), version),
|
|
)
|
|
db_conn.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
tracker.mark_completed(operation)
|
|
|
|
message = f"File written to {path}"
|
|
if show_diff and not is_new_file:
|
|
stats = get_diff_stats(old_content, content)
|
|
message += f" ({stats['insertions']}+ {stats['deletions']}-)"
|
|
|
|
return {"status": "success", "message": message}
|
|
except Exception as e:
|
|
if operation is not None:
|
|
tracker.mark_failed(operation)
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def list_directory(path=".", recursive=False):
|
|
"""List files and directories in the specified path."""
|
|
try:
|
|
path = os.path.expanduser(path)
|
|
items = []
|
|
if recursive:
|
|
for root, dirs, files in os.walk(path):
|
|
for name in files:
|
|
item_path = os.path.join(root, name)
|
|
items.append(
|
|
{
|
|
"path": item_path,
|
|
"type": "file",
|
|
"size": os.path.getsize(item_path),
|
|
}
|
|
)
|
|
for name in dirs:
|
|
items.append({"path": os.path.join(root, name), "type": "directory"})
|
|
else:
|
|
for item in os.listdir(path):
|
|
item_path = os.path.join(path, item)
|
|
items.append(
|
|
{
|
|
"name": item,
|
|
"type": "directory" if os.path.isdir(item_path) else "file",
|
|
"size": (os.path.getsize(item_path) if os.path.isfile(item_path) else None),
|
|
}
|
|
)
|
|
return {"status": "success", "items": items}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def mkdir(path):
|
|
try:
|
|
os.makedirs(os.path.expanduser(path), exist_ok=True)
|
|
return {"status": "success", "message": f"Directory created at {path}"}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def chdir(path):
|
|
"""Change the current working directory."""
|
|
try:
|
|
os.chdir(os.path.expanduser(path))
|
|
return {"status": "success", "new_path": os.getcwd()}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def getpwd():
|
|
"""Get the current working directory."""
|
|
try:
|
|
return {"status": "success", "path": os.getcwd()}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def index_source_directory(path: str) -> dict:
|
|
"""
|
|
Index directory recursively and read all source files.
|
|
|
|
Args:
|
|
path: Path to index
|
|
|
|
Returns:
|
|
dict: Status and indexed files or error
|
|
"""
|
|
extensions = [
|
|
".py",
|
|
".js",
|
|
".ts",
|
|
".java",
|
|
".cpp",
|
|
".c",
|
|
".h",
|
|
".hpp",
|
|
".html",
|
|
".css",
|
|
".json",
|
|
".xml",
|
|
".md",
|
|
".sh",
|
|
".rb",
|
|
".go",
|
|
]
|
|
source_files = []
|
|
try:
|
|
for root, _, files in os.walk(os.path.expanduser(path)):
|
|
for file in files:
|
|
if any(file.endswith(ext) for ext in extensions):
|
|
filepath = os.path.join(root, file)
|
|
try:
|
|
with open(filepath, encoding="utf-8") as f:
|
|
content = f.read()
|
|
source_files.append({"path": filepath, "content": content})
|
|
except Exception:
|
|
continue
|
|
return {"status": "success", "indexed_files": source_files}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def search_replace(
|
|
filepath: str, old_string: str, new_string: str, db_conn: Optional[Any] = None
|
|
) -> dict:
|
|
"""
|
|
Search and replace text in a file.
|
|
|
|
Args:
|
|
filepath: Path to the file
|
|
old_string: String to replace
|
|
new_string: Replacement string
|
|
db_conn: Optional database connection for tracking
|
|
|
|
Returns:
|
|
dict: Status and message or error
|
|
"""
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
if not os.path.exists(path):
|
|
return {"status": "error", "error": "File does not exist"}
|
|
if db_conn:
|
|
from pr.tools.database import db_get
|
|
|
|
read_status = db_get("read:" + path, db_conn)
|
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
|
return {
|
|
"status": "error",
|
|
"error": "File must be read before writing. Please read the file first.",
|
|
}
|
|
with open(path) as f:
|
|
content = f.read()
|
|
content = content.replace(old_string, new_string)
|
|
with open(path, "w") as f:
|
|
f.write(content)
|
|
return {
|
|
"status": "success",
|
|
"message": f"Replaced '{old_string}' with '{new_string}' in {path}",
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
_editors = {}
|
|
|
|
|
|
def get_editor(filepath):
|
|
if filepath not in _editors:
|
|
_editors[filepath] = RPEditor(filepath)
|
|
return _editors[filepath]
|
|
|
|
|
|
def close_editor(filepath):
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
editor = get_editor(path)
|
|
editor.close()
|
|
return {"status": "success", "message": f"Editor closed for {path}"}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def open_editor(filepath):
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
editor = RPEditor(path)
|
|
editor.start()
|
|
return {"status": "success", "message": f"Editor opened for {path}"}
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def editor_insert_text(filepath, text, line=None, col=None, show_diff=True, db_conn=None):
|
|
operation = None
|
|
try:
|
|
path = os.path.expanduser(filepath)
|
|
if db_conn:
|
|
from pr.tools.database import db_get
|
|
|
|
read_status = db_get("read:" + path, db_conn)
|
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
|
return {
|
|
"status": "error",
|
|
"error": "File must be read before writing. Please read the file first.",
|
|
}
|
|
|
|
old_content = ""
|
|
if os.path.exists(path):
|
|
with open(path) as f:
|
|
old_content = f.read()
|
|
|
|
position = (line if line is not None else 0) * 1000 + (col if col is not None else 0)
|
|
operation = track_edit("INSERT", filepath, start_pos=position, content=text)
|
|
tracker.mark_in_progress(operation)
|
|
|
|
editor = get_editor(path)
|
|
if line is not None and col is not None:
|
|
editor.move_cursor_to(line, col)
|
|
editor.insert_text(text)
|
|
editor.save_file()
|
|
|
|
if show_diff and old_content:
|
|
with open(path) as f:
|
|
new_content = f.read()
|
|
diff_result = display_content_diff(old_content, new_content, filepath)
|
|
if diff_result["status"] == "success":
|
|
print(diff_result["visual_diff"])
|
|
|
|
tracker.mark_completed(operation)
|
|
return {"status": "success", "message": f"Inserted text in {path}"}
|
|
except Exception as e:
|
|
if operation is not None:
|
|
tracker.mark_failed(operation)
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def editor_replace_text(
|
|
filepath,
|
|
start_line,
|
|
start_col,
|
|
end_line,
|
|
end_col,
|
|
new_text,
|
|
show_diff=True,
|
|
db_conn=None,
|
|
):
|
|
try:
|
|
operation = None
|
|
path = os.path.expanduser(filepath)
|
|
if db_conn:
|
|
from pr.tools.database import db_get
|
|
|
|
read_status = db_get("read:" + path, db_conn)
|
|
if read_status.get("status") != "success" or read_status.get("value") != "true":
|
|
return {
|
|
"status": "error",
|
|
"error": "File must be read before writing. Please read the file first.",
|
|
}
|
|
|
|
old_content = ""
|
|
if os.path.exists(path):
|
|
with open(path) as f:
|
|
old_content = f.read()
|
|
|
|
start_pos = start_line * 1000 + start_col
|
|
end_pos = end_line * 1000 + end_col
|
|
operation = track_edit(
|
|
"REPLACE",
|
|
filepath,
|
|
start_pos=start_pos,
|
|
end_pos=end_pos,
|
|
content=new_text,
|
|
old_content=old_content,
|
|
)
|
|
tracker.mark_in_progress(operation)
|
|
|
|
editor = get_editor(path)
|
|
editor.replace_text(start_line, start_col, end_line, end_col, new_text)
|
|
editor.save_file()
|
|
|
|
if show_diff and old_content:
|
|
with open(path) as f:
|
|
new_content = f.read()
|
|
diff_result = display_content_diff(old_content, new_content, filepath)
|
|
if diff_result["status"] == "success":
|
|
print(diff_result["visual_diff"])
|
|
|
|
tracker.mark_completed(operation)
|
|
return {"status": "success", "message": f"Replaced text in {path}"}
|
|
except Exception as e:
|
|
if operation is not None:
|
|
tracker.mark_failed(operation)
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
def display_edit_summary():
|
|
from ..ui.edit_feedback import display_edit_summary
|
|
|
|
return display_edit_summary()
|
|
|
|
|
|
def display_edit_timeline(show_content=False):
|
|
from ..ui.edit_feedback import display_edit_timeline
|
|
|
|
return display_edit_timeline(show_content)
|
|
|
|
|
|
def clear_edit_tracker():
|
|
from ..ui.edit_feedback import clear_tracker
|
|
|
|
clear_tracker()
|
|
return {"status": "success", "message": "Edit tracker cleared"}
|