import glob as glob_module
import json
import logging
import os
import readline
import signal
import sqlite3
import sys
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from rp.commands import handle_command
from rp.config import (
DB_PATH,
DEFAULT_API_URL,
DEFAULT_MODEL,
HISTORY_FILE,
LOG_FILE,
MODEL_LIST_URL,
)
from rp.core.api import call_api
from rp.core.autonomous_interactions import start_global_autonomous, stop_global_autonomous
from rp.core.background_monitor import get_global_monitor, start_global_monitor, stop_global_monitor
from rp.core.context import init_system_message, truncate_tool_result
from rp.core.usage_tracker import UsageTracker
from rp.input_handler import get_advanced_input
from rp.tools import get_tools_definition
from rp.tools.agents import (
collaborate_agents,
create_agent,
execute_agent_task,
list_agents,
remove_agent,
)
from rp.tools.command import kill_process, run_command, tail_process
from rp.tools.database import db_get, db_query, db_set
from rp.tools.filesystem import (
chdir,
clear_edit_tracker,
display_edit_summary,
display_edit_timeline,
getpwd,
index_source_directory,
list_directory,
mkdir,
read_file,
search_replace,
write_file,
)
from rp.tools.interactive_control import (
close_interactive_session,
list_active_sessions,
read_session_output,
send_input_to_session,
start_interactive_session,
)
from rp.tools.memory import (
add_knowledge_entry,
delete_knowledge_entry,
get_knowledge_by_category,
get_knowledge_entry,
get_knowledge_statistics,
search_knowledge,
update_knowledge_importance,
)
from rp.tools.patch import apply_patch, create_diff, display_file_diff
from rp.tools.python_exec import python_exec
from rp.tools.web import http_fetch, web_search, web_search_news
from rp.ui import Colors, Spinner, render_markdown
from rp.ui.progress import ProgressIndicator
logger = logging.getLogger("rp")
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(file_handler)
class Assistant:
def __init__(self, args):
self.args = args
self.messages = []
self.verbose = args.verbose
self.debug = getattr(args, "debug", False)
self.syntax_highlighting = not args.no_syntax
if self.debug:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
logger.addHandler(console_handler)
logger.debug("Debug mode enabled")
self.api_key = os.environ.get("OPENROUTER_API_KEY", "")
if not self.api_key:
print("Warning: OPENROUTER_API_KEY environment variable not set. API calls may fail.")
self.model = args.model or os.environ.get("AI_MODEL", DEFAULT_MODEL)
self.api_url = args.api_url or os.environ.get("API_URL", DEFAULT_API_URL)
self.model_list_url = args.model_list_url or os.environ.get(
"MODEL_LIST_URL", MODEL_LIST_URL
)
self.use_tools = os.environ.get("USE_TOOLS", "1") == "1"
self.interrupt_count = 0
self.python_globals = {}
self.db_conn = None
self.autonomous_mode = False
self.autonomous_iterations = 0
self.background_monitoring = False
self.usage_tracker = UsageTracker()
self.background_tasks = set()
self.last_result = None
self.init_database()
from rp.memory import KnowledgeStore, FactExtractor, GraphMemory
self.knowledge_store = KnowledgeStore(DB_PATH, db_conn=self.db_conn)
self.fact_extractor = FactExtractor()
self.graph_memory = GraphMemory(DB_PATH, db_conn=self.db_conn)
self.messages.append(init_system_message(args))
try:
from rp.core.enhanced_assistant import EnhancedAssistant
self.enhanced = EnhancedAssistant(self)
if self.debug:
logger.debug("Enhanced assistant features initialized")
except Exception as e:
logger.warning(f"Could not initialize enhanced features: {e}")
self.enhanced = None
from rp.config import BACKGROUND_MONITOR_ENABLED
bg_enabled = os.environ.get("BACKGROUND_MONITOR", str(BACKGROUND_MONITOR_ENABLED)).lower() in ("1", "true", "yes")
if bg_enabled:
try:
start_global_monitor()
start_global_autonomous(llm_callback=self._handle_background_updates)
self.background_monitoring = True
if self.debug:
logger.debug("Background monitoring initialized")
except Exception as e:
logger.warning(f"Could not initialize background monitoring: {e}")
self.background_monitoring = False
else:
self.background_monitoring = False
if self.debug:
logger.debug("Background monitoring disabled")
def init_database(self):
try:
logger.debug(f"Initializing database at {DB_PATH}")
self.db_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = self.db_conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS kv_store\n (key TEXT PRIMARY KEY, value TEXT, timestamp REAL)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS api_request_logs\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n timestamp REAL, model TEXT, api_url TEXT,\n request_payload TEXT)"
)
self.db_conn.commit()
logger.debug("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization error: {e}")
self.db_conn = None
def _handle_background_updates(self, updates):
"""Handle background session updates by injecting them into the conversation."""
if not updates or not updates.get("sessions"):
return
update_message = self._format_background_update_message(updates)
if self.messages and len(self.messages) > 0:
self.messages.append(
{"role": "system", "content": f"Background session updates: {update_message}"}
)
if self.verbose:
print(f"{Colors.CYAN}Background update: {update_message}{Colors.RESET}")
def _format_background_update_message(self, updates):
"""Format background updates for LLM consumption."""
session_summaries = []
for session_name, session_info in updates.get("sessions", {}).items():
summary = session_info.get("summary", f"Session {session_name}")
session_summaries.append(f"{session_name}: {summary}")
if session_summaries:
return "Active background sessions: " + "; ".join(session_summaries)
else:
return "No active background sessions requiring attention."
def _check_background_updates(self):
"""Check for pending background updates and display them."""
if not self.background_monitoring:
return
try:
monitor = get_global_monitor()
events = monitor.get_pending_events()
if events:
print(f"\n{Colors.CYAN}Background Events:{Colors.RESET}")
for event in events:
event_type = event.get("type", "unknown")
session_name = event.get("session_name", "unknown")
if event_type == "session_started":
print(f" {Colors.GREEN}{Colors.RESET} Session '{session_name}' started")
elif event_type == "session_ended":
print(f" {Colors.YELLOW}{Colors.RESET} Session '{session_name}' ended")
elif event_type == "output_received":
lines = len(event.get("new_output", {}).get("stdout", []))
print(
f" {Colors.BLUE}📝{Colors.RESET} Session '{session_name}' produced {lines} lines of output"
)
elif event_type == "possible_input_needed":
print(
f" {Colors.RED}{Colors.RESET} Session '{session_name}' may need input"
)
elif event_type == "high_output_volume":
total = event.get("total_lines", 0)
print(
f" {Colors.YELLOW}📊{Colors.RESET} Session '{session_name}' has high output volume ({total} lines)"
)
elif event_type == "inactive_session":
inactive_time = event.get("inactive_seconds", 0)
print(
f" {Colors.GRAY}{Colors.RESET} Session '{session_name}' inactive for {inactive_time:.0f}s"
)
print()
except Exception as e:
if self.debug:
print(f"{Colors.RED}Error checking background updates: {e}{Colors.RESET}")
def execute_tool_calls(self, tool_calls):
results = []
logger.debug(f"Executing {len(tool_calls)} tool call(s)")
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for tool_call in tool_calls:
func_name = tool_call["function"]["name"]
arguments = json.loads(tool_call["function"]["arguments"])
logger.debug(f"Tool call: {func_name} with arguments: {arguments}")
func_map = {
"http_fetch": lambda **kw: http_fetch(**kw),
"run_command": lambda **kw: run_command(**kw),
"tail_process": lambda **kw: tail_process(**kw),
"kill_process": lambda **kw: kill_process(**kw),
"start_interactive_session": lambda **kw: start_interactive_session(**kw),
"send_input_to_session": lambda **kw: send_input_to_session(**kw),
"read_session_output": lambda **kw: read_session_output(**kw),
"close_interactive_session": lambda **kw: close_interactive_session(**kw),
"read_file": lambda **kw: read_file(**kw, db_conn=self.db_conn),
"write_file": lambda **kw: write_file(**kw, db_conn=self.db_conn),
"list_directory": lambda **kw: list_directory(**kw),
"mkdir": lambda **kw: mkdir(**kw),
"chdir": lambda **kw: chdir(**kw),
"getpwd": lambda **kw: getpwd(**kw),
"db_set": lambda **kw: db_set(**kw, db_conn=self.db_conn),
"db_get": lambda **kw: db_get(**kw, db_conn=self.db_conn),
"db_query": lambda **kw: db_query(**kw, db_conn=self.db_conn),
"web_search": lambda **kw: web_search(**kw),
"web_search_news": lambda **kw: web_search_news(**kw),
"python_exec": lambda **kw: python_exec(
**kw, python_globals=self.python_globals
),
"index_source_directory": lambda **kw: index_source_directory(**kw),
"search_replace": lambda **kw: search_replace(**kw, db_conn=self.db_conn),
"create_diff": lambda **kw: create_diff(**kw),
"apply_patch": lambda **kw: apply_patch(**kw, db_conn=self.db_conn),
"display_file_diff": lambda **kw: display_file_diff(**kw),
"display_edit_summary": lambda **kw: display_edit_summary(),
"display_edit_timeline": lambda **kw: display_edit_timeline(**kw),
"clear_edit_tracker": lambda **kw: clear_edit_tracker(),
"start_interactive_session": lambda **kw: start_interactive_session(**kw),
"send_input_to_session": lambda **kw: send_input_to_session(**kw),
"read_session_output": lambda **kw: read_session_output(**kw),
"list_active_sessions": lambda **kw: list_active_sessions(**kw),
"close_interactive_session": lambda **kw: close_interactive_session(**kw),
"create_agent": lambda **kw: create_agent(**kw),
"list_agents": lambda **kw: list_agents(**kw),
"execute_agent_task": lambda **kw: execute_agent_task(**kw),
"remove_agent": lambda **kw: remove_agent(**kw),
"collaborate_agents": lambda **kw: collaborate_agents(**kw),
"add_knowledge_entry": lambda **kw: add_knowledge_entry(**kw),
"get_knowledge_entry": lambda **kw: get_knowledge_entry(**kw),
"search_knowledge": lambda **kw: search_knowledge(**kw),
"get_knowledge_by_category": lambda **kw: get_knowledge_by_category(**kw),
"update_knowledge_importance": lambda **kw: update_knowledge_importance(**kw),
"delete_knowledge_entry": lambda **kw: delete_knowledge_entry(**kw),
"get_knowledge_statistics": lambda **kw: get_knowledge_statistics(**kw),
}
if func_name in func_map:
future = executor.submit(func_map[func_name], **arguments)
futures.append((tool_call["id"], future))
for tool_id, future in futures:
try:
result = future.result(timeout=30)
result = truncate_tool_result(result)
logger.debug(f"Tool result for {tool_id}: {str(result)[:200]}...")
results.append(
{"tool_call_id": tool_id, "role": "tool", "content": json.dumps(result)}
)
except Exception as e:
logger.debug(f"Tool error for {tool_id}: {str(e)}")
error_msg = str(e)[:200] if len(str(e)) > 200 else str(e)
results.append(
{
"tool_call_id": tool_id,
"role": "tool",
"content": json.dumps({"status": "error", "error": error_msg}),
}
)
return results
def process_response(self, response):
if "error" in response:
return f"Error: {response['error']}"
if "choices" not in response or not response["choices"]:
return "No response from API"
message = response["choices"][0]["message"]
self.messages.append(message)
if "tool_calls" in message and message["tool_calls"]:
tool_count = len(message["tool_calls"])
print(f"{Colors.BLUE}🔧 Executing {tool_count} tool call(s)...{Colors.RESET}")
with ProgressIndicator("Executing tools..."):
tool_results = self.execute_tool_calls(message["tool_calls"])
print(f"{Colors.GREEN}✅ Tool execution completed.{Colors.RESET}")
for result in tool_results:
self.messages.append(result)
with ProgressIndicator("Processing tool results..."):
follow_up = call_api(
self.messages,
self.model,
self.api_url,
self.api_key,
self.use_tools,
get_tools_definition(),
verbose=self.verbose,
db_conn=self.db_conn,
)
return self.process_response(follow_up)
content = message.get("content", "")
with ProgressIndicator("Updating memory..."):
self.graph_memory.populate_from_text(content)
return render_markdown(content, self.syntax_highlighting)
def signal_handler(self, signum, frame):
if self.autonomous_mode:
self.interrupt_count += 1
if self.interrupt_count >= 2:
print(f"\n{Colors.RED}Force exiting autonomous mode...{Colors.RESET}")
self.autonomous_mode = False
sys.exit(0)
else:
print(f"\n{Colors.YELLOW}Press Ctrl+C again to force exit{Colors.RESET}")
return
self.interrupt_count += 1
if self.interrupt_count >= 2:
print(f"\n{Colors.RED}Exiting...{Colors.RESET}")
self.cleanup()
sys.exit(0)
else:
print(f"\n{Colors.YELLOW}Press Ctrl+C again to exit{Colors.RESET}")
def setup_readline(self):
try:
readline.read_history_file(HISTORY_FILE)
except FileNotFoundError:
pass
readline.set_history_length(1000)
import atexit
atexit.register(readline.write_history_file, HISTORY_FILE)
commands = [
"exit",
"quit",
"help",
"reset",
"dump",
"verbose",
"models",
"tools",
"review",
"refactor",
"obfuscate",
"/auto",
"/edit",
]
def completer(text, state):
options = [cmd for cmd in commands if cmd.startswith(text)]
glob_pattern = os.path.expanduser(text) + "*"
path_options = glob_module.glob(glob_pattern)
path_options = [p + os.sep if os.path.isdir(p) else p for p in path_options]
combined_options = sorted(list(set(options + path_options)))
if state < len(combined_options):
return combined_options[state]
return None
delims = readline.get_completer_delims()
readline.set_completer_delims(delims.replace("/", ""))
readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
def run_repl(self):
self.setup_readline()
signal.signal(signal.SIGINT, self.signal_handler)
while True:
try:
if self.background_monitoring:
self._check_background_updates()
prompt = f"{Colors.BLUE}You"
if self.background_monitoring:
try:
from rp.multiplexer import get_all_sessions
sessions = get_all_sessions()
active_count = sum(
(1 for s in sessions.values() if s.get("status") == "running")
)
if active_count > 0:
prompt += f"[{active_count}bg]"
except:
pass
prompt += f">{Colors.RESET} "
user_input = get_advanced_input(prompt) or ""
user_input = user_input.strip()
if not user_input:
continue
cmd_result = handle_command(self, user_input)
if cmd_result is False:
break
# If cmd_result is True, the command was handled (e.g., /auto),
# and the blocking operation will complete before the next prompt.
# If cmd_result is None, it's not a special command, process with autonomous mode.
elif cmd_result is None:
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, user_input)
except EOFError:
break
except KeyboardInterrupt:
self.signal_handler(None, None)
except Exception as e:
print(f"{Colors.RED}Error: {e}{Colors.RESET}")
logging.error(f"REPL error: {e}\n{traceback.format_exc()}")
def run_single(self):
if self.args.message:
message = self.args.message
else:
message = sys.stdin.read()
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, message)
def run_autonomous(self):
if self.args.message:
task = self.args.message
else:
self.setup_readline()
task = input("> ").strip()
if not task:
print("No task provided. Exiting.")
return
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, task)
def cleanup(self):
if hasattr(self, "enhanced") and self.enhanced:
try:
self.enhanced.cleanup()
except Exception as e:
logger.error(f"Error cleaning up enhanced features: {e}")
if self.background_monitoring:
try:
stop_global_autonomous()
stop_global_monitor()
except Exception as e:
logger.error(f"Error stopping background monitoring: {e}")
try:
from rp.multiplexer import cleanup_all_multiplexers
cleanup_all_multiplexers()
except Exception as e:
logger.error(f"Error cleaning up multiplexers: {e}")
if self.db_conn:
self.db_conn.close()
def run(self):
try:
if self.args.autonomous:
self.run_autonomous()
elif self.args.interactive or (not self.args.message and sys.stdin.isatty()):
self.run_repl()
else:
self.run_single()
finally:
self.cleanup()
def process_message(assistant, message):
from rp.core.knowledge_context import inject_knowledge_context
# Save the user message as a fact
import time
import uuid
from rp.memory import KnowledgeEntry
categories = assistant.fact_extractor.categorize_content(message)
entry_id = str(uuid.uuid4())[:16]
entry = KnowledgeEntry(
entry_id=entry_id,
category=categories[0] if categories else "user_message",
content=message,
metadata={
"type": "user_message",
"confidence": 1.0,
"source": "user_input",
},
created_at=time.time(),
updated_at=time.time(),
)
assistant.knowledge_store.add_entry(entry)
assistant.messages.append({"role": "user", "content": str(entry)})
inject_knowledge_context(assistant, assistant.messages[-1]["content"])
with ProgressIndicator("Updating memory..."):
assistant.graph_memory.populate_from_text(message)
logger.debug(f"Processing user message: {message[:100]}...")
logger.debug(f"Current message count: {len(assistant.messages)}")
with ProgressIndicator("Querying AI..."):
response = call_api(
assistant.messages,
assistant.model,
assistant.api_url,
assistant.api_key,
assistant.use_tools,
get_tools_definition(),
verbose=assistant.verbose,
db_conn=assistant.db_conn,
)
if "usage" in response:
usage = response["usage"]
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
assistant.usage_tracker.track_request(assistant.model, input_tokens, output_tokens)
cost = UsageTracker._calculate_cost(assistant.model, input_tokens, output_tokens)
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}💰 Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
result = assistant.process_response(response)
if result != assistant.last_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
assistant.last_result = result