370 lines
12 KiB
Python
Raw Normal View History

2025-11-04 05:17:27 +01:00
import json
import sqlite3
import threading
2025-11-04 05:17:27 +01:00
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
2025-11-04 08:09:12 +01:00
2025-11-04 05:17:27 +01:00
from .semantic_index import SemanticIndex
2025-11-04 08:09:12 +01:00
2025-11-04 05:17:27 +01:00
@dataclass
class KnowledgeEntry:
entry_id: str
category: str
content: str
metadata: Dict[str, Any]
created_at: float
updated_at: float
access_count: int = 0
importance_score: float = 1.0
def to_dict(self) -> Dict[str, Any]:
return {
2025-11-04 08:09:12 +01:00
"entry_id": self.entry_id,
"category": self.category,
"content": self.content,
"metadata": self.metadata,
"created_at": self.created_at,
"updated_at": self.updated_at,
"access_count": self.access_count,
"importance_score": self.importance_score,
2025-11-04 05:17:27 +01:00
}
2025-11-04 08:09:12 +01:00
2025-11-04 05:17:27 +01:00
class KnowledgeStore:
def __init__(self, db_path: str):
self.db_path = db_path
2025-11-04 08:01:20 +01:00
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.lock = threading.Lock()
2025-11-04 05:17:27 +01:00
self.semantic_index = SemanticIndex()
self._initialize_store()
self._load_index()
def _initialize_store(self):
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS knowledge_entries (
entry_id TEXT PRIMARY KEY,
category TEXT NOT NULL,
content TEXT NOT NULL,
metadata TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
access_count INTEGER DEFAULT 0,
importance_score REAL DEFAULT 1.0
)
2025-11-04 08:09:12 +01:00
"""
2025-11-04 05:17:27 +01:00
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_category ON knowledge_entries(category)
2025-11-04 08:09:12 +01:00
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_importance ON knowledge_entries(importance_score DESC)
2025-11-04 08:09:12 +01:00
"""
)
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS idx_created ON knowledge_entries(created_at DESC)
2025-11-04 08:09:12 +01:00
"""
)
2025-11-04 05:17:27 +01:00
self.conn.commit()
2025-11-04 05:17:27 +01:00
def _load_index(self):
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute("SELECT entry_id, content FROM knowledge_entries")
for row in cursor.fetchall():
self.semantic_index.add_document(row[0], row[1])
2025-11-04 05:17:27 +01:00
def add_entry(self, entry: KnowledgeEntry):
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute(
"""
INSERT OR REPLACE INTO knowledge_entries
(entry_id, category, content, metadata, created_at, updated_at, access_count, importance_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.entry_id,
entry.category,
entry.content,
json.dumps(entry.metadata),
entry.created_at,
entry.updated_at,
entry.access_count,
entry.importance_score,
),
)
2025-11-04 05:17:27 +01:00
self.conn.commit()
2025-11-04 05:17:27 +01:00
self.semantic_index.add_document(entry.entry_id, entry.content)
2025-11-04 05:17:27 +01:00
def get_entry(self, entry_id: str) -> Optional[KnowledgeEntry]:
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
2025-11-04 08:09:12 +01:00
cursor.execute(
"""
SELECT entry_id, category, content, metadata, created_at, updated_at, access_count, importance_score
FROM knowledge_entries
2025-11-04 05:17:27 +01:00
WHERE entry_id = ?
2025-11-04 08:09:12 +01:00
""",
(entry_id,),
)
2025-11-04 05:17:27 +01:00
row = cursor.fetchone()
2025-11-04 05:17:27 +01:00
if row:
2025-11-04 08:09:12 +01:00
cursor.execute(
"""
UPDATE knowledge_entries
SET access_count = access_count + 1
2025-11-04 05:17:27 +01:00
WHERE entry_id = ?
2025-11-04 08:09:12 +01:00
""",
(entry_id,),
)
self.conn.commit()
2025-11-04 05:17:27 +01:00
return KnowledgeEntry(
2025-11-04 05:17:27 +01:00
entry_id=row[0],
category=row[1],
content=row[2],
metadata=json.loads(row[3]) if row[3] else {},
created_at=row[4],
updated_at=row[5],
access_count=row[6] + 1,
2025-11-04 08:09:12 +01:00
importance_score=row[7],
2025-11-04 05:17:27 +01:00
)
return None
def search_entries(
self, query: str, category: Optional[str] = None, top_k: int = 5
) -> List[KnowledgeEntry]:
# Combine semantic search with exact matching
semantic_results = self.semantic_index.search(query, top_k * 2)
# Add FTS (Full Text Search) with exact word/phrase matching
fts_results = self._fts_search(query, top_k * 2)
# Combine and deduplicate results with weighted scoring
combined_results = {}
2025-11-04 05:17:27 +01:00
# Add semantic results with weight 0.7
for entry_id, score in semantic_results:
combined_results[entry_id] = score * 0.7
# Add FTS results with weight 1.0 (higher priority for exact matches)
for entry_id, score in fts_results:
if entry_id in combined_results:
combined_results[entry_id] = max(combined_results[entry_id], score * 1.0)
else:
combined_results[entry_id] = score * 1.0
# Sort by combined score
sorted_results = sorted(combined_results.items(), key=lambda x: x[1], reverse=True)
with self.lock:
cursor = self.conn.cursor()
entries = []
for entry_id, score in sorted_results[:top_k]:
if category:
cursor.execute(
"""
SELECT entry_id, category, content, metadata, created_at, updated_at, access_count, importance_score
FROM knowledge_entries
WHERE entry_id = ? AND category = ?
""",
(entry_id, category),
)
else:
cursor.execute(
"""
SELECT entry_id, category, content, metadata, created_at, updated_at, access_count, importance_score
FROM knowledge_entries
WHERE entry_id = ?
""",
(entry_id,),
)
row = cursor.fetchone()
if row:
entry = KnowledgeEntry(
entry_id=row[0],
category=row[1],
content=row[2],
metadata=json.loads(row[3]) if row[3] else {},
created_at=row[4],
updated_at=row[5],
access_count=row[6],
importance_score=row[7],
)
# Add search score to metadata for context
entry.metadata["search_score"] = score
entries.append(entry)
return entries
def _fts_search(self, query: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""Full Text Search with exact word and partial sentence matching."""
with self.lock:
cursor = self.conn.cursor()
# Prepare query for FTS
query_lower = query.lower()
query_words = query_lower.split()
# Search for exact phrase matches first
cursor.execute(
"""
SELECT entry_id, content
FROM knowledge_entries
WHERE LOWER(content) LIKE ?
""",
(f"%{query_lower}%",),
)
exact_matches = []
partial_matches = []
for row in cursor.fetchall():
entry_id, content = row
content_lower = content.lower()
# Exact phrase match gets highest score
if query_lower in content_lower:
exact_matches.append((entry_id, 1.0))
continue
# Count matching words
content_words = set(content_lower.split())
query_word_set = set(query_words)
matching_words = len(query_word_set & content_words)
if matching_words > 0:
# Score based on word overlap and position
word_overlap_score = matching_words / len(query_word_set)
# Bonus for consecutive word sequences
consecutive_bonus = 0.0
for i in range(len(query_words)):
for j in range(i + 1, min(i + 4, len(query_words) + 1)):
phrase = " ".join(query_words[i:j])
if phrase in content_lower:
consecutive_bonus += 0.2 * (j - i)
total_score = min(0.99, word_overlap_score + consecutive_bonus)
partial_matches.append((entry_id, total_score))
# Combine results, prioritizing exact matches
all_results = exact_matches + partial_matches
all_results.sort(key=lambda x: x[1], reverse=True)
return all_results[:top_k]
2025-11-04 05:17:27 +01:00
def get_by_category(self, category: str, limit: int = 20) -> List[KnowledgeEntry]:
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute(
"""
SELECT entry_id, category, content, metadata, created_at, updated_at, access_count, importance_score
FROM knowledge_entries
WHERE category = ?
ORDER BY importance_score DESC, created_at DESC
LIMIT ?
""",
(category, limit),
2025-11-04 08:09:12 +01:00
)
2025-11-04 05:17:27 +01:00
entries = []
for row in cursor.fetchall():
entries.append(
KnowledgeEntry(
entry_id=row[0],
category=row[1],
content=row[2],
metadata=json.loads(row[3]) if row[3] else {},
created_at=row[4],
updated_at=row[5],
access_count=row[6],
importance_score=row[7],
)
)
return entries
2025-11-04 05:17:27 +01:00
def update_importance(self, entry_id: str, importance_score: float):
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute(
"""
UPDATE knowledge_entries
SET importance_score = ?, updated_at = ?
WHERE entry_id = ?
""",
(importance_score, time.time(), entry_id),
)
2025-11-04 05:17:27 +01:00
self.conn.commit()
2025-11-04 05:17:27 +01:00
def delete_entry(self, entry_id: str) -> bool:
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute("DELETE FROM knowledge_entries WHERE entry_id = ?", (entry_id,))
deleted = cursor.rowcount > 0
2025-11-04 05:17:27 +01:00
self.conn.commit()
2025-11-04 05:17:27 +01:00
if deleted:
self.semantic_index.remove_document(entry_id)
2025-11-04 05:17:27 +01:00
return deleted
2025-11-04 05:17:27 +01:00
def get_statistics(self) -> Dict[str, Any]:
with self.lock:
cursor = self.conn.cursor()
2025-11-04 05:17:27 +01:00
cursor.execute("SELECT COUNT(*) FROM knowledge_entries")
total_entries = cursor.fetchone()[0]
2025-11-04 05:17:27 +01:00
cursor.execute("SELECT COUNT(DISTINCT category) FROM knowledge_entries")
total_categories = cursor.fetchone()[0]
2025-11-04 05:17:27 +01:00
cursor.execute(
"""
SELECT category, COUNT(*) as count
FROM knowledge_entries
GROUP BY category
ORDER BY count DESC
2025-11-04 08:09:12 +01:00
"""
)
category_counts = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("SELECT SUM(access_count) FROM knowledge_entries")
total_accesses = cursor.fetchone()[0] or 0
return {
"total_entries": total_entries,
"total_categories": total_categories,
"category_distribution": category_counts,
"total_accesses": total_accesses,
"vocabulary_size": len(self.semantic_index.vocabulary),
}