149 lines
5.3 KiB
Python
Raw Normal View History

import logging
logger = logging.getLogger("pr")
KNOWLEDGE_MESSAGE_MARKER = "[KNOWLEDGE_BASE_CONTEXT]"
def inject_knowledge_context(assistant, user_message):
if not hasattr(assistant, "enhanced") or not assistant.enhanced:
return
messages = assistant.messages
# Remove any existing knowledge context messages
for i in range(len(messages) - 1, -1, -1):
if messages[i].get("role") == "user" and KNOWLEDGE_MESSAGE_MARKER in messages[i].get(
"content", ""
):
del messages[i]
logger.debug(f"Removed existing knowledge base message at index {i}")
break
try:
# Search knowledge base with enhanced FTS + semantic search
knowledge_results = assistant.enhanced.knowledge_store.search_entries(user_message, top_k=5)
# Search conversation history for related content
conversation_results = []
if hasattr(assistant.enhanced, "conversation_memory"):
history_results = assistant.enhanced.conversation_memory.search_conversations(
user_message, limit=3
)
for conv in history_results:
# Extract relevant messages from conversation
conv_messages = assistant.enhanced.conversation_memory.get_conversation_messages(
conv["conversation_id"]
)
for msg in conv_messages[-5:]: # Last 5 messages from each conversation
if msg["role"] == "user" and msg["content"] != user_message:
# Calculate relevance score
relevance = calculate_text_similarity(user_message, msg["content"])
if relevance > 0.3: # Only include relevant matches
conversation_results.append(
{
"content": msg["content"],
"score": relevance,
"source": f"Previous conversation: {conv['conversation_id'][:8]}",
}
)
# Combine and sort results by relevance score
all_results = []
# Add knowledge base results
for entry in knowledge_results:
score = entry.metadata.get("search_score", 0.5)
all_results.append(
{
"content": entry.content,
"score": score,
"source": f"Knowledge Base ({entry.category})",
"type": "knowledge",
}
)
# Add conversation results
for conv in conversation_results:
all_results.append(
{
"content": conv["content"],
"score": conv["score"],
"source": conv["source"],
"type": "conversation",
}
)
# Sort by score and take top 5
all_results.sort(key=lambda x: x["score"], reverse=True)
top_results = all_results[:5]
if not top_results:
logger.debug("No relevant knowledge or conversation matches found")
return
# Format context for LLM
knowledge_parts = []
for idx, result in enumerate(top_results, 1):
content = result["content"]
if len(content) > 1500: # Shorter limit for multiple results
content = content[:1500] + "..."
score_indicator = f"({result['score']:.2f})" if result["score"] < 1.0 else "(exact)"
knowledge_parts.append(
f"Match {idx} {score_indicator} - {result['source']}:\n{content}"
)
knowledge_message_content = (
f"{KNOWLEDGE_MESSAGE_MARKER}\nRelevant information from knowledge base and conversation history:\n\n"
+ "\n\n".join(knowledge_parts)
)
knowledge_message = {"role": "user", "content": knowledge_message_content}
messages.append(knowledge_message)
logger.debug(f"Injected enhanced context message with {len(top_results)} matches")
except Exception as e:
logger.error(f"Error injecting knowledge context: {e}")
def calculate_text_similarity(text1: str, text2: str) -> float:
"""Calculate similarity between two texts using word overlap and sequence matching."""
import re
# Normalize texts
text1_lower = text1.lower()
text2_lower = text2.lower()
# Exact substring match gets highest score
if text1_lower in text2_lower or text2_lower in text1_lower:
return 1.0
# Word-level similarity
words1 = set(re.findall(r"\b\w+\b", text1_lower))
words2 = set(re.findall(r"\b\w+\b", text2_lower))
if not words1 or not words2:
return 0.0
intersection = words1 & words2
union = words1 | words2
word_similarity = len(intersection) / len(union)
# Bonus for consecutive word sequences (partial sentences)
consecutive_bonus = 0.0
words1_list = list(words1)
list(words2)
for i in range(len(words1_list) - 1):
for j in range(i + 2, min(i + 5, len(words1_list) + 1)):
phrase = " ".join(words1_list[i:j])
if phrase in text2_lower:
consecutive_bonus += 0.1 * (j - i)
total_similarity = min(1.0, word_similarity + consecutive_bonus)
return total_similarity