import asyncio import json import logging import sqlite3 from typing import Dict, Any, Optional, List, Tuple from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse from pathlib import Path # --- Setup --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() DB_FILE = Path("tycoon.db") # --- Database Management --- def init_db(): """Initializes the database and creates/alters tables if they don't exist.""" try: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS players ( nickname TEXT PRIMARY KEY, money INTEGER NOT NULL, population INTEGER NOT NULL, happiness REAL NOT NULL DEFAULT 0.5 ) """) # Add happiness column if it doesn't exist for migrations try: cursor.execute("ALTER TABLE players ADD COLUMN happiness REAL NOT NULL DEFAULT 0.5") logger.info("Added 'happiness' column to players table.") except sqlite3.OperationalError: pass # Column already exists cursor.execute(""" CREATE TABLE IF NOT EXISTS buildings ( key TEXT PRIMARY KEY, owner_nickname TEXT NOT NULL, type TEXT NOT NULL, cost INTEGER NOT NULL, FOREIGN KEY (owner_nickname) REFERENCES players (nickname) ) """) conn.commit() conn.close() logger.info("Database initialized successfully.") except sqlite3.Error as e: logger.error(f"Database error on init: {e}") def db_execute(query: str, params: tuple = ()): """Executes a write query (INSERT, UPDATE, DELETE).""" try: with sqlite3.connect(DB_FILE) as conn: cursor = conn.cursor() cursor.execute(query, params) conn.commit() except sqlite3.Error as e: logger.error(f"DB execute error: {e} with query: {query}") def db_fetchone(query: str, params: tuple = ()) -> Optional[Any]: """Fetches a single row from the database.""" try: with sqlite3.connect(DB_FILE) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) return cursor.fetchone() except sqlite3.Error as e: logger.error(f"DB fetchone error: {e} with query: {query}") return None def db_fetchall(query: str, params: tuple = ()) -> list: """Fetches all rows from the database.""" try: with sqlite3.connect(DB_FILE) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(query, params) return cursor.fetchall() except sqlite3.Error as e: logger.error(f"DB fetchall error: {e} with query: {query}") return [] # --- Connection Manager --- class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} async def connect(self, websocket: WebSocket, nickname: str): await websocket.accept() self.active_connections[nickname] = websocket logger.info(f"Player connected: {nickname}") def disconnect(self, nickname: str): if nickname in self.active_connections: del self.active_connections[nickname] logger.info(f"Player disconnected: {nickname}") async def broadcast(self, message: str): disconnected_players = [] for nickname, connection in self.active_connections.items(): try: await connection.send_text(message) except Exception: disconnected_players.append(nickname) for nickname in disconnected_players: self.disconnect(nickname) manager = ConnectionManager() # --- Game State & Data --- game_state: Dict[str, Any] = { "players": {}, "buildings": {} } building_data = { 'residential': { 'cost': 100, 'population': 10 }, 'commercial': { 'cost': 250, 'income': 5 }, 'industrial': { 'cost': 500, 'income': 20, 'happiness_impact': -0.02 }, 'park': { 'cost': 80, 'population_bonus': 5, 'happiness_impact': 0.01 }, 'powerplant': { 'cost': 1000, 'income': 50 }, 'road': { 'cost': 20 }, 'police': { 'cost': 600, 'happiness_bonus': 0.1 }, 'stadium': {'cost': 5000, 'income': 150, 'happiness_impact': 0.05 } } # --- Game Logic --- def get_neighbors(key: str) -> List[str]: """Gets the keys of the four adjacent grid cells.""" x_str, z_str = key.split('_') x, z = int(x_str), int(z_str) grid_cell_size = 2 # Must match client return [ f"{x}_{z + grid_cell_size}", f"{x}_{z - grid_cell_size}", f"{x + grid_cell_size}_{z}", f"{x - grid_cell_size}_{z}", ] async def game_loop(): """Periodically updates game state.""" while True: await asyncio.sleep(2) # Update interval active_nicknames = list(game_state["players"].keys()) for nickname in active_nicknames: player = game_state["players"][nickname] income = 0 population = 0 base_happiness = 0.5 # Start with a neutral base happiness player_buildings = {k: v for k, v in game_state["buildings"].items() if v["owner"] == nickname} # Calculate base income, population, and happiness impacts for building in player_buildings.values(): b_type = building["type"] b_data = building_data.get(b_type, {}) if b_type == 'residential': population += b_data.get('population', 0) elif 'income' in b_data: if b_type == 'commercial': # Commercial income is modified by happiness happiness_multiplier = max(0.1, player.get('happiness', 0.5)) income += b_data['income'] * (1 + happiness_multiplier) else: income += b_data.get('income', 0) if 'happiness_impact' in b_data: base_happiness += b_data['happiness_impact'] # Calculate adjacency bonuses parks = {k: v for k, v in player_buildings.items() if v["type"] == 'park'} police_stations = {k: v for k, v in player_buildings.items() if v["type"] == 'police'} for park_key in parks: for neighbor_key in get_neighbors(park_key): neighbor = player_buildings.get(neighbor_key) if neighbor and neighbor["type"] == 'residential': population += building_data['park']['population_bonus'] for police_key in police_stations: for neighbor_key in get_neighbors(police_key): neighbor = player_buildings.get(neighbor_key) if neighbor and neighbor["type"] == 'residential': base_happiness += building_data['police']['happiness_bonus'] / 4 # Distribute bonus over 4 neighbors # Finalize and clamp values final_happiness = max(0, min(1, base_happiness)) player["money"] += income player["population"] = population player["happiness"] = final_happiness db_execute("UPDATE players SET money = ?, population = ?, happiness = ? WHERE nickname = ?", (player["money"], player["population"], player["happiness"], nickname)) if game_state["players"]: await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] })) @app.on_event("startup") async def on_startup(): init_db() all_db_buildings = db_fetchall("SELECT key, owner_nickname, type, cost FROM buildings") for b in all_db_buildings: game_state["buildings"][b["key"]] = { "owner": b["owner_nickname"], "type": b["type"], "cost": b["cost"] } logger.info(f"Loaded {len(game_state['buildings'])} buildings from database.") asyncio.create_task(game_loop()) # --- WebSocket Endpoint --- @app.websocket("/ws/{nickname}") async def websocket_endpoint(websocket: WebSocket, nickname: str): await manager.connect(websocket, nickname) player_data = db_fetchone("SELECT money, population, happiness FROM players WHERE nickname = ?", (nickname,)) if player_data: game_state["players"][nickname] = { "money": player_data["money"], "population": player_data["population"], "happiness": player_data["happiness"] } else: initial_money = 1500 initial_pop = 0 initial_happiness = 0.5 db_execute("INSERT INTO players (nickname, money, population, happiness) VALUES (?, ?, ?, ?)", (nickname, initial_money, initial_pop, initial_happiness)) game_state["players"][nickname] = {"money": initial_money, "population": initial_pop, "happiness": initial_happiness} await websocket.send_text(json.dumps({ "type": "full_state", "buildings": game_state["buildings"] })) await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has joined the game!" })) await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] })) try: while True: data = await websocket.receive_text() message = json.loads(data) player = game_state["players"].get(nickname) if not player: continue action = message.get("action") pos = message.get("position") key = f"{pos['x']}_{pos['z']}" if action == "build": build_type = message.get("type") if build_type not in building_data: continue cost = building_data[build_type].get('cost', 0) if player["money"] >= cost and key not in game_state["buildings"]: player["money"] -= cost db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname)) new_building = {"owner": nickname, "type": build_type, "cost": cost} game_state["buildings"][key] = new_building db_execute("INSERT INTO buildings (key, owner_nickname, type, cost) VALUES (?, ?, ?, ?)", (key, nickname, build_type, cost)) await manager.broadcast(json.dumps({ "type": "build_update", "key": key, "building": new_building, "position": pos })) await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' built a new {build_type}." })) elif action == "remove": if key in game_state["buildings"] and game_state["buildings"][key]["owner"] == nickname: removed_building = game_state["buildings"].pop(key) player["money"] += removed_building["cost"] * 0.5 db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname)) db_execute("DELETE FROM buildings WHERE key = ?", (key,)) await manager.broadcast(json.dumps({"type": "remove_update", "key": key, "position": pos})) await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' removed a building." })) except WebSocketDisconnect: manager.disconnect(nickname) if nickname in game_state["players"]: del game_state["players"][nickname] await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] })) await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has left the game." })) @app.get("/") async def root(): return FileResponse("index.html") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8588)