281 lines
10 KiB
Python
Raw Normal View History

import math
import pytest
from rp.memory.semantic_index import SemanticIndex
class TestSemanticIndex:
def test_init(self):
"""Test SemanticIndex initialization."""
index = SemanticIndex()
assert index.documents == {}
assert index.vocabulary == set()
assert index.idf_scores == {}
assert index.doc_tf_scores == {}
def test_tokenize_basic(self):
"""Test basic tokenization functionality."""
index = SemanticIndex()
tokens = index._tokenize("Hello, world! This is a test.")
expected = ["hello", "world", "this", "is", "a", "test"]
assert tokens == expected
def test_tokenize_special_characters(self):
"""Test tokenization with special characters."""
index = SemanticIndex()
tokens = index._tokenize("Hello@world.com test-case_123")
expected = ["hello", "world", "com", "test", "case", "123"]
assert tokens == expected
def test_tokenize_empty_string(self):
"""Test tokenization of empty string."""
index = SemanticIndex()
tokens = index._tokenize("")
assert tokens == []
def test_tokenize_only_special_chars(self):
"""Test tokenization with only special characters."""
index = SemanticIndex()
tokens = index._tokenize("!@#$%^&*()")
assert tokens == []
def test_compute_tf_basic(self):
"""Test TF computation for basic case."""
index = SemanticIndex()
tokens = ["hello", "world", "hello", "test"]
tf_scores = index._compute_tf(tokens)
expected = {"hello": 2 / 4, "world": 1 / 4, "test": 1 / 4} # 0.5 # 0.25 # 0.25
assert tf_scores == expected
def test_compute_tf_empty(self):
"""Test TF computation for empty tokens."""
index = SemanticIndex()
tf_scores = index._compute_tf([])
assert tf_scores == {}
def test_compute_tf_single_token(self):
"""Test TF computation for single token."""
index = SemanticIndex()
tokens = ["hello"]
tf_scores = index._compute_tf(tokens)
assert tf_scores == {"hello": 1.0}
def test_compute_idf_single_document(self):
"""Test IDF computation with single document."""
index = SemanticIndex()
index.documents = {"doc1": "hello world"}
index._compute_idf()
assert index.idf_scores == {"hello": 1.0, "world": 1.0}
def test_compute_idf_multiple_documents(self):
"""Test IDF computation with multiple documents."""
index = SemanticIndex()
index.documents = {"doc1": "hello world", "doc2": "hello test", "doc3": "world test"}
index._compute_idf()
expected = {
"hello": math.log(3 / 2), # appears in 2/3 docs
"world": math.log(3 / 2), # appears in 2/3 docs
"test": math.log(3 / 2), # appears in 2/3 docs
}
assert index.idf_scores == expected
def test_compute_idf_empty_documents(self):
"""Test IDF computation with no documents."""
index = SemanticIndex()
index._compute_idf()
assert index.idf_scores == {}
def test_add_document_basic(self):
"""Test adding a basic document."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
assert "doc1" in index.documents
assert index.documents["doc1"] == "hello world"
assert "hello" in index.vocabulary
assert "world" in index.vocabulary
assert "doc1" in index.doc_tf_scores
def test_add_document_updates_vocabulary(self):
"""Test that adding documents updates vocabulary."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
assert index.vocabulary == {"hello", "world"}
index.add_document("doc2", "hello test")
assert index.vocabulary == {"hello", "world", "test"}
def test_add_document_updates_idf(self):
"""Test that adding documents updates IDF scores."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
assert index.idf_scores == {"hello": 1.0, "world": 1.0}
index.add_document("doc2", "hello test")
expected_idf = {
"hello": math.log(2 / 2), # appears in both docs
"world": math.log(2 / 1), # appears in 1/2 docs
"test": math.log(2 / 1), # appears in 1/2 docs
}
assert index.idf_scores == expected_idf
def test_add_document_tf_computation(self):
"""Test TF score computation when adding document."""
index = SemanticIndex()
index.add_document("doc1", "hello world hello")
# TF: hello=2/3, world=1/3
expected_tf = {"hello": 2 / 3, "world": 1 / 3}
assert index.doc_tf_scores["doc1"] == expected_tf
def test_remove_document_existing(self):
"""Test removing an existing document."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
index.add_document("doc2", "hello test")
initial_vocab = index.vocabulary.copy()
initial_idf = index.idf_scores.copy()
index.remove_document("doc1")
assert "doc1" not in index.documents
assert "doc1" not in index.doc_tf_scores
# Vocabulary should still contain all words
assert index.vocabulary == initial_vocab
# IDF should be recomputed
assert index.idf_scores != initial_idf
assert index.idf_scores == {"hello": 1.0, "test": 1.0}
def test_remove_document_nonexistent(self):
"""Test removing a non-existent document."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
initial_state = {
"documents": index.documents.copy(),
"vocabulary": index.vocabulary.copy(),
"idf_scores": index.idf_scores.copy(),
"doc_tf_scores": index.doc_tf_scores.copy(),
}
index.remove_document("nonexistent")
assert index.documents == initial_state["documents"]
assert index.vocabulary == initial_state["vocabulary"]
assert index.idf_scores == initial_state["idf_scores"]
assert index.doc_tf_scores == initial_state["doc_tf_scores"]
def test_search_basic(self):
"""Test basic search functionality."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
index.add_document("doc2", "hello test")
index.add_document("doc3", "world test")
results = index.search("hello", top_k=5)
assert len(results) == 3 # All documents are returned with similarity scores
# Results should be sorted by similarity (descending)
scores = {doc_id: score for doc_id, score in results}
assert scores["doc1"] > 0 # doc1 contains "hello"
assert scores["doc2"] > 0 # doc2 contains "hello"
assert scores["doc3"] == 0 # doc3 does not contain "hello"
def test_search_empty_query(self):
"""Test search with empty query."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
results = index.search("", top_k=5)
assert results == []
def test_search_no_documents(self):
"""Test search when no documents exist."""
index = SemanticIndex()
results = index.search("hello", top_k=5)
assert results == []
def test_search_top_k_limit(self):
"""Test search respects top_k parameter."""
index = SemanticIndex()
for i in range(10):
index.add_document(f"doc{i}", f"hello world content")
results = index.search("content", top_k=3)
assert len(results) == 3
def test_cosine_similarity_identical_vectors(self):
"""Test cosine similarity with identical vectors."""
index = SemanticIndex()
vec1 = {"hello": 1.0, "world": 0.5}
vec2 = {"hello": 1.0, "world": 0.5}
similarity = index._cosine_similarity(vec1, vec2)
assert similarity == pytest.approx(1.0)
def test_cosine_similarity_orthogonal_vectors(self):
"""Test cosine similarity with orthogonal vectors."""
index = SemanticIndex()
vec1 = {"hello": 1.0}
vec2 = {"world": 1.0}
similarity = index._cosine_similarity(vec1, vec2)
assert similarity == 0.0
def test_cosine_similarity_zero_vector(self):
"""Test cosine similarity with zero vector."""
index = SemanticIndex()
vec1 = {"hello": 1.0}
vec2 = {}
similarity = index._cosine_similarity(vec1, vec2)
assert similarity == 0.0
def test_cosine_similarity_empty_vectors(self):
"""Test cosine similarity with empty vectors."""
index = SemanticIndex()
similarity = index._cosine_similarity({}, {})
assert similarity == 0.0
def test_search_relevance_ordering(self):
"""Test that search results are ordered by relevance."""
index = SemanticIndex()
index.add_document("doc1", "hello hello hello") # High TF for "hello"
index.add_document("doc2", "hello world") # Medium relevance
index.add_document("doc3", "world test") # No "hello"
results = index.search("hello", top_k=5)
assert len(results) == 3 # All documents are returned
# doc1 should have higher score than doc2, and doc3 should have 0
scores = {doc_id: score for doc_id, score in results}
assert scores["doc1"] > scores["doc2"] > scores["doc3"]
assert scores["doc3"] == 0
def test_vocabulary_persistence(self):
"""Test that vocabulary persists even after document removal."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
index.add_document("doc2", "test case")
assert index.vocabulary == {"hello", "world", "test", "case"}
index.remove_document("doc1")
# Vocabulary should still contain all words
assert index.vocabulary == {"hello", "world", "test", "case"}
def test_idf_recomputation_after_removal(self):
"""Test IDF recomputation after document removal."""
index = SemanticIndex()
index.add_document("doc1", "hello world")
index.add_document("doc2", "hello test")
index.add_document("doc3", "world test")
# Remove doc3, leaving doc1 and doc2
index.remove_document("doc3")
# "hello" appears in both remaining docs, "world" and "test" in one each
expected_idf = {
"hello": math.log(2 / 2), # 2 docs, appears in 2
"world": math.log(2 / 1), # 2 docs, appears in 1
"test": math.log(2 / 1), # 2 docs, appears in 1
}
assert index.idf_scores == expected_idf