import math import pytest from pr.memory.semantic_index import SemanticIndex class TestSemanticIndex: def test_init(self): """Test SemanticIndex initialization.""" index = SemanticIndex() assert index.documents == {} assert index.vocabulary == set() assert index.idf_scores == {} assert index.doc_tf_scores == {} def test_tokenize_basic(self): """Test basic tokenization functionality.""" index = SemanticIndex() tokens = index._tokenize("Hello, world! This is a test.") expected = ["hello", "world", "this", "is", "a", "test"] assert tokens == expected def test_tokenize_special_characters(self): """Test tokenization with special characters.""" index = SemanticIndex() tokens = index._tokenize("Hello@world.com test-case_123") expected = ["hello", "world", "com", "test", "case", "123"] assert tokens == expected def test_tokenize_empty_string(self): """Test tokenization of empty string.""" index = SemanticIndex() tokens = index._tokenize("") assert tokens == [] def test_tokenize_only_special_chars(self): """Test tokenization with only special characters.""" index = SemanticIndex() tokens = index._tokenize("!@#$%^&*()") assert tokens == [] def test_compute_tf_basic(self): """Test TF computation for basic case.""" index = SemanticIndex() tokens = ["hello", "world", "hello", "test"] tf_scores = index._compute_tf(tokens) expected = {"hello": 2 / 4, "world": 1 / 4, "test": 1 / 4} # 0.5 # 0.25 # 0.25 assert tf_scores == expected def test_compute_tf_empty(self): """Test TF computation for empty tokens.""" index = SemanticIndex() tf_scores = index._compute_tf([]) assert tf_scores == {} def test_compute_tf_single_token(self): """Test TF computation for single token.""" index = SemanticIndex() tokens = ["hello"] tf_scores = index._compute_tf(tokens) assert tf_scores == {"hello": 1.0} def test_compute_idf_single_document(self): """Test IDF computation with single document.""" index = SemanticIndex() index.documents = {"doc1": "hello world"} index._compute_idf() assert index.idf_scores == {"hello": 1.0, "world": 1.0} def test_compute_idf_multiple_documents(self): """Test IDF computation with multiple documents.""" index = SemanticIndex() index.documents = {"doc1": "hello world", "doc2": "hello test", "doc3": "world test"} index._compute_idf() expected = { "hello": math.log(3 / 2), # appears in 2/3 docs "world": math.log(3 / 2), # appears in 2/3 docs "test": math.log(3 / 2), # appears in 2/3 docs } assert index.idf_scores == expected def test_compute_idf_empty_documents(self): """Test IDF computation with no documents.""" index = SemanticIndex() index._compute_idf() assert index.idf_scores == {} def test_add_document_basic(self): """Test adding a basic document.""" index = SemanticIndex() index.add_document("doc1", "hello world") assert "doc1" in index.documents assert index.documents["doc1"] == "hello world" assert "hello" in index.vocabulary assert "world" in index.vocabulary assert "doc1" in index.doc_tf_scores def test_add_document_updates_vocabulary(self): """Test that adding documents updates vocabulary.""" index = SemanticIndex() index.add_document("doc1", "hello world") assert index.vocabulary == {"hello", "world"} index.add_document("doc2", "hello test") assert index.vocabulary == {"hello", "world", "test"} def test_add_document_updates_idf(self): """Test that adding documents updates IDF scores.""" index = SemanticIndex() index.add_document("doc1", "hello world") assert index.idf_scores == {"hello": 1.0, "world": 1.0} index.add_document("doc2", "hello test") expected_idf = { "hello": math.log(2 / 2), # appears in both docs "world": math.log(2 / 1), # appears in 1/2 docs "test": math.log(2 / 1), # appears in 1/2 docs } assert index.idf_scores == expected_idf def test_add_document_tf_computation(self): """Test TF score computation when adding document.""" index = SemanticIndex() index.add_document("doc1", "hello world hello") # TF: hello=2/3, world=1/3 expected_tf = {"hello": 2 / 3, "world": 1 / 3} assert index.doc_tf_scores["doc1"] == expected_tf def test_remove_document_existing(self): """Test removing an existing document.""" index = SemanticIndex() index.add_document("doc1", "hello world") index.add_document("doc2", "hello test") initial_vocab = index.vocabulary.copy() initial_idf = index.idf_scores.copy() index.remove_document("doc1") assert "doc1" not in index.documents assert "doc1" not in index.doc_tf_scores # Vocabulary should still contain all words assert index.vocabulary == initial_vocab # IDF should be recomputed assert index.idf_scores != initial_idf assert index.idf_scores == {"hello": 1.0, "test": 1.0} def test_remove_document_nonexistent(self): """Test removing a non-existent document.""" index = SemanticIndex() index.add_document("doc1", "hello world") initial_state = { "documents": index.documents.copy(), "vocabulary": index.vocabulary.copy(), "idf_scores": index.idf_scores.copy(), "doc_tf_scores": index.doc_tf_scores.copy(), } index.remove_document("nonexistent") assert index.documents == initial_state["documents"] assert index.vocabulary == initial_state["vocabulary"] assert index.idf_scores == initial_state["idf_scores"] assert index.doc_tf_scores == initial_state["doc_tf_scores"] def test_search_basic(self): """Test basic search functionality.""" index = SemanticIndex() index.add_document("doc1", "hello world") index.add_document("doc2", "hello test") index.add_document("doc3", "world test") results = index.search("hello", top_k=5) assert len(results) == 3 # All documents are returned with similarity scores # Results should be sorted by similarity (descending) scores = {doc_id: score for doc_id, score in results} assert scores["doc1"] > 0 # doc1 contains "hello" assert scores["doc2"] > 0 # doc2 contains "hello" assert scores["doc3"] == 0 # doc3 does not contain "hello" def test_search_empty_query(self): """Test search with empty query.""" index = SemanticIndex() index.add_document("doc1", "hello world") results = index.search("", top_k=5) assert results == [] def test_search_no_documents(self): """Test search when no documents exist.""" index = SemanticIndex() results = index.search("hello", top_k=5) assert results == [] def test_search_top_k_limit(self): """Test search respects top_k parameter.""" index = SemanticIndex() for i in range(10): index.add_document(f"doc{i}", f"hello world content") results = index.search("content", top_k=3) assert len(results) == 3 def test_cosine_similarity_identical_vectors(self): """Test cosine similarity with identical vectors.""" index = SemanticIndex() vec1 = {"hello": 1.0, "world": 0.5} vec2 = {"hello": 1.0, "world": 0.5} similarity = index._cosine_similarity(vec1, vec2) assert similarity == pytest.approx(1.0) def test_cosine_similarity_orthogonal_vectors(self): """Test cosine similarity with orthogonal vectors.""" index = SemanticIndex() vec1 = {"hello": 1.0} vec2 = {"world": 1.0} similarity = index._cosine_similarity(vec1, vec2) assert similarity == 0.0 def test_cosine_similarity_zero_vector(self): """Test cosine similarity with zero vector.""" index = SemanticIndex() vec1 = {"hello": 1.0} vec2 = {} similarity = index._cosine_similarity(vec1, vec2) assert similarity == 0.0 def test_cosine_similarity_empty_vectors(self): """Test cosine similarity with empty vectors.""" index = SemanticIndex() similarity = index._cosine_similarity({}, {}) assert similarity == 0.0 def test_search_relevance_ordering(self): """Test that search results are ordered by relevance.""" index = SemanticIndex() index.add_document("doc1", "hello hello hello") # High TF for "hello" index.add_document("doc2", "hello world") # Medium relevance index.add_document("doc3", "world test") # No "hello" results = index.search("hello", top_k=5) assert len(results) == 3 # All documents are returned # doc1 should have higher score than doc2, and doc3 should have 0 scores = {doc_id: score for doc_id, score in results} assert scores["doc1"] > scores["doc2"] > scores["doc3"] assert scores["doc3"] == 0 def test_vocabulary_persistence(self): """Test that vocabulary persists even after document removal.""" index = SemanticIndex() index.add_document("doc1", "hello world") index.add_document("doc2", "test case") assert index.vocabulary == {"hello", "world", "test", "case"} index.remove_document("doc1") # Vocabulary should still contain all words assert index.vocabulary == {"hello", "world", "test", "case"} def test_idf_recomputation_after_removal(self): """Test IDF recomputation after document removal.""" index = SemanticIndex() index.add_document("doc1", "hello world") index.add_document("doc2", "hello test") index.add_document("doc3", "world test") # Remove doc3, leaving doc1 and doc2 index.remove_document("doc3") # "hello" appears in both remaining docs, "world" and "test" in one each expected_idf = { "hello": math.log(2 / 2), # 2 docs, appears in 2 "world": math.log(2 / 1), # 2 docs, appears in 1 "test": math.log(2 / 1), # 2 docs, appears in 1 } assert index.idf_scores == expected_idf