import logging
import re
from typing import Any, Dict, List, Optional, Set, Tuple
from .models import (
ArtifactType,
Phase,
PhaseType,
ProjectPlan,
TaskIntent,
ToolCall,
)
logger = logging.getLogger("rp")
class ProjectPlanner:
def __init__(self):
self.task_patterns = self._init_task_patterns()
self.tool_mappings = self._init_tool_mappings()
self.artifact_indicators = self._init_artifact_indicators()
def _init_task_patterns(self) -> Dict[str, List[str]]:
return {
"research": [
r"\b(research|investigate|find out|discover|learn about|study)\b",
r"\b(search|look up|find information|gather data)\b",
r"\b(analyze|compare|evaluate|assess)\b",
],
"coding": [
r"\b(write|create|implement|develop|build|code)\b.*\b(function|class|script|program|code|app)\b",
r"\b(fix|debug|solve|repair)\b.*\b(bug|error|issue|problem)\b",
r"\b(refactor|optimize|improve)\b.*\b(code|function|class|performance)\b",
],
"data_processing": [
r"\b(download|fetch|scrape|crawl|extract)\b",
r"\b(process|transform|convert|parse|clean)\b.*\b(data|file|document)\b",
r"\b(merge|combine|aggregate|consolidate)\b",
],
"file_operations": [
r"\b(move|copy|rename|delete|organize)\b.*\b(file|folder|directory)\b",
r"\b(find|search|locate)\b.*\b(file|duplicate|empty)\b",
r"\b(sync|backup|archive)\b",
],
"visualization": [
r"\b(create|generate|make|build)\b.*\b(chart|graph|dashboard|visualization)\b",
r"\b(visualize|plot|display)\b",
r"\b(report|summary|overview)\b",
],
"automation": [
r"\b(automate|schedule|batch|bulk)\b",
r"\b(workflow|pipeline|process)\b",
r"\b(monitor|watch|track)\b",
],
}
def _init_tool_mappings(self) -> Dict[str, Set[str]]:
return {
"research": {"web_search", "http_fetch", "deep_research", "research_info"},
"coding": {"read_file", "write_file", "python_exec", "search_replace", "run_command"},
"data_processing": {"scrape_images", "crawl_and_download", "bulk_download_urls", "python_exec", "http_fetch"},
"file_operations": {"bulk_move_rename", "find_duplicates", "cleanup_directory", "sync_directory", "organize_files", "batch_rename"},
"visualization": {"python_exec", "write_file"},
"database": {"db_query", "db_get", "db_set"},
"analysis": {"python_exec", "grep", "glob_files", "read_file"},
}
def _init_artifact_indicators(self) -> Dict[ArtifactType, List[str]]:
return {
ArtifactType.REPORT: ["report", "summary", "document", "analysis", "findings"],
ArtifactType.DASHBOARD: ["dashboard", "visualization", "monitor", "overview"],
ArtifactType.SPREADSHEET: ["spreadsheet", "csv", "excel", "table", "data"],
ArtifactType.WEBAPP: ["webapp", "web app", "application", "interface", "ui"],
ArtifactType.CHART: ["chart", "graph", "plot", "visualization"],
ArtifactType.CODE: ["script", "program", "function", "class", "module"],
ArtifactType.DATA: ["data", "dataset", "json", "database"],
}
def parse_request(self, user_request: str) -> TaskIntent:
request_lower = user_request.lower()
task_types = self._identify_task_types(request_lower)
required_tools = self._identify_required_tools(task_types, request_lower)
data_sources = self._extract_data_sources(user_request)
artifact_type = self._identify_artifact_type(request_lower)
constraints = self._extract_constraints(user_request)
complexity = self._estimate_complexity(user_request, task_types, required_tools)
primary_task_type = task_types[0] if task_types else "general"
intent = TaskIntent(
objective=user_request,
task_type=primary_task_type,
required_tools=required_tools,
data_sources=data_sources,
artifact_type=artifact_type,
constraints=constraints,
complexity=complexity,
confidence=self._calculate_confidence(task_types, required_tools, artifact_type)
)
logger.debug(f"Parsed task intent: {intent}")
return intent
def _identify_task_types(self, request: str) -> List[str]:
identified = []
for task_type, patterns in self.task_patterns.items():
for pattern in patterns:
if re.search(pattern, request, re.IGNORECASE):
if task_type not in identified:
identified.append(task_type)
break
return identified if identified else ["general"]
def _identify_required_tools(self, task_types: List[str], request: str) -> Set[str]:
tools = set()
for task_type in task_types:
if task_type in self.tool_mappings:
tools.update(self.tool_mappings[task_type])
if re.search(r"\burl\b|https?://|website|webpage", request):
tools.update({"http_fetch", "web_search"})
if re.search(r"\bimage|photo|picture|png|jpg|jpeg", request):
tools.update({"scrape_images", "download_to_file"})
if re.search(r"\bfile|directory|folder", request):
tools.update({"read_file", "list_directory", "write_file"})
if re.search(r"\bpython|script|code|execute", request):
tools.add("python_exec")
if re.search(r"\bcommand|terminal|shell|bash", request):
tools.add("run_command")
return tools
def _extract_data_sources(self, request: str) -> List[str]:
sources = []
url_pattern = r'https?://[^\s<>"\']+|www\.[^\s<>"\']+'
urls = re.findall(url_pattern, request)
sources.extend(urls)
path_pattern = r'(?:^|[\s"])([/~][^\s<>"\']+|[A-Za-z]:\\[^\s<>"\']+)'
paths = re.findall(path_pattern, request)
sources.extend(paths)
return sources
def _identify_artifact_type(self, request: str) -> Optional[ArtifactType]:
for artifact_type, indicators in self.artifact_indicators.items():
for indicator in indicators:
if indicator in request:
return artifact_type
return None
def _extract_constraints(self, request: str) -> Dict[str, Any]:
constraints = {}
size_match = re.search(r'(\d+)\s*(kb|mb|gb)', request, re.IGNORECASE)
if size_match:
value = int(size_match.group(1))
unit = size_match.group(2).lower()
multipliers = {"kb": 1024, "mb": 1024*1024, "gb": 1024*1024*1024}
constraints["size_bytes"] = value * multipliers.get(unit, 1)
time_match = re.search(r'(\d+)\s*(day|week|month|hour|minute)s?', request, re.IGNORECASE)
if time_match:
constraints["time_constraint"] = {
"value": int(time_match.group(1)),
"unit": time_match.group(2).lower()
}
if "only" in request or "just" in request:
ext_match = re.search(r'\.(jpg|jpeg|png|gif|pdf|csv|txt|json|xml|html|py|js)', request, re.IGNORECASE)
if ext_match:
constraints["file_extension"] = ext_match.group(1).lower()
return constraints
def _estimate_complexity(self, request: str, task_types: List[str], tools: Set[str]) -> str:
score = 0
score += len(task_types) * 2
score += len(tools)
score += len(request.split()) // 20
complex_indicators = ["analyze", "compare", "optimize", "automate", "integrate", "comprehensive"]
for indicator in complex_indicators:
if indicator in request.lower():
score += 2
if score <= 5:
return "simple"
elif score <= 12:
return "medium"
else:
return "complex"
def _calculate_confidence(self, task_types: List[str], tools: Set[str], artifact_type: Optional[ArtifactType]) -> float:
confidence = 0.5
if task_types and task_types[0] != "general":
confidence += 0.2
if tools:
confidence += min(0.2, len(tools) * 0.03)
if artifact_type:
confidence += 0.1
return min(1.0, confidence)
def create_plan(self, intent: TaskIntent) -> ProjectPlan:
plan = ProjectPlan.create(objective=intent.objective)
plan.artifact_type = intent.artifact_type
plan.constraints = intent.constraints
phases = self._generate_phases(intent)
for i, phase in enumerate(phases):
depends_on = [phases[j].phase_id for j in range(i) if self._has_dependency(phases[j], phase)]
plan.add_phase(phase, depends_on=depends_on if depends_on else None)
plan.estimated_cost = self._estimate_cost(phases)
plan.estimated_duration = self._estimate_duration(phases)
logger.info(f"Created plan with {len(phases)} phases, est. cost: ${plan.estimated_cost:.2f}, est. duration: {plan.estimated_duration}s")
return plan
def _generate_phases(self, intent: TaskIntent) -> List[Phase]:
phases = []
if intent.data_sources or "research" in intent.task_type or "http_fetch" in intent.required_tools:
discovery_phase = Phase.create(
name="Discovery",
phase_type=PhaseType.DISCOVERY,
description="Gather data and information from sources",
outputs=["raw_data", "source_info"]
)
discovery_phase.tools = self._create_discovery_tools(intent)
phases.append(discovery_phase)
if intent.task_type in ["data_processing", "file_operations"] or len(intent.required_tools) > 3:
analysis_phase = Phase.create(
name="Analysis",
phase_type=PhaseType.ANALYSIS,
description="Process and analyze collected data",
outputs=["processed_data", "insights"]
)
analysis_phase.tools = self._create_analysis_tools(intent)
phases.append(analysis_phase)
if intent.task_type in ["coding", "automation"]:
transform_phase = Phase.create(
name="Transformation",
phase_type=PhaseType.TRANSFORMATION,
description="Execute transformations and operations",
outputs=["transformed_data", "execution_results"]
)
transform_phase.tools = self._create_transformation_tools(intent)
phases.append(transform_phase)
if intent.artifact_type:
artifact_phase = Phase.create(
name="Artifact Generation",
phase_type=PhaseType.ARTIFACT,
description=f"Generate {intent.artifact_type.value} artifact",
outputs=["artifact"]
)
artifact_phase.tools = self._create_artifact_tools(intent)
phases.append(artifact_phase)
if intent.complexity == "complex":
verify_phase = Phase.create(
name="Verification",
phase_type=PhaseType.VERIFICATION,
description="Verify results and quality",
outputs=["verification_report"]
)
phases.append(verify_phase)
if not phases:
default_phase = Phase.create(
name="Execution",
phase_type=PhaseType.TRANSFORMATION,
description="Execute the requested task",
outputs=["result"]
)
default_phase.tools = [ToolCall(tool_name=t, arguments={}) for t in list(intent.required_tools)[:5]]
phases.append(default_phase)
return phases
def _create_discovery_tools(self, intent: TaskIntent) -> List[ToolCall]:
tools = []
for source in intent.data_sources:
if source.startswith(("http://", "https://", "www.")):
if any(ext in source.lower() for ext in [".jpg", ".png", ".gif", "image"]):
tools.append(ToolCall(
tool_name="scrape_images",
arguments={"url": source, "destination_dir": "/tmp/downloads"}
))
else:
tools.append(ToolCall(
tool_name="http_fetch",
arguments={"url": source}
))
if "web_search" in intent.required_tools and not intent.data_sources:
tools.append(ToolCall(
tool_name="web_search",
arguments={"query": intent.objective[:100]}
))
return tools
def _create_analysis_tools(self, intent: TaskIntent) -> List[ToolCall]:
tools = []
if "python_exec" in intent.required_tools:
tools.append(ToolCall(
tool_name="python_exec",
arguments={"code": "# Analysis code will be generated"}
))
if "find_duplicates" in intent.required_tools:
tools.append(ToolCall(
tool_name="find_duplicates",
arguments={"directory": ".", "dry_run": True}
))
return tools
def _create_transformation_tools(self, intent: TaskIntent) -> List[ToolCall]:
tools = []
file_ops = {"bulk_move_rename", "sync_directory", "organize_files", "batch_rename", "cleanup_directory"}
for tool in intent.required_tools.intersection(file_ops):
tools.append(ToolCall(tool_name=tool, arguments={}))
if "python_exec" in intent.required_tools:
tools.append(ToolCall(
tool_name="python_exec",
arguments={"code": "# Transformation code"}
))
return tools
def _create_artifact_tools(self, intent: TaskIntent) -> List[ToolCall]:
tools = []
if intent.artifact_type in [ArtifactType.REPORT, ArtifactType.DOCUMENT]:
tools.append(ToolCall(
tool_name="write_file",
arguments={"path": "/tmp/report.md", "content": ""}
))
elif intent.artifact_type == ArtifactType.DASHBOARD:
tools.append(ToolCall(
tool_name="write_file",
arguments={"path": "/tmp/dashboard.html", "content": ""}
))
elif intent.artifact_type == ArtifactType.SPREADSHEET:
tools.append(ToolCall(
tool_name="write_file",
arguments={"path": "/tmp/data.csv", "content": ""}
))
return tools
def _has_dependency(self, phase_a: Phase, phase_b: Phase) -> bool:
phase_order = {
PhaseType.DISCOVERY: 0,
PhaseType.RESEARCH: 1,
PhaseType.ANALYSIS: 2,
PhaseType.TRANSFORMATION: 3,
PhaseType.VISUALIZATION: 4,
PhaseType.GENERATION: 5,
PhaseType.ARTIFACT: 6,
PhaseType.VERIFICATION: 7,
}
return phase_order.get(phase_a.phase_type, 0) < phase_order.get(phase_b.phase_type, 0)
def _estimate_cost(self, phases: List[Phase]) -> float:
base_cost = 0.01
tool_cost = 0.005
total = base_cost * len(phases)
for phase in phases:
total += tool_cost * len(phase.tools)
return round(total, 4)
def _estimate_duration(self, phases: List[Phase]) -> int:
base_duration = 30
tool_duration = 10
total = base_duration * len(phases)
for phase in phases:
total += tool_duration * len(phase.tools)
return total