|
"""
|
|
Multiplayer interaction tests - validates chat system, cursor movement,
|
|
building synchronization between multiple clients, and player connections
|
|
"""
|
|
import pytest
|
|
import asyncio
|
|
from tests.test_client import TestWebSocketClient, test_clients
|
|
|
|
|
|
class TestMultiplayerInteractions:
|
|
"""Test multiplayer functionality between multiple clients"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_client_connections(self):
|
|
"""Test that multiple clients can connect simultaneously"""
|
|
async with test_clients("player1", "player2", "player3") as clients:
|
|
assert len(clients) == 3
|
|
|
|
# All clients should be connected
|
|
for client in clients:
|
|
assert client.connected == True
|
|
assert client.player_data is not None
|
|
assert client.get_money() == 100000 # Starting money
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_player_join_notifications(self):
|
|
"""Test that clients receive notifications when players join"""
|
|
async with test_clients("first_player") as [client1]:
|
|
client1.clear_messages()
|
|
|
|
# Connect second player
|
|
async with test_clients("second_player") as [client2]:
|
|
# First client should receive join notification
|
|
message = await client1.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "player_joined"
|
|
assert message["nickname"] == "second_player"
|
|
assert "player_id" in message
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_player_leave_notifications(self):
|
|
"""Test that clients receive notifications when players leave"""
|
|
async with test_clients("observer") as [observer]:
|
|
observer.clear_messages()
|
|
|
|
# Connect and disconnect another player
|
|
async with test_clients("temp_player") as [temp_client]:
|
|
# Clear join message
|
|
await observer.receive_message(timeout=1.0)
|
|
|
|
# Observer should receive leave notification
|
|
message = await observer.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "player_left"
|
|
assert message["nickname"] == "temp_player"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_message_broadcasting(self):
|
|
"""Test that chat messages are broadcast to all clients"""
|
|
async with test_clients("sender", "receiver1", "receiver2") as clients:
|
|
sender, receiver1, receiver2 = clients
|
|
|
|
# Clear initial messages
|
|
for client in clients:
|
|
client.clear_messages()
|
|
|
|
# Send chat message
|
|
test_message = "Hello everyone!"
|
|
await sender.send_chat(test_message)
|
|
|
|
# Both receivers should get the chat message
|
|
for receiver in [receiver1, receiver2]:
|
|
message = await receiver.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "chat"
|
|
assert message["message"] == test_message
|
|
assert message["nickname"] == "sender"
|
|
assert "timestamp" in message
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_message_not_echoed_to_sender(self):
|
|
"""Test that chat messages are not echoed back to sender"""
|
|
async with test_clients("sender", "receiver") as [sender, receiver]:
|
|
# Clear initial messages
|
|
sender.clear_messages()
|
|
receiver.clear_messages()
|
|
|
|
# Send chat message
|
|
await sender.send_chat("Test message")
|
|
|
|
# Receiver should get message
|
|
receiver_msg = await receiver.receive_message(timeout=2.0)
|
|
assert receiver_msg["type"] == "chat"
|
|
|
|
# Sender should not receive their own message back
|
|
sender_msg = await sender.receive_message(timeout=1.0)
|
|
assert sender_msg is None or sender_msg["type"] != "chat"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cursor_movement_synchronization(self):
|
|
"""Test that cursor movements are synchronized between clients"""
|
|
async with test_clients("mover", "observer") as [mover, observer]:
|
|
# Clear initial messages
|
|
observer.clear_messages()
|
|
|
|
# Send cursor movement
|
|
test_x, test_y = 10, 15
|
|
await mover.send_cursor_move(test_x, test_y)
|
|
|
|
# Observer should receive cursor movement
|
|
message = await observer.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "cursor_move"
|
|
assert message["x"] == test_x
|
|
assert message["y"] == test_y
|
|
assert "player_id" in message
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cursor_movement_not_echoed_to_sender(self):
|
|
"""Test that cursor movements are not echoed back to sender"""
|
|
async with test_clients("mover", "observer") as [mover, observer]:
|
|
# Clear initial messages
|
|
mover.clear_messages()
|
|
observer.clear_messages()
|
|
|
|
# Send cursor movement
|
|
await mover.send_cursor_move(5, 7)
|
|
|
|
# Observer should get movement
|
|
observer_msg = await observer.receive_message(timeout=2.0)
|
|
assert observer_msg["type"] == "cursor_move"
|
|
|
|
# Mover should not receive their own cursor movement back
|
|
mover_msg = await mover.receive_message(timeout=1.0)
|
|
assert mover_msg is None or mover_msg["type"] != "cursor_move"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_building_placement_synchronization(self):
|
|
"""Test that building placements are synchronized to all clients"""
|
|
async with test_clients("builder", "observer1", "observer2") as clients:
|
|
builder, observer1, observer2 = clients
|
|
|
|
# Clear initial messages
|
|
for client in clients:
|
|
client.clear_messages()
|
|
|
|
# Place a building
|
|
await builder.place_building("small_house", 5, 5)
|
|
|
|
# All clients should receive building placement notification
|
|
for observer in [observer1, observer2]:
|
|
message = await observer.receive_message(timeout=3.0)
|
|
assert message is not None
|
|
assert message["type"] == "building_placed"
|
|
assert message["building"]["type"] == "small_house"
|
|
assert message["building"]["x"] == 5
|
|
assert message["building"]["y"] == 5
|
|
assert message["building"]["owner_id"] == builder.player_data["player_id"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_building_removal_synchronization(self):
|
|
"""Test that building removals are synchronized to all clients"""
|
|
async with test_clients("builder", "observer") as [builder, observer]:
|
|
# Place building first
|
|
await builder.place_building("small_house", 3, 3)
|
|
|
|
# Wait for placement to complete
|
|
placement_msg = await observer.receive_message(timeout=2.0)
|
|
assert placement_msg["type"] == "building_placed"
|
|
|
|
# Clear messages
|
|
observer.clear_messages()
|
|
|
|
# Remove the building
|
|
await builder.remove_building(3, 3)
|
|
|
|
# Observer should receive removal notification
|
|
message = await observer.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "building_removed"
|
|
assert message["x"] == 3
|
|
assert message["y"] == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_building_edit_synchronization(self):
|
|
"""Test that building name edits are synchronized to all clients"""
|
|
async with test_clients("builder", "observer") as [builder, observer]:
|
|
# Place building first
|
|
await builder.place_building("small_house", 7, 7)
|
|
|
|
# Wait for placement
|
|
placement_msg = await observer.receive_message(timeout=2.0)
|
|
assert placement_msg["type"] == "building_placed"
|
|
|
|
# Clear messages
|
|
observer.clear_messages()
|
|
|
|
# Edit building name
|
|
new_name = "My House"
|
|
await builder.edit_building(7, 7, new_name)
|
|
|
|
# Observer should receive edit notification
|
|
message = await observer.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "building_updated"
|
|
assert message["x"] == 7
|
|
assert message["y"] == 7
|
|
assert message["name"] == new_name
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_building_ownership_permissions(self):
|
|
"""Test that only building owners can edit/delete buildings"""
|
|
async with test_clients("owner", "other_player") as [owner, other_player]:
|
|
# Owner places building
|
|
await owner.place_building("small_house", 10, 10)
|
|
|
|
# Wait for placement
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Other player tries to remove owner's building
|
|
await other_player.remove_building(10, 10)
|
|
|
|
# Should receive error message
|
|
message = await other_player.receive_message(timeout=2.0)
|
|
assert message is not None
|
|
assert message["type"] == "error"
|
|
assert "don't own" in message["message"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_simultaneous_actions(self):
|
|
"""Test handling multiple simultaneous client actions"""
|
|
async with test_clients("player1", "player2", "player3") as clients:
|
|
# All players perform actions simultaneously
|
|
tasks = []
|
|
|
|
# Player 1 places buildings
|
|
tasks.append(clients[0].place_building("small_house", 0, 0))
|
|
tasks.append(clients[0].place_building("road", 1, 0))
|
|
|
|
# Player 2 sends chat and moves cursor
|
|
tasks.append(clients[1].send_chat("Building my city!"))
|
|
tasks.append(clients[1].send_cursor_move(15, 20))
|
|
|
|
# Player 3 places building
|
|
tasks.append(clients[2].place_building("small_shop", 5, 5))
|
|
|
|
# Execute all actions simultaneously
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Give time for all messages to propagate
|
|
await asyncio.sleep(1.0)
|
|
|
|
# Collect all messages from all clients
|
|
all_messages = []
|
|
for client in clients:
|
|
messages = await client.receive_messages_for(2.0)
|
|
all_messages.extend(messages)
|
|
|
|
# Should have received various message types
|
|
message_types = [msg["type"] for msg in all_messages]
|
|
assert "building_placed" in message_types
|
|
assert "chat" in message_types
|
|
assert "cursor_move" in message_types
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_player_color_uniqueness(self):
|
|
"""Test that each player gets a unique color"""
|
|
async with test_clients("red", "green", "blue") as clients:
|
|
colors = []
|
|
for client in clients:
|
|
color = client.player_data["color"]
|
|
assert color.startswith("#") # Hex color format
|
|
assert len(color) == 7 # #RRGGBB format
|
|
colors.append(color)
|
|
|
|
# All colors should be different (very likely with random generation)
|
|
assert len(set(colors)) == len(colors)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reconnection_with_same_nickname(self):
|
|
"""Test that reconnecting with same nickname restores player data"""
|
|
# Connect first time
|
|
async with test_clients("reconnect_test") as [client1]:
|
|
original_player_id = client1.player_data["player_id"]
|
|
original_color = client1.player_data["color"]
|
|
|
|
# Place a building to modify game state
|
|
await client1.place_building("small_house", 0, 0)
|
|
await asyncio.sleep(0.5) # Wait for placement
|
|
|
|
# Reconnect with same nickname
|
|
async with test_clients("reconnect_test") as [client2]:
|
|
# Should have same player ID and color
|
|
assert client2.player_data["player_id"] == original_player_id
|
|
assert client2.player_data["color"] == original_color
|
|
|
|
# Money should be reduced by building cost
|
|
assert client2.get_money() == 100000 - 5000 # Started with 100k, spent 5k on house
|
|
|
|
|
|
class TestMultiplayerStressTests:
|
|
"""Stress tests for multiplayer functionality"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rapid_chat_messages(self):
|
|
"""Test handling of rapid chat message bursts"""
|
|
async with test_clients("chatter", "listener") as [chatter, listener]:
|
|
listener.clear_messages()
|
|
|
|
# Send many messages quickly
|
|
messages = [f"Message {i}" for i in range(10)]
|
|
tasks = [chatter.send_chat(msg) for msg in messages]
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Collect received messages
|
|
received = await listener.receive_messages_for(3.0)
|
|
chat_messages = [msg for msg in received if msg["type"] == "chat"]
|
|
|
|
# Should receive all messages (though order might vary)
|
|
assert len(chat_messages) >= 8 # Allow for some timing issues
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rapid_cursor_movements(self):
|
|
"""Test handling of rapid cursor movement updates"""
|
|
async with test_clients("mover", "observer") as [mover, observer]:
|
|
observer.clear_messages()
|
|
|
|
# Send many cursor movements quickly
|
|
positions = [(i, i*2) for i in range(20)]
|
|
tasks = [mover.send_cursor_move(x, y) for x, y in positions]
|
|
await asyncio.gather(*tasks)
|
|
|
|
# Collect received movements
|
|
received = await observer.receive_messages_for(2.0)
|
|
cursor_messages = [msg for msg in received if msg["type"] == "cursor_move"]
|
|
|
|
# Should receive cursor updates (may be throttled by server)
|
|
assert len(cursor_messages) > 0
|
|
|
|
# Last position should be among the received messages
|
|
last_positions = [(msg["x"], msg["y"]) for msg in cursor_messages[-3:]]
|
|
assert (19, 38) in last_positions # Last position from our list |