184 lines
5.5 KiB
Python
Raw Normal View History

2025-11-09 08:14:14 +01:00
import pytest
import asyncio
import shutil
from pathlib import Path
from retoors.services.storage_service import StorageService, UserStorageManager
@pytest.fixture
def test_storage():
storage = StorageService(base_path="data/test_user")
yield storage
if Path("data/test_user").exists():
shutil.rmtree("data/test_user")
@pytest.fixture
def user_manager():
manager = UserStorageManager()
manager.storage.base_path = Path("data/test_user")
yield manager
if Path("data/test_user").exists():
shutil.rmtree("data/test_user")
@pytest.mark.asyncio
async def test_save_and_load(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"title": "Test Document", "content": "Test content"}
result = await test_storage.save(user_email, identifier, data)
assert result is True
loaded_data = await test_storage.load(user_email, identifier)
assert loaded_data == data
@pytest.mark.asyncio
async def test_distributed_path_structure(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"test": "data"}
await test_storage.save(user_email, identifier, data)
user_base = test_storage._get_user_base_path(user_email)
file_path = test_storage._get_distributed_path(user_base, identifier)
assert file_path.exists()
assert len(file_path.parent.name) == 3
assert len(file_path.parent.parent.name) == 3
assert len(file_path.parent.parent.parent.name) == 3
@pytest.mark.asyncio
async def test_user_isolation(test_storage):
user1_email = "user1@example.com"
user2_email = "user2@example.com"
identifier = "doc1"
data1 = {"user": "user1"}
data2 = {"user": "user2"}
await test_storage.save(user1_email, identifier, data1)
await test_storage.save(user2_email, identifier, data2)
loaded1 = await test_storage.load(user1_email, identifier)
loaded2 = await test_storage.load(user2_email, identifier)
assert loaded1 == data1
assert loaded2 == data2
@pytest.mark.asyncio
async def test_path_traversal_protection(test_storage):
user_email = "test@example.com"
malicious_identifier = "../../../etc/passwd"
await test_storage.save(user_email, malicious_identifier, {"test": "data"})
user_base = test_storage._get_user_base_path(user_email)
file_path = test_storage._get_distributed_path(user_base, malicious_identifier)
assert file_path.exists()
assert test_storage._validate_path(file_path, user_base)
assert str(file_path.resolve()).startswith(str(user_base.resolve()))
@pytest.mark.asyncio
async def test_delete(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"test": "data"}
await test_storage.save(user_email, identifier, data)
assert await test_storage.exists(user_email, identifier)
result = await test_storage.delete(user_email, identifier)
assert result is True
assert not await test_storage.exists(user_email, identifier)
@pytest.mark.asyncio
async def test_list_all(test_storage):
user_email = "test@example.com"
await test_storage.save(user_email, "doc1", {"id": 1})
await test_storage.save(user_email, "doc2", {"id": 2})
await test_storage.save(user_email, "doc3", {"id": 3})
all_docs = await test_storage.list_all(user_email)
assert len(all_docs) == 3
assert any(doc["id"] == 1 for doc in all_docs)
assert any(doc["id"] == 2 for doc in all_docs)
assert any(doc["id"] == 3 for doc in all_docs)
@pytest.mark.asyncio
async def test_delete_all(test_storage):
user_email = "test@example.com"
await test_storage.save(user_email, "doc1", {"id": 1})
await test_storage.save(user_email, "doc2", {"id": 2})
result = await test_storage.delete_all(user_email)
assert result is True
all_docs = await test_storage.list_all(user_email)
assert len(all_docs) == 0
@pytest.mark.asyncio
async def test_user_storage_manager(user_manager):
user_data = {
"full_name": "Test User",
"email": "test@example.com",
"password": "hashed_password",
"is_customer": True
}
await user_manager.save_user("test@example.com", user_data)
loaded_user = await user_manager.get_user("test@example.com")
assert loaded_user == user_data
assert await user_manager.user_exists("test@example.com")
await user_manager.delete_user("test@example.com")
assert not await user_manager.user_exists("test@example.com")
@pytest.mark.asyncio
async def test_list_users_by_parent(user_manager):
parent_user = {
"email": "parent@example.com",
"full_name": "Parent User",
"password": "hashed",
"is_customer": True
}
child_user1 = {
"email": "child1@example.com",
"full_name": "Child User 1",
"password": "hashed",
"parent_email": "parent@example.com",
"is_customer": True
}
child_user2 = {
"email": "child2@example.com",
"full_name": "Child User 2",
"password": "hashed",
"parent_email": "parent@example.com",
"is_customer": True
}
await user_manager.save_user("parent@example.com", parent_user)
await user_manager.save_user("child1@example.com", child_user1)
await user_manager.save_user("child2@example.com", child_user2)
children = await user_manager.list_users_by_parent("parent@example.com")
assert len(children) == 2
assert any(u["email"] == "child1@example.com" for u in children)
assert any(u["email"] == "child2@example.com" for u in children)