import glob as glob_module
import json
import logging
import os
import readline
import signal
import sqlite3
import sys
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from rp.commands import handle_command
from rp.config import (
ADVANCED_CONTEXT_ENABLED,
API_CACHE_TTL,
CACHE_ENABLED,
CONVERSATION_SUMMARY_THRESHOLD,
DB_PATH,
DEFAULT_API_KEY,
DEFAULT_API_URL,
DEFAULT_MODEL,
HISTORY_FILE,
KNOWLEDGE_SEARCH_LIMIT,
LOG_FILE,
MODEL_LIST_URL,
TOOL_CACHE_TTL,
WORKFLOW_EXECUTOR_MAX_WORKERS,
)
from rp.core.api import call_api
from rp.core.autonomous_interactions import start_global_autonomous, stop_global_autonomous
from rp.core.background_monitor import get_global_monitor, start_global_monitor, stop_global_monitor
from rp.core.config_validator import ConfigManager, get_config
from rp.core.context import init_system_message, refresh_system_message, truncate_tool_result
from rp.core.database import DatabaseManager, SQLiteBackend, KeyValueStore, FileVersionStore
from rp.core.debug import debug_trace, enable_debug, is_debug_enabled
from rp.core.logging import setup_logging
from rp.core.tool_executor import ToolExecutor, ToolCall, ToolPriority, create_tool_executor_from_assistant
from rp.core.usage_tracker import UsageTracker
from rp.input_handler import get_advanced_input
from rp.tools import get_tools_definition
from rp.tools.agents import (
collaborate_agents,
create_agent,
execute_agent_task,
list_agents,
remove_agent,
)
from rp.tools.command import kill_process, run_command, tail_process
from rp.tools.database import db_get, db_query, db_set
from rp.tools.filesystem import (
chdir,
clear_edit_tracker,
display_edit_summary,
display_edit_timeline,
getpwd,
index_source_directory,
list_directory,
mkdir,
read_file,
search_replace,
write_file,
)
from rp.tools.interactive_control import (
close_interactive_session,
list_active_sessions,
read_session_output,
send_input_to_session,
start_interactive_session,
)
from rp.tools.memory import (
add_knowledge_entry,
delete_knowledge_entry,
get_knowledge_by_category,
get_knowledge_entry,
get_knowledge_statistics,
search_knowledge,
update_knowledge_importance,
)
from rp.tools.patch import apply_patch, create_diff, display_file_diff
from rp.tools.python_exec import python_exec
from rp.tools.web import http_fetch, web_search, web_search_news
from rp.ui import Colors, render_markdown
from rp.ui.progress import ProgressIndicator
from rp.ui.build_formatter import BuildOutputFormatter
from rp.ui.keybindings import ReadlineKeybindingManager
logger = logging.getLogger("rp")
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(file_handler)
class Assistant:
def __init__(self, args):
self.args = args
self.messages = []
self.verbose = args.verbose
self.debug = getattr(args, "debug", False)
self.syntax_highlighting = not args.no_syntax
if self.debug:
enable_debug(verbose_output=True)
logger.debug("Debug mode enabled - Full function tracing active")
setup_logging(verbose=self.verbose, debug=self.debug)
self.api_key = os.environ.get("OPENROUTER_API_KEY", DEFAULT_API_KEY)
self.model = args.model or os.environ.get("AI_MODEL", DEFAULT_MODEL)
self.api_url = args.api_url or os.environ.get("API_URL", DEFAULT_API_URL)
self.model_list_url = args.model_list_url or os.environ.get(
"MODEL_LIST_URL", MODEL_LIST_URL
)
self.use_tools = os.environ.get("USE_TOOLS", "1") == "1"
self.last_interrupt_time = 0
self.python_globals = {}
self.db_conn = None
self.autonomous_mode = False
self.autonomous_iterations = 0
self.background_monitoring = False
self.usage_tracker = UsageTracker()
self.background_tasks = set()
self.last_result = None
self.init_database()
# Memory initialization moved to enhanced features section below
self.messages.append(init_system_message(args))
# Enhanced features initialization
from rp.agents import AgentManager
from rp.cache import APICache, ToolCache
from rp.workflows import WorkflowEngine, WorkflowStorage
from rp.core.advanced_context import AdvancedContextManager
from rp.memory import MemoryManager
from rp.config import (
CACHE_ENABLED, API_CACHE_TTL, TOOL_CACHE_TTL,
WORKFLOW_EXECUTOR_MAX_WORKERS, ADVANCED_CONTEXT_ENABLED,
CONVERSATION_SUMMARY_THRESHOLD, KNOWLEDGE_SEARCH_LIMIT
)
# Initialize caching
if CACHE_ENABLED:
self.api_cache = APICache(DB_PATH, API_CACHE_TTL)
self.tool_cache = ToolCache(DB_PATH, TOOL_CACHE_TTL)
else:
self.api_cache = None
self.tool_cache = None
# Initialize workflows
self.workflow_storage = WorkflowStorage(DB_PATH)
self.workflow_engine = WorkflowEngine(
tool_executor=self._execute_tool_for_workflow,
max_workers=WORKFLOW_EXECUTOR_MAX_WORKERS
)
# Initialize agents
self.agent_manager = AgentManager(DB_PATH, self._api_caller_for_agent)
# Replace basic memory with unified MemoryManager
self.memory_manager = MemoryManager(DB_PATH, db_conn=self.db_conn, enable_auto_extraction=True)
self.knowledge_store = self.memory_manager.knowledge_store
self.conversation_memory = self.memory_manager.conversation_memory
self.graph_memory = self.memory_manager.graph_memory
self.fact_extractor = self.memory_manager.fact_extractor
# Initialize advanced context manager
if ADVANCED_CONTEXT_ENABLED:
self.context_manager = AdvancedContextManager(
knowledge_store=self.memory_manager.knowledge_store,
conversation_memory=self.memory_manager.conversation_memory
)
else:
self.context_manager = None
# Start conversation tracking
import uuid
session_id = str(uuid.uuid4())[:16]
self.current_conversation_id = self.memory_manager.start_conversation(session_id=session_id)
from rp.core.executor import LabsExecutor
from rp.core.planner import ProjectPlanner
from rp.core.artifacts import ArtifactGenerator
self.planner = ProjectPlanner()
self.artifact_generator = ArtifactGenerator(output_dir="/tmp/rp_artifacts")
self.labs_executor = None
self.start_time = time.time()
self.config_manager = get_config()
self.config_manager.load()
self.db_manager = DatabaseManager(SQLiteBackend(DB_PATH, check_same_thread=False))
self.db_manager.connect()
self.kv_store = KeyValueStore(self.db_manager)
self.file_version_store = FileVersionStore(self.db_manager)
self.tool_executor = create_tool_executor_from_assistant(self)
from rp.config import (
BUILD_LIVE_COST_TICKER, BUILD_DEFAULT_VERBOSITY,
BUILD_SHOW_TOKEN_BREAKDOWN, BUILD_SHOW_TIME_ANALYSIS,
BUILD_PROGRESS_WIDTH, BUILD_DEFAULT_BUDGET_EUR,
PRICING_INPUT_EUR, PRICING_OUTPUT_EUR, KEYBINDINGS_ENABLED
)
from decimal import Decimal
self.build_formatter = BuildOutputFormatter(
use_colors=not args.no_syntax,
progress_width=BUILD_PROGRESS_WIDTH
)
self.build_formatter.live_cost_ticker = BUILD_LIVE_COST_TICKER
self.build_formatter.verbose_mode = BUILD_DEFAULT_VERBOSITY
self.build_formatter.show_token_breakdown = BUILD_SHOW_TOKEN_BREAKDOWN
self.build_formatter.show_time_analysis = BUILD_SHOW_TIME_ANALYSIS
self.build_formatter.cost_tracker.set_budget(Decimal(str(BUILD_DEFAULT_BUDGET_EUR)))
self.build_formatter.cost_tracker.pricing_input = Decimal(str(PRICING_INPUT_EUR))
self.build_formatter.cost_tracker.pricing_output = Decimal(str(PRICING_OUTPUT_EUR))
self.keybinding_manager = ReadlineKeybindingManager(formatter=self.build_formatter)
self.keybindings_enabled = KEYBINDINGS_ENABLED
logger.info("Unified Assistant initialized with all features including Labs architecture")
from rp.config import BACKGROUND_MONITOR_ENABLED
bg_enabled = os.environ.get(
"BACKGROUND_MONITOR", str(BACKGROUND_MONITOR_ENABLED)
).lower() in ("1", "true", "yes")
if bg_enabled:
try:
start_global_monitor()
start_global_autonomous(llm_callback=self._handle_background_updates)
self.background_monitoring = True
if self.debug:
logger.debug("Background monitoring initialized")
except Exception as e:
logger.warning(f"Could not initialize background monitoring: {e}")
self.background_monitoring = False
else:
self.background_monitoring = False
if self.debug:
logger.debug("Background monitoring disabled")
def init_database(self):
try:
logger.debug(f"Initializing database at {DB_PATH}")
self.db_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = self.db_conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS kv_store\n (key TEXT PRIMARY KEY, value TEXT, timestamp REAL)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS file_versions\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n filepath TEXT, content TEXT, hash TEXT,\n timestamp REAL, version INTEGER)"
)
cursor.execute(
"CREATE TABLE IF NOT EXISTS api_request_logs\n (id INTEGER PRIMARY KEY AUTOINCREMENT,\n timestamp REAL, model TEXT, api_url TEXT,\n request_payload TEXT)"
)
self.db_conn.commit()
logger.debug("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization error: {e}")
self.db_conn = None
def _handle_background_updates(self, updates):
"""Handle background session updates by injecting them into the conversation."""
if not updates or not updates.get("sessions"):
return
update_message = self._format_background_update_message(updates)
if self.messages and len(self.messages) > 0:
self.messages.append(
{"role": "system", "content": f"Background session updates: {update_message}"}
)
if self.verbose:
print(f"{Colors.CYAN}Background update: {update_message}{Colors.RESET}")
def _format_background_update_message(self, updates):
"""Format background updates for LLM consumption."""
session_summaries = []
for session_name, session_info in updates.get("sessions", {}).items():
summary = session_info.get("summary", f"Session {session_name}")
session_summaries.append(f"{session_name}: {summary}")
if session_summaries:
return "Active background sessions: " + "; ".join(session_summaries)
else:
return "No active background sessions requiring attention."
def _check_background_updates(self):
"""Check for pending background updates and display them."""
if not self.background_monitoring:
return
try:
monitor = get_global_monitor()
events = monitor.get_pending_events()
if events:
print(f"\n{Colors.CYAN}Background Events:{Colors.RESET}")
for event in events:
event_type = event.get("type", "unknown")
session_name = event.get("session_name", "unknown")
if event_type == "session_started":
print(f" {Colors.GREEN}{Colors.RESET} Session '{session_name}' started")
elif event_type == "session_ended":
print(f" {Colors.YELLOW}{Colors.RESET} Session '{session_name}' ended")
elif event_type == "output_received":
lines = len(event.get("new_output", {}).get("stdout", []))
print(
f" {Colors.BLUE}📝{Colors.RESET} Session '{session_name}' produced {lines} lines of output"
)
elif event_type == "possible_input_needed":
print(
f" {Colors.RED}{Colors.RESET} Session '{session_name}' may need input"
)
elif event_type == "high_output_volume":
total = event.get("total_lines", 0)
print(
f" {Colors.YELLOW}📊{Colors.RESET} Session '{session_name}' has high output volume ({total} lines)"
)
elif event_type == "inactive_session":
inactive_time = event.get("inactive_seconds", 0)
print(
f" {Colors.GRAY}{Colors.RESET} Session '{session_name}' inactive for {inactive_time:.0f}s"
)
print()
except Exception as e:
if self.debug:
print(f"{Colors.RED}Error checking background updates: {e}{Colors.RESET}")
def execute_tool_calls(self, tool_calls):
results = []
logger.debug(f"Executing {len(tool_calls)} tool call(s)")
parallel_tool_calls = []
for tool_call in tool_calls:
func_name = tool_call["function"]["name"]
arguments = json.loads(tool_call["function"]["arguments"])
logger.debug(f"Tool call: {func_name} with arguments: {arguments}")
args_str = ", ".join([f"{k}={repr(v)}" for k, v in arguments.items()])
if len(args_str) > 100:
args_str = args_str[:97] + "..."
print(f"{Colors.BLUE}⠋ Executing tools......{func_name}({args_str}){Colors.RESET}")
parallel_tool_calls.append(ToolCall(
tool_id=tool_call["id"],
function_name=func_name,
arguments=arguments,
timeout=self.config_manager.get("TOOL_DEFAULT_TIMEOUT", 30.0),
retries=self.config_manager.get("TOOL_MAX_RETRIES", 3)
))
tool_results = self.tool_executor.execute_parallel(parallel_tool_calls)
for tool_result in tool_results:
if tool_result.success:
result = truncate_tool_result(tool_result.result)
logger.debug(f"Tool result for {tool_result.tool_id}: {str(result)[:200]}...")
results.append({
"tool_call_id": tool_result.tool_id,
"role": "tool",
"content": json.dumps(result)
})
else:
logger.debug(f"Tool error for {tool_result.tool_id}: {tool_result.error}")
error_msg = tool_result.error[:200] if tool_result.error and len(tool_result.error) > 200 else tool_result.error
results.append({
"tool_call_id": tool_result.tool_id,
"role": "tool",
"content": json.dumps({"status": "error", "error": error_msg})
})
return results
def process_response(self, response):
if "error" in response:
return f"Error: {response['error']}"
if "choices" not in response or not response["choices"]:
return "No response from API"
message = response["choices"][0]["message"]
self.messages.append(message)
if "tool_calls" in message and message["tool_calls"]:
tool_count = len(message["tool_calls"])
print(f"{Colors.BLUE}[TOOL] Executing {tool_count} tool call(s)...{Colors.RESET}")
with ProgressIndicator("Executing tools..."):
tool_results = self.execute_tool_calls(message["tool_calls"])
print(f"{Colors.GREEN}[OK] Tool execution completed.{Colors.RESET}")
for result in tool_results:
self.messages.append(result)
with ProgressIndicator("Processing tool results..."):
refresh_system_message(self.messages, self.args)
follow_up = call_api(
self.messages,
self.model,
self.api_url,
self.api_key,
self.use_tools,
get_tools_definition(),
verbose=self.verbose,
db_conn=self.db_conn,
)
return self.process_response(follow_up)
content = message.get("content", "")
from rp.autonomous.mode import extract_reasoning_and_clean_content
reasoning, cleaned_content = extract_reasoning_and_clean_content(content)
if reasoning:
print(f"{Colors.BLUE}💭 Reasoning: {reasoning}{Colors.RESET}")
with ProgressIndicator("Updating memory..."):
self.graph_memory.populate_from_text(cleaned_content)
return cleaned_content
def format_output(self, content):
output_format = getattr(self.args, "output", "text")
if output_format == "json":
return json.dumps({"response": content}, indent=2)
elif output_format == "structured":
# For structured, perhaps parse and format
return f"Response:\n{content}"
elif output_format == "markdown":
return content # Raw markdown
else: # text
return f"\n{Colors.GREEN}r:{Colors.RESET} {render_markdown(content, self.syntax_highlighting)}\n"
def signal_handler(self, signum, frame):
current_time = time.time()
if current_time - self.last_interrupt_time < 1.0:
print(f"\n{Colors.RED}Force exiting...{Colors.RESET}")
self.cleanup()
sys.exit(0)
else:
self.last_interrupt_time = current_time
print(f"\n{Colors.YELLOW}Interrupted{Colors.RESET}")
raise KeyboardInterrupt
def setup_readline(self):
try:
readline.read_history_file(HISTORY_FILE)
except FileNotFoundError:
pass
readline.set_history_length(1000)
import atexit
atexit.register(readline.write_history_file, HISTORY_FILE)
commands = [
"exit",
"quit",
"help",
"reset",
"dump",
"verbose",
"models",
"tools",
"review",
"refactor",
"obfuscate",
"/auto",
"/edit",
"/prompt",
"/shortcuts",
"/cost",
"/budget",
]
def completer(text, state):
options = [cmd for cmd in commands if cmd.startswith(text)]
glob_pattern = os.path.expanduser(text) + "*"
path_options = glob_module.glob(glob_pattern)
path_options = [p + os.sep if os.path.isdir(p) else p for p in path_options]
combined_options = sorted(list(set(options + path_options)))
if state < len(combined_options):
return combined_options[state]
return None
delims = readline.get_completer_delims()
readline.set_completer_delims(delims.replace("/", ""))
readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
if self.keybindings_enabled and hasattr(self, 'keybinding_manager'):
self.keybinding_manager.register_keybindings()
def run_repl(self):
self.setup_readline()
signal.signal(signal.SIGINT, self.signal_handler)
while True:
try:
if self.background_monitoring:
self._check_background_updates()
prompt = f"{Colors.BLUE}You"
if self.background_monitoring:
try:
from rp.multiplexer import get_all_sessions
sessions = get_all_sessions()
active_count = sum(
(1 for s in sessions.values() if s.get("status") == "running")
)
if active_count > 0:
prompt += f"[{active_count}bg]"
except:
pass
prompt += f">{Colors.RESET} "
user_input = get_advanced_input(prompt) or ""
user_input = user_input.strip()
if not user_input:
continue
cmd_result = handle_command(self, user_input)
if cmd_result is False:
break
# If cmd_result is True, the command was handled (e.g., /auto),
# and the blocking operation will complete before the next prompt.
# If cmd_result is None, it's not a special command, process with autonomous mode.
elif cmd_result is None:
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, user_input)
except EOFError:
break
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Interrupted, returning to prompt{Colors.RESET}")
continue
except Exception as e:
print(f"{Colors.RED}Error: {e}{Colors.RESET}")
logging.error(f"REPL error: {e}\n{traceback.format_exc()}")
def run_single(self):
if self.args.message:
message = self.args.message
else:
message = sys.stdin.read()
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, message)
def run_autonomous(self):
if self.args.message:
task = self.args.message
else:
self.setup_readline()
task = input("> ").strip()
if not task:
print("No task provided. Exiting.")
return
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, task)
# ===== Enhanced Features Methods =====
def _execute_tool_for_workflow(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
if self.tool_cache:
cached_result = self.tool_cache.get(tool_name, arguments)
if cached_result is not None:
logger.debug(f"Tool cache hit for {tool_name}")
return cached_result
func_map = {
"read_file": lambda **kw: self.execute_tool_calls(
[{"id": "temp", "function": {"name": "read_file", "arguments": json.dumps(kw)}}]
)[0],
"write_file": lambda **kw: self.execute_tool_calls(
[{"id": "temp", "function": {"name": "write_file", "arguments": json.dumps(kw)}}]
)[0],
"list_directory": lambda **kw: self.execute_tool_calls(
[
{
"id": "temp",
"function": {"name": "list_directory", "arguments": json.dumps(kw)},
}
]
)[0],
"run_command": lambda **kw: self.execute_tool_calls(
[{"id": "temp", "function": {"name": "run_command", "arguments": json.dumps(kw)}}]
)[0],
}
if tool_name in func_map:
result = func_map[tool_name](**arguments)
if self.tool_cache:
content = result.get("content", "")
try:
parsed_content = json.loads(content) if isinstance(content, str) else content
self.tool_cache.set(tool_name, arguments, parsed_content)
except Exception:
pass
return result
return {"error": f"Unknown tool: {tool_name}"}
def _api_caller_for_agent(
self, messages: List[Dict[str, Any]], temperature: float, max_tokens: int
) -> Dict[str, Any]:
return call_api(
messages,
self.model,
self.api_url,
self.api_key,
use_tools=False,
tools_definition=[],
verbose=self.verbose,
)
def enhanced_call_api(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
if self.api_cache and CACHE_ENABLED:
cached_response = self.api_cache.get(self.model, messages, 0.7, 4096)
if cached_response:
logger.debug("API cache hit")
return cached_response
from rp.core.context import refresh_system_message
refresh_system_message(messages, self.args)
response = call_api(
messages,
self.model,
self.api_url,
self.api_key,
self.use_tools,
get_tools_definition(),
verbose=self.verbose,
)
if self.api_cache and CACHE_ENABLED and ("error" not in response):
token_count = response.get("usage", {}).get("total_tokens", 0)
self.api_cache.set(self.model, messages, 0.7, 4096, response, token_count)
return response
def print_cost_summary(self):
usage = self.usage_tracker.get_total_usage()
duration = time.time() - self.start_time
print(f"{Colors.CYAN}[COST] Tokens: {usage['total_tokens']:,} | Cost: ${usage['total_cost']:.4f} | Duration: {duration:.1f}s{Colors.RESET}")
if hasattr(self, 'build_formatter') and self.build_formatter.live_cost_ticker:
self.build_formatter.print_cost_panel()
def track_step_cost(self, step_name: str, input_tokens: int, output_tokens: int, duration: float, success: bool = True):
if hasattr(self, 'build_formatter'):
step_cost = self.build_formatter.cost_tracker.add_step_cost(input_tokens, output_tokens)
self.build_formatter.record_step(
name=step_name,
cost=step_cost.cost_eur,
duration=duration,
input_tokens=input_tokens,
output_tokens=output_tokens,
success=success
)
if self.build_formatter.live_cost_ticker:
self.build_formatter.print_cost_display(input_tokens, output_tokens, step_cost.cost_eur)
def show_shortcuts_help(self):
if hasattr(self, 'build_formatter'):
self.build_formatter.print_help()
def reset_build_costs(self):
if hasattr(self, 'build_formatter'):
self.build_formatter.cost_tracker.reset_build()
self.build_formatter.step_history.clear()
def process_with_enhanced_context(self, user_message: str) -> str:
self.messages.append({"role": "user", "content": user_message})
self.memory_manager.process_message(
user_message, role="user", extract_facts=True, update_graph=True
)
if self.context_manager and ADVANCED_CONTEXT_ENABLED:
enhanced_messages, context_info = self.context_manager.create_enhanced_context(
self.messages, user_message, include_knowledge=True
)
if self.verbose:
logger.info(f"Enhanced context: {context_info}")
working_messages = enhanced_messages
else:
working_messages = self.messages
with ProgressIndicator("Querying AI..."):
response = self.enhanced_call_api(working_messages)
result = self.process_response(response)
if len(self.messages) >= CONVERSATION_SUMMARY_THRESHOLD:
summary = (
self.context_manager.advanced_summarize_messages(
self.messages[-CONVERSATION_SUMMARY_THRESHOLD:]
)
if self.context_manager
else "Conversation in progress"
)
topics = self.fact_extractor.categorize_content(summary)
self.memory_manager.update_conversation_summary(summary, topics)
return result
def execute_workflow(
self, workflow_name: str, initial_variables: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
workflow = self.workflow_storage.load_workflow_by_name(workflow_name)
if not workflow:
return {"error": f'Workflow "{workflow_name}" not found'}
context = self.workflow_engine.execute_workflow(workflow, initial_variables)
execution_id = self.workflow_storage.save_execution(
self.workflow_storage.load_workflow_by_name(workflow_name).name, context
)
return {
"success": True,
"execution_id": execution_id,
"results": context.step_results,
"execution_log": context.execution_log,
}
def create_agent(self, role_name: str, agent_id: Optional[str] = None) -> str:
return self.agent_manager.create_agent(role_name, agent_id)
def agent_task(self, agent_id: str, task: str) -> Dict[str, Any]:
return self.agent_manager.execute_agent_task(agent_id, task)
def collaborate_agents(self, task: str, agent_roles: List[str]) -> Dict[str, Any]:
orchestrator_id = self.agent_manager.create_agent("orchestrator")
return self.agent_manager.collaborate_agents(orchestrator_id, task, agent_roles)
def search_knowledge(self, query: str, limit: int = KNOWLEDGE_SEARCH_LIMIT) -> List[Any]:
return self.knowledge_store.search_entries(query, top_k=limit)
def get_cache_statistics(self) -> Dict[str, Any]:
stats = {}
if self.api_cache:
stats["api_cache"] = self.api_cache.get_statistics()
if self.tool_cache:
stats["tool_cache"] = self.tool_cache.get_statistics()
return stats
def get_workflow_list(self) -> List[Dict[str, Any]]:
return self.workflow_storage.list_workflows()
def get_agent_summary(self) -> Dict[str, Any]:
return self.agent_manager.get_session_summary()
def get_knowledge_statistics(self) -> Dict[str, Any]:
return self.knowledge_store.get_statistics()
def get_conversation_history(self, limit: int = 10) -> List[Dict[str, Any]]:
return self.conversation_memory.get_recent_conversations(limit=limit)
def _get_labs_executor(self):
if self.labs_executor is None:
from rp.core.executor import create_labs_executor
self.labs_executor = create_labs_executor(
self,
output_dir="/tmp/rp_artifacts",
verbose=self.verbose
)
return self.labs_executor
def execute_labs_task(
self,
task: str,
initial_context: Optional[Dict[str, Any]] = None,
max_duration: int = 600,
max_cost: float = 1.0
) -> Dict[str, Any]:
executor = self._get_labs_executor()
return executor.execute(task, initial_context, max_duration, max_cost)
def execute_labs_task_simple(self, task: str) -> str:
executor = self._get_labs_executor()
return executor.execute_simple(task)
def plan_task(self, task: str) -> Dict[str, Any]:
intent = self.planner.parse_request(task)
plan = self.planner.create_plan(intent)
return {
"intent": {
"task_type": intent.task_type,
"complexity": intent.complexity,
"objective": intent.objective,
"required_tools": list(intent.required_tools),
"artifact_type": intent.artifact_type.value if intent.artifact_type else None,
"confidence": intent.confidence
},
"plan": {
"plan_id": plan.plan_id,
"objective": plan.objective,
"phases": [
{
"phase_id": p.phase_id,
"name": p.name,
"type": p.phase_type.value,
"tools": list(p.tools)
}
for p in plan.phases
],
"estimated_cost": plan.estimated_cost,
"estimated_duration": plan.estimated_duration
}
}
def generate_artifact(
self,
artifact_type: str,
data: Dict[str, Any],
title: str = "Generated Artifact"
) -> Dict[str, Any]:
from rp.core.models import ArtifactType
type_map = {
"dashboard": ArtifactType.DASHBOARD,
"report": ArtifactType.REPORT,
"spreadsheet": ArtifactType.SPREADSHEET,
"chart": ArtifactType.CHART,
"webapp": ArtifactType.WEBAPP,
"presentation": ArtifactType.PRESENTATION
}
art_type = type_map.get(artifact_type.lower())
if not art_type:
return {"error": f"Unknown artifact type: {artifact_type}. Valid types: {list(type_map.keys())}"}
artifact = self.artifact_generator.generate(art_type, data, title)
return {
"artifact_id": artifact.artifact_id,
"type": artifact.artifact_type.value,
"title": artifact.title,
"file_path": artifact.file_path,
"content_preview": artifact.content[:500] if artifact.content else ""
}
def get_labs_statistics(self) -> Dict[str, Any]:
executor = self._get_labs_executor()
return executor.get_statistics()
def get_tool_execution_statistics(self) -> Dict[str, Any]:
return self.tool_executor.get_statistics()
def get_config_value(self, key: str, default: Any = None) -> Any:
return self.config_manager.get(key, default)
def set_config_value(self, key: str, value: Any) -> bool:
result = self.config_manager.set(key, value)
return result.valid
def get_all_statistics(self) -> Dict[str, Any]:
return {
"tool_execution": self.get_tool_execution_statistics(),
"labs": self.get_labs_statistics() if self.labs_executor else {},
"cache": self.get_cache_statistics(),
"knowledge": self.get_knowledge_statistics(),
"usage": self.usage_tracker.get_summary()
}
def clear_caches(self):
if self.api_cache:
self.api_cache.clear_all()
if self.tool_cache:
self.tool_cache.clear_all()
logger.info("All caches cleared")
def cleanup(self):
if self.api_cache:
self.api_cache.clear_expired()
if self.tool_cache:
self.tool_cache.clear_expired()
self.agent_manager.clear_session()
self.memory_manager.cleanup()
# ===== Cleanup and Shutdown =====
def cleanup(self):
# Cleanup caches
if hasattr(self, "api_cache") and self.api_cache:
try:
self.api_cache.clear_expired()
except Exception as e:
logger.error(f"Error cleaning up API cache: {e}")
if hasattr(self, "tool_cache") and self.tool_cache:
try:
self.tool_cache.clear_expired()
except Exception as e:
logger.error(f"Error cleaning up tool cache: {e}")
# Cleanup agents
if hasattr(self, "agent_manager") and self.agent_manager:
try:
self.agent_manager.clear_session()
except Exception as e:
logger.error(f"Error cleaning up agents: {e}")
# Cleanup memory
if hasattr(self, "memory_manager") and self.memory_manager:
try:
self.memory_manager.cleanup()
except Exception as e:
logger.error(f"Error cleaning up memory: {e}")
if self.background_monitoring:
try:
stop_global_autonomous()
stop_global_monitor()
except Exception as e:
logger.error(f"Error stopping background monitoring: {e}")
try:
from rp.multiplexer import cleanup_all_multiplexers
cleanup_all_multiplexers()
except Exception as e:
logger.error(f"Error cleaning up multiplexers: {e}")
if hasattr(self, "db_manager") and self.db_manager:
try:
self.db_manager.disconnect()
except Exception as e:
logger.error(f"Error disconnecting database manager: {e}")
if self.db_conn:
self.db_conn.close()
def run(self):
try:
if self.args.autonomous:
self.run_autonomous()
elif self.args.interactive or (not self.args.message and sys.stdin.isatty()):
self.run_repl()
else:
self.run_single()
finally:
self.cleanup()
def process_message(assistant, message):
from rp.core.knowledge_context import inject_knowledge_context
# Save the user message as a fact
import time
import uuid
from rp.memory import KnowledgeEntry
categories = assistant.fact_extractor.categorize_content(message)
entry_id = str(uuid.uuid4())[:16]
entry = KnowledgeEntry(
entry_id=entry_id,
category=categories[0] if categories else "user_message",
content=message,
metadata={
"type": "user_message",
"confidence": 1.0,
"source": "user_input",
},
created_at=time.time(),
updated_at=time.time(),
)
assistant.knowledge_store.add_entry(entry)
assistant.messages.append({"role": "user", "content": str(entry)})
inject_knowledge_context(assistant, assistant.messages[-1]["content"], assistant.messages)
with ProgressIndicator("Updating memory..."):
assistant.graph_memory.populate_from_text(message)
logger.debug(f"Processing user message: {message[:100]}...")
logger.debug(f"Current message count: {len(assistant.messages)}")
with ProgressIndicator("Querying AI..."):
refresh_system_message(assistant.messages, assistant.args)
response = call_api(
assistant.messages,
assistant.model,
assistant.api_url,
assistant.api_key,
assistant.use_tools,
get_tools_definition(),
verbose=assistant.verbose,
db_conn=assistant.db_conn,
)
if "usage" in response:
usage = response["usage"]
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
assistant.usage_tracker.track_request(assistant.model, input_tokens, output_tokens)
cost = UsageTracker._calculate_cost(assistant.model, input_tokens, output_tokens)
total_cost = assistant.usage_tracker.session_usage["estimated_cost"]
print(f"{Colors.YELLOW}💰 Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
result = assistant.process_response(response)
if result != assistant.last_result:
formatted_result = assistant.format_output(result)
print(formatted_result)
assistant.last_result = result