import asyncio
import json
import logging
import sqlite3
from typing import Dict, Any, Optional, List, Tuple
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from pathlib import Path
# --- Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
DB_FILE = Path("tycoon.db")
# --- Database Management ---
def init_db():
"""Initializes the database and creates tables if they don't exist."""
try:
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS players (
nickname TEXT PRIMARY KEY,
money INTEGER NOT NULL,
population INTEGER NOT NULL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS buildings (
key TEXT PRIMARY KEY,
owner_nickname TEXT NOT NULL,
type TEXT NOT NULL,
cost INTEGER NOT NULL,
FOREIGN KEY (owner_nickname) REFERENCES players (nickname)
)
""")
conn.commit()
conn.close()
logger.info("Database initialized successfully.")
except sqlite3.Error as e:
logger.error(f"Database error on init: {e}")
def db_execute(query: str, params: tuple = ()):
"""Executes a write query (INSERT, UPDATE, DELETE)."""
try:
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.cursor()
cursor.execute(query, params)
conn.commit()
except sqlite3.Error as e:
logger.error(f"DB execute error: {e} with query: {query}")
def db_fetchone(query: str, params: tuple = ()) -> Optional[Any]:
"""Fetches a single row from the database."""
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchone()
except sqlite3.Error as e:
logger.error(f"DB fetchone error: {e} with query: {query}")
return None
def db_fetchall(query: str, params: tuple = ()) -> list:
"""Fetches all rows from the database."""
try:
with sqlite3.connect(DB_FILE) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
except sqlite3.Error as e:
logger.error(f"DB fetchall error: {e} with query: {query}")
return []
# --- Connection Manager ---
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, nickname: str):
await websocket.accept()
self.active_connections[nickname] = websocket
logger.info(f"Player connected: {nickname}")
def disconnect(self, nickname: str):
if nickname in self.active_connections:
del self.active_connections[nickname]
logger.info(f"Player disconnected: {nickname}")
async def broadcast(self, message: str):
disconnected_players = []
for nickname, connection in self.active_connections.items():
try:
await connection.send_text(message)
except Exception:
disconnected_players.append(nickname)
for nickname in disconnected_players:
self.disconnect(nickname)
manager = ConnectionManager()
# --- Game State & Data ---
game_state: Dict[str, Any] = {
"players": {},
"buildings": {}
}
building_data = {
'residential': { 'cost': 100, 'population': 10 },
'commercial': { 'cost': 250, 'income': 5 },
'industrial': { 'cost': 500, 'income': 20 },
'park': { 'cost': 80, 'population_bonus': 5 },
'powerplant': { 'cost': 1000, 'income': 50 },
'road': { 'cost': 20 }
}
# --- Game Logic ---
def get_neighbors(key: str) -> List[str]:
"""Gets the keys of the four adjacent grid cells."""
x_str, z_str = key.split('_')
x, z = int(x_str), int(z_str)
grid_cell_size = 2 # Must match client
return [
f"{x}_{z + grid_cell_size}",
f"{x}_{z - grid_cell_size}",
f"{x + grid_cell_size}_{z}",
f"{x - grid_cell_size}_{z}",
]
async def game_loop():
"""Periodically updates game state."""
while True:
await asyncio.sleep(2) # Update interval
active_nicknames = list(game_state["players"].keys())
for nickname in active_nicknames:
player = game_state["players"][nickname]
income = 0
population = 0
player_buildings = {k: v for k, v in game_state["buildings"].items() if v["owner"] == nickname}
# Calculate base income and population
for building in player_buildings.values():
b_type = building["type"]
if b_type == 'residential':
population += building_data['residential']['population']
elif b_type == 'commercial':
income += building_data['commercial']['income']
elif b_type == 'industrial':
income += building_data['industrial']['income']
elif b_type == 'powerplant':
income += building_data['powerplant']['income']
# Calculate park adjacency bonuses
parks = {k: v for k, v in player_buildings.items() if v["type"] == 'park'}
for park_key in parks:
for neighbor_key in get_neighbors(park_key):
neighbor = player_buildings.get(neighbor_key)
if neighbor and neighbor["type"] == 'residential':
population += building_data['park']['population_bonus']
player["money"] += income
player["population"] = population
db_execute("UPDATE players SET money = ?, population = ? WHERE nickname = ?", (player["money"], player["population"], nickname))
if game_state["players"]:
await manager.broadcast(json.dumps({
"type": "players_update", "players": game_state["players"]
}))
@app.on_event("startup")
async def on_startup():
init_db()
all_db_buildings = db_fetchall("SELECT key, owner_nickname, type, cost FROM buildings")
for b in all_db_buildings:
game_state["buildings"][b["key"]] = { "owner": b["owner_nickname"], "type": b["type"], "cost": b["cost"] }
logger.info(f"Loaded {len(game_state['buildings'])} buildings from database.")
asyncio.create_task(game_loop())
# --- WebSocket Endpoint ---
@app.websocket("/ws/{nickname}")
async def websocket_endpoint(websocket: WebSocket, nickname: str):
await manager.connect(websocket, nickname)
player_data = db_fetchone("SELECT money, population FROM players WHERE nickname = ?", (nickname,))
if player_data:
game_state["players"][nickname] = { "money": player_data["money"], "population": player_data["population"] }
else:
initial_money = 1500
initial_pop = 0
db_execute("INSERT INTO players (nickname, money, population) VALUES (?, ?, ?)", (nickname, initial_money, initial_pop))
game_state["players"][nickname] = {"money": initial_money, "population": initial_pop}
await websocket.send_text(json.dumps({ "type": "full_state", "buildings": game_state["buildings"] }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has joined the game!" }))
await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] }))
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
player = game_state["players"].get(nickname)
if not player: continue
action = message.get("action")
pos = message.get("position")
key = f"{pos['x']}_{pos['z']}"
if action == "build":
build_type = message.get("type")
if build_type not in building_data: continue
cost = building_data[build_type].get('cost', 0)
if player["money"] >= cost and key not in game_state["buildings"]:
player["money"] -= cost
db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname))
new_building = {"owner": nickname, "type": build_type, "cost": cost}
game_state["buildings"][key] = new_building
db_execute("INSERT INTO buildings (key, owner_nickname, type, cost) VALUES (?, ?, ?, ?)", (key, nickname, build_type, cost))
await manager.broadcast(json.dumps({ "type": "build_update", "key": key, "building": new_building, "position": pos }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' built a new {build_type}." }))
elif action == "remove":
if key in game_state["buildings"] and game_state["buildings"][key]["owner"] == nickname:
removed_building = game_state["buildings"].pop(key)
player["money"] += removed_building["cost"] * 0.5
db_execute("UPDATE players SET money = ? WHERE nickname = ?", (player["money"], nickname))
db_execute("DELETE FROM buildings WHERE key = ?", (key,))
await manager.broadcast(json.dumps({"type": "remove_update", "key": key, "position": pos}))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' removed a building." }))
except WebSocketDisconnect:
manager.disconnect(nickname)
if nickname in game_state["players"]:
del game_state["players"][nickname]
await manager.broadcast(json.dumps({ "type": "players_update", "players": game_state["players"] }))
await manager.broadcast(json.dumps({ "type": "status_update", "message": f"'{nickname}' has left the game." }))
@app.get("/")
async def root():
return FileResponse("index.html")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8588)