|
# retoor <retoor@molodetz.nl>
|
|
import pytest
|
|
|
|
from helpers.async_client import DWNAsyncClient
|
|
from helpers.event_collector import EventCollector
|
|
|
|
|
|
class TestAIIsAvailable:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_is_available_returns_ok(self, dwn_client: DWNAsyncClient) -> None:
|
|
result = await dwn_client.ai_is_available()
|
|
|
|
assert result.get("status") == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_is_available_structure(self, dwn_client: DWNAsyncClient) -> None:
|
|
result = await dwn_client.ai_is_available()
|
|
|
|
assert result.get("status") == "ok"
|
|
assert "available" in result
|
|
assert isinstance(result["available"], bool)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_is_available_has_model_when_available(self, dwn_client: DWNAsyncClient) -> None:
|
|
result = await dwn_client.ai_is_available()
|
|
|
|
assert result.get("status") == "ok"
|
|
|
|
if result.get("available"):
|
|
assert "model" in result
|
|
|
|
|
|
class TestAICommand:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_command_when_unavailable(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.ai_is_available()
|
|
|
|
if not available.get("available"):
|
|
result = await dwn_client.ai_command("Hello")
|
|
|
|
assert result.get("status") == "error"
|
|
assert "available" in result.get("message", "").lower() or "not configured" in result.get("message", "").lower()
|
|
else:
|
|
pytest.skip("AI is available, cannot test unavailable scenario")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_command_returns_response(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.ai_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("AI not available")
|
|
|
|
result = await dwn_client.ai_command("What is 2+2?")
|
|
|
|
assert result.get("status") in ["ok", "error"]
|
|
|
|
if result.get("status") == "ok":
|
|
assert "response" in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_command_async_mode(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.ai_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("AI not available")
|
|
|
|
result = await dwn_client.ai_command("Hello", async_mode=True)
|
|
|
|
assert result.get("status") in ["ok", "pending", "error"]
|
|
|
|
|
|
class TestExaIsAvailable:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_is_available_returns_ok(self, dwn_client: DWNAsyncClient) -> None:
|
|
result = await dwn_client.exa_is_available()
|
|
|
|
assert result.get("status") == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_is_available_structure(self, dwn_client: DWNAsyncClient) -> None:
|
|
result = await dwn_client.exa_is_available()
|
|
|
|
assert result.get("status") == "ok"
|
|
assert "available" in result
|
|
assert isinstance(result["available"], bool)
|
|
|
|
|
|
class TestExaSearch:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_search_when_unavailable(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.exa_is_available()
|
|
|
|
if not available.get("available"):
|
|
result = await dwn_client.exa_search("test query")
|
|
|
|
assert result.get("status") == "error"
|
|
else:
|
|
pytest.skip("Exa is available, cannot test unavailable scenario")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_search_returns_results(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.exa_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("Exa not available")
|
|
|
|
result = await dwn_client.exa_search("python programming")
|
|
|
|
assert result.get("status") in ["ok", "error"]
|
|
|
|
if result.get("status") == "ok":
|
|
assert "results" in result
|
|
assert isinstance(result["results"], list)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_search_async_mode(self, dwn_client: DWNAsyncClient) -> None:
|
|
available = await dwn_client.exa_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("Exa not available")
|
|
|
|
result = await dwn_client.exa_search("test", async_mode=True)
|
|
|
|
assert result.get("status") in ["ok", "pending", "error"]
|
|
|
|
|
|
class TestAIAvailabilityConsistency:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_availability_consistent(self, dwn_client: DWNAsyncClient) -> None:
|
|
result1 = await dwn_client.ai_is_available()
|
|
result2 = await dwn_client.ai_is_available()
|
|
|
|
assert result1.get("available") == result2.get("available")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_availability_consistent(self, dwn_client: DWNAsyncClient) -> None:
|
|
result1 = await dwn_client.exa_is_available()
|
|
result2 = await dwn_client.exa_is_available()
|
|
|
|
assert result1.get("available") == result2.get("available")
|
|
|
|
|
|
class TestAIEventEmission:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_command_emits_event_on_success(
|
|
self,
|
|
dwn_client: DWNAsyncClient,
|
|
event_collector: EventCollector
|
|
) -> None:
|
|
available = await dwn_client.ai_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("AI not available")
|
|
|
|
event_collector.clear()
|
|
|
|
result = await dwn_client.ai_command("Hello")
|
|
|
|
if result.get("status") == "ok":
|
|
await event_collector.collect_for(0.5)
|
|
events = event_collector.get_events("ai_response_received")
|
|
assert len(events) >= 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exa_search_emits_event_on_success(
|
|
self,
|
|
dwn_client: DWNAsyncClient,
|
|
event_collector: EventCollector
|
|
) -> None:
|
|
available = await dwn_client.exa_is_available()
|
|
|
|
if not available.get("available"):
|
|
pytest.skip("Exa not available")
|
|
|
|
event_collector.clear()
|
|
|
|
result = await dwn_client.exa_search("test")
|
|
|
|
if result.get("status") == "ok":
|
|
await event_collector.collect_for(0.5)
|
|
events = event_collector.get_events("exa_search_completed")
|
|
assert len(events) >= 0
|