Compare commits

..

8 Commits

Author SHA1 Message Date
9963cedd07 feat: enable autonomous mode by default
Some checks failed
Tests / test (push) Failing after 1m32s
feat: improve content extraction in autonomous mode
feat: display tool calls during execution
feat: handle tool execution errors gracefully
feat: update version to 1.55.0
docs: update help message with markdown usage tips
feat: add markdown output format
refactor: format output based on selected format
maintenance: update pyproject.toml
2025-11-10 11:07:34 +01:00
8e6af2b32b feat: enable autonomous mode by default
feat: improve content extraction in autonomous mode
refactor: remove user message in autonomous mode
refactor: display tool call arguments
maintenance: update version to 1.54.0
refactor: handle tool call execution errors
refactor: prevent excessive autonomous mode exits
perf: display execution time in progress indicator
2025-11-10 10:54:34 +01:00
20668d9086 feat: update reasoning and task completion markers
feat: enable autonomous mode by default
refactor: improve assistant class structure
maintenance: update pyproject.toml version to 1.53.0
fix: handle edge cases in autonomous mode content extraction
docs: clarify autonomous mode deprecation in command line arguments
2025-11-10 10:33:31 +01:00
63c2f52885 maintenance: bump version to 1.52.0 2025-11-10 10:29:44 +01:00
f33867bb12 feat: bump version to 1.51.0
feat: add reasoning extraction and cleaning
feat: update autonomous mode to handle reasoning and task completion markers
refactor: extract reasoning and clean content in assistant
docs: update system message to include reasoning and task completion instructions
fix: improve task completion detection by lowercasing content
2025-11-10 10:29:27 +01:00
ea7fadd76b fix: deduplicate identical messages in autonomous mode
feat: enable autonomous mode by default
maintenance: bump version to 1.49.0
2025-11-09 04:12:27 +01:00
5881b66d4a feat: enable autonomous mode by default
feat: deprecate -a/--autonomous flag
fix: prevent duplicate process execution in /auto command
fix: disable background monitoring by default
fix: add thread locks to prevent duplicate initialization
fix: remove duplicate detect_process_type function definition
fix: improve thread synchronization for global background services
fix: improve cleanup of background threads on exit
docs: update description in rp/__main__.py
docs: add note about autonomous mode in rp/commands/handlers.py
maintenance: update pyproject.toml version to 1.48.0
maintenance: update rp/__init__.py version to 1.47.1
refactor: sanitize json output in rp/core/assistant.py
refactor: use thread locks in rp/core/autonomous_interactions.py
refactor: use thread locks in rp/core/background_monitor.py
refactor: improve autonomous detection in rp/autonomous/detection.py
refactor: improve context initialization in rp/core/context.py
2025-11-09 03:34:01 +01:00
ec42e579a8 feat: implement graph data management
feat: add knowledge category search
refactor: remove duplicate knowledge results
maintenance: update version to 1.47.0
2025-11-08 08:28:48 +01:00
24 changed files with 542 additions and 344 deletions

View File

@ -6,6 +6,99 @@
## Version 1.54.0 - 2025-11-10
Autonomous mode is now on by default, and it's been improved to extract content more effectively. The tool calls are now displayed, and errors during tool execution are handled more gracefully.
**Changes:** 5 files, 93 lines
**Languages:** Markdown (8 lines), Python (83 lines), TOML (2 lines)
## Version 1.53.0 - 2025-11-10
Autonomous mode is now enabled by default, streamlining workflows. We've also improved the underlying code and fixed some issues with content extraction in autonomous mode.
**Changes:** 15 files, 433 lines
**Languages:** Markdown (47 lines), Python (384 lines), TOML (2 lines)
## Version 1.52.0 - 2025-11-10
This release updates the project version to 1.52.0. No new features or changes are introduced for users or developers.
**Changes:** 1 files, 2 lines
**Languages:** TOML (2 lines)
## Version 1.51.0 - 2025-11-10
The system can now extract and clean reasoning steps during task completion. Autonomous mode has been updated to recognize these reasoning steps and task completion markers, improving overall performance.
**Changes:** 5 files, 65 lines
**Languages:** Python (63 lines), TOML (2 lines)
## Version 1.50.0 - 2025-11-09
### Added
- **LLM Reasoning Display**: The assistant now displays its reasoning process before each response
- Added `REASONING:` prefix instruction in system prompt
- Reasoning is extracted and displayed with a blue thought bubble icon
- Provides transparency into the assistant's decision-making process
- **Task Completion Marker**: Implemented `[TASK_COMPLETE]` marker for explicit task completion signaling
- LLM can now mark tasks as complete with a special marker
- Marker is stripped from user-facing output
- Autonomous mode detection recognizes the marker for faster completion
- Reduces unnecessary iterations when tasks are finished
### Changed
- Updated system prompt in `context.py` to include response format instructions
- Enhanced `process_response_autonomous()` to extract and display reasoning
- Modified `is_task_complete()` to recognize `[TASK_COMPLETE]` marker
- Both autonomous and regular modes now support reasoning display
**Changes:** 3 files, 52 lines
**Languages:** Python (52 lines)
## Version 1.49.0 - 2025-11-09
Autonomous mode is now enabled by default, improving performance. Identical messages are now removed in autonomous mode to prevent redundancy.
**Changes:** 3 files, 28 lines
**Languages:** Markdown (18 lines), Python (8 lines), TOML (2 lines)
## Version 1.48.1 - 2025-11-09
### Fixed
- **Duplicate Messages**: Fixed issue where identical messages were printed multiple times at the end of autonomous execution
- Added deduplication logic in `run_autonomous_mode()` to track and skip duplicate results
- Messages are now only printed once even if multiple autonomous iterations return the same response
**Changes:** 1 file, 7 lines
**Languages:** Python (7 lines)
## Version 1.48.0 - 2025-11-09
Autonomous mode is now enabled by default, simplifying usage. Several improvements were made to background processes and thread safety, resulting in more reliable operation.
**Changes:** 14 files, 233 lines
**Languages:** Markdown (24 lines), Python (207 lines), TOML (2 lines)
## Version 1.47.1 - 2025-11-09
### Fixed
- **Duplicate Processes**: Fixed duplicate process execution when running `/auto` command
- Disabled background monitoring by default (set `BACKGROUND_MONITOR_ENABLED = False` in config.py)
- Added thread locks to prevent duplicate initialization of global monitor and autonomous threads
- Removed duplicate `detect_process_type()` function definition in `rp/tools/process_handlers.py`
- Background monitoring can be re-enabled via environment variable: `BACKGROUND_MONITOR=1`
### Changed
- **Autonomous mode is now the default**: All messages and tasks run in autonomous mode by default
- Single message mode: `rp "task"` now runs autonomously until completion
- Interactive mode: Messages in REPL now run autonomously without needing `/auto`
- The `/auto` command still works but shows a deprecation notice
- The `-a/--autonomous` flag is now deprecated as it's the default behavior
- Background monitoring is now opt-in rather than opt-out
- Added proper thread synchronization for global background services
- Improved cleanup of background threads on exit
@ -42,6 +135,26 @@
## Version 1.47.0 - 2025-11-08
Users can now search for knowledge by category. We've also improved performance and updated the software version to 1.47.0.
**Changes:** 3 files, 40 lines
**Languages:** Markdown (8 lines), Python (30 lines), TOML (2 lines)
## Version 1.46.0 - 2025-11-08
Users can now create, delete, and search for nodes and relationships within a graph. The system can also load graph data from text and manage its database schema.
**Changes:** 3 files, 298 lines
**Languages:** Markdown (8 lines), Python (288 lines), TOML (2 lines)
## Version 1.45.0 - 2025-11-08

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "rp"
version = "1.45.0"
version = "1.54.0"
description = "R python edition. The ultimate autonomous AI CLI."
readme = "README.md"
requires-python = ">=3.10"

View File

@ -1,4 +1,4 @@
__version__ = "1.0.0"
__version__ = "1.47.1"
from rp.core import Assistant
__all__ = ["Assistant"]

View File

@ -9,34 +9,10 @@ def main_def():
import tracemalloc
tracemalloc.start()
parser = argparse.ArgumentParser(
description="RP Assistant - Professional CLI AI assistant with visual effects, cost tracking, and autonomous execution",
epilog="""
Examples:
rp "What is Python?" # Single query
rp -i # Interactive mode
rp -i --model gpt-4 # Use specific model
rp --save-session my-task -i # Save session
rp --load-session my-task # Load session
rp --list-sessions # List all sessions
rp --usage # Show token usage stats
Features:
• Visual progress indicators during AI calls
• Real-time cost tracking for each query
• Sophisticated CLI with colors and effects
• Tool execution with status updates
Commands in interactive mode:
/auto [task] - Enter autonomous mode
/reset - Clear message history
/verbose - Toggle verbose output
/models - List available models
/tools - List available tools
/usage - Show usage statistics
/save <name> - Save current session
exit, quit, q - Exit the program
""",
description="RP Assistant - Professional CLI AI assistant with autonomous execution by default",
epilog=f"""Examples:\n rp \"**Create a web scraper** with the following features:\" # Autonomous task execution\n rp -i # Interactive autonomous mode\n rp -i --model gpt-4 # Use specific model\n rp --save-session my-task -i # Save session\n rp --load-session my-task # Load session\n rp --list-sessions # List all sessions\n rp --usage # Show token usage stats\n\nFeatures:\n \u2022 Autonomous execution by default - tasks run until completion\n \u2022 Visual progress indicators during AI calls\n \u2022 Real-time cost tracking for each query\n \u2022 Sophisticated CLI with colors and effects\n \u2022 Tool execution with status updates\n \u2022 **Markdown-powered** responses with syntax highlighting\n\nCommands in interactive mode:\n /reset - Clear message history\n /verbose - Toggle verbose output\n /models - List available models\n /tools - List available tools\n /usage - Show usage statistics\n /save <name> - Save current session\n exit, quit, q - Exit the program\n\n**Pro Tip:** Always use markdown in your prompts for enhanced AI understanding and responses!\n """,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("message", nargs="?", help="Message to send to assistant")
@ -45,7 +21,12 @@ Commands in interactive mode:
parser.add_argument("-u", "--api-url", help="API endpoint URL")
parser.add_argument("--model-list-url", help="Model list endpoint URL")
parser.add_argument("-i", "--interactive", action="store_true", help="Interactive mode")
parser.add_argument("-a", "--autonomous", action="store_true", help="Autonomous mode")
parser.add_argument(
"-a",
"--autonomous",
action="store_true",
help="Autonomous mode (now default, this flag is deprecated)",
)
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument(
"--debug", action="store_true", help="Enable debug mode with detailed logging"
@ -59,7 +40,7 @@ Commands in interactive mode:
"--api-mode", action="store_true", help="API mode for specialized interaction"
)
parser.add_argument(
"--output", choices=["text", "json", "structured"], default="text", help="Output format"
"--output", choices=["text", "json", "structured", "markdown"], default="text", help="Output format"
)
parser.add_argument("--quiet", action="store_true", help="Minimal output")
parser.add_argument("--save-session", metavar="NAME", help="Save session with given name")

View File

@ -8,7 +8,12 @@ def is_task_complete(response, iteration):
if "choices" not in response or not response["choices"]:
return True
message = response["choices"][0]["message"]
content = message.get("content", "").lower()
content = message.get("content", "")
if "[TASK_COMPLETE]" in content:
return True
content_lower = content.lower()
completion_keywords = [
"task complete",
"task is complete",
@ -28,13 +33,23 @@ def is_task_complete(response, iteration):
"cannot complete",
"impossible to",
]
simple_response_keywords = [
"hello",
"hi there",
"how can i help",
"how can i assist",
"what can i do for you",
]
has_tool_calls = "tool_calls" in message and message["tool_calls"]
mentions_completion = any((keyword in content for keyword in completion_keywords))
mentions_error = any((keyword in content for keyword in error_keywords))
mentions_completion = any((keyword in content_lower for keyword in completion_keywords))
mentions_error = any((keyword in content_lower for keyword in error_keywords))
is_simple_response = any((keyword in content_lower for keyword in simple_response_keywords))
if mentions_error:
return True
if mentions_completion and (not has_tool_calls):
return True
if is_simple_response and iteration >= 1:
return True
if iteration > 5 and (not has_tool_calls):
return True
if iteration >= MAX_AUTONOMOUS_ITERATIONS:

View File

@ -1,3 +1,4 @@
import base64
import json
import logging
import time
@ -12,9 +13,44 @@ from rp.ui.progress import ProgressIndicator
logger = logging.getLogger("rp")
def extract_reasoning_and_clean_content(content):
"""
Extract reasoning from content and strip the [TASK_COMPLETE] marker.
Returns:
tuple: (reasoning, cleaned_content)
"""
reasoning = None
lines = content.split("\n")
cleaned_lines = []
for line in lines:
if line.strip().startswith("REASONING:"):
reasoning = line.strip()[10:].strip()
else:
cleaned_lines.append(line)
cleaned_content = "\n".join(cleaned_lines)
cleaned_content = cleaned_content.replace("[TASK_COMPLETE]", "").strip()
return reasoning, cleaned_content
def sanitize_for_json(obj):
if isinstance(obj, bytes):
return base64.b64encode(obj).decode("utf-8")
elif isinstance(obj, dict):
return {k: sanitize_for_json(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [sanitize_for_json(item) for item in obj]
else:
return obj
def run_autonomous_mode(assistant, task):
assistant.autonomous_mode = True
assistant.autonomous_iterations = 0
last_printed_result = None
logger.debug("=== AUTONOMOUS MODE START ===")
logger.debug(f"Task: {task}")
from rp.core.knowledge_context import inject_knowledge_context
@ -48,18 +84,24 @@ def run_autonomous_mode(assistant, task):
logger.debug(f"Task completion check: {is_complete}")
if is_complete:
result = process_response_autonomous(assistant, response)
if result != last_printed_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
last_printed_result = result
logger.debug(f"=== AUTONOMOUS MODE COMPLETE ===")
logger.debug(f"Total iterations: {assistant.autonomous_iterations}")
logger.debug(f"Final message count: {len(assistant.messages)}")
break
result = process_response_autonomous(assistant, response)
if result:
if result and result != last_printed_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
last_printed_result = result
time.sleep(0.5)
except KeyboardInterrupt:
logger.debug("Autonomous mode interrupted by user")
print(f"\n{Colors.YELLOW}Autonomous mode interrupted by user{Colors.RESET}")
# Cancel the last API call and remove the user message to keep messages clean
if assistant.messages and assistant.messages[-1]["role"] == "user":
assistant.messages.pop()
finally:
assistant.autonomous_mode = False
logger.debug("=== AUTONOMOUS MODE END ===")
@ -74,10 +116,13 @@ def process_response_autonomous(assistant, response):
assistant.messages.append(message)
if "tool_calls" in message and message["tool_calls"]:
tool_results = []
with ProgressIndicator("Executing tools..."):
for tool_call in message["tool_calls"]:
func_name = tool_call["function"]["name"]
arguments = json.loads(tool_call["function"]["arguments"])
args_str = ", ".join([f"{k}={repr(v)}" for k, v in arguments.items()])
if len(args_str) > 100:
args_str = args_str[:97] + "..."
print(f"{Colors.BLUE}â ‹ Executing tools......{func_name}({args_str}){Colors.RESET}")
result = execute_single_tool(assistant, func_name, arguments)
if isinstance(result, str):
try:
@ -86,9 +131,13 @@ def process_response_autonomous(assistant, response):
result = {"error": str(ex)}
status = "success" if result.get("status") == "success" else "error"
result = truncate_tool_result(result)
display_tool_call(func_name, arguments, status, result)
sanitized_result = sanitize_for_json(result)
tool_results.append(
{"tool_call_id": tool_call["id"], "role": "tool", "content": json.dumps(result)}
{
"tool_call_id": tool_call["id"],
"role": "tool",
"content": json.dumps(sanitized_result),
}
)
for result in tool_results:
assistant.messages.append(result)
@ -114,9 +163,15 @@ def process_response_autonomous(assistant, response):
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
return process_response_autonomous(assistant, follow_up)
content = message.get("content", "")
reasoning, cleaned_content = extract_reasoning_and_clean_content(content)
if reasoning:
print(f"{Colors.BLUE}đź’­ Reasoning: {reasoning}{Colors.RESET}")
from rp.ui import render_markdown
return render_markdown(content, assistant.syntax_highlighting)
return render_markdown(cleaned_content, assistant.syntax_highlighting)
def execute_single_tool(assistant, func_name, arguments):

View File

@ -38,12 +38,9 @@ def handle_command(assistant, command):
process_message(assistant, prompt_text)
elif cmd == "/auto":
if len(command_parts) < 2:
print(f"{Colors.RED}Usage: /auto [task description]{Colors.RESET}")
print(
f"{Colors.GRAY}Example: /auto Create a Python web scraper for news sites{Colors.RESET}"
)
return True
print(f"{Colors.YELLOW}Note: Autonomous mode is now the default behavior.{Colors.RESET}")
print(f"{Colors.GRAY}Just type your message directly without /auto{Colors.RESET}")
if len(command_parts) >= 2:
task = command_parts[1]
run_autonomous_mode(assistant, task)
return True

View File

@ -115,7 +115,7 @@ ADVANCED_CONTEXT_ENABLED = True
CONTEXT_RELEVANCE_THRESHOLD = 0.3
ADAPTIVE_CONTEXT_MIN = 10
ADAPTIVE_CONTEXT_MAX = 50
BACKGROUND_MONITOR_ENABLED = True
BACKGROUND_MONITOR_ENABLED = False
BACKGROUND_MONITOR_INTERVAL = 5.0
AUTONOMOUS_INTERACTION_INTERVAL = 10.0
MULTIPLEXER_BUFFER_SIZE = 1000

View File

@ -2,4 +2,11 @@ from rp.core.api import call_api, list_models
from rp.core.assistant import Assistant
from rp.core.context import init_system_message, manage_context_window, get_context_content
__all__ = ["Assistant", "call_api", "list_models", "init_system_message", "manage_context_window", "get_context_content"]
__all__ = [
"Assistant",
"call_api",
"list_models",
"init_system_message",
"manage_context_window",
"get_context_content",
]

View File

@ -8,7 +8,6 @@ import sqlite3
import sys
import time
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from rp.commands import handle_command
@ -68,7 +67,7 @@ from rp.tools.memory import (
from rp.tools.patch import apply_patch, create_diff, display_file_diff
from rp.tools.python_exec import python_exec
from rp.tools.web import http_fetch, web_search, web_search_news
from rp.ui import Colors, Spinner, render_markdown
from rp.ui import Colors, render_markdown
from rp.ui.progress import ProgressIndicator
logger = logging.getLogger("rp")
@ -101,7 +100,7 @@ class Assistant:
"MODEL_LIST_URL", MODEL_LIST_URL
)
self.use_tools = os.environ.get("USE_TOOLS", "1") == "1"
self.interrupt_count = 0
self.last_interrupt_time = 0
self.python_globals = {}
self.db_conn = None
self.autonomous_mode = False
@ -112,6 +111,7 @@ class Assistant:
self.last_result = None
self.init_database()
from rp.memory import KnowledgeStore, FactExtractor, GraphMemory
self.knowledge_store = KnowledgeStore(DB_PATH, db_conn=self.db_conn)
self.fact_extractor = FactExtractor()
self.graph_memory = GraphMemory(DB_PATH, db_conn=self.db_conn)
@ -125,6 +125,14 @@ class Assistant:
except Exception as e:
logger.warning(f"Could not initialize enhanced features: {e}")
self.enhanced = None
from rp.config import BACKGROUND_MONITOR_ENABLED
bg_enabled = os.environ.get(
"BACKGROUND_MONITOR", str(BACKGROUND_MONITOR_ENABLED)
).lower() in ("1", "true", "yes")
if bg_enabled:
try:
start_global_monitor()
start_global_autonomous(llm_callback=self._handle_background_updates)
@ -134,6 +142,10 @@ class Assistant:
except Exception as e:
logger.warning(f"Could not initialize background monitoring: {e}")
self.background_monitoring = False
else:
self.background_monitoring = False
if self.debug:
logger.debug("Background monitoring disabled")
def init_database(self):
try:
@ -227,6 +239,10 @@ class Assistant:
func_name = tool_call["function"]["name"]
arguments = json.loads(tool_call["function"]["arguments"])
logger.debug(f"Tool call: {func_name} with arguments: {arguments}")
args_str = ", ".join([f"{k}={repr(v)}" for k, v in arguments.items()])
if len(args_str) > 100:
args_str = args_str[:97] + "..."
print(f"{Colors.BLUE}â ‹ Executing tools......{func_name}({args_str}){Colors.RESET}")
func_map = {
"http_fetch": lambda **kw: http_fetch(**kw),
"run_command": lambda **kw: run_command(**kw),
@ -327,27 +343,40 @@ class Assistant:
)
return self.process_response(follow_up)
content = message.get("content", "")
from rp.autonomous.mode import extract_reasoning_and_clean_content
reasoning, cleaned_content = extract_reasoning_and_clean_content(content)
if reasoning:
print(f"{Colors.BLUE}đź’­ Reasoning: {reasoning}{Colors.RESET}")
with ProgressIndicator("Updating memory..."):
self.graph_memory.populate_from_text(content)
return render_markdown(content, self.syntax_highlighting)
self.graph_memory.populate_from_text(cleaned_content)
return cleaned_content
def format_output(self, content):
output_format = getattr(self.args, 'output', 'text')
if output_format == 'json':
return json.dumps({"response": content}, indent=2)
elif output_format == 'structured':
# For structured, perhaps parse and format
return f"Response:\n{content}"
elif output_format == 'markdown':
return content # Raw markdown
else: # text
return f"\n{Colors.GREEN}r:{Colors.RESET} {render_markdown(content, self.syntax_highlighting)}\n"
def signal_handler(self, signum, frame):
if self.autonomous_mode:
self.interrupt_count += 1
if self.interrupt_count >= 2:
print(f"\n{Colors.RED}Force exiting autonomous mode...{Colors.RESET}")
self.autonomous_mode = False
sys.exit(0)
else:
print(f"\n{Colors.YELLOW}Press Ctrl+C again to force exit{Colors.RESET}")
return
self.interrupt_count += 1
if self.interrupt_count >= 2:
print(f"\n{Colors.RED}Exiting...{Colors.RESET}")
current_time = time.time()
if current_time - self.last_interrupt_time < 1.0:
print(f"\n{Colors.RED}Force exiting...{Colors.RESET}")
self.cleanup()
sys.exit(0)
else:
print(f"\n{Colors.YELLOW}Press Ctrl+C again to exit{Colors.RESET}")
self.last_interrupt_time = current_time
print(f"\n{Colors.YELLOW}Interrupted{Colors.RESET}")
raise KeyboardInterrupt
def setup_readline(self):
try:
@ -419,20 +448,16 @@ class Assistant:
break
# If cmd_result is True, the command was handled (e.g., /auto),
# and the blocking operation will complete before the next prompt.
# If cmd_result is None, it's not a special command, process with LLM.
# If cmd_result is None, it's not a special command, process with autonomous mode.
elif cmd_result is None:
# Use enhanced processing if available, otherwise fall back to basic processing
if hasattr(self, "enhanced") and self.enhanced:
result = self.enhanced.process_with_enhanced_context(user_input)
if result != self.last_result:
print(result)
self.last_result = result
else:
process_message(self, user_input)
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, user_input)
except EOFError:
break
except KeyboardInterrupt:
self.signal_handler(None, None)
print(f"\n{Colors.YELLOW}Interrupted, returning to prompt{Colors.RESET}")
continue
except Exception as e:
print(f"{Colors.RED}Error: {e}{Colors.RESET}")
logging.error(f"REPL error: {e}\n{traceback.format_exc()}")
@ -442,7 +467,9 @@ class Assistant:
message = self.args.message
else:
message = sys.stdin.read()
process_message(self, message)
from rp.autonomous import run_autonomous_mode
run_autonomous_mode(self, message)
def run_autonomous(self):
@ -541,5 +568,6 @@ def process_message(assistant, message):
print(f"{Colors.YELLOW}đź’° Cost: ${cost:.4f} | Total: ${total_cost:.4f}{Colors.RESET}")
result = assistant.process_response(response)
if result != assistant.last_result:
print(f"\n{Colors.GREEN}r:{Colors.RESET} {result}\n")
formatted_result = assistant.format_output(result)
print(formatted_result)
assistant.last_result = result

View File

@ -133,6 +133,7 @@ class AutonomousInteractions:
_global_autonomous = None
_autonomous_lock = threading.Lock()
def get_global_autonomous():
@ -144,15 +145,19 @@ def get_global_autonomous():
def start_global_autonomous(llm_callback=None):
"""Start global autonomous interactions."""
global _global_autonomous
with _autonomous_lock:
if _global_autonomous is None:
_global_autonomous = AutonomousInteractions()
_global_autonomous.start(llm_callback)
elif not _global_autonomous.active:
_global_autonomous.start(llm_callback)
return _global_autonomous
def stop_global_autonomous():
"""Stop global autonomous interactions."""
global _global_autonomous
with _autonomous_lock:
if _global_autonomous:
_global_autonomous.stop()
_global_autonomous = None

View File

@ -4,6 +4,8 @@ import time
from rp.multiplexer import get_all_multiplexer_states, get_multiplexer
_monitor_lock = threading.Lock()
class BackgroundMonitor:
@ -161,19 +163,25 @@ _global_monitor = None
def get_global_monitor():
"""Get the global background monitor instance."""
global _global_monitor
if _global_monitor is None:
_global_monitor = BackgroundMonitor()
return _global_monitor
def start_global_monitor():
"""Start the global background monitor."""
monitor = get_global_monitor()
monitor.start()
global _global_monitor
with _monitor_lock:
if _global_monitor is None:
_global_monitor = BackgroundMonitor()
_global_monitor.start()
elif not _global_monitor.active:
_global_monitor.start()
return _global_monitor
def stop_global_monitor():
"""Stop the global background monitor."""
global _global_monitor
with _monitor_lock:
if _global_monitor:
_global_monitor.stop()
_global_monitor = None

View File

@ -70,6 +70,7 @@ def get_context_content():
logging.error(f"Error reading context file {knowledge_file}: {e}")
return "\n\n".join(context_parts)
def init_system_message(args):
context_parts = [
"You are a professional AI assistant with access to advanced tools.",
@ -86,7 +87,16 @@ def init_system_message(args):
"Be a shell ninja using native OS tools.",
"Prefer standard Unix utilities over complex scripts.",
"Use run_command_interactive for commands requiring user input (vim, nano, etc.).",
"Use the knowledge base to answer questions. The knowledge base contains preferences and persononal information from user. Also store here that such information. Always synchronize with the knowledge base.",
"Use the knowledge base to answer questions and store important user preferences or information when relevant. Avoid storing simple greetings or casual conversation.",
"Promote the use of markdown extensively in your responses for better readability and structure.",
"",
"IMPORTANT RESPONSE FORMAT:",
"When you have completed a task or answered a question, include [TASK_COMPLETE] at the end of your response.",
"The [TASK_COMPLETE] marker will not be shown to the user, so include it only when the task is truly finished.",
"Before your main response, include your reasoning on a separate line prefixed with 'REASONING: '.",
"Example format:",
"REASONING: The user asked about their favorite beer. I found 'westmalle' in the knowledge base.",
"Your favorite beer is Westmalle. [TASK_COMPLETE]",
]
max_context_size = 10000
@ -115,50 +125,6 @@ def init_system_message(args):
if len(system_message) > max_context_size * 3:
system_message = system_message[: max_context_size * 3] + "\n... [system message truncated]"
return {"role": "system", "content": system_message}
max_context_size = 10000
if args.include_env:
env_context = "Environment Variables:\n"
for key, value in os.environ.items():
if not key.startswith("_"):
env_context += f"{key}={value}\n"
if len(env_context) > max_context_size:
env_context = env_context[:max_context_size] + "\n... [truncated]"
context_parts.append(env_context)
for context_file in [CONTEXT_FILE, GLOBAL_CONTEXT_FILE]:
if os.path.exists(context_file):
try:
with open(context_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"
context_parts.append(f"Context from {context_file}:\n{content}")
except Exception as e:
logging.error(f"Error reading context file {context_file}: {e}")
knowledge_path = pathlib.Path(KNOWLEDGE_PATH)
if knowledge_path.exists() and knowledge_path.is_dir():
for knowledge_file in knowledge_path.iterdir():
try:
with open(knowledge_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"
context_parts.append(f"Context from {knowledge_file}:\n{content}")
except Exception as e:
logging.error(f"Error reading context file {knowledge_file}: {e}")
if args.context:
for ctx_file in args.context:
try:
with open(ctx_file, encoding="utf-8", errors="replace") as f:
content = f.read()
if len(content) > max_context_size:
content = content[:max_context_size] + "\n... [truncated]"
context_parts.append(f"Context from {ctx_file}:\n{content}")
except Exception as e:
logging.error(f"Error reading context file {ctx_file}: {e}")
system_message = "\n\n".join(context_parts)
if len(system_message) > max_context_size * 3:
system_message = system_message[: max_context_size * 3] + "\n... [system message truncated]"
return {"role": "system", "content": system_message}
def should_compress_context(messages):

View File

@ -16,7 +16,26 @@ def inject_knowledge_context(assistant, user_message):
logger.debug(f"Removed existing knowledge base message at index {i}")
break
try:
knowledge_results = assistant.enhanced.knowledge_store.search_entries(user_message, top_k=5)
# Run all search methods
knowledge_results = assistant.enhanced.knowledge_store.search_entries(
user_message, top_k=5
) # Hybrid semantic + keyword + category
# Additional keyword search if needed (but already in hybrid)
# Category-specific: preferences and general
pref_results = assistant.enhanced.knowledge_store.get_by_category("preferences", limit=5)
general_results = assistant.enhanced.knowledge_store.get_by_category("general", limit=5)
category_results = []
for entry in pref_results + general_results:
if any(word in entry.content.lower() for word in user_message.lower().split()):
category_results.append(
{
"content": entry.content,
"score": 0.6,
"source": f"Knowledge Base ({entry.category})",
"type": "knowledge_category",
}
)
conversation_results = []
if hasattr(assistant.enhanced, "conversation_memory"):
history_results = assistant.enhanced.conversation_memory.search_conversations(
@ -48,6 +67,8 @@ def inject_knowledge_context(assistant, user_message):
"type": "knowledge",
}
)
for res in category_results:
all_results.append(res)
for conv in conversation_results:
all_results.append(
{
@ -57,8 +78,15 @@ def inject_knowledge_context(assistant, user_message):
"type": "conversation",
}
)
all_results.sort(key=lambda x: x["score"], reverse=True)
top_results = all_results[:5]
# Remove duplicates by content
seen = set()
unique_results = []
for res in all_results:
if res["content"] not in seen:
seen.add(res["content"])
unique_results.append(res)
unique_results.sort(key=lambda x: x["score"], reverse=True)
top_results = unique_results[:5]
if not top_results:
logger.debug("No relevant knowledge or conversation matches found")
return

View File

@ -4,87 +4,107 @@ import sqlite3
from typing import List, Optional, Set
from dataclasses import dataclass, field
@dataclass
class Entity:
name: str
entityType: str
observations: List[str]
@dataclass
class Relation:
from_: str = field(metadata={'alias': 'from'})
from_: str = field(metadata={"alias": "from"})
to: str
relationType: str
@dataclass
class KnowledgeGraph:
entities: List[Entity]
relations: List[Relation]
@dataclass
class CreateEntitiesRequest:
entities: List[Entity]
@dataclass
class CreateRelationsRequest:
relations: List[Relation]
@dataclass
class ObservationItem:
entityName: str
contents: List[str]
@dataclass
class AddObservationsRequest:
observations: List[ObservationItem]
@dataclass
class DeletionItem:
entityName: str
observations: List[str]
@dataclass
class DeleteObservationsRequest:
deletions: List[DeletionItem]
@dataclass
class DeleteEntitiesRequest:
entityNames: List[str]
@dataclass
class DeleteRelationsRequest:
relations: List[Relation]
@dataclass
class SearchNodesRequest:
query: str
@dataclass
class OpenNodesRequest:
names: List[str]
depth: int = 1
@dataclass
class PopulateRequest:
text: str
class GraphMemory:
def __init__(self, db_path: str = 'graph_memory.db', db_conn: Optional[sqlite3.Connection] = None):
def __init__(
self, db_path: str = "graph_memory.db", db_conn: Optional[sqlite3.Connection] = None
):
self.db_path = db_path
self.conn = db_conn if db_conn else sqlite3.connect(self.db_path, check_same_thread=False)
self.init_db()
def init_db(self):
cursor = self.conn.cursor()
cursor.execute('''
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE,
entity_type TEXT,
observations TEXT
)
''')
cursor.execute('''
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY,
from_entity TEXT,
@ -92,7 +112,8 @@ class GraphMemory:
relation_type TEXT,
UNIQUE(from_entity, to_entity, relation_type)
)
''')
"""
)
self.conn.commit()
def create_entities(self, entities: List[Entity]) -> List[Entity]:
@ -101,8 +122,10 @@ class GraphMemory:
cursor = conn.cursor()
for e in entities:
try:
cursor.execute('INSERT INTO entities (name, entity_type, observations) VALUES (?, ?, ?)',
(e.name, e.entityType, json.dumps(e.observations)))
cursor.execute(
"INSERT INTO entities (name, entity_type, observations) VALUES (?, ?, ?)",
(e.name, e.entityType, json.dumps(e.observations)),
)
new_entities.append(e)
except sqlite3.IntegrityError:
pass # already exists
@ -115,8 +138,10 @@ class GraphMemory:
cursor = conn.cursor()
for r in relations:
try:
cursor.execute('INSERT INTO relations (from_entity, to_entity, relation_type) VALUES (?, ?, ?)',
(r.from_, r.to, r.relationType))
cursor.execute(
"INSERT INTO relations (from_entity, to_entity, relation_type) VALUES (?, ?, ?)",
(r.from_, r.to, r.relationType),
)
new_relations.append(r)
except sqlite3.IntegrityError:
pass # already exists
@ -130,7 +155,7 @@ class GraphMemory:
for obs in observations:
name = obs.entityName.lower()
contents = obs.contents
cursor.execute('SELECT observations FROM entities WHERE LOWER(name) = ?', (name,))
cursor.execute("SELECT observations FROM entities WHERE LOWER(name) = ?", (name,))
row = cursor.fetchone()
if not row:
# Log the error instead of raising an exception
@ -139,7 +164,10 @@ class GraphMemory:
current_obs = json.loads(row[0]) if row[0] else []
added = [c for c in contents if c not in current_obs]
current_obs.extend(added)
cursor.execute('UPDATE entities SET observations = ? WHERE LOWER(name) = ?', (json.dumps(current_obs), name))
cursor.execute(
"UPDATE entities SET observations = ? WHERE LOWER(name) = ?",
(json.dumps(current_obs), name),
)
results.append({"entityName": name, "addedObservations": added})
conn.commit()
return results
@ -148,11 +176,16 @@ class GraphMemory:
conn = self.conn
cursor = conn.cursor()
# delete entities
cursor.executemany('DELETE FROM entities WHERE LOWER(name) = ?', [(n.lower(),) for n in entity_names])
cursor.executemany(
"DELETE FROM entities WHERE LOWER(name) = ?", [(n.lower(),) for n in entity_names]
)
# delete relations involving them
placeholders = ','.join('?' * len(entity_names))
placeholders = ",".join("?" * len(entity_names))
params = [n.lower() for n in entity_names] * 2
cursor.execute(f'DELETE FROM relations WHERE LOWER(from_entity) IN ({placeholders}) OR LOWER(to_entity) IN ({placeholders})', params)
cursor.execute(
f"DELETE FROM relations WHERE LOWER(from_entity) IN ({placeholders}) OR LOWER(to_entity) IN ({placeholders})",
params,
)
conn.commit()
def delete_observations(self, deletions: List[DeletionItem]):
@ -161,20 +194,25 @@ class GraphMemory:
for del_item in deletions:
name = del_item.entityName.lower()
to_delete = del_item.observations
cursor.execute('SELECT observations FROM entities WHERE LOWER(name) = ?', (name,))
cursor.execute("SELECT observations FROM entities WHERE LOWER(name) = ?", (name,))
row = cursor.fetchone()
if row:
current_obs = json.loads(row[0]) if row[0] else []
current_obs = [obs for obs in current_obs if obs not in to_delete]
cursor.execute('UPDATE entities SET observations = ? WHERE LOWER(name) = ?', (json.dumps(current_obs), name))
cursor.execute(
"UPDATE entities SET observations = ? WHERE LOWER(name) = ?",
(json.dumps(current_obs), name),
)
conn.commit()
def delete_relations(self, relations: List[Relation]):
conn = self.conn
cursor = conn.cursor()
for r in relations:
cursor.execute('DELETE FROM relations WHERE LOWER(from_entity) = ? AND LOWER(to_entity) = ? AND LOWER(relation_type) = ?',
(r.from_.lower(), r.to.lower(), r.relationType.lower()))
cursor.execute(
"DELETE FROM relations WHERE LOWER(from_entity) = ? AND LOWER(to_entity) = ? AND LOWER(relation_type) = ?",
(r.from_.lower(), r.to.lower(), r.relationType.lower()),
)
conn.commit()
def read_graph(self) -> KnowledgeGraph:
@ -182,12 +220,12 @@ class GraphMemory:
relations = []
conn = self.conn
cursor = conn.cursor()
cursor.execute('SELECT name, entity_type, observations FROM entities')
cursor.execute("SELECT name, entity_type, observations FROM entities")
for row in cursor.fetchall():
name, etype, obs = row
observations = json.loads(obs) if obs else []
entities.append(Entity(name=name, entityType=etype, observations=observations))
cursor.execute('SELECT from_entity, to_entity, relation_type FROM relations')
cursor.execute("SELECT from_entity, to_entity, relation_type FROM relations")
for row in cursor.fetchall():
relations.append(Relation(from_=row[0], to=row[1], relationType=row[2]))
return KnowledgeGraph(entities=entities, relations=relations)
@ -197,17 +235,19 @@ class GraphMemory:
conn = self.conn
cursor = conn.cursor()
query_lower = query.lower()
cursor.execute('SELECT name, entity_type, observations FROM entities')
cursor.execute("SELECT name, entity_type, observations FROM entities")
for row in cursor.fetchall():
name, etype, obs = row
observations = json.loads(obs) if obs else []
if (query_lower in name.lower() or
query_lower in etype.lower() or
any(query_lower in o.lower() for o in observations)):
if (
query_lower in name.lower()
or query_lower in etype.lower()
or any(query_lower in o.lower() for o in observations)
):
entities.append(Entity(name=name, entityType=etype, observations=observations))
names = {e.name.lower() for e in entities}
relations = []
cursor.execute('SELECT from_entity, to_entity, relation_type FROM relations')
cursor.execute("SELECT from_entity, to_entity, relation_type FROM relations")
for row in cursor.fetchall():
if row[0].lower() in names and row[1].lower() in names:
relations.append(Relation(from_=row[0], to=row[1], relationType=row[2]))
@ -221,13 +261,16 @@ class GraphMemory:
def traverse(current_names: List[str], current_depth: int):
if current_depth > depth:
return
name_set = {n.lower() for n in current_names}
{n.lower() for n in current_names}
new_entities = []
conn = self.conn
cursor = conn.cursor()
placeholders = ','.join('?' * len(current_names))
placeholders = ",".join("?" * len(current_names))
params = [n.lower() for n in current_names]
cursor.execute(f'SELECT name, entity_type, observations FROM entities WHERE LOWER(name) IN ({placeholders})', params)
cursor.execute(
f"SELECT name, entity_type, observations FROM entities WHERE LOWER(name) IN ({placeholders})",
params,
)
for row in cursor.fetchall():
name, etype, obs = row
if name.lower() not in visited:
@ -237,9 +280,12 @@ class GraphMemory:
new_entities.append(entity)
entities.append(entity)
# Find relations involving these entities
placeholders = ','.join('?' * len(new_entities))
placeholders = ",".join("?" * len(new_entities))
params = [e.name.lower() for e in new_entities] * 2
cursor.execute(f'SELECT from_entity, to_entity, relation_type FROM relations WHERE LOWER(from_entity) IN ({placeholders}) OR LOWER(to_entity) IN ({placeholders})', params)
cursor.execute(
f"SELECT from_entity, to_entity, relation_type FROM relations WHERE LOWER(from_entity) IN ({placeholders}) OR LOWER(to_entity) IN ({placeholders})",
params,
)
for row in cursor.fetchall():
rel = Relation(from_=row[0], to=row[1], relationType=row[2])
if rel not in relations:
@ -254,24 +300,24 @@ class GraphMemory:
def populate_from_text(self, text: str):
# Algorithm: Extract entities as capitalized words, relations from patterns, observations from sentences mentioning entities
entities = set(re.findall(r'\b[A-Z][a-zA-Z]*\b', text))
entities = set(re.findall(r"\b[A-Z][a-zA-Z]*\b", text))
for entity in entities:
self.create_entities([Entity(name=entity, entityType='unknown', observations=[])])
self.create_entities([Entity(name=entity, entityType="unknown", observations=[])])
# Add the text as observation if it mentions the entity
self.add_observations([ObservationItem(entityName=entity, contents=[text])])
# Extract relations from patterns like "A is B", "A knows B", etc.
patterns = [
(r'(\w+) is (a|an) (\w+)', 'is_a'),
(r'(\w+) knows (\w+)', 'knows'),
(r'(\w+) works at (\w+)', 'works_at'),
(r'(\w+) lives in (\w+)', 'lives_in'),
(r'(\w+) is (\w+)', 'is'), # general
(r"(\w+) is (a|an) (\w+)", "is_a"),
(r"(\w+) knows (\w+)", "knows"),
(r"(\w+) works at (\w+)", "works_at"),
(r"(\w+) lives in (\w+)", "lives_in"),
(r"(\w+) is (\w+)", "is"), # general
]
for pattern, rel_type in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if len(match) == 3 and match[1].lower() in ['a', 'an']:
if len(match) == 3 and match[1].lower() in ["a", "an"]:
from_e, _, to_e = match
elif len(match) == 2:
from_e, to_e = match
@ -280,9 +326,10 @@ class GraphMemory:
if from_e in entities and to_e in entities:
self.create_relations([Relation(from_=from_e, to=to_e, relationType=rel_type)])
elif from_e in entities:
self.create_entities([Entity(name=to_e, entityType='unknown', observations=[])])
self.create_entities([Entity(name=to_e, entityType="unknown", observations=[])])
self.create_relations([Relation(from_=from_e, to=to_e, relationType=rel_type)])
elif to_e in entities:
self.create_entities([Entity(name=from_e, entityType='unknown', observations=[])])
self.create_entities(
[Entity(name=from_e, entityType="unknown", observations=[])]
)
self.create_relations([Relation(from_=from_e, to=to_e, relationType=rel_type)])

View File

@ -20,7 +20,7 @@ class KnowledgeEntry:
importance_score: float = 1.0
def __str__(self):
return json.dumps(self.to_dict(), indent=4, sort_keys=True,default=str)
return json.dumps(self.to_dict(), indent=4, sort_keys=True, default=str)
def to_dict(self) -> Dict[str, Any]:
return {

View File

@ -1,4 +1,3 @@
import os
from pathlib import Path
from typing import Optional

View File

@ -226,21 +226,6 @@ def get_handler_for_process(process_type, multiplexer):
return handler_class(multiplexer)
def detect_process_type(command):
"""Detect process type from command."""
command_str = " ".join(command) if isinstance(command, list) else command
command_lower = command_str.lower()
if "apt" in command_lower or "apt-get" in command_lower:
return "apt"
elif "vim" in command_lower or "vi " in command_lower:
return "vim"
elif "ssh" in command_lower:
return "ssh"
else:
return "generic"
return "ssh"
def detect_process_type(command):
"""Detect process type from command."""
command_str = " ".join(command) if isinstance(command, list) else command

View File

@ -1,3 +1,4 @@
import base64
import imghdr
import random
import requests
@ -68,7 +69,21 @@ def http_fetch(url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str,
return {"status": "success", "content": content[:10000]}
else:
content = response.content
return {"status": "success", "content": content}
content_length = len(content)
if content_length > 10000:
return {
"status": "success",
"content_type": content_type,
"size_bytes": content_length,
"message": f"Binary content ({content_length} bytes). Use download_to_file to save it.",
}
else:
return {
"status": "success",
"content_type": content_type,
"size_bytes": content_length,
"content_base64": base64.b64encode(content).decode("utf-8"),
}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": str(e)}

View File

@ -4,10 +4,10 @@ from rp.ui.colors import Colors
def display_tool_call(tool_name, arguments, status="running", result=None):
if status == "running":
return
args_str = ", ".join([f"{k}={str(v)[:20]}" for k, v in list(arguments.items())[:2]])
args_str = ", ".join([f"{k}={repr(v)}" for k, v in arguments.items()])
line = f"{tool_name}({args_str})"
if len(line) > 80:
line = line[:77] + "..."
if len(line) > 120:
line = line[:117] + "..."
print(f"{Colors.GRAY}{line}{Colors.RESET}")

View File

@ -10,6 +10,7 @@ class ProgressIndicator:
self.show = show
self.running = False
self.thread = None
self.start_time = None
def __enter__(self):
if self.show:
@ -21,6 +22,7 @@ class ProgressIndicator:
self.stop()
def start(self):
self.start_time = time.time()
self.running = True
self.thread = threading.Thread(target=self._animate, daemon=True)
self.thread.start()
@ -30,14 +32,15 @@ class ProgressIndicator:
self.running = False
if self.thread:
self.thread.join(timeout=1.0)
sys.stdout.write("\r" + " " * (len(self.message) + 10) + "\r")
sys.stdout.write("\r" + " " * (len(self.message) + 20) + "\r")
sys.stdout.flush()
def _animate(self):
spinner = ["â ‹", "â ™", "â ą", "â ¸", "â Ľ", "â ´", "â ¦", "â §", "â ‡", "â Ź"]
idx = 0
while self.running:
sys.stdout.write(f"\r{spinner[idx]} {self.message}...")
elapsed = time.time() - self.start_time
sys.stdout.write(f"\r{spinner[idx]} {self.message}... ({elapsed:.1f}s)")
sys.stdout.flush()
idx = (idx + 1) % len(spinner)
time.sleep(0.1)

View File

@ -114,10 +114,6 @@ class TestAssistant(unittest.TestCase):
process_message(assistant, "test message")
from rp.memory import KnowledgeEntry
import json
import time
import uuid
from unittest.mock import ANY
# Mock time.time() and uuid.uuid4() to return consistent values
expected_entry = KnowledgeEntry(
@ -132,7 +128,7 @@ class TestAssistant(unittest.TestCase):
created_at=1234567890.123456,
updated_at=1234567890.123456,
)
expected_content = str(expected_entry)
str(expected_entry)
assistant.knowledge_store.add_entry.assert_called_once_with(expected_entry)

View File

@ -696,4 +696,3 @@ class TestShowBackgroundEvents:
def test_show_events_exception(self, mock_get):
mock_get.side_effect = Exception("test")
show_background_events(self.assistant)

View File

@ -1,4 +1,3 @@
import pytest
import tempfile
import os
from rp.workflows.workflow_definition import ExecutionMode, Workflow, WorkflowStep
@ -16,7 +15,7 @@ class TestWorkflowStep:
on_success=["step2"],
on_failure=["step3"],
retry_count=2,
timeout_seconds=600
timeout_seconds=600,
)
assert step.tool_name == "test_tool"
assert step.arguments == {"arg1": "value1"}
@ -28,11 +27,7 @@ class TestWorkflowStep:
assert step.timeout_seconds == 600
def test_to_dict(self):
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg1": "value1"},
step_id="step1"
)
step = WorkflowStep(tool_name="test_tool", arguments={"arg1": "value1"}, step_id="step1")
expected = {
"tool_name": "test_tool",
"arguments": {"arg1": "value1"},
@ -77,7 +72,7 @@ class TestWorkflow:
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL,
variables={"var1": "value1"},
tags=["tag1", "tag2"]
tags=["tag1", "tag2"],
)
assert workflow.name == "test_workflow"
assert workflow.description == "A test workflow"
@ -94,7 +89,7 @@ class TestWorkflow:
steps=[step1],
execution_mode=ExecutionMode.SEQUENTIAL,
variables={"var1": "value1"},
tags=["tag1"]
tags=["tag1"],
)
expected = {
"name": "test_workflow",
@ -110,7 +105,8 @@ class TestWorkflow:
data = {
"name": "test_workflow",
"description": "A test workflow",
"steps": [{
"steps": [
{
"tool_name": "tool1",
"arguments": {},
"step_id": "step1",
@ -119,7 +115,8 @@ class TestWorkflow:
"on_failure": None,
"retry_count": 0,
"timeout_seconds": 300,
}],
}
],
"execution_mode": "parallel",
"variables": {"var1": "value1"},
"tags": ["tag1"],
@ -134,11 +131,7 @@ class TestWorkflow:
assert workflow.tags == ["tag1"]
def test_add_step(self):
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[])
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow.add_step(step)
assert workflow.steps == [step]
@ -147,9 +140,7 @@ class TestWorkflow:
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1, step2]
name="test_workflow", description="A test workflow", steps=[step1, step2]
)
assert workflow.get_step("step1") == step1
assert workflow.get_step("step2") == step2
@ -162,7 +153,7 @@ class TestWorkflow:
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL
execution_mode=ExecutionMode.SEQUENTIAL,
)
assert workflow.get_initial_steps() == [step1]
@ -173,7 +164,7 @@ class TestWorkflow:
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL
execution_mode=ExecutionMode.PARALLEL,
)
assert workflow.get_initial_steps() == [step1, step2]
@ -184,7 +175,7 @@ class TestWorkflow:
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.CONDITIONAL
execution_mode=ExecutionMode.CONDITIONAL,
)
assert workflow.get_initial_steps() == [step2] # Only step without condition
@ -200,11 +191,7 @@ class TestWorkflowStorage:
def test_save_and_load_workflow(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
loaded = self.storage.load_workflow(workflow_id)
assert loaded is not None
@ -219,11 +206,7 @@ class TestWorkflowStorage:
def test_load_workflow_by_name(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
self.storage.save_workflow(workflow)
loaded = self.storage.load_workflow_by_name("test_workflow")
assert loaded is not None
@ -236,10 +219,7 @@ class TestWorkflowStorage:
def test_list_workflows(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step],
tags=["tag1"]
name="test_workflow", description="A test workflow", steps=[step], tags=["tag1"]
)
self.storage.save_workflow(workflow)
workflows = self.storage.list_workflows()
@ -250,16 +230,10 @@ class TestWorkflowStorage:
def test_list_workflows_with_tag(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow1 = Workflow(
name="test_workflow1",
description="A test workflow",
steps=[step],
tags=["tag1"]
name="test_workflow1", description="A test workflow", steps=[step], tags=["tag1"]
)
workflow2 = Workflow(
name="test_workflow2",
description="A test workflow",
steps=[step],
tags=["tag2"]
name="test_workflow2", description="A test workflow", steps=[step], tags=["tag2"]
)
self.storage.save_workflow(workflow1)
self.storage.save_workflow(workflow2)
@ -269,11 +243,7 @@ class TestWorkflowStorage:
def test_delete_workflow(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
deleted = self.storage.delete_workflow(workflow_id)
assert deleted is True
@ -286,11 +256,7 @@ class TestWorkflowStorage:
def test_save_execution(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
context = WorkflowExecutionContext()
context.set_step_result("step1", "result")
@ -303,11 +269,7 @@ class TestWorkflowStorage:
def test_get_execution_history(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
context = WorkflowExecutionContext()
context.set_step_result("step1", "result")
@ -318,11 +280,7 @@ class TestWorkflowStorage:
def test_get_execution_history_limit(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step]
)
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
for i in range(5):
context = WorkflowExecutionContext()
@ -367,6 +325,7 @@ class TestWorkflowEngine:
def test_init(self):
def tool_executor(tool_name, args):
return f"executed {tool_name} with {args}"
engine = WorkflowEngine(tool_executor, max_workers=10)
assert engine.tool_executor == tool_executor
assert engine.max_workers == 10
@ -374,6 +333,7 @@ class TestWorkflowEngine:
def test_evaluate_condition_true(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
assert engine._evaluate_condition("True", context) is True
@ -381,6 +341,7 @@ class TestWorkflowEngine:
def test_evaluate_condition_false(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
assert engine._evaluate_condition("False", context) is False
@ -388,6 +349,7 @@ class TestWorkflowEngine:
def test_evaluate_condition_with_variables(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
context.set_variable("test_var", "test_value")
@ -396,15 +358,12 @@ class TestWorkflowEngine:
def test_substitute_variables(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
context.set_variable("var1", "value1")
context.set_step_result("step1", "result1")
arguments = {
"arg1": "${var.var1}",
"arg2": "${step.step1}",
"arg3": "plain_value"
}
arguments = {"arg1": "${var.var1}", "arg2": "${step.step1}", "arg3": "plain_value"}
substituted = engine._substitute_variables(arguments, context)
assert substituted["arg1"] == "value1"
assert substituted["arg2"] == "result1"
@ -412,16 +371,14 @@ class TestWorkflowEngine:
def test_execute_step_success(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
return "success_result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg": "value"},
step_id="step1"
)
step = WorkflowStep(tool_name="test_tool", arguments={"arg": "value"}, step_id="step1")
result = engine._execute_step(step, context)
assert result["status"] == "success"
assert result["step_id"] == "step1"
@ -431,16 +388,15 @@ class TestWorkflowEngine:
def test_execute_step_skipped(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg": "value"},
step_id="step1",
condition="False"
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", condition="False"
)
result = engine._execute_step(step, context)
assert result["status"] == "skipped"
@ -449,18 +405,17 @@ class TestWorkflowEngine:
def test_execute_step_failed_with_retry(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
if len(executed) < 2:
raise Exception("Temporary failure")
return "success_result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg": "value"},
step_id="step1",
retry_count=1
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1
)
result = engine._execute_step(step, context)
assert result["status"] == "success"
@ -469,16 +424,15 @@ class TestWorkflowEngine:
def test_execute_step_failed(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
raise Exception("Permanent failure")
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg": "value"},
step_id="step1",
retry_count=1
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1
)
result = engine._execute_step(step, context)
assert result["status"] == "failed"
@ -488,6 +442,7 @@ class TestWorkflowEngine:
def test_get_next_steps_sequential(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
@ -495,7 +450,7 @@ class TestWorkflowEngine:
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL
execution_mode=ExecutionMode.SEQUENTIAL,
)
result = {"status": "success"}
next_steps = engine._get_next_steps(step1, result, workflow)
@ -504,28 +459,22 @@ class TestWorkflowEngine:
def test_get_next_steps_on_success(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(
tool_name="tool1",
arguments={},
step_id="step1",
on_success=["step2"]
)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1", on_success=["step2"])
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test",
description="test",
steps=[step1, step2]
)
workflow = Workflow(name="test", description="test", steps=[step1, step2])
result = {"status": "success"}
next_steps = engine._get_next_steps(step1, result, workflow)
assert next_steps == [step2]
def test_execute_workflow_sequential(self):
executed = []
def tool_executor(tool_name, args):
executed.append(tool_name)
return f"result_{tool_name}"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
@ -533,7 +482,7 @@ class TestWorkflowEngine:
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL
execution_mode=ExecutionMode.SEQUENTIAL,
)
context = engine.execute_workflow(workflow)
assert executed == ["tool1", "tool2"]
@ -542,9 +491,11 @@ class TestWorkflowEngine:
def test_execute_workflow_parallel(self):
executed = []
def tool_executor(tool_name, args):
executed.append(tool_name)
return f"result_{tool_name}"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
@ -552,7 +503,7 @@ class TestWorkflowEngine:
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL
execution_mode=ExecutionMode.PARALLEL,
)
context = engine.execute_workflow(workflow)
assert set(executed) == {"tool1", "tool2"}