417 lines
13 KiB
Python
417 lines
13 KiB
Python
|
|
"""
|
||
|
|
ML Service Tests
|
||
|
|
|
||
|
|
Tests for machine learning analytics endpoints.
|
||
|
|
Covers pattern detection, anomaly detection, and behavioral analysis.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
from typing import List, Dict, Any
|
||
|
|
|
||
|
|
|
||
|
|
class TestMLServiceHealth:
|
||
|
|
"""Tests for ML service health and basic functionality."""
|
||
|
|
|
||
|
|
def test_ml_health_check(self, ml_client):
|
||
|
|
"""Test ML service health check endpoint."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
response = ml_client.get("/health")
|
||
|
|
assert response.status_code == 200
|
||
|
|
data = response.json()
|
||
|
|
assert data["status"] == "healthy"
|
||
|
|
assert "ml_available" in data
|
||
|
|
|
||
|
|
def test_ml_root_endpoint(self, ml_client):
|
||
|
|
"""Test ML service root endpoint."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
response = ml_client.get("/")
|
||
|
|
assert response.status_code == 200
|
||
|
|
data = response.json()
|
||
|
|
assert data["name"] == "Tikker ML Service"
|
||
|
|
assert "endpoints" in data
|
||
|
|
|
||
|
|
|
||
|
|
class TestPatternDetection:
|
||
|
|
"""Tests for keystroke pattern detection."""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _create_keystroke_events(count: int = 100, wpm: float = 50) -> List[Dict]:
|
||
|
|
"""Create mock keystroke events."""
|
||
|
|
events = []
|
||
|
|
interval = int((60000 / (wpm * 5)))
|
||
|
|
|
||
|
|
for i in range(count):
|
||
|
|
events.append({
|
||
|
|
"timestamp": i * interval,
|
||
|
|
"key_code": 65 + (i % 26),
|
||
|
|
"event_type": "press"
|
||
|
|
})
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def test_detect_fast_typing_pattern(self, ml_client):
|
||
|
|
"""Test detection of fast typing pattern."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
fast_events = self._create_keystroke_events(count=150, wpm=80)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": fast_events,
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/patterns/detect", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert isinstance(data, list)
|
||
|
|
|
||
|
|
pattern_names = [p["name"] for p in data]
|
||
|
|
assert any("fast" in name for name in pattern_names)
|
||
|
|
|
||
|
|
def test_detect_slow_typing_pattern(self, ml_client):
|
||
|
|
"""Test detection of slow typing pattern."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
slow_events = self._create_keystroke_events(count=50, wpm=20)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": slow_events,
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/patterns/detect", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
pattern_names = [p["name"] for p in data]
|
||
|
|
assert any("slow" in name for name in pattern_names)
|
||
|
|
|
||
|
|
def test_pattern_detection_empty_events(self, ml_client):
|
||
|
|
"""Test pattern detection with empty events."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": [],
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/patterns/detect", json=payload)
|
||
|
|
assert response.status_code == 400
|
||
|
|
|
||
|
|
|
||
|
|
class TestAnomalyDetection:
|
||
|
|
"""Tests for keystroke anomaly detection."""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _create_keystroke_events(count: int = 100, wpm: float = 50) -> List[Dict]:
|
||
|
|
"""Create mock keystroke events."""
|
||
|
|
events = []
|
||
|
|
interval = int((60000 / (wpm * 5)))
|
||
|
|
|
||
|
|
for i in range(count):
|
||
|
|
events.append({
|
||
|
|
"timestamp": i * interval,
|
||
|
|
"key_code": 65 + (i % 26),
|
||
|
|
"event_type": "press"
|
||
|
|
})
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def test_detect_typing_speed_anomaly(self, ml_client):
|
||
|
|
"""Test detection of typing speed anomaly."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
normal_events = self._create_keystroke_events(count=100, wpm=50)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": normal_events,
|
||
|
|
"user_id": "test_user_anom"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/anomalies/detect", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert isinstance(data, list)
|
||
|
|
|
||
|
|
def test_anomaly_detection_empty_events(self, ml_client):
|
||
|
|
"""Test anomaly detection with empty events."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": [],
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/anomalies/detect", json=payload)
|
||
|
|
assert response.status_code == 400
|
||
|
|
|
||
|
|
|
||
|
|
class TestBehavioralProfile:
|
||
|
|
"""Tests for behavioral profile building."""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _create_keystroke_events(count: int = 200) -> List[Dict]:
|
||
|
|
"""Create mock keystroke events."""
|
||
|
|
events = []
|
||
|
|
|
||
|
|
for i in range(count):
|
||
|
|
events.append({
|
||
|
|
"timestamp": i * 100,
|
||
|
|
"key_code": 65 + (i % 26),
|
||
|
|
"event_type": "press"
|
||
|
|
})
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def test_build_behavioral_profile(self, ml_client):
|
||
|
|
"""Test building behavioral profile from events."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
events = self._create_keystroke_events(count=200)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": "profile_test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/profile/build", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
|
||
|
|
assert "user_id" in data
|
||
|
|
assert "avg_typing_speed" in data
|
||
|
|
assert "peak_hours" in data
|
||
|
|
assert "common_words" in data
|
||
|
|
assert "consistency_score" in data
|
||
|
|
assert "patterns" in data
|
||
|
|
|
||
|
|
assert data["user_id"] == "profile_test_user"
|
||
|
|
assert data["consistency_score"] >= 0
|
||
|
|
assert data["consistency_score"] <= 1
|
||
|
|
|
||
|
|
def test_profile_empty_events(self, ml_client):
|
||
|
|
"""Test profile building with empty events."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": [],
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/profile/build", json=payload)
|
||
|
|
assert response.status_code == 400
|
||
|
|
|
||
|
|
|
||
|
|
class TestAuthenticityCheck:
|
||
|
|
"""Tests for user authenticity verification."""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _create_keystroke_events(count: int = 100, wpm: float = 50) -> List[Dict]:
|
||
|
|
"""Create mock keystroke events."""
|
||
|
|
events = []
|
||
|
|
interval = int((60000 / (wpm * 5)))
|
||
|
|
|
||
|
|
for i in range(count):
|
||
|
|
events.append({
|
||
|
|
"timestamp": i * interval,
|
||
|
|
"key_code": 65 + (i % 26),
|
||
|
|
"event_type": "press"
|
||
|
|
})
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def test_authenticity_check_unknown_user(self, ml_client):
|
||
|
|
"""Test authenticity check for unknown user."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
events = self._create_keystroke_events(count=100)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": "unknown_user_123"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/authenticity/check", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert "authenticity_score" in data
|
||
|
|
assert "verdict" in data
|
||
|
|
assert data["verdict"] == "unknown"
|
||
|
|
|
||
|
|
def test_authenticity_check_established_user(self, ml_client):
|
||
|
|
"""Test authenticity check for user with established profile."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
user_id = "established_user_test"
|
||
|
|
events = self._create_keystroke_events(count=100, wpm=50)
|
||
|
|
|
||
|
|
build_payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": user_id
|
||
|
|
}
|
||
|
|
|
||
|
|
build_response = ml_client.post("/profile/build", json=build_payload)
|
||
|
|
|
||
|
|
if build_response.status_code == 200:
|
||
|
|
check_payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": user_id
|
||
|
|
}
|
||
|
|
|
||
|
|
check_response = ml_client.post("/authenticity/check", json=check_payload)
|
||
|
|
|
||
|
|
if check_response.status_code == 200:
|
||
|
|
data = check_response.json()
|
||
|
|
assert "authenticity_score" in data
|
||
|
|
assert "verdict" in data
|
||
|
|
|
||
|
|
|
||
|
|
class TestTemporalAnalysis:
|
||
|
|
"""Tests for temporal pattern analysis."""
|
||
|
|
|
||
|
|
def test_temporal_analysis_default_range(self, ml_client):
|
||
|
|
"""Test temporal analysis with default date range."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
payload = {"date_range_days": 7}
|
||
|
|
|
||
|
|
response = ml_client.post("/temporal/analyze", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert "trend" in data
|
||
|
|
assert "date_range_days" in data or "error" in data
|
||
|
|
if "date_range_days" in data:
|
||
|
|
assert data["date_range_days"] == 7
|
||
|
|
|
||
|
|
def test_temporal_analysis_custom_range(self, ml_client):
|
||
|
|
"""Test temporal analysis with custom date range."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
payload = {"date_range_days": 30}
|
||
|
|
|
||
|
|
response = ml_client.post("/temporal/analyze", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert "date_range_days" in data or "error" in data
|
||
|
|
if "date_range_days" in data:
|
||
|
|
assert data["date_range_days"] == 30
|
||
|
|
|
||
|
|
|
||
|
|
class TestModelTraining:
|
||
|
|
"""Tests for ML model training."""
|
||
|
|
|
||
|
|
def test_train_model_default(self, ml_client):
|
||
|
|
"""Test training ML model with default parameters."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
response = ml_client.post("/model/train")
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert data["status"] == "trained"
|
||
|
|
assert "samples" in data
|
||
|
|
assert "features" in data
|
||
|
|
assert "accuracy" in data
|
||
|
|
|
||
|
|
def test_train_model_custom_size(self, ml_client):
|
||
|
|
"""Test training ML model with custom sample size."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
response = ml_client.post("/model/train?sample_size=500")
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert data["samples"] == 500
|
||
|
|
|
||
|
|
|
||
|
|
class TestBehaviorPrediction:
|
||
|
|
"""Tests for behavior prediction."""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _create_keystroke_events(count: int = 100) -> List[Dict]:
|
||
|
|
"""Create mock keystroke events."""
|
||
|
|
events = []
|
||
|
|
|
||
|
|
for i in range(count):
|
||
|
|
events.append({
|
||
|
|
"timestamp": i * 100,
|
||
|
|
"key_code": 65 + (i % 26),
|
||
|
|
"event_type": "press"
|
||
|
|
})
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def test_predict_behavior_untrained_model(self, ml_client):
|
||
|
|
"""Test behavior prediction with untrained model."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
events = self._create_keystroke_events(count=100)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
response = ml_client.post("/behavior/predict", json=payload)
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert "behavior_category" in data or "status" in data
|
||
|
|
|
||
|
|
def test_predict_behavior_after_training(self, ml_client):
|
||
|
|
"""Test behavior prediction after model training."""
|
||
|
|
if not ml_client:
|
||
|
|
pytest.skip("ML client not available")
|
||
|
|
|
||
|
|
train_response = ml_client.post("/model/train?sample_size=100")
|
||
|
|
|
||
|
|
if train_response.status_code == 200:
|
||
|
|
events = self._create_keystroke_events(count=100)
|
||
|
|
|
||
|
|
payload = {
|
||
|
|
"events": events,
|
||
|
|
"user_id": "test_user"
|
||
|
|
}
|
||
|
|
|
||
|
|
predict_response = ml_client.post("/behavior/predict", json=payload)
|
||
|
|
|
||
|
|
if predict_response.status_code == 200:
|
||
|
|
data = predict_response.json()
|
||
|
|
assert "behavior_category" in data
|
||
|
|
assert "confidence" in data
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
def ml_client():
|
||
|
|
"""Create ML service test client."""
|
||
|
|
from fastapi.testclient import TestClient
|
||
|
|
try:
|
||
|
|
from ml_service import app
|
||
|
|
return TestClient(app)
|
||
|
|
except:
|
||
|
|
return None
|