import hashlib import os import time from pr.editor import RPEditor from ..tools.patch import display_content_diff from ..ui.diff_display import get_diff_stats from ..ui.edit_feedback import track_edit, tracker _id = 0 def get_uid(): global _id _id += 3 return _id def read_file(filepath, db_conn=None): try: path = os.path.expanduser(filepath) with open(path) as f: content = f.read() if db_conn: from pr.tools.database import db_set db_set("read:" + path, "true", db_conn) return {"status": "success", "content": content} except Exception as e: return {"status": "error", "error": str(e)} def write_file(filepath, content, db_conn=None, show_diff=True): try: path = os.path.expanduser(filepath) old_content = "" is_new_file = not os.path.exists(path) if not is_new_file and db_conn: from pr.tools.database import db_get read_status = db_get("read:" + path, db_conn) if read_status.get("status") != "success" or read_status.get("value") != "true": return { "status": "error", "error": "File must be read before writing. Please read the file first.", } if not is_new_file: with open(path) as f: old_content = f.read() operation = track_edit("WRITE", filepath, content=content, old_content=old_content) tracker.mark_in_progress(operation) if show_diff and not is_new_file: diff_result = display_content_diff(old_content, content, filepath) if diff_result["status"] == "success": print(diff_result["visual_diff"]) editor = RPEditor(path) editor.set_text(content) editor.save_file() if os.path.exists(path) and db_conn: try: cursor = db_conn.cursor() file_hash = hashlib.md5(old_content.encode()).hexdigest() cursor.execute( "SELECT MAX(version) FROM file_versions WHERE filepath = ?", (filepath,), ) result = cursor.fetchone() version = (result[0] + 1) if result[0] else 1 cursor.execute( """INSERT INTO file_versions (filepath, content, hash, timestamp, version) VALUES (?, ?, ?, ?, ?)""", (filepath, old_content, file_hash, time.time(), version), ) db_conn.commit() except Exception: pass tracker.mark_completed(operation) message = f"File written to {path}" if show_diff and not is_new_file: stats = get_diff_stats(old_content, content) message += f" ({stats['insertions']}+ {stats['deletions']}-)" return {"status": "success", "message": message} except Exception as e: if "operation" in locals(): tracker.mark_failed(operation) return {"status": "error", "error": str(e)} def list_directory(path=".", recursive=False): try: path = os.path.expanduser(path) items = [] if recursive: for root, dirs, files in os.walk(path): for name in files: item_path = os.path.join(root, name) items.append( { "path": item_path, "type": "file", "size": os.path.getsize(item_path), } ) for name in dirs: items.append({"path": os.path.join(root, name), "type": "directory"}) else: for item in os.listdir(path): item_path = os.path.join(path, item) items.append( { "name": item, "type": "directory" if os.path.isdir(item_path) else "file", "size": (os.path.getsize(item_path) if os.path.isfile(item_path) else None), } ) return {"status": "success", "items": items} except Exception as e: return {"status": "error", "error": str(e)} def mkdir(path): try: os.makedirs(os.path.expanduser(path), exist_ok=True) return {"status": "success", "message": f"Directory created at {path}"} except Exception as e: return {"status": "error", "error": str(e)} def chdir(path): try: os.chdir(os.path.expanduser(path)) return {"status": "success", "new_path": os.getcwd()} except Exception as e: return {"status": "error", "error": str(e)} def getpwd(): try: return {"status": "success", "path": os.getcwd()} except Exception as e: return {"status": "error", "error": str(e)} def index_source_directory(path): extensions = [ ".py", ".js", ".ts", ".java", ".cpp", ".c", ".h", ".hpp", ".html", ".css", ".json", ".xml", ".md", ".sh", ".rb", ".go", ] source_files = [] try: for root, _, files in os.walk(os.path.expanduser(path)): for file in files: if any(file.endswith(ext) for ext in extensions): filepath = os.path.join(root, file) try: with open(filepath, encoding="utf-8") as f: content = f.read() source_files.append({"path": filepath, "content": content}) except Exception: continue return {"status": "success", "indexed_files": source_files} except Exception as e: return {"status": "error", "error": str(e)} def search_replace(filepath, old_string, new_string, db_conn=None): try: path = os.path.expanduser(filepath) if not os.path.exists(path): return {"status": "error", "error": "File does not exist"} if db_conn: from pr.tools.database import db_get read_status = db_get("read:" + path, db_conn) if read_status.get("status") != "success" or read_status.get("value") != "true": return { "status": "error", "error": "File must be read before writing. Please read the file first.", } with open(path) as f: content = f.read() content = content.replace(old_string, new_string) with open(path, "w") as f: f.write(content) return { "status": "success", "message": f"Replaced '{old_string}' with '{new_string}' in {path}", } except Exception as e: return {"status": "error", "error": str(e)} _editors = {} def get_editor(filepath): if filepath not in _editors: _editors[filepath] = RPEditor(filepath) return _editors[filepath] def close_editor(filepath): try: path = os.path.expanduser(filepath) editor = get_editor(path) editor.close() return {"status": "success", "message": f"Editor closed for {path}"} except Exception as e: return {"status": "error", "error": str(e)} def open_editor(filepath): try: path = os.path.expanduser(filepath) editor = RPEditor(path) editor.start() return {"status": "success", "message": f"Editor opened for {path}"} except Exception as e: return {"status": "error", "error": str(e)} def editor_insert_text(filepath, text, line=None, col=None, show_diff=True, db_conn=None): try: path = os.path.expanduser(filepath) if db_conn: from pr.tools.database import db_get read_status = db_get("read:" + path, db_conn) if read_status.get("status") != "success" or read_status.get("value") != "true": return { "status": "error", "error": "File must be read before writing. Please read the file first.", } old_content = "" if os.path.exists(path): with open(path) as f: old_content = f.read() position = (line if line is not None else 0) * 1000 + (col if col is not None else 0) operation = track_edit("INSERT", filepath, start_pos=position, content=text) tracker.mark_in_progress(operation) editor = get_editor(path) if line is not None and col is not None: editor.move_cursor_to(line, col) editor.insert_text(text) editor.save_file() if show_diff and old_content: with open(path) as f: new_content = f.read() diff_result = display_content_diff(old_content, new_content, filepath) if diff_result["status"] == "success": print(diff_result["visual_diff"]) tracker.mark_completed(operation) return {"status": "success", "message": f"Inserted text in {path}"} except Exception as e: if "operation" in locals(): tracker.mark_failed(operation) return {"status": "error", "error": str(e)} def editor_replace_text( filepath, start_line, start_col, end_line, end_col, new_text, show_diff=True, db_conn=None, ): try: path = os.path.expanduser(filepath) if db_conn: from pr.tools.database import db_get read_status = db_get("read:" + path, db_conn) if read_status.get("status") != "success" or read_status.get("value") != "true": return { "status": "error", "error": "File must be read before writing. Please read the file first.", } old_content = "" if os.path.exists(path): with open(path) as f: old_content = f.read() start_pos = start_line * 1000 + start_col end_pos = end_line * 1000 + end_col operation = track_edit( "REPLACE", filepath, start_pos=start_pos, end_pos=end_pos, content=new_text, old_content=old_content, ) tracker.mark_in_progress(operation) editor = get_editor(path) editor.replace_text(start_line, start_col, end_line, end_col, new_text) editor.save_file() if show_diff and old_content: with open(path) as f: new_content = f.read() diff_result = display_content_diff(old_content, new_content, filepath) if diff_result["status"] == "success": print(diff_result["visual_diff"]) tracker.mark_completed(operation) return {"status": "success", "message": f"Replaced text in {path}"} except Exception as e: if "operation" in locals(): tracker.mark_failed(operation) return {"status": "error", "error": str(e)} def display_edit_summary(): from ..ui.edit_feedback import display_edit_summary return display_edit_summary() def display_edit_timeline(show_content=False): from ..ui.edit_feedback import display_edit_timeline return display_edit_timeline(show_content) def clear_edit_tracker(): from ..ui.edit_feedback import clear_tracker clear_tracker() return {"status": "success", "message": "Edit tracker cleared"}