281 lines
12 KiB
Python
Raw Normal View History

2025-10-01 20:15:47 +02:00
import asyncio
import json
import logging
import sqlite3
from typing import Dict, Any, Optional, List, Tuple
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from pathlib import Path
# --- Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
DB_FILE = Path("tycoon.db")
# --- Database Management ---
def init_db():
2025-10-01 20:29:32 +02:00
"""Initializes the database and creates/alters tables if they don't exist."""
2025-10-01 20:15:47 +02:00
try:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS players (
nickname TEXT PRIMARY KEY,
money INTEGER NOT NULL,
2025-10-01 20:29:32 +02:00
population INTEGER NOT NULL,
happiness REAL NOT NULL DEFAULT 0.5
2025-10-01 20:15:47 +02:00
)
""")
2025-10-01 20:29:32 +02:00
# Add happiness column if it doesn't exist for migrations
try:
cursor.execute("ALTER TABLE players ADD COLUMN happiness REAL NOT NULL DEFAULT 0.5")
logger.info("Added 'happiness' column to players table.")
except sqlite3.OperationalError:
pass # Column already exists
2025-10-01 20:15:47 +02:00
cursor.execute("""
CREATE TABLE IF NOT EXISTS buildings (
key TEXT PRIMARY KEY,
owner_nickname TEXT NOT NULL,
type TEXT NOT NULL,
cost INTEGER NOT NULL,
FOREIGN KEY (owner_nickname) REFERENCES players (nickname)
)
""")
conn.commit()
conn.close()
logger.info("Database initialized successfully.")
except sqlite3.Error as e:
logger.error(f"Database error on init: {e}")
def db_execute(query: str, params: tuple = ()):
"""Executes a write query (INSERT, UPDATE, DELETE)."""
try:
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
except sqlite3.Error as e:
logger.error(f"DB execute error: {e} with query: {query}")
def db_fetchone(query: str, params: tuple = ()) -> Optional[Any]:
"""Fetches a single row from the database."""
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()
except sqlite3.Error as e:
logger.error(f"DB fetchone error: {e} with query: {query}")
return None
def db_fetchall(query: str, params: tuple = ()) -> list:
"""Fetches all rows from the database."""
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
logger.error(f"DB fetchall error: {e} with query: {query}")
return []
# --- Connection Manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, nickname: str):
await websocket.accept()
self.active_connections[nickname] = websocket
logger.info(f"Player connected: {nickname}")
def disconnect(self, nickname: str):
if nickname in self.active_connections:
del self.active_connections[nickname]
logger.info(f"Player disconnected: {nickname}")
async def broadcast(self, message: str):
disconnected_players = []
for nickname, connection in self.active_connections.items():
try:
await connection.send_text(message)
except Exception:
disconnected_players.append(nickname)
for nickname in disconnected_players:
self.disconnect(nickname)
manager = ConnectionManager()
# --- Game State & Data ---
game_state: Dict[str, Any] = {
"players": {},
"buildings": {}
}
building_data = {
'residential': { 'cost': 100, 'population': 10 },
'commercial': { 'cost': 250, 'income': 5 },
2025-10-01 20:29:32 +02:00
'industrial': { 'cost': 500, 'income': 20, 'happiness_impact': -0.02 },
'park': { 'cost': 80, 'population_bonus': 5, 'happiness_impact': 0.01 },
2025-10-01 20:15:47 +02:00
'powerplant': { 'cost': 1000, 'income': 50 },
2025-10-01 20:29:32 +02:00
'road': { 'cost': 20 },
'police': { 'cost': 600, 'happiness_bonus': 0.1 },
'stadium': {'cost': 5000, 'income': 150, 'happiness_impact': 0.05 }
2025-10-01 20:15:47 +02:00
}
# --- Game Logic ---
def get_neighbors(key: str) -> List[str]:
"""Gets the keys of the four adjacent grid cells."""
x_str, z_str = key.split('_')
x, z = int(x_str), int(z_str)
grid_cell_size = 2 # Must match client
return [
f"{x}_{z + grid_cell_size}",
f"{x}_{z - grid_cell_size}",
f"{x + grid_cell_size}_{z}",
f"{x - grid_cell_size}_{z}",
]
async def game_loop():
"""Periodically updates game state."""
while True:
await asyncio.sleep(2) # Update interval
active_nicknames = list(game_state["players"].keys())
for nickname in active_nicknames:
player = game_state["players"][nickname]
income = 0
population = 0
2025-10-01 20:29:32 +02:00
base_happiness = 0.5 # Start with a neutral base happiness
2025-10-01 20:15:47 +02:00
player_buildings = {k: v for k, v in game_state["buildings"].items() if v["owner"] == nickname}
2025-10-01 20:29:32 +02:00
# Calculate base income, population, and happiness impacts
2025-10-01 20:15:47 +02:00
for building in player_buildings.values():
b_type = building["type"]
2025-10-01 20:29:32 +02:00
b_data = building_data.get(b_type, {})
2025-10-01 20:15:47 +02:00
if b_type == 'residential':
2025-10-01 20:29:32 +02:00
population += b_data.get('population', 0)
elif 'income' in b_data:
if b_type == 'commercial':
# Commercial income is modified by happiness
happiness_multiplier = max(0.1, player.get('happiness', 0.5))
income += b_data['income'] * (1 + happiness_multiplier)
else:
income += b_data.get('income', 0)
if 'happiness_impact' in b_data:
base_happiness += b_data['happiness_impact']
# Calculate adjacency bonuses
2025-10-01 20:15:47 +02:00
parks = {k: v for k, v in player_buildings.items() if v["type"] == 'park'}
2025-10-01 20:29:32 +02:00
police_stations = {k: v for k, v in player_buildings.items() if v["type"] == 'police'}
2025-10-01 20:15:47 +02:00
for park_key in parks:
for neighbor_key in get_neighbors(park_key):
neighbor = player_buildings.get(neighbor_key)
if neighbor and neighbor["type"] == 'residential':
population += building_data['park']['population_bonus']
2025-10-01 20:29:32 +02:00
for police_key in police_stations:
for neighbor_key in get_neighbors(police_key):
neighbor = player_buildings.get(neighbor_key)
if neighbor and neighbor["type"] == 'residential':
base_happiness += building_data['police']['happiness_bonus'] / 4 # Distribute bonus over 4 neighbors
# Finalize and clamp values
final_happiness = max(0, min(1, base_happiness))
2025-10-01 20:15:47 +02:00
player["money"] += income
player["population"] = population
2025-10-01 20:29:32 +02:00
player["happiness"] = final_happiness
2025-10-01 20:15:47 +02:00
2025-10-01 20:29:32 +02:00
db_execute("UPDATE players SET money = ?, population = ?, happiness = ? WHERE nickname = ?", (player["money"], player["population"], player["happiness"], nickname))
2025-10-01 20:15:47 +02:00
if game_state["players"]:
await manager.broadcast(json.dumps({
"type": "players_update", "players": game_state["players"]
}))
@app.on_event("startup")
async def on_startup():
init_db()
all_db_buildings = db_fetchall("SELECT key, owner_nickname, type, cost FROM buildings")
for b in all_db_buildings:
game_state["buildings"][b["key"]] = { "owner": b["owner_nickname"], "type": b["type"], "cost": b["cost"] }
logger.info(f"Loaded {len(game_state['buildings'])} buildings from database.")
asyncio.create_task(game_loop())
# --- WebSocket Endpoint ---
@app.websocket("/ws/{nickname}")
async def websocket_endpoint(websocket: WebSocket, nickname: str):
await manager.connect(websocket, nickname)
2025-10-01 20:29:32 +02:00
player_data = db_fetchone("SELECT money, population, happiness FROM players WHERE nickname = ?", (nickname,))
2025-10-01 20:15:47 +02:00
if player_data:
2025-10-01 20:29:32 +02:00
game_state["players"][nickname] = { "money": player_data["money"], "population": player_data["population"], "happiness": player_data["happiness"] }
2025-10-01 20:15:47 +02:00
else:
initial_money = 1500
initial_pop = 0
2025-10-01 20:29:32 +02:00
initial_happiness = 0.5
db_execute("INSERT INTO players (nickname, money, population, happiness) VALUES (?, ?, ?, ?)", (nickname, initial_money, initial_pop, initial_happiness))
game_state["players"][nickname] = {"money": initial_money, "population": initial_pop, "happiness": initial_happiness}
2025-10-01 20:15:47 +02:00
await websocket.send_text(json.dumps({ "type": "full_state", "buildings": game_state["buildings"] }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has joined the game!" }))
await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] }))
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
player = game_state["players"].get(nickname)
if not player: continue
action = message.get("action")
pos = message.get("position")
key = f"{pos['x']}_{pos['z']}"
if action == "build":
build_type = message.get("type")
if build_type not in building_data: continue
cost = building_data[build_type].get('cost', 0)
if player["money"] >= cost and key not in game_state["buildings"]:
player["money"] -= cost
db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname))
new_building = {"owner": nickname, "type": build_type, "cost": cost}
game_state["buildings"][key] = new_building
db_execute("INSERT INTO buildings (key, owner_nickname, type, cost) VALUES (?, ?, ?, ?)", (key, nickname, build_type, cost))
await manager.broadcast(json.dumps({ "type": "build_update", "key": key, "building": new_building, "position": pos }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' built a new {build_type}." }))
elif action == "remove":
if key in game_state["buildings"] and game_state["buildings"][key]["owner"] == nickname:
removed_building = game_state["buildings"].pop(key)
player["money"] += removed_building["cost"] * 0.5
db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname))
db_execute("DELETE FROM buildings WHERE key = ?", (key,))
await manager.broadcast(json.dumps({"type": "remove_update", "key": key, "position": pos}))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' removed a building." }))
except WebSocketDisconnect:
manager.disconnect(nickname)
if nickname in game_state["players"]:
del game_state["players"][nickname]
await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has left the game." }))
@app.get("/")
async def root():
return FileResponse("index.html")