commit de387d2d58cc4db6c1744b3e16f0df11ed595783 Author: retoor Date: Thu Sep 4 18:49:55 2025 +0200 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..c7d282c --- /dev/null +++ b/README.md @@ -0,0 +1,1013 @@ +# Container Management API Integration Guide + +## Overview + +This API provides comprehensive container management capabilities through REST endpoints, WebSocket connections, and file upload/download interfaces. All containers are managed via Docker Compose v2 CLI. + +## Quick Start + +### 1. Prerequisites + +```bash +# Required software +- Python 3.12+ +- Docker with Docker Compose v2 +- Linux environment + +# Python dependencies +pip install aiohttp aiofiles python-dotenv ruamel.yaml +``` + +### 2. Configuration + +Create a `.env` file: + +```env +APP_USER=admin +APP_PASS=your_secure_password_here +DEFAULT_USER_UID=1000:1000 +``` + +### 3. Run the Server + +```bash +python container_manager.py +# Server starts at http://0.0.0.0:8080 +``` + +## Authentication + +All API endpoints require Basic Authentication. + +```http +Authorization: Basic base64(username:password) +``` + +Example: +```python +import base64 + +username = "admin" +password = "your_secure_password_here" +auth = base64.b64encode(f"{username}:{password}".encode()).decode() +headers = {"Authorization": f"Basic {auth}"} +``` + +## REST API Integration + +### Container Lifecycle + +#### Create Container + +```python +import requests +import json + +# Create container with Python example +def create_container(base_url, auth_headers): + payload = { + "image": "python:3.12-slim", + "env": { + "APP_ENV": "production", + "DEBUG": "false" + }, + "tags": ["web", "production"], + "resources": { + "cpus": 1.0, + "memory": "4096m", + "pids": 2048 + }, + "ports": [ + { + "host": 8080, + "container": 80, + "protocol": "tcp" + } + ] + } + + response = requests.post( + f"{base_url}/containers", + json=payload, + headers=auth_headers + ) + + if response.status_code == 201: + container = response.json() + print(f"Created container: {container['cuid']}") + return container['cuid'] + else: + print(f"Error: {response.json()}") + return None + +# Usage +base_url = "http://localhost:8080" +auth = base64.b64encode(b"admin:password").decode() +headers = {"Authorization": f"Basic {auth}"} + +cuid = create_container(base_url, headers) +``` + +#### List Containers with Pagination + +```javascript +// Node.js example with axios +const axios = require('axios'); + +async function listContainers(baseUrl, auth, status = null, cursor = null) { + const params = {}; + if (status) params.status = status.join(','); + if (cursor) params.cursor = cursor; + + const response = await axios.get(`${baseUrl}/containers`, { + params, + headers: { + 'Authorization': `Basic ${auth}` + } + }); + + const data = response.data; + console.log(`Found ${data.containers.length} containers`); + + // Handle pagination + if (data.next_cursor) { + console.log('More results available, cursor:', data.next_cursor); + // Fetch next page + await listContainers(baseUrl, auth, status, data.next_cursor); + } + + return data.containers; +} + +// Usage +const auth = Buffer.from('admin:password').toString('base64'); +listContainers('http://localhost:8080', auth, ['running', 'paused']); +``` + +#### Update Container Configuration + +```go +// Go example +package main + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" +) + +type UpdateRequest struct { + Env map[string]interface{} `json:"env,omitempty"` + Tags []string `json:"tags,omitempty"` + Resources map[string]interface{} `json:"resources,omitempty"` + Image string `json:"image,omitempty"` +} + +func updateContainer(baseURL, cuid, username, password string) error { + update := UpdateRequest{ + Env: map[string]interface{}{ + "NEW_VAR": "value", + "OLD_VAR": nil, // Remove this env var + }, + Tags: []string{"updated", "v2"}, + Resources: map[string]interface{}{ + "memory": "8192m", + }, + } + + jsonData, _ := json.Marshal(update) + + req, err := http.NewRequest("PATCH", + fmt.Sprintf("%s/containers/%s", baseURL, cuid), + bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + + auth := base64.StdEncoding.EncodeToString( + []byte(fmt.Sprintf("%s:%s", username, password))) + req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth)) + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("update failed with status: %d", resp.StatusCode) + } + + return nil +} +``` + +### Container Control Operations + +```ruby +# Ruby example +require 'net/http' +require 'json' +require 'base64' + +class ContainerManager + def initialize(base_url, username, password) + @base_url = base_url + @auth = Base64.encode64("#{username}:#{password}").strip + end + + def control_action(cuid, action) + uri = URI("#{@base_url}/containers/#{cuid}/#{action}") + http = Net::HTTP.new(uri.host, uri.port) + + request = Net::HTTP::Post.new(uri) + request['Authorization'] = "Basic #{@auth}" + request['Content-Type'] = 'application/json' + + response = http.request(request) + JSON.parse(response.body) + end + + def start(cuid) + control_action(cuid, 'start') + end + + def stop(cuid) + control_action(cuid, 'stop') + end + + def restart(cuid) + control_action(cuid, 'restart') + end + + def pause(cuid) + control_action(cuid, 'pause') + end + + def unpause(cuid) + control_action(cuid, 'unpause') + end +end + +# Usage +manager = ContainerManager.new('http://localhost:8080', 'admin', 'password') +manager.start('c123e4567-e89b-12d3-a456-426614174000') +``` + +### Port Management + +```python +# Update container ports +def update_ports(base_url, cuid, auth_headers): + new_ports = { + "ports": [ + {"host": 3000, "container": 3000, "protocol": "tcp"}, + {"host": 9000, "container": 9000, "protocol": "udp"} + ] + } + + response = requests.patch( + f"{base_url}/containers/{cuid}/ports", + json=new_ports, + headers=auth_headers + ) + + return response.status_code == 200 +``` + +## File Upload/Download Integration + +### Upload ZIP Archive + +```python +import zipfile +import io +import requests + +def upload_files_to_container(base_url, cuid, files_dict, auth_headers): + """ + Upload files to container mount + files_dict: {"path/in/container": file_content_bytes} + """ + # Create ZIP in memory + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + for path, content in files_dict.items(): + zf.writestr(path, content) + + zip_buffer.seek(0) + + # Upload ZIP + response = requests.post( + f"{base_url}/containers/{cuid}/upload-zip", + data=zip_buffer.read(), + headers={ + **auth_headers, + 'Content-Type': 'application/zip' + } + ) + + return response.status_code == 200 + +# Example: Upload Python application +files = { + "boot.py": b""" +import os +import time + +print(f"Container {os.environ.get('CONTAINER_UID')} started") +print(f"Tags: {os.environ.get('TAGS', 'none')}") + +while True: + print("Working...") + time.sleep(10) +""", + "requirements.txt": b"requests==2.31.0\nnumpy==1.24.0\n", + "data/config.json": b'{"setting": "value"}' +} + +upload_files_to_container(base_url, cuid, files, headers) +``` + +### Download Files + +```javascript +// Download single file +async function downloadFile(baseUrl, cuid, filePath, auth) { + const response = await axios.get( + `${baseUrl}/containers/${cuid}/download`, + { + params: { path: filePath }, + headers: { 'Authorization': `Basic ${auth}` }, + responseType: 'stream' + } + ); + + // Save to file + const fs = require('fs'); + const writer = fs.createWriteStream('downloaded_file'); + response.data.pipe(writer); + + return new Promise((resolve, reject) => { + writer.on('finish', resolve); + writer.on('error', reject); + }); +} + +// Download directory as ZIP +async function downloadDirectory(baseUrl, cuid, dirPath, auth) { + const response = await axios.get( + `${baseUrl}/containers/${cuid}/download-zip`, + { + params: { path: dirPath }, + headers: { 'Authorization': `Basic ${auth}` }, + responseType: 'arraybuffer' + } + ); + + // Save ZIP file + fs.writeFileSync('container_backup.zip', response.data); +} +``` + +## WebSocket Integration + +### Interactive Terminal Session + +```python +import asyncio +import websockets +import json +import base64 + +class ContainerTerminal: + def __init__(self, base_url, cuid, username, password): + self.ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}" + self.auth = base64.b64encode(f"{username}:{password}".encode()).decode() + self.ws = None + + async def connect(self, cols=80, rows=24): + # Connect with TTY size + headers = {"Authorization": f"Basic {self.auth}"} + uri = f"{self.ws_url}?cols={cols}&rows={rows}" + + self.ws = await websockets.connect(uri, extra_headers=headers) + + # Start receiver task + asyncio.create_task(self._receive_output()) + + async def _receive_output(self): + """Receive and print output from container""" + async for message in self.ws: + data = json.loads(message) + + if data['type'] == 'stdout': + if data['encoding'] == 'base64': + import base64 + text = base64.b64decode(data['data']).decode('utf-8', errors='replace') + else: + text = data['data'] + print(text, end='') + + elif data['type'] == 'stderr': + if data['encoding'] == 'base64': + import base64 + text = base64.b64decode(data['data']).decode('utf-8', errors='replace') + else: + text = data['data'] + print(f"[ERROR] {text}", end='') + + elif data['type'] == 'exit': + print(f"\n[Process exited with code {data['code']}]") + await self.ws.close() + + elif data['type'] == 'error': + print(f"[ERROR] {data['error']}") + + async def send_input(self, text): + """Send input to container""" + await self.ws.send(json.dumps({ + "type": "stdin", + "data": text, + "encoding": "utf8" + })) + + async def resize(self, cols, rows): + """Resize terminal""" + await self.ws.send(json.dumps({ + "type": "resize", + "cols": cols, + "rows": rows + })) + + async def send_signal(self, signal_name): + """Send signal (INT, TERM, KILL)""" + await self.ws.send(json.dumps({ + "type": "signal", + "name": signal_name + })) + + async def run_interactive(self): + """Run interactive terminal""" + import aioconsole + + await self.connect() + + print("Connected to container. Type 'exit' to quit.") + print("Special commands: !INT (Ctrl+C), !TERM (terminate), !KILL (force kill)") + + while self.ws and not self.ws.closed: + try: + line = await aioconsole.ainput() + + if line == 'exit': + await self.ws.send(json.dumps({"type": "close"})) + break + elif line == '!INT': + await self.send_signal('INT') + elif line == '!TERM': + await self.send_signal('TERM') + elif line == '!KILL': + await self.send_signal('KILL') + else: + await self.send_input(line + '\n') + + except Exception as e: + print(f"Error: {e}") + break + +# Usage +async def main(): + terminal = ContainerTerminal( + 'localhost:8080', + 'c123e4567-e89b-12d3-a456-426614174000', + 'admin', + 'password' + ) + await terminal.run_interactive() + +asyncio.run(main()) +``` + +### Non-Interactive Command Execution + +```javascript +const WebSocket = require('ws'); + +class ContainerExecutor { + constructor(baseUrl, cuid, username, password) { + this.wsUrl = `ws://${baseUrl.replace('http://', '')}/ws/${cuid}`; + this.auth = Buffer.from(`${username}:${password}`).toString('base64'); + } + + async execute(command) { + return new Promise((resolve, reject) => { + const ws = new WebSocket(this.wsUrl, { + headers: { + 'Authorization': `Basic ${this.auth}` + } + }); + + let output = ''; + let errorOutput = ''; + + ws.on('open', () => { + // Send command + ws.send(JSON.stringify({ + type: 'stdin', + data: command + '\n', + encoding: 'utf8' + })); + + // Send EOF/exit after command + setTimeout(() => { + ws.send(JSON.stringify({ + type: 'stdin', + data: 'exit\n', + encoding: 'utf8' + })); + }, 100); + }); + + ws.on('message', (data) => { + const msg = JSON.parse(data.toString()); + + if (msg.type === 'stdout') { + output += msg.encoding === 'base64' + ? Buffer.from(msg.data, 'base64').toString() + : msg.data; + } else if (msg.type === 'stderr') { + errorOutput += msg.encoding === 'base64' + ? Buffer.from(msg.data, 'base64').toString() + : msg.data; + } else if (msg.type === 'exit') { + ws.close(); + resolve({ + exitCode: msg.code, + stdout: output, + stderr: errorOutput + }); + } else if (msg.type === 'error') { + ws.close(); + reject(new Error(msg.error.message)); + } + }); + + ws.on('error', reject); + }); + } +} + +// Usage +const executor = new ContainerExecutor( + 'localhost:8080', + 'c123e4567-e89b-12d3-a456-426614174000', + 'admin', + 'password' +); + +executor.execute('ls -la /app') + .then(result => { + console.log('Exit code:', result.exitCode); + console.log('Output:', result.stdout); + if (result.stderr) { + console.error('Errors:', result.stderr); + } + }) + .catch(err => console.error('Failed:', err)); +``` + +## Error Handling + +All API errors follow a consistent JSON schema: + +```json +{ + "error": "validation_error", + "code": "MISSING_IMAGE", + "message": "Image is required", + "status": 400, + "request_id": "123e4567-e89b-12d3-a456-426614174000", + "timestamp": "2024-01-20T15:30:00+01:00", + "details": { + "field": "image", + "provided": null + }, + "stacktrace": "..." // Only in development mode +} +``` + +### Error Types + +| Error ID | Description | HTTP Status | +|----------|-------------|-------------| +| `auth_error` | Authentication failed | 401 | +| `validation_error` | Invalid request data | 400 | +| `not_found` | Resource not found | 404 | +| `conflict` | Resource conflict | 409 | +| `compose_error` | Docker Compose operation failed | 500 | +| `io_error` | File I/O operation failed | 500 | +| `timeout` | Operation timed out | 408 | +| `state_error` | Invalid state transition | 422 | + +### Error Handling Example + +```python +def safe_api_call(func): + """Decorator for safe API calls with retry logic""" + def wrapper(*args, **kwargs): + max_retries = 3 + retry_delay = 1 + + for attempt in range(max_retries): + try: + response = func(*args, **kwargs) + + if response.status_code >= 500: + # Server error, retry + if attempt < max_retries - 1: + time.sleep(retry_delay * (attempt + 1)) + continue + + if response.status_code >= 400: + # Client error, don't retry + error = response.json() + print(f"API Error: {error['message']} (ID: {error['request_id']})") + return None + + return response.json() + + except requests.exceptions.RequestException as e: + print(f"Network error (attempt {attempt + 1}): {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay * (attempt + 1)) + else: + raise + + return None + + return wrapper + +@safe_api_call +def get_container_status(base_url, cuid, headers): + return requests.get(f"{base_url}/containers/{cuid}/status", headers=headers) +``` + +## Best Practices + +### 1. Connection Pooling + +```python +# Use session for connection pooling +import requests + +class APIClient: + def __init__(self, base_url, username, password): + self.base_url = base_url + self.session = requests.Session() + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + self.session.headers.update({"Authorization": f"Basic {auth}"}) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.session.close() + + def create_container(self, config): + return self.session.post(f"{self.base_url}/containers", json=config) + +# Usage +with APIClient('http://localhost:8080', 'admin', 'password') as client: + response = client.create_container({"image": "nginx:alpine"}) +``` + +### 2. Async Operations + +```python +import aiohttp +import asyncio + +class AsyncAPIClient: + def __init__(self, base_url, username, password): + self.base_url = base_url + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + self.headers = {"Authorization": f"Basic {auth}"} + + async def __aenter__(self): + self.session = aiohttp.ClientSession(headers=self.headers) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.session.close() + + async def batch_create(self, configs): + tasks = [] + for config in configs: + task = self.session.post(f"{self.base_url}/containers", json=config) + tasks.append(task) + + responses = await asyncio.gather(*tasks) + return [await r.json() for r in responses] + +# Usage +async def main(): + configs = [ + {"image": "nginx:alpine"}, + {"image": "redis:alpine"}, + {"image": "postgres:alpine"} + ] + + async with AsyncAPIClient('http://localhost:8080', 'admin', 'password') as client: + containers = await client.batch_create(configs) + print(f"Created {len(containers)} containers") + +asyncio.run(main()) +``` + +### 3. Health Monitoring + +```python +import time +import threading + +class HealthMonitor: + def __init__(self, base_url, auth_headers, check_interval=30): + self.base_url = base_url + self.headers = auth_headers + self.check_interval = check_interval + self.running = False + self.thread = None + + def start(self): + self.running = True + self.thread = threading.Thread(target=self._monitor_loop) + self.thread.daemon = True + self.thread.start() + + def stop(self): + self.running = False + if self.thread: + self.thread.join() + + def _monitor_loop(self): + while self.running: + try: + response = requests.get( + f"{self.base_url}/healthz", + headers=self.headers, + timeout=5 + ) + + if response.status_code == 200: + data = response.json() + print(f"API healthy: Compose {data['compose_version']}") + else: + print(f"API unhealthy: Status {response.status_code}") + + except Exception as e: + print(f"Health check failed: {e}") + + time.sleep(self.check_interval) +``` + +### 4. Container Logs Streaming + +```python +async def stream_logs(base_url, cuid, auth): + """Stream container logs in real-time""" + ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}" + + async with websockets.connect( + ws_url, + extra_headers={"Authorization": f"Basic {auth}"} + ) as ws: + # Request log streaming + await ws.send(json.dumps({ + "type": "stdin", + "data": "tail -f /app/logs/app.log\n", + "encoding": "utf8" + })) + + # Process incoming logs + async for message in ws: + data = json.loads(message) + if data['type'] == 'stdout': + log_line = data['data'] + # Process log line (e.g., parse JSON logs, send to monitoring) + process_log_line(log_line) +``` + +## Security Considerations + +1. **Always use HTTPS in production** - Deploy behind a reverse proxy (nginx/traefik) with TLS +2. **Rotate credentials regularly** - Update `.env` file and restart service +3. **Limit network exposure** - Bind to localhost if only local access needed +4. **Monitor failed auth attempts** - Check `logs/actions.jsonl` for `auth_error` entries +5. **Validate all inputs** - The API validates paths and prevents escapes, but always sanitize on client side too +6. **Set resource limits** - Always specify CPU/memory limits when creating containers + +## Troubleshooting + +### Common Issues + +1. **Container won't start** + - Check if `/app/boot.py` exists in the mount + - Verify the image is available locally + - Check compose logs: `docker compose logs ` + +2. **WebSocket connection fails** + - Ensure Basic Auth is included in WebSocket headers + - Check if container is running before connecting + - Verify `/app/boot.py` exists + +3. **File upload fails** + - Check file size (no limit by default, but system may have limits) + - Ensure ZIP format is valid + - Verify mount directory exists and has correct permissions + +4. **Port conflicts** + - Check if host ports are already in use + - Use `docker compose ps` to verify current port mappings + +### Debug Mode + +Enable debug logging by examining `logs/actions.jsonl`: + +```python +import json +from datetime import datetime + +def analyze_logs(log_file='logs/actions.jsonl'): + errors = [] + slow_requests = [] + + with open(log_file, 'r') as f: + for line in f: + entry = json.loads(line) + + # Find errors + if entry.get('error'): + errors.append(entry) + + # Find slow requests (>5 seconds) + if entry.get('duration_ms', 0) > 5000: + slow_requests.append(entry) + + print(f"Found {len(errors)} errors") + print(f"Found {len(slow_requests)} slow requests") + + return errors, slow_requests +``` + +## Complete Integration Example + +Here's a full example that creates a container, uploads code, executes it, and cleans up: + +```python +import asyncio +import aiohttp +import base64 +import json +import zipfile +import io + +class ContainerOrchestrator: + def __init__(self, base_url, username, password): + self.base_url = base_url + auth = base64.b64encode(f"{username}:{password}".encode()).decode() + self.headers = {"Authorization": f"Basic {auth}"} + + async def deploy_and_run(self, image, code, env=None): + async with aiohttp.ClientSession(headers=self.headers) as session: + # 1. Create container + create_payload = { + "image": image, + "env": env or {}, + "tags": ["automated"], + "resources": { + "cpus": 0.5, + "memory": "1024m" + } + } + + async with session.post( + f"{self.base_url}/containers", + json=create_payload + ) as resp: + if resp.status != 201: + raise Exception(f"Failed to create container: {await resp.text()}") + container = await resp.json() + cuid = container['cuid'] + print(f"Created container: {cuid}") + + # 2. Upload code + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'w') as zf: + zf.writestr('boot.py', code) + + async with session.post( + f"{self.base_url}/containers/{cuid}/upload-zip", + data=zip_buffer.getvalue(), + headers={**self.headers, 'Content-Type': 'application/zip'} + ) as resp: + if resp.status != 200: + raise Exception(f"Failed to upload code: {await resp.text()}") + print("Code uploaded successfully") + + # 3. Start container + async with session.post( + f"{self.base_url}/containers/{cuid}/start" + ) as resp: + if resp.status != 200: + raise Exception(f"Failed to start container: {await resp.text()}") + print("Container started") + + # 4. Connect via WebSocket to see output + ws_url = f"ws://{self.base_url.replace('http://', '')}/ws/{cuid}" + + async with aiohttp.ClientSession() as ws_session: + async with ws_session.ws_connect( + ws_url, + headers=self.headers + ) as ws: + print("Connected to container output:") + + # Read output for 10 seconds + timeout = asyncio.create_task(asyncio.sleep(10)) + receive = asyncio.create_task(ws.receive()) + + while not timeout.done(): + done, pending = await asyncio.wait( + {timeout, receive}, + return_when=asyncio.FIRST_COMPLETED + ) + + if receive in done: + msg = receive.result() + if msg.type == aiohttp.WSMsgType.TEXT: + data = json.loads(msg.data) + if data['type'] == 'stdout': + print(f"[OUT] {data['data']}", end='') + elif data['type'] == 'stderr': + print(f"[ERR] {data['data']}", end='') + receive = asyncio.create_task(ws.receive()) + + # 5. Stop and delete container + async with session.post( + f"{self.base_url}/containers/{cuid}/stop" + ) as resp: + print("\nContainer stopped") + + async with session.delete( + f"{self.base_url}/containers/{cuid}" + ) as resp: + print("Container deleted") + + return cuid + +# Usage +async def main(): + orchestrator = ContainerOrchestrator( + 'http://localhost:8080', + 'admin', + 'password' + ) + + code = """ +import os +import time + +print(f"Hello from container {os.environ.get('CONTAINER_UID')}") +print(f"Environment: {os.environ.get('APP_ENV', 'development')}") + +for i in range(5): + print(f"Iteration {i+1}") + time.sleep(1) + +print("Done!") +""" + + await orchestrator.deploy_and_run( + image='python:3.12-slim', + code=code, + env={'APP_ENV': 'production'} + ) + +asyncio.run(main()) +``` + +## Support + +For issues or questions about the API, check: +1. The `logs/actions.jsonl` file for detailed request/response logs +2. Docker Compose logs: `docker compose logs` +3. Container-specific logs: `docker compose logs ` + +## License + +This API is designed for container orchestration and management. Ensure you comply with Docker's licensing terms and your organization's security policies when deploying. diff --git a/app.py b/app.py new file mode 100644 index 0000000..e6ef7e7 --- /dev/null +++ b/app.py @@ -0,0 +1,1345 @@ +#!/usr/bin/env python3 +""" +Container Management API Server +Single-file Python 3.12+ application using aiohttp and docker compose v2 CLI +""" + +import asyncio +import base64 +import json +import os +import shutil +import traceback +import uuid +import weakref +import zipfile +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import aiofiles +from aiohttp import web, WSMsgType +from dotenv import load_dotenv +from ruamel.yaml import YAML +from zoneinfo import ZoneInfo + +# Global configuration +TIMEZONE = ZoneInfo("Europe/Amsterdam") +COMPOSE_FILE = "./containers.yml" +MOUNTS_DIR = "./mounts" +LOGS_DIR = "./logs" +DEFAULT_USER_UID = "1000:1000" + +# Global semaphore for containers.yml writes +COMPOSE_WRITE_LOCK = asyncio.Semaphore(1) + +# Error identifiers +ERROR_IDS = { + "AUTH": "auth_error", + "VALIDATION": "validation_error", + "NOT_FOUND": "not_found", + "CONFLICT": "conflict", + "COMPOSE": "compose_error", + "IO": "io_error", + "TIMEOUT": "timeout", + "STATE": "state_error" +} + +# Status mapping from Docker to API +STATUS_MAP = { + "running": "running", + "paused": "paused", + "exited": "exited", + "restarting": "restarting", + "dead": "dead", + "removing": "removing", + "created": "created" +} + + +class Config: + """Application configuration from environment""" + + def __init__(self): + load_dotenv() + self.APP_USER = os.getenv("APP_USER") + self.APP_PASS = os.getenv("APP_PASS") + self.DEFAULT_USER_UID = os.getenv("DEFAULT_USER_UID", DEFAULT_USER_UID) + + if not self.APP_USER or not self.APP_PASS: + raise ValueError("APP_USER and APP_PASS must be set in .env file") + + +class ErrorObject: + """Standard error response object""" + + @staticmethod + def create(error_id: str, code: str, message: str, status: int, + request_id: str, details: Optional[Dict] = None, + exception: Optional[Exception] = None) -> Dict: + obj = { + "error": error_id, + "code": code, + "message": message, + "status": status, + "request_id": request_id, + "timestamp": datetime.now(TIMEZONE).isoformat() + } + if details: + obj["details"] = details + if exception: + obj["stacktrace"] = traceback.format_exc() + return obj + + +class ActionLogger: + """JSON Lines logger for all actions""" + + def __init__(self): + os.makedirs(LOGS_DIR, exist_ok=True) + self.log_file = Path(LOGS_DIR) / "actions.jsonl" + + async def log(self, level: str, route: str, method: str, request_id: str, + user: str, status: int, duration_ms: float, + cuid: Optional[str] = None, details: Optional[Dict] = None, + note: Optional[str] = None, error: Optional[Dict] = None): + entry = { + "ts": datetime.now(TIMEZONE).isoformat(), + "level": level, + "route": route, + "method": method, + "request_id": request_id, + "user": user, + "status": status, + "duration_ms": duration_ms + } + if cuid: + entry["cuid"] = cuid + if details: + entry["details"] = details + if note: + entry["note"] = note + if error: + entry["error"] = error + + async with aiofiles.open(self.log_file, "a") as f: + await f.write(json.dumps(entry) + "\n") + + +class ComposeManager: + """Manages docker compose operations""" + + def __init__(self, config: Config, logger: ActionLogger): + self.config = config + self.logger = logger + self.yaml = YAML() + self.yaml.preserve_quotes = True + self.yaml.indent(mapping=2, sequence=4, offset=2) + + # Initialize compose file if it doesn't exist + if not Path(COMPOSE_FILE).exists(): + self._init_compose_file() + + def _init_compose_file(self): + """Create initial compose file""" + initial = { + "version": "3.9", + "services": {} + } + with open(COMPOSE_FILE, "w") as f: + self.yaml.dump(initial, f) + + async def read_compose(self) -> Dict: + """Read compose file""" + async with aiofiles.open(COMPOSE_FILE, "r") as f: + content = await f.read() + return self.yaml.load(content) + + async def write_compose(self, data: Dict): + """Write compose file with fsync""" + async with COMPOSE_WRITE_LOCK: + # Write to temp file first + temp_file = COMPOSE_FILE + ".tmp" + with open(temp_file, "w") as f: + self.yaml.dump(data, f) + f.flush() + os.fsync(f.fileno()) + + # Atomic rename + os.replace(temp_file, COMPOSE_FILE) + + async def run_compose(self, args: List[str], timeout: int = 300, + stdin_data: Optional[bytes] = None) -> Dict: + """Execute docker compose command""" + cmd = ["docker", "compose", "-f", COMPOSE_FILE] + args + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if stdin_data else None + ) + + stdout, stderr = await asyncio.wait_for( + proc.communicate(stdin_data), + timeout=timeout + ) + + return { + "returncode": proc.returncode, + "stdout": stdout.decode("utf-8"), + "stderr": stderr.decode("utf-8") + } + except asyncio.TimeoutError: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + raise + + def _build_service_spec(self, cuid: str, image: str, env: Dict, + tags: List[str], resources: Dict, ports: List[Dict]) -> Dict: + """Build service specification for compose file""" + # Merge environment + service_env = env.copy() + service_env["CONTAINER_UID"] = cuid + if tags: + service_env["TAGS"] = ",".join(tags) + + spec = { + "image": image, + "container_name": cuid, + "environment": service_env, + "working_dir": "/app", + "command": ["python", "-u", "/app/boot.py"], + "volumes": [f"./mounts/{cuid}:/app:rw"], + "restart": "on-failure", + "privileged": False, + "user": self.config.DEFAULT_USER_UID, + "cap_drop": ["ALL"], + "security_opt": [ + "no-new-privileges:true" + ] + } + + # Resource limits at service root + spec["cpus"] = str(resources.get("cpus", 0.5)) + spec["mem_limit"] = resources.get("memory", "2048m") + spec["pids_limit"] = resources.get("pids", 1024) + + # Ports + if ports: + spec["ports"] = [ + f"{p['host']}:{p['container']}/{p.get('protocol', 'tcp')}" + for p in ports + ] + + return spec + + async def create_container(self, image: str, env: Dict, tags: List[str], + resources: Dict, ports: List[Dict]) -> str: + """Create new container""" + cuid = "c" + str(uuid.uuid4()) + + # Create mount directory + mount_dir = Path(MOUNTS_DIR) / cuid + mount_dir.mkdir(parents=True, exist_ok=True) + + # Set ownership to 1000:1000 + os.chown(mount_dir, 1000, 1000) + + # Update compose file + compose_data = await self.read_compose() + compose_data["services"][cuid] = self._build_service_spec( + cuid, image, env, tags, resources, ports + ) + await self.write_compose(compose_data) + + # Start the container + await self.run_compose(["up", "-d", cuid]) + + return cuid + + async def update_container(self, cuid: str, env: Optional[Dict] = None, + tags: Optional[List[str]] = None, + resources: Optional[Dict] = None, + image: Optional[str] = None) -> bool: + """Update container configuration""" + compose_data = await self.read_compose() + + if cuid not in compose_data["services"]: + return False + + service = compose_data["services"][cuid] + + # Update environment (merge) + if env is not None: + current_env = service.get("environment", {}) + for key, value in env.items(): + if value is None and key in current_env: + del current_env[key] + else: + current_env[key] = value + service["environment"] = current_env + + # Update tags (replace) + if tags is not None: + service["environment"]["TAGS"] = ",".join(tags) + + # Update resources (merge) + if resources is not None: + service["cpus"] = str(resources.get("cpus", service.get("cpus", 0.5))) + service["mem_limit"] = resources.get("memory", service.get("mem_limit", "2048m")) + service["pids_limit"] = resources.get("pids", service.get("pids_limit", 1024)) + + # Update image + if image is not None: + service["image"] = image + + await self.write_compose(compose_data) + await self.run_compose(["up", "-d", cuid]) + return True + + async def update_ports(self, cuid: str, ports: List[Dict]) -> bool: + """Update container ports""" + compose_data = await self.read_compose() + + if cuid not in compose_data["services"]: + return False + + service = compose_data["services"][cuid] + + # Replace ports + if ports: + service["ports"] = [ + f"{p['host']}:{p['container']}/{p.get('protocol', 'tcp')}" + for p in ports + ] + else: + service.pop("ports", None) + + await self.write_compose(compose_data) + await self.run_compose(["up", "-d", cuid]) + return True + + async def delete_container(self, cuid: str) -> bool: + """Delete container and its mount""" + compose_data = await self.read_compose() + + if cuid not in compose_data["services"]: + return False + + # Stop and remove container + await self.run_compose(["stop", cuid]) + await self.run_compose(["rm", "-f", cuid]) + + # Remove from compose file + del compose_data["services"][cuid] + await self.write_compose(compose_data) + + # Delete mount directory + mount_dir = Path(MOUNTS_DIR) / cuid + if mount_dir.exists(): + shutil.rmtree(mount_dir) + + return True + + async def get_container_info(self, cuid: str) -> Optional[Dict]: + """Get container information""" + compose_data = await self.read_compose() + + if cuid not in compose_data["services"]: + return None + + service = compose_data["services"][cuid] + + # Get runtime status + result = await self.run_compose(["ps", "--format", "json", cuid]) + status_info = {} + if result["returncode"] == 0 and result["stdout"]: + try: + ps_data = json.loads(result["stdout"]) + if ps_data and len(ps_data) > 0: + container = ps_data[0] + status_info = { + "status": STATUS_MAP.get(container.get("State", "").lower(), "unknown"), + "created_at": container.get("CreatedAt", ""), + "started_at": container.get("StartedAt", "") + } + except json.JSONDecodeError: + pass + + # Parse ports back to JSON format + ports = [] + if "ports" in service: + for port_str in service["ports"]: + parts = port_str.split(":") + proto_split = parts[-1].split("/") + ports.append({ + "host": int(parts[0]), + "container": int(proto_split[0]), + "protocol": proto_split[1] if len(proto_split) > 1 else "tcp" + }) + + # Extract tags + tags = [] + if "TAGS" in service.get("environment", {}): + tags = service["environment"]["TAGS"].split(",") + + # Build response + return { + "cuid": cuid, + "image": service.get("image", ""), + "env": {k: v for k, v in service.get("environment", {}).items() + if k not in ["CONTAINER_UID", "TAGS"]}, + "tags": tags, + "resources": { + "cpus": float(service.get("cpus", 0.5)), + "memory": service.get("mem_limit", "2048m"), + "pids": service.get("pids_limit", 1024) + }, + "ports": ports, + **status_info + } + + async def list_containers(self, status_filter: Optional[List[str]] = None, + cursor: Optional[str] = None, + limit: int = 20) -> Tuple[List[Dict], Optional[str]]: + """List containers with filtering and pagination""" + compose_data = await self.read_compose() + all_cuids = list(compose_data["services"].keys()) + + # Apply cursor (simple offset-based) + start_idx = 0 + if cursor: + try: + start_idx = int(cursor) + except ValueError: + start_idx = 0 + + # Get slice + end_idx = start_idx + limit + cuids_slice = all_cuids[start_idx:end_idx] + + # Get container info + containers = [] + for cuid in cuids_slice: + info = await self.get_container_info(cuid) + if info: + # Apply status filter + if status_filter and info.get("status") not in status_filter: + continue + containers.append(info) + + # Determine next cursor + next_cursor = None + if end_idx < len(all_cuids): + next_cursor = str(end_idx) + + return containers, next_cursor + + +class WebSocketSession: + """Manages WebSocket connection to container""" + + def __init__(self, cuid: str, ws: web.WebSocketResponse, + compose_manager: ComposeManager): + self.cuid = cuid + self.ws = ws + self.compose_manager = compose_manager + self.proc = None + self.tasks = [] + + async def start(self, cols: Optional[int] = None, rows: Optional[int] = None): + """Start exec session""" + # Check if boot.py exists + boot_path = Path(MOUNTS_DIR) / self.cuid / "boot.py" + if not boot_path.exists(): + error = ErrorObject.create( + ERROR_IDS["VALIDATION"], + "BOOT_MISSING", + "/app/boot.py not found in container mount", + 422, + str(uuid.uuid4()), + {"cuid": self.cuid} + ) + await self.ws.send_str(json.dumps({ + "type": "error", + "error": error + })) + return False + + # Ensure container is running + result = await self.compose_manager.run_compose(["ps", "-q", self.cuid]) + if not result["stdout"].strip(): + # Start container + await self.compose_manager.run_compose(["up", "-d", self.cuid]) + + # Setup environment for TTY size + env = os.environ.copy() + if cols: + env["COLUMNS"] = str(cols) + if rows: + env["LINES"] = str(rows) + + # Start exec with PTY + cmd = [ + "docker", "compose", "-f", COMPOSE_FILE, + "exec", "-i", "-T", self.cuid, + "sh", "-lc", "exec python -u /app/boot.py" + ] + + self.proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env + ) + + # Start output readers + self.tasks.append(asyncio.create_task(self._read_stdout())) + self.tasks.append(asyncio.create_task(self._read_stderr())) + + return True + + async def _read_stdout(self): + """Read stdout and send to WebSocket""" + while self.proc and self.proc.returncode is None: + try: + data = await self.proc.stdout.read(1024) + if not data: + break + + # Try UTF-8 decode, fallback to base64 + try: + text = data.decode("utf-8") + await self.ws.send_str(json.dumps({ + "type": "stdout", + "data": text, + "encoding": "utf8" + })) + except UnicodeDecodeError: + import base64 + await self.ws.send_str(json.dumps({ + "type": "stdout", + "data": base64.b64encode(data).decode("ascii"), + "encoding": "base64" + })) + except Exception: + break + + async def _read_stderr(self): + """Read stderr and send to WebSocket""" + while self.proc and self.proc.returncode is None: + try: + data = await self.proc.stderr.read(1024) + if not data: + break + + # Try UTF-8 decode, fallback to base64 + try: + text = data.decode("utf-8") + await self.ws.send_str(json.dumps({ + "type": "stderr", + "data": text, + "encoding": "utf8" + })) + except UnicodeDecodeError: + import base64 + await self.ws.send_str(json.dumps({ + "type": "stderr", + "data": base64.b64encode(data).decode("ascii"), + "encoding": "base64" + })) + except Exception: + break + + async def write_stdin(self, data: str, encoding: str = "utf8"): + """Write to stdin""" + if not self.proc or self.proc.returncode is not None: + return + + if encoding == "base64": + import base64 + data_bytes = base64.b64decode(data) + else: + data_bytes = data.encode("utf-8") + + try: + self.proc.stdin.write(data_bytes) + await self.proc.stdin.drain() + except Exception: + pass + + async def resize(self, cols: int, rows: int): + """Send resize sequence""" + if not self.proc or self.proc.returncode is not None: + return + + # Send ANSI resize sequence + resize_seq = f"\x1b[8;{rows};{cols}t" + try: + self.proc.stdin.write(resize_seq.encode()) + await self.proc.stdin.drain() + except Exception: + pass + + async def signal(self, sig_name: str): + """Handle signal""" + if sig_name == "INT": + # Send Ctrl-C to TTY + if self.proc and self.proc.returncode is None: + try: + self.proc.stdin.write(b"\x03") + await self.proc.stdin.drain() + except Exception: + pass + elif sig_name in ["TERM", "KILL"]: + # Kill container + await self.compose_manager.run_compose(["kill", "-s", sig_name, self.cuid]) + await self.close() + + async def close(self): + """Clean up session""" + # Cancel tasks + for task in self.tasks: + if not task.done(): + task.cancel() + + # Kill process + if self.proc and self.proc.returncode is None: + self.proc.kill() + await self.proc.wait() + + # Send exit code + if self.proc: + await self.ws.send_str(json.dumps({ + "type": "exit", + "code": self.proc.returncode or 0 + })) + + +class Application: + """Main application class""" + + def __init__(self): + self.config = Config() + self.logger = ActionLogger() + self.compose_manager = ComposeManager(self.config, self.logger) + self.app = web.Application() + self.ws_sessions = weakref.WeakSet() + + # Setup routes + self._setup_routes() + + # Setup middleware + self._setup_middleware() + + def _setup_routes(self): + """Setup HTTP routes""" + self.app.router.add_routes([ + # Health check + web.get("/healthz", self.healthz), + + # Container management + web.post("/containers", self.create_container), + web.get("/containers", self.list_containers), + web.get("/containers/{cuid}", self.get_container), + web.patch("/containers/{cuid}", self.update_container), + web.delete("/containers/{cuid}", self.delete_container), + + # Lifecycle + web.post("/containers/{cuid}/start", self.start_container), + web.post("/containers/{cuid}/stop", self.stop_container), + web.post("/containers/{cuid}/pause", self.pause_container), + web.post("/containers/{cuid}/unpause", self.unpause_container), + web.post("/containers/{cuid}/restart", self.restart_container), + + # Ports + web.patch("/containers/{cuid}/ports", self.update_ports), + + # Files + web.post("/containers/{cuid}/upload-zip", self.upload_zip), + web.get("/containers/{cuid}/download", self.download_file), + web.get("/containers/{cuid}/download-zip", self.download_zip), + + # Status + web.get("/containers/{cuid}/status", self.get_status), + + # WebSocket + web.get("/ws/{cuid}", self.websocket_handler) + ]) + + def _setup_middleware(self): + """Setup middleware""" + + @web.middleware + async def auth_middleware(request, handler): + # Skip auth for healthz + if request.path == "/healthz": + return await handler(request) + + # Check Basic Auth + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Basic "): + return web.Response( + text=json.dumps(ErrorObject.create( + ERROR_IDS["AUTH"], + "MISSING_AUTH", + "Authentication required", + 401, + str(uuid.uuid4()) + )), + status=401, + headers={"WWW-Authenticate": 'Basic realm="app"'}, + content_type="application/json" + ) + + try: + encoded = auth_header[6:] + decoded = base64.b64decode(encoded).decode("utf-8") + username, password = decoded.split(":", 1) + + if username != self.config.APP_USER or password != self.config.APP_PASS: + raise ValueError("Invalid credentials") + + request["user"] = username + request["request_id"] = str(uuid.uuid4()) + + # Process request + start_time = datetime.now() + response = await handler(request) + duration_ms = (datetime.now() - start_time).total_seconds() * 1000 + + # Log action + await self.logger.log( + "INFO", + request.path, + request.method, + request["request_id"], + username, + response.status, + duration_ms + ) + + return response + + except Exception as e: + return web.Response( + text=json.dumps(ErrorObject.create( + ERROR_IDS["AUTH"], + "INVALID_AUTH", + "Invalid credentials", + 401, + str(uuid.uuid4()) + )), + status=401, + headers={"WWW-Authenticate": 'Basic realm="app"'}, + content_type="application/json" + ) + + self.app.middlewares.append(auth_middleware) + + async def healthz(self, request): + """Health check endpoint""" + # Check compose availability + result = await self.compose_manager.run_compose(["version"], timeout=5) + + if result["returncode"] != 0: + return web.json_response( + ErrorObject.create( + ERROR_IDS["COMPOSE"], + "COMPOSE_UNAVAILABLE", + "Docker Compose not available", + 500, + str(uuid.uuid4()) + ), + status=500 + ) + + # Parse version + version = "unknown" + if "version" in result["stdout"].lower(): + lines = result["stdout"].strip().split("\n") + for line in lines: + if "version" in line.lower(): + version = line.split()[-1] + break + + return web.json_response({ + "status": "ok", + "compose_version": version, + "uptime_s": 0 # Would need to track app start time + }) + + async def create_container(self, request): + """Create new container""" + try: + data = await request.json() + + # Validate required fields + if "image" not in data: + return web.json_response( + ErrorObject.create( + ERROR_IDS["VALIDATION"], + "MISSING_IMAGE", + "Image is required", + 400, + request["request_id"] + ), + status=400 + ) + + # Create container + cuid = await self.compose_manager.create_container( + image=data["image"], + env=data.get("env", {}), + tags=data.get("tags", []), + resources=data.get("resources", {}), + ports=data.get("ports", []) + ) + + # Get container info + info = await self.compose_manager.get_container_info(cuid) + + return web.json_response(info, status=201) + + except Exception as e: + await self.logger.log( + "ERROR", + request.path, + request.method, + request["request_id"], + request["user"], + 500, + 0, + error=ErrorObject.create( + ERROR_IDS["COMPOSE"], + "CREATE_FAILED", + str(e), + 500, + request["request_id"], + exception=e + ) + ) + return web.json_response( + ErrorObject.create( + ERROR_IDS["COMPOSE"], + "CREATE_FAILED", + str(e), + 500, + request["request_id"] + ), + status=500 + ) + + async def list_containers(self, request): + """List containers""" + # Parse query params + status_filter = None + if "status" in request.query: + status_filter = request.query["status"].split(",") + + cursor = request.query.get("cursor") + limit = int(request.query.get("limit", 20)) + + # Get containers + containers, next_cursor = await self.compose_manager.list_containers( + status_filter, cursor, limit + ) + + response = {"containers": containers} + if next_cursor: + response["next_cursor"] = next_cursor + + return web.json_response(response) + + async def get_container(self, request): + """Get container details""" + cuid = request.match_info["cuid"] + + info = await self.compose_manager.get_container_info(cuid) + if not info: + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "CONTAINER_NOT_FOUND", + f"Container {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + return web.json_response(info) + + async def update_container(self, request): + """Update container configuration""" + cuid = request.match_info["cuid"] + + try: + data = await request.json() + + success = await self.compose_manager.update_container( + cuid, + env=data.get("env"), + tags=data.get("tags"), + resources=data.get("resources"), + image=data.get("image") + ) + + if not success: + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "CONTAINER_NOT_FOUND", + f"Container {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + # Get updated info + info = await self.compose_manager.get_container_info(cuid) + return web.json_response(info) + + except Exception as e: + return web.json_response( + ErrorObject.create( + ERROR_IDS["COMPOSE"], + "UPDATE_FAILED", + str(e), + 500, + request["request_id"] + ), + status=500 + ) + + async def delete_container(self, request): + """Delete container""" + cuid = request.match_info["cuid"] + + success = await self.compose_manager.delete_container(cuid) + if not success: + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "CONTAINER_NOT_FOUND", + f"Container {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + return web.Response(status=204) + + async def start_container(self, request): + """Start container""" + cuid = request.match_info["cuid"] + await self.compose_manager.run_compose(["start", cuid]) + return web.json_response({"status": "started"}) + + async def stop_container(self, request): + """Stop container""" + cuid = request.match_info["cuid"] + await self.compose_manager.run_compose(["stop", cuid]) + return web.json_response({"status": "stopped"}) + + async def pause_container(self, request): + """Pause container""" + cuid = request.match_info["cuid"] + await self.compose_manager.run_compose(["pause", cuid]) + return web.json_response({"status": "paused"}) + + async def unpause_container(self, request): + """Unpause container""" + cuid = request.match_info["cuid"] + await self.compose_manager.run_compose(["unpause", cuid]) + return web.json_response({"status": "unpaused"}) + + async def restart_container(self, request): + """Restart container""" + cuid = request.match_info["cuid"] + await self.compose_manager.run_compose(["restart", cuid]) + return web.json_response({"status": "restarted"}) + + async def update_ports(self, request): + """Update container ports""" + cuid = request.match_info["cuid"] + + try: + data = await request.json() + + if "ports" not in data: + return web.json_response( + ErrorObject.create( + ERROR_IDS["VALIDATION"], + "MISSING_PORTS", + "Ports field is required", + 400, + request["request_id"] + ), + status=400 + ) + + success = await self.compose_manager.update_ports(cuid, data["ports"]) + + if not success: + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "CONTAINER_NOT_FOUND", + f"Container {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + return web.json_response({"status": "updated"}) + + except Exception as e: + return web.json_response( + ErrorObject.create( + ERROR_IDS["COMPOSE"], + "UPDATE_FAILED", + str(e), + 500, + request["request_id"] + ), + status=500 + ) + + async def upload_zip(self, request): + """Upload ZIP file to container mount""" + cuid = request.match_info["cuid"] + mount_dir = Path(MOUNTS_DIR) / cuid + + if not mount_dir.exists(): + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "MOUNT_NOT_FOUND", + f"Mount directory for {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + try: + # Read body as ZIP + body = await request.read() + + # Extract ZIP with protection + with zipfile.ZipFile(io.BytesIO(body), "r") as zf: + for member in zf.namelist(): + # Validate path + member_path = Path(member) + if member_path.is_absolute() or ".." in member_path.parts: + continue # Skip dangerous paths + + target_path = mount_dir / member_path + + # Ensure target is within mount + if not target_path.resolve().is_relative_to(mount_dir.resolve()): + continue # Skip escaping paths + + # Extract file + if member.endswith("/"): + target_path.mkdir(parents=True, exist_ok=True) + else: + target_path.parent.mkdir(parents=True, exist_ok=True) + with open(target_path, "wb") as f: + f.write(zf.read(member)) + + return web.json_response({"status": "uploaded"}) + + except Exception as e: + return web.json_response( + ErrorObject.create( + ERROR_IDS["IO"], + "UPLOAD_FAILED", + str(e), + 500, + request["request_id"] + ), + status=500 + ) + + async def download_file(self, request): + """Download single file from container mount""" + cuid = request.match_info["cuid"] + rel_path = request.query.get("path", "") + + if not rel_path: + return web.json_response( + ErrorObject.create( + ERROR_IDS["VALIDATION"], + "MISSING_PATH", + "Path parameter is required", + 400, + request["request_id"] + ), + status=400 + ) + + # Remove leading slash if present + rel_path = rel_path.lstrip("/") + + mount_dir = Path(MOUNTS_DIR) / cuid + target_path = (mount_dir / rel_path).resolve() + + # Validate path + if not target_path.is_relative_to(mount_dir.resolve()): + return web.json_response( + ErrorObject.create( + ERROR_IDS["VALIDATION"], + "INVALID_PATH", + "Path escapes mount directory", + 400, + request["request_id"] + ), + status=400 + ) + + if not target_path.exists() or not target_path.is_file(): + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "FILE_NOT_FOUND", + f"File not found: {rel_path}", + 404, + request["request_id"] + ), + status=404 + ) + + # Stream file + response = web.StreamResponse() + response.headers["Content-Type"] = "application/octet-stream" + response.headers["Content-Disposition"] = f'attachment; filename="{target_path.name}"' + await response.prepare(request) + + async with aiofiles.open(target_path, "rb") as f: + while chunk := await f.read(8192): + await response.write(chunk) + + await response.write_eof() + return response + + async def download_zip(self, request): + """Download directory as ZIP""" + cuid = request.match_info["cuid"] + rel_path = request.query.get("path", "") + + # Remove leading slash if present + rel_path = rel_path.lstrip("/") + + mount_dir = Path(MOUNTS_DIR) / cuid + + if rel_path: + target_dir = (mount_dir / rel_path).resolve() + else: + target_dir = mount_dir.resolve() + + # Validate path + if not target_dir.is_relative_to(mount_dir.resolve()): + return web.json_response( + ErrorObject.create( + ERROR_IDS["VALIDATION"], + "INVALID_PATH", + "Path escapes mount directory", + 400, + request["request_id"] + ), + status=400 + ) + + if not target_dir.exists() or not target_dir.is_dir(): + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "DIR_NOT_FOUND", + f"Directory not found: {rel_path}", + 404, + request["request_id"] + ), + status=404 + ) + + # Create ZIP in memory + import io + zip_buffer = io.BytesIO() + + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf: + for file_path in target_dir.rglob("*"): + if file_path.is_file(): + arcname = file_path.relative_to(target_dir) + zf.write(file_path, arcname) + + # Stream ZIP + response = web.StreamResponse() + response.headers["Content-Type"] = "application/zip" + response.headers["Content-Disposition"] = f'attachment; filename="{cuid}.zip"' + await response.prepare(request) + + zip_buffer.seek(0) + await response.write(zip_buffer.read()) + await response.write_eof() + + return response + + async def get_status(self, request): + """Get container status""" + cuid = request.match_info["cuid"] + + info = await self.compose_manager.get_container_info(cuid) + if not info: + return web.json_response( + ErrorObject.create( + ERROR_IDS["NOT_FOUND"], + "CONTAINER_NOT_FOUND", + f"Container {cuid} not found", + 404, + request["request_id"] + ), + status=404 + ) + + # Get additional status info + result = await self.compose_manager.run_compose(["ps", "--format", "json", cuid]) + + status_data = { + "cuid": cuid, + "status": info.get("status", "unknown"), + "created_at": info.get("created_at", ""), + "uptime": "", + "restarts": 0, + "last_error": None + } + + if result["returncode"] == 0 and result["stdout"]: + try: + ps_data = json.loads(result["stdout"]) + if ps_data and len(ps_data) > 0: + container = ps_data[0] + status_data["uptime"] = container.get("RunningFor", "") + # Would need to parse container inspect for restart count + except json.JSONDecodeError: + pass + + return web.json_response(status_data) + + async def websocket_handler(self, request): + """WebSocket handler for container I/O""" + cuid = request.match_info["cuid"] + + # Check auth + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Basic "): + return web.Response(status=401) + + try: + encoded = auth_header[6:] + decoded = base64.b64decode(encoded).decode("utf-8") + username, password = decoded.split(":", 1) + + if username != self.config.APP_USER or password != self.config.APP_PASS: + return web.Response(status=401) + except Exception: + return web.Response(status=401) + + # Setup WebSocket + ws = web.WebSocketResponse() + await ws.prepare(request) + + # Get initial TTY size from query params + cols = request.query.get("cols", type=int) + rows = request.query.get("rows", type=int) + + # Create session + session = WebSocketSession(cuid, ws, self.compose_manager) + self.ws_sessions.add(session) + + # Start exec session + if not await session.start(cols, rows): + await ws.close() + return ws + + try: + async for msg in ws: + if msg.type == WSMsgType.TEXT: + try: + data = json.loads(msg.data) + msg_type = data.get("type") + + if msg_type == "stdin": + await session.write_stdin( + data.get("data", ""), + data.get("encoding", "utf8") + ) + elif msg_type == "resize": + await session.resize( + data.get("cols", 80), + data.get("rows", 24) + ) + elif msg_type == "signal": + await session.signal(data.get("name", "")) + elif msg_type == "close": + break + + except json.JSONDecodeError: + await ws.send_str(json.dumps({ + "type": "error", + "error": {"message": "Invalid JSON"} + })) + + elif msg.type == WSMsgType.ERROR: + break + + finally: + await session.close() + + return ws + + async def run(self): + """Run the application""" + # Check Docker Compose availability + result = await self.compose_manager.run_compose(["version"], timeout=5) + if result["returncode"] != 0: + raise RuntimeError("Docker Compose not available") + + # Create required directories + os.makedirs(MOUNTS_DIR, exist_ok=True) + os.makedirs(LOGS_DIR, exist_ok=True) + + # Run app + runner = web.AppRunner(self.app) + await runner.setup() + site = web.TCPSite(runner, "0.0.0.0", 8080) + + print("Container Manager API starting on http://0.0.0.0:8080") + await site.start() + + # Keep running + try: + await asyncio.Event().wait() + except KeyboardInterrupt: + pass + finally: + await runner.cleanup() + + +async def main(): + """Main entry point""" + app = Application() + await app.run() + + +if __name__ == "__main__": + import io + asyncio.run(main())