From 923945b06ab52a52512de495bf42768c1d19468d Mon Sep 17 00:00:00 2001 From: retoor Date: Fri, 5 Sep 2025 05:10:16 +0200 Subject: [PATCH] Update. --- CLIENT_MANUAL.md | 949 ++++++++++++++++++++++++++++++++++++ app.py | 87 +++- container_client.py | 1135 +++++++++++++++++++++++++++++++++++++++++++ example_usage.py | 335 +++++++++++++ 4 files changed, 2483 insertions(+), 23 deletions(-) create mode 100644 CLIENT_MANUAL.md create mode 100644 container_client.py create mode 100644 example_usage.py diff --git a/CLIENT_MANUAL.md b/CLIENT_MANUAL.md new file mode 100644 index 0000000..b0b1d10 --- /dev/null +++ b/CLIENT_MANUAL.md @@ -0,0 +1,949 @@ +# Container Management API Client Manual + +## Table of Contents +1. [Installation](#installation) +2. [Quick Start](#quick-start) +3. [Client Initialization](#client-initialization) +4. [Container Operations](#container-operations) +5. [Container Lifecycle](#container-lifecycle) +6. [Port Management](#port-management) +7. [File Operations](#file-operations) +8. [Interactive Terminal](#interactive-terminal) +9. [Command Execution](#command-execution) +10. [Batch Operations](#batch-operations) +11. [Utility Methods](#utility-methods) +12. [Complete Examples](#complete-examples) + +## Installation + +```bash +pip install aiohttp +``` + +Save the client as `container_client.py` and import it: + +```python +from container_client import ContainerClient, ContainerConfig, ContainerInfo, ContainerStatus +``` + +## Quick Start + +```python +import asyncio +from container_client import ContainerClient + +async def main(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create a container + container = await client.create_container( + image="python:3.12-slim", + env={"APP_ENV": "production"}, + tags=["web", "api"] + ) + print(f"Created container: {container.cuid}") + + # Start the container + await client.start_container(container.cuid) + + # Execute a command + stdout, stderr, exit_code = await client.execute_command( + container.cuid, + "python --version" + ) + print(f"Python version: {stdout}") + + # Clean up + await client.stop_container(container.cuid) + await client.delete_container(container.cuid) + +asyncio.run(main()) +``` + +## Client Initialization + +### Using Context Manager (Recommended) + +```python +async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Client is automatically connected and will be closed after use + health = await client.health_check() + print(f"API Status: {health.status}") +``` + +### Manual Connection Management + +```python +client = ContainerClient("http://localhost:8080", "admin", "password") +await client.connect() + +try: + # Use client + health = await client.health_check() +finally: + await client.close() +``` + +## Container Operations + +### Create Container + +```python +# Basic creation +container = await client.create_container( + image="nginx:alpine" +) + +# Full configuration +container = await client.create_container( + image="python:3.12-slim", + env={ + "APP_ENV": "production", + "DEBUG": "false", + "DATABASE_URL": "postgresql://..." + }, + tags=["web", "production", "v2.0"], + resources={ + "cpus": 2.0, # CPU cores + "memory": "4096m", # Memory limit + "pids": 2048 # PID limit + }, + ports=[ + {"host": 8080, "container": 80, "protocol": "tcp"}, + {"host": 8443, "container": 443, "protocol": "tcp"} + ] +) + +print(f"Container ID: {container.cuid}") +print(f"Status: {container.status}") +print(f"Image: {container.image}") +``` + +### List Containers + +```python +# List all containers (auto-pagination) +all_containers = await client.list_all_containers() + +# List with filtering +running_containers = await client.list_all_containers( + status=["running", "paused"] +) + +# Manual pagination +containers, next_cursor = await client.list_containers(limit=10) +print(f"Found {len(containers)} containers") + +if next_cursor: + # Get next page + more_containers, next_cursor = await client.list_containers( + cursor=next_cursor, + limit=10 + ) +``` + +### Get Container Details + +```python +container = await client.get_container("c123e4567-e89b-12d3-a456-426614174000") + +print(f"Container: {container.cuid}") +print(f"Image: {container.image}") +print(f"Status: {container.status}") +print(f"Environment: {container.env}") +print(f"Tags: {container.tags}") +print(f"Resources: {container.resources}") +print(f"Ports: {container.ports}") +``` + +### Update Container + +```python +# Update environment variables +updated = await client.update_container( + container.cuid, + env={ + "NEW_VAR": "value", # Add new variable + "EXISTING_VAR": "new", # Update existing + "OLD_VAR": None # Remove variable + } +) + +# Replace tags +updated = await client.update_container( + container.cuid, + tags=["updated", "v2.1"] +) + +# Update resources (merged with existing) +updated = await client.update_container( + container.cuid, + resources={ + "memory": "8192m" # Only update memory, keep other limits + } +) + +# Change image +updated = await client.update_container( + container.cuid, + image="python:3.13-slim" +) +``` + +### Delete Container + +```python +# Delete container and its mount directory +await client.delete_container(container.cuid) + +# Batch delete +cuids = ["cuid1", "cuid2", "cuid3"] +errors = await client.batch_delete(cuids) + +for cuid, error in zip(cuids, errors): + if error: + print(f"Failed to delete {cuid}: {error}") + else: + print(f"Deleted {cuid}") +``` + +## Container Lifecycle + +### Start Container + +```python +result = await client.start_container(container.cuid) +print(result) # {"status": "started"} + +# Wait for container to be running +if await client.wait_for_status(container.cuid, "running", timeout=30): + print("Container is running") +else: + print("Container failed to start") +``` + +### Stop Container + +```python +result = await client.stop_container(container.cuid) +print(result) # {"status": "stopped"} +``` + +### Pause/Unpause Container + +```python +# Pause +await client.pause_container(container.cuid) + +# Do something while paused... + +# Unpause +await client.unpause_container(container.cuid) +``` + +### Restart Container + +```python +result = await client.restart_container(container.cuid) +print(result) # {"status": "restarted"} +``` + +## Port Management + +### Update Ports + +```python +# Add or update port mappings +result = await client.update_ports( + container.cuid, + ports=[ + {"host": 3000, "container": 3000, "protocol": "tcp"}, + {"host": 9000, "container": 9000, "protocol": "udp"}, + {"host": 8080, "container": 80, "protocol": "tcp"} + ] +) + +# Remove all ports +result = await client.update_ports(container.cuid, ports=[]) +``` + +## File Operations + +### Upload Files + +```python +# Method 1: Upload from dictionary +files = { + "boot.py": b""" +import os +print(f"Container {os.environ.get('CONTAINER_UID')} started") +while True: + print("Working...") + time.sleep(10) +""", + "requirements.txt": b"requests==2.31.0\nnumpy==1.24.0", + "data/config.json": b'{"setting": "value"}' +} + +await client.upload_zip(container.cuid, files=files) + +# Method 2: Upload existing ZIP file +await client.upload_zip( + container.cuid, + zip_path="/path/to/archive.zip" +) + +# Method 3: Upload raw ZIP data +with open("archive.zip", "rb") as f: + zip_data = f.read() + +await client.upload_zip(container.cuid, zip_data=zip_data) +``` + +### Download Files + +```python +# Download single file +content = await client.download_file( + container.cuid, + path="data/output.json" +) +print(content.decode('utf-8')) + +# Download and save file +await client.download_file( + container.cuid, + path="logs/app.log", + save_to="./downloads/app.log" +) + +# Download directory as ZIP +zip_data = await client.download_zip( + container.cuid, + path="data" # Download data/ directory +) + +# Download entire container mount as ZIP +zip_data = await client.download_zip( + container.cuid, + path="" # Empty path for root +) + +# Download and extract +await client.download_zip( + container.cuid, + path="", + extract_to="./container_backup/" +) +``` + +## Interactive Terminal + +### Enter Container CLI (Full Terminal Experience) + +```python +# Enter interactive terminal (blocks until Ctrl+] is pressed) +await client.enter_container(container.cuid) + +# User can now interact with the container terminal like SSH +# - Type commands normally +# - Ctrl+C sends interrupt to container +# - Ctrl+] exits the terminal session +``` + +### Custom Terminal Session + +```python +# Create terminal with custom size +terminal = await client.create_terminal( + container.cuid, + cols=120, # Terminal width + rows=40 # Terminal height +) + +# Start interactive mode +await terminal.start_interactive() + +try: + # Wait for session to complete + await terminal.wait() +finally: + # Clean up + await terminal.stop() +``` + +### Programmatic Terminal Control + +```python +terminal = await client.create_terminal(container.cuid) + +# Send commands programmatically +await terminal.send_input("ls -la\n") +await terminal.send_input("pwd\n") + +# Send special signals +await terminal.send_interrupt() # Ctrl+C +await terminal.send_terminate() # SIGTERM +await terminal.send_kill() # SIGKILL + +# Resize terminal +await terminal.resize(cols=100, rows=30) + +# Send binary data +await terminal.send_input_bytes(b"\x03") # Ctrl+C as bytes + +# Clean up when done +await terminal.stop() +``` + +## Command Execution + +### Execute and Get Output + +```python +# Simple command execution +stdout, stderr, exit_code = await client.execute_command( + container.cuid, + "ls -la /app" +) + +print(f"Output: {stdout}") +print(f"Errors: {stderr}") +print(f"Exit Code: {exit_code}") + +# With custom timeout +stdout, stderr, exit_code = await client.execute_command( + container.cuid, + "python long_running_script.py", + timeout=300.0 # 5 minutes +) + +# Multi-line script +script = """ +cd /app +python -m pip install -r requirements.txt +python main.py --init +""" + +stdout, stderr, exit_code = await client.execute_command( + container.cuid, + script +) +``` + +### Stream Output in Real-Time + +```python +# Define callbacks for output +def on_stdout(data): + print(f"[OUT] {data}", end="") + +def on_stderr(data): + print(f"[ERR] {data}", end="") + +# Stream command output +exit_code = await client.stream_output( + container.cuid, + "python train_model.py", + on_stdout=on_stdout, + on_stderr=on_stderr +) + +print(f"\nCommand finished with exit code: {exit_code}") +``` + +### Upload and Run + +```python +# Upload files and execute command in one operation +files = { + "script.py": b""" +import sys +print("Hello from uploaded script!") +print(f"Arguments: {sys.argv[1:]}") +""", + "data.txt": b"sample data" +} + +exit_code = await client.upload_and_run( + container.cuid, + files=files, + command="python script.py arg1 arg2", + wait_for_completion=True +) + +print(f"Script exit code: {exit_code}") + +# Start long-running process without waiting +await client.upload_and_run( + container.cuid, + files={"server.py": server_code}, + command="python server.py", + wait_for_completion=False +) +``` + +## Batch Operations + +### Create Multiple Containers + +```python +from container_client import ContainerConfig + +# Define container configurations +configs = [ + ContainerConfig( + image="nginx:alpine", + env={"NGINX_PORT": "80"}, + tags=["web", "frontend"], + ports=[{"host": 8080, "container": 80, "protocol": "tcp"}] + ), + ContainerConfig( + image="redis:alpine", + env={"REDIS_PASSWORD": "secret"}, + tags=["cache", "backend"] + ), + ContainerConfig( + image="postgres:15", + env={"POSTGRES_PASSWORD": "dbpass"}, + tags=["database", "backend"], + resources={"memory": "4096m", "cpus": 2.0} + ) +] + +# Create all containers in parallel +containers = await client.batch_create(configs) + +for container in containers: + print(f"Created: {container.cuid} ({container.image})") +``` + +### Execute Commands on Multiple Containers + +```python +# Define commands for each container +commands = { + "c123e4567-e89b-12d3-a456-426614174000": "nginx -v", + "c223e4567-e89b-12d3-a456-426614174001": "redis-cli ping", + "c323e4567-e89b-12d3-a456-426614174002": "psql --version" +} + +# Execute all commands in parallel +results = await client.batch_execute(commands, timeout=10.0) + +for cuid, (stdout, stderr, exit_code) in results.items(): + print(f"\nContainer {cuid}:") + print(f" Output: {stdout.strip()}") + if stderr: + print(f" Errors: {stderr.strip()}") + print(f" Exit Code: {exit_code}") +``` + +## Utility Methods + +### Get Container Status + +```python +status = await client.get_status(container.cuid) + +print(f"Container: {status.cuid}") +print(f"Status: {status.status}") +print(f"Created: {status.created_at}") +print(f"Uptime: {status.uptime}") +print(f"Restarts: {status.restarts}") + +if status.last_error: + print(f"Last Error: {status.last_error}") +``` + +### Health Check + +```python +health = await client.health_check() + +print(f"API Status: {health.status}") +print(f"Compose Version: {health.compose_version}") +print(f"Uptime: {health.uptime_s} seconds") +``` + +### Wait for Status + +```python +# Wait for container to be running +if await client.wait_for_status( + container.cuid, + target_status="running", + timeout=60.0, + poll_interval=2.0 +): + print("Container is running") +else: + print("Timeout waiting for container") + +# Wait for container to stop +if await client.wait_for_status(container.cuid, "exited"): + print("Container has stopped") +``` + +## Complete Examples + +### Example 1: Web Application Deployment + +```python +import asyncio +from container_client import ContainerClient + +async def deploy_web_app(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create container + container = await client.create_container( + image="python:3.12-slim", + env={ + "FLASK_APP": "app.py", + "FLASK_ENV": "production" + }, + ports=[{"host": 5000, "container": 5000, "protocol": "tcp"}], + resources={"memory": "1024m", "cpus": 1.0} + ) + + print(f"Created container: {container.cuid}") + + # Upload application files + app_files = { + "app.py": b""" +from flask import Flask +app = Flask(__name__) + +@app.route('/') +def hello(): + return 'Hello from containerized Flask!' + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000) +""", + "requirements.txt": b"flask==3.0.0", + "boot.py": b""" +import subprocess +import sys + +# Install requirements +subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) + +# Start Flask app +subprocess.call([sys.executable, "app.py"]) +""" + } + + await client.upload_zip(container.cuid, files=app_files) + + # Start container + await client.start_container(container.cuid) + + # Wait for app to be ready + if await client.wait_for_status(container.cuid, "running", timeout=30): + print("Web app is running on http://localhost:5000") + + # Check logs + stdout, stderr, _ = await client.execute_command( + container.cuid, + "curl -s http://localhost:5000" + ) + print(f"App response: {stdout}") + + return container.cuid + +asyncio.run(deploy_web_app()) +``` + +### Example 2: Data Processing Pipeline + +```python +import asyncio +from container_client import ContainerClient + +async def run_data_pipeline(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create data processor container + processor = await client.create_container( + image="python:3.12-slim", + env={"PYTHONUNBUFFERED": "1"}, + resources={"memory": "4096m", "cpus": 2.0} + ) + + # Upload data and processing script + files = { + "boot.py": b""" +import json +import sys + +# Process data +with open('input.json', 'r') as f: + data = json.load(f) + +# Transform data +processed = { + 'total': len(data), + 'items': [{'id': i, 'value': item * 2} for i, item in enumerate(data)] +} + +# Save results +with open('output.json', 'w') as f: + json.dump(processed, f, indent=2) + +print(f"Processed {len(data)} items") +""", + "input.json": b"[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]" + } + + await client.upload_zip(processor.cuid, files=files) + + # Start container and run processing + await client.start_container(processor.cuid) + + # Execute processing with streaming output + def print_output(data): + print(data, end="") + + exit_code = await client.stream_output( + processor.cuid, + "python /app/boot.py", + on_stdout=print_output, + on_stderr=print_output + ) + + if exit_code == 0: + # Download results + output = await client.download_file( + processor.cuid, + "output.json" + ) + + import json + results = json.loads(output) + print(f"\nResults: {results}") + + # Cleanup + await client.stop_container(processor.cuid) + await client.delete_container(processor.cuid) + +asyncio.run(run_data_pipeline()) +``` + +### Example 3: Interactive Development Environment + +```python +import asyncio +from container_client import ContainerClient + +async def create_dev_environment(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create development container + dev_container = await client.create_container( + image="python:3.12", + env={ + "PYTHONPATH": "/app", + "DEVELOPMENT": "true" + }, + tags=["development", "interactive"], + resources={"memory": "8192m", "cpus": 4.0}, + ports=[ + {"host": 8888, "container": 8888, "protocol": "tcp"}, # Jupyter + {"host": 5000, "container": 5000, "protocol": "tcp"} # Dev server + ] + ) + + # Setup development tools + setup_script = """ +pip install jupyter ipython black flake8 pytest +pip install pandas numpy matplotlib +mkdir -p /app/notebooks /app/src /app/tests +echo 'Development environment ready!' > /app/README.md +""" + + await client.upload_zip( + dev_container.cuid, + files={"setup.sh": setup_script.encode()} + ) + + # Start container + await client.start_container(dev_container.cuid) + + # Run setup + print("Setting up development environment...") + stdout, stderr, exit_code = await client.execute_command( + dev_container.cuid, + "bash /app/setup.sh", + timeout=300 + ) + + if exit_code == 0: + print("Development environment ready!") + print(f"Container ID: {dev_container.cuid}") + + # Start Jupyter in background + await client.upload_and_run( + dev_container.cuid, + files={"start_jupyter.sh": b"jupyter notebook --ip=0.0.0.0 --port=8888 --no-browser --allow-root"}, + command="bash /app/start_jupyter.sh &", + wait_for_completion=False + ) + + print("\nYou can now:") + print("1. Enter the container terminal:") + print(f" await client.enter_container('{dev_container.cuid}')") + print("2. Access Jupyter at http://localhost:8888") + + # Enter interactive terminal + await client.enter_container(dev_container.cuid) + + return dev_container.cuid + +asyncio.run(create_dev_environment()) +``` + +### Example 4: Multi-Container Application + +```python +import asyncio +from container_client import ContainerClient, ContainerConfig + +async def deploy_stack(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Define application stack + stack = { + "frontend": ContainerConfig( + image="nginx:alpine", + tags=["frontend", "web"], + ports=[{"host": 80, "container": 80, "protocol": "tcp"}] + ), + "api": ContainerConfig( + image="node:18-alpine", + env={"NODE_ENV": "production", "PORT": "3000"}, + tags=["api", "backend"], + ports=[{"host": 3000, "container": 3000, "protocol": "tcp"}] + ), + "database": ContainerConfig( + image="postgres:15-alpine", + env={ + "POSTGRES_DB": "appdb", + "POSTGRES_USER": "appuser", + "POSTGRES_PASSWORD": "secret" + }, + tags=["database", "backend"], + resources={"memory": "2048m", "cpus": 1.0} + ), + "cache": ContainerConfig( + image="redis:7-alpine", + tags=["cache", "backend"], + resources={"memory": "512m"} + ) + } + + # Deploy all containers + print("Deploying application stack...") + containers = await client.batch_create(list(stack.values())) + + # Map container IDs + container_map = {} + for container, (name, _) in zip(containers, stack.items()): + container_map[name] = container.cuid + print(f" {name}: {container.cuid}") + + # Start all containers + for name, cuid in container_map.items(): + await client.start_container(cuid) + print(f"Started {name}") + + # Wait for all to be running + all_running = True + for name, cuid in container_map.items(): + if not await client.wait_for_status(cuid, "running", timeout=30): + print(f"Failed to start {name}") + all_running = False + + if all_running: + print("\nStack deployed successfully!") + + # Check connectivity + checks = { + container_map["api"]: "node --version", + container_map["database"]: "psql --version", + container_map["cache"]: "redis-cli ping" + } + + results = await client.batch_execute(checks, timeout=5) + + print("\nHealth checks:") + for cuid, (stdout, stderr, exit_code) in results.items(): + name = [k for k, v in container_map.items() if v == cuid][0] + status = "✓" if exit_code == 0 else "✗" + print(f" {name}: {status}") + + return container_map + +asyncio.run(deploy_stack()) +``` + +## Error Handling + +```python +import asyncio +from container_client import ContainerClient + +async def safe_operations(): + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + try: + # Attempt to get non-existent container + container = await client.get_container("invalid-cuid") + except Exception as e: + print(f"Expected error: {e}") + + try: + # Create container with invalid image + container = await client.create_container( + image="non/existent:image" + ) + except Exception as e: + print(f"Image error: {e}") + + # Safe command execution with timeout + try: + stdout, stderr, exit_code = await client.execute_command( + "some-cuid", + "sleep 100", + timeout=5.0 + ) + except TimeoutError as e: + print(f"Command timed out: {e}") + except Exception as e: + print(f"Execution error: {e}") + +asyncio.run(safe_operations()) +``` + +## Tips and Best Practices + +1. **Always use context managers** for automatic session cleanup +2. **Handle timeouts** for long-running operations +3. **Use batch operations** for better performance with multiple containers +4. **Stream output** for long-running commands instead of waiting for completion +5. **Set resource limits** to prevent containers from consuming too many resources +6. **Use tags** to organize and filter containers +7. **Check health status** before performing operations +8. **Clean up containers** after use to free resources +9. **Use wait_for_status** to ensure containers are ready before operations +10. **Handle errors gracefully** with try-except blocks + +## Requirements + +- Python 3.12+ +- aiohttp +- Container Management API server running +- Valid API credentials in `.env` file + +## Support + +For issues or questions: +1. Check API server logs: `logs/actions.jsonl` +2. Verify container status with `get_status()` +3. Use `health_check()` to verify API connectivity +4. Enable debug output by monitoring WebSocket frames diff --git a/app.py b/app.py index e6ef7e7..4a5147c 100644 --- a/app.py +++ b/app.py @@ -186,11 +186,13 @@ class ComposeManager: timeout=timeout ) - return { + result = { "returncode": proc.returncode, "stdout": stdout.decode("utf-8"), "stderr": stderr.decode("utf-8") } + print(result) + return result except asyncio.TimeoutError: if proc and proc.returncode is None: proc.kill() @@ -345,49 +347,67 @@ class ComposeManager: return True async def get_container_info(self, cuid: str) -> Optional[Dict]: + print("DEBUG 1: Entered get_container_info") """Get container information""" compose_data = await self.read_compose() - + print("DEBUG 2: compose_data loaded") + if cuid not in compose_data["services"]: + print("DEBUG 3: cuid not in services") return None - + service = compose_data["services"][cuid] - + print("DEBUG 4: service loaded") + # Get runtime status result = await self.run_compose(["ps", "--format", "json", cuid]) + print("DEBUG 5: run_compose result", result) status_info = {} if result["returncode"] == 0 and result["stdout"]: try: ps_data = json.loads(result["stdout"]) - if ps_data and len(ps_data) > 0: - container = ps_data[0] + print("DEBUG 6: ps_data loaded", ps_data) + if ps_data: + print("DEBUG 7: ps_data is not empty", ps_data) + container = ps_data + status_info = { "status": STATUS_MAP.get(container.get("State", "").lower(), "unknown"), "created_at": container.get("CreatedAt", ""), "started_at": container.get("StartedAt", "") } - except json.JSONDecodeError: - pass - + print("DEBUG 8: status_info set", status_info) + except json.JSONDecodeError as e: + print("DEBUG 9: JSONDecodeError", e) + # Parse ports back to JSON format ports = [] if "ports" in service: - for port_str in service["ports"]: + print("DEBUG 10: service has ports", service["ports"]) + for idx, port_str in enumerate(service["ports"]): + print(f"DEBUG 11.{idx}: parsing port_str", port_str) parts = port_str.split(":") proto_split = parts[-1].split("/") - ports.append({ - "host": int(parts[0]), - "container": int(proto_split[0]), - "protocol": proto_split[1] if len(proto_split) > 1 else "tcp" - }) - + if parts and proto_split: + try: + port_obj = { + "host": int(parts[0]), + "container": int(proto_split[0]), + "protocol": proto_split[1] if len(proto_split) > 1 else "tcp" + } + ports.append(port_obj) + print(f"DEBUG 12.{idx}: port_obj appended", port_obj) + except Exception as e: + print(f"DEBUG 13.{idx}: Exception parsing port", e) + # Extract tags tags = [] if "TAGS" in service.get("environment", {}): tags = service["environment"]["TAGS"].split(",") - + print("DEBUG 14: tags extracted", tags) + # Build response - return { + response = { "cuid": cuid, "image": service.get("image", ""), "env": {k: v for k, v in service.get("environment", {}).items() @@ -401,6 +421,9 @@ class ComposeManager: "ports": ports, **status_info } + print("DEBUG 15: response built", response) + return response + async def list_containers(self, status_filter: Optional[List[str]] = None, cursor: Optional[str] = None, @@ -451,10 +474,13 @@ class WebSocketSession: self.tasks = [] async def start(self, cols: Optional[int] = None, rows: Optional[int] = None): + print("DEBUG 1: Entered start()") """Start exec session""" # Check if boot.py exists boot_path = Path(MOUNTS_DIR) / self.cuid / "boot.py" + print("DEBUG 2: Checking boot.py at", boot_path) if not boot_path.exists(): + print("DEBUG 3: boot.py not found") error = ErrorObject.create( ERROR_IDS["VALIDATION"], "BOOT_MISSING", @@ -467,20 +493,28 @@ class WebSocketSession: "type": "error", "error": error })) + print("DEBUG 4: Sent error to ws") return False # Ensure container is running + print("DEBUG 5: Checking if container is running") result = await self.compose_manager.run_compose(["ps", "-q", self.cuid]) + print("DEBUG 6: Compose ps result:", result) if not result["stdout"].strip(): + print("DEBUG 7: Container not running, starting container") # Start container await self.compose_manager.run_compose(["up", "-d", self.cuid]) + print("DEBUG 8: Container started") # Setup environment for TTY size env = os.environ.copy() + print("DEBUG 9: Copied environment") if cols: env["COLUMNS"] = str(cols) + print(f"DEBUG 10: Set COLUMNS={cols}") if rows: env["LINES"] = str(rows) + print(f"DEBUG 11: Set LINES={rows}") # Start exec with PTY cmd = [ @@ -488,6 +522,7 @@ class WebSocketSession: "exec", "-i", "-T", self.cuid, "sh", "-lc", "exec python -u /app/boot.py" ] + print("DEBUG 12: Command to execute:", cmd) self.proc = await asyncio.create_subprocess_exec( *cmd, @@ -496,13 +531,17 @@ class WebSocketSession: stderr=asyncio.subprocess.PIPE, env=env ) + print("DEBUG 13: Subprocess started") # Start output readers self.tasks.append(asyncio.create_task(self._read_stdout())) + print("DEBUG 14: Started _read_stdout task") self.tasks.append(asyncio.create_task(self._read_stderr())) + print("DEBUG 15: Started _read_stderr task") + print("DEBUG 16: Returning True from start()") return True - + async def _read_stdout(self): """Read stdout and send to WebSocket""" while self.proc and self.proc.returncode is None: @@ -700,13 +739,13 @@ class Application: encoded = auth_header[6:] decoded = base64.b64decode(encoded).decode("utf-8") username, password = decoded.split(":", 1) - + print("ZZZZZZZZZ", username,password) if username != self.config.APP_USER or password != self.config.APP_PASS: raise ValueError("Invalid credentials") request["user"] = username request["request_id"] = str(uuid.uuid4()) - + print("GING GOED!") # Process request start_time = datetime.now() response = await handler(request) @@ -726,10 +765,12 @@ class Application: return response except Exception as e: + help(e) + print("XXXXXXXXXXX",e,flush=True) return web.Response( text=json.dumps(ErrorObject.create( ERROR_IDS["AUTH"], - "INVALID_AUTH", + "INVALID_AUTG", "Invalid credentials", 401, str(uuid.uuid4()) @@ -1226,7 +1267,7 @@ class Application: try: ps_data = json.loads(result["stdout"]) if ps_data and len(ps_data) > 0: - container = ps_data[0] + container = ps_data status_data["uptime"] = container.get("RunningFor", "") # Would need to parse container inspect for restart count except json.JSONDecodeError: diff --git a/container_client.py b/container_client.py new file mode 100644 index 0000000..ab2c20d --- /dev/null +++ b/container_client.py @@ -0,0 +1,1135 @@ +#!/usr/bin/env python3 +""" +Container Management API Client +Async client for container management with full WebSocket terminal support +""" + +import asyncio +import base64 +import io +import json +import os +import sys +import termios +import tty +import zipfile +from dataclasses import dataclass +from pathlib import Path +from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, Union + +import aiohttp +from aiohttp import ClientSession, ClientWebSocketResponse + + +@dataclass +class ContainerConfig: + """Container configuration""" + image: str + env: Optional[Dict[str, str]] = None + tags: Optional[List[str]] = None + resources: Optional[Dict[str, Any]] = None + ports: Optional[List[Dict[str, Any]]] = None + + +@dataclass +class ContainerInfo: + """Container information""" + cuid: str + image: str + env: Dict[str, str] + tags: List[str] + resources: Dict[str, Any] + ports: List[Dict[str, Any]] + status: str + created_at: Optional[str] = None + started_at: Optional[str] = None + + +@dataclass +class ContainerStatus: + """Container status""" + cuid: str + status: str + created_at: str + uptime: str + restarts: int + last_error: Optional[Dict[str, Any]] = None + + +@dataclass +class HealthStatus: + """API health status""" + status: str + compose_version: str + uptime_s: int + + +class ContainerTerminal: + """Interactive terminal session for a container""" + + def __init__(self, ws: ClientWebSocketResponse, cuid: str): + self.ws = ws + self.cuid = cuid + self.running = False + self.old_tty = None + self.reader_task = None + self.input_task = None + + async def start_interactive(self) -> None: + """Start interactive terminal session""" + self.running = True + + # Save terminal settings and set raw mode + if sys.stdin.isatty(): + self.old_tty = termios.tcgetattr(sys.stdin) + tty.setraw(sys.stdin.fileno()) + + # Get terminal size + rows, cols = self._get_terminal_size() + + # Send initial resize + await self.resize(cols, rows) + + # Start reader and input tasks + self.reader_task = asyncio.create_task(self._read_output()) + self.input_task = asyncio.create_task(self._read_input()) + + # Handle terminal resize + import signal + signal.signal(signal.SIGWINCH, self._handle_resize) + + print(f"\r\n[Connected to container {self.cuid}]\r\n") + print("[Press Ctrl+] to exit, Ctrl+C to send interrupt]\r\n") + + async def stop(self) -> None: + """Stop interactive session""" + self.running = False + + # Cancel tasks + if self.reader_task: + self.reader_task.cancel() + if self.input_task: + self.input_task.cancel() + + # Restore terminal settings + if self.old_tty: + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_tty) + + # Close WebSocket + await self.ws.send_json({"type": "close"}) + await self.ws.close() + + print("\r\n[Disconnected]\r\n") + + def _get_terminal_size(self) -> Tuple[int, int]: + """Get terminal size""" + import struct + import fcntl + + try: + size = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, ' ') + rows, cols = struct.unpack('hh', size) + return rows, cols + except: + return 24, 80 + + def _handle_resize(self, signum, frame) -> None: + """Handle terminal resize signal""" + if self.running: + rows, cols = self._get_terminal_size() + asyncio.create_task(self.resize(cols, rows)) + + async def _read_output(self) -> None: + """Read output from WebSocket and print to terminal""" + try: + async for msg in self.ws: + if msg.type == aiohttp.WSMsgType.TEXT: + data = json.loads(msg.data) + + if data['type'] in ['stdout', 'stderr']: + if data['encoding'] == 'base64': + output = base64.b64decode(data['data']) + else: + output = data['data'].encode('utf-8') + + sys.stdout.buffer.write(output) + sys.stdout.buffer.flush() + + elif data['type'] == 'exit': + print(f"\r\n[Process exited with code {data['code']}]\r\n") + self.running = False + break + + elif data['type'] == 'error': + print(f"\r\n[Error: {data['error']}]\r\n") + self.running = False + break + + elif msg.type == aiohttp.WSMsgType.ERROR: + print(f"\r\n[WebSocket error: {msg.data}]\r\n") + self.running = False + break + except asyncio.CancelledError: + pass + except Exception as e: + print(f"\r\n[Reader error: {e}]\r\n") + self.running = False + + async def _read_input(self) -> None: + """Read input from terminal and send to WebSocket""" + try: + while self.running: + # Read input byte by byte + byte = await asyncio.get_event_loop().run_in_executor( + None, sys.stdin.buffer.read, 1 + ) + + if not byte: + break + + # Check for exit sequence (Ctrl+]) + if byte == b'\x1d': # Ctrl+] + self.running = False + break + + # Send input to container + await self.ws.send_json({ + "type": "stdin", + "data": byte.decode('utf-8', errors='ignore'), + "encoding": "utf8" + }) + except asyncio.CancelledError: + pass + except Exception as e: + print(f"\r\n[Input error: {e}]\r\n") + self.running = False + + async def send_input(self, data: str) -> None: + """Send input to container""" + await self.ws.send_json({ + "type": "stdin", + "data": data, + "encoding": "utf8" + }) + + async def send_input_bytes(self, data: bytes) -> None: + """Send binary input to container""" + await self.ws.send_json({ + "type": "stdin", + "data": base64.b64encode(data).decode('ascii'), + "encoding": "base64" + }) + + async def resize(self, cols: int, rows: int) -> None: + """Resize terminal""" + await self.ws.send_json({ + "type": "resize", + "cols": cols, + "rows": rows + }) + + async def send_interrupt(self) -> None: + """Send Ctrl+C (SIGINT) to container""" + await self.ws.send_json({ + "type": "signal", + "name": "INT" + }) + + async def send_terminate(self) -> None: + """Send SIGTERM to container""" + await self.ws.send_json({ + "type": "signal", + "name": "TERM" + }) + + async def send_kill(self) -> None: + """Send SIGKILL to container""" + await self.ws.send_json({ + "type": "signal", + "name": "KILL" + }) + + async def wait(self) -> None: + """Wait for terminal session to complete""" + if self.reader_task: + await self.reader_task + if self.input_task: + await self.input_task + + +class ContainerClient: + """Async client for Container Management API""" + + def __init__(self, base_url: str, username: str, password: str): + """ + Initialize client + + Args: + base_url: API base URL (e.g., "http://localhost:8080") + username: Basic auth username + password: Basic auth password + """ + self.base_url = base_url.rstrip('/') + self.ws_url = self.base_url.replace('http://', 'ws://').replace('https://', 'wss://') + + # Setup authentication + auth_str = f"{username}:{password}" + auth_bytes = auth_str.encode('utf-8') + auth_b64 = base64.b64encode(auth_bytes).decode('ascii') + self.auth_header = f"Basic {auth_b64}" + + self.session: Optional[ClientSession] = None + + async def __aenter__(self): + """Async context manager entry""" + self.session = ClientSession( + headers={"Authorization": self.auth_header} + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit""" + if self.session: + await self.session.close() + + def _check_session(self) -> None: + """Check if session is initialized""" + if not self.session: + raise RuntimeError("Client session not initialized. Use 'async with' or call connect()") + + async def connect(self) -> None: + """Manually connect the client session""" + if not self.session: + self.session = ClientSession( + headers={"Authorization": self.auth_header} + ) + + async def close(self) -> None: + """Manually close the client session""" + if self.session: + await self.session.close() + self.session = None + + # Health Check + + async def health_check(self) -> HealthStatus: + """ + Check API health status + + Returns: + HealthStatus object with status, compose version, and uptime + """ + self._check_session() + + async with self.session.get(f"{self.base_url}/healthz") as resp: + if resp.status == 200: + data = await resp.json() + return HealthStatus(**data) + else: + error = await resp.json() + raise Exception(f"Health check failed: {error}") + + # Container Management + + async def create_container( + self, + image: str, + env: Optional[Dict[str, str]] = None, + tags: Optional[List[str]] = None, + resources: Optional[Dict[str, Any]] = None, + ports: Optional[List[Dict[str, Any]]] = None + ) -> ContainerInfo: + """ + Create a new container + + Args: + image: Docker image (e.g., "python:3.12-slim") + env: Environment variables + tags: Container tags + resources: Resource limits {"cpus": 0.5, "memory": "2048m", "pids": 1024} + ports: Port mappings [{"host": 8080, "container": 80, "protocol": "tcp"}] + + Returns: + ContainerInfo object with created container details + """ + self._check_session() + + payload = { + "image": image, + "env": env or {}, + "tags": tags or [], + "resources": resources or {}, + "ports": ports or [] + } + + async with self.session.post( + f"{self.base_url}/containers", + json=payload + ) as resp: + if resp.status == 201: + data = await resp.json() + return ContainerInfo(**data) + else: + error = await resp.json() + raise Exception(f"Failed to create container: {error}") + + async def list_containers( + self, + status: Optional[List[str]] = None, + cursor: Optional[str] = None, + limit: int = 20 + ) -> Tuple[List[ContainerInfo], Optional[str]]: + """ + List containers with optional filtering and pagination + + Args: + status: Filter by status (e.g., ["running", "paused"]) + cursor: Pagination cursor from previous response + limit: Maximum number of results + + Returns: + Tuple of (list of ContainerInfo objects, next cursor if more results exist) + """ + self._check_session() + + params = {"limit": limit} + if status: + params["status"] = ",".join(status) + if cursor: + params["cursor"] = cursor + + async with self.session.get( + f"{self.base_url}/containers", + params=params + ) as resp: + if resp.status == 200: + data = await resp.json() + containers = [ContainerInfo(**c) for c in data["containers"]] + next_cursor = data.get("next_cursor") + return containers, next_cursor + else: + error = await resp.json() + raise Exception(f"Failed to list containers: {error}") + + async def get_container(self, cuid: str) -> ContainerInfo: + """ + Get container details + + Args: + cuid: Container UID + + Returns: + ContainerInfo object + """ + self._check_session() + + async with self.session.get(f"{self.base_url}/containers/{cuid}") as resp: + if resp.status == 200: + data = await resp.json() + return ContainerInfo(**data) + else: + error = await resp.json() + raise Exception(f"Failed to get container: {error}") + + async def update_container( + self, + cuid: str, + env: Optional[Dict[str, Optional[str]]] = None, + tags: Optional[List[str]] = None, + resources: Optional[Dict[str, Any]] = None, + image: Optional[str] = None + ) -> ContainerInfo: + """ + Update container configuration + + Args: + cuid: Container UID + env: Environment variables (set to None to remove a key) + tags: New tags (replaces existing) + resources: Resource limits (merged with existing) + image: New Docker image + + Returns: + Updated ContainerInfo object + """ + self._check_session() + + payload = {} + if env is not None: + payload["env"] = env + if tags is not None: + payload["tags"] = tags + if resources is not None: + payload["resources"] = resources + if image is not None: + payload["image"] = image + + async with self.session.patch( + f"{self.base_url}/containers/{cuid}", + json=payload + ) as resp: + if resp.status == 200: + data = await resp.json() + return ContainerInfo(**data) + else: + error = await resp.json() + raise Exception(f"Failed to update container: {error}") + + async def delete_container(self, cuid: str) -> None: + """ + Delete container and its mount directory + + Args: + cuid: Container UID + """ + self._check_session() + + async with self.session.delete(f"{self.base_url}/containers/{cuid}") as resp: + if resp.status != 204: + error = await resp.json() + raise Exception(f"Failed to delete container: {error}") + + # Container Lifecycle + + async def start_container(self, cuid: str) -> Dict[str, str]: + """ + Start a stopped container + + Args: + cuid: Container UID + + Returns: + Status response {"status": "started"} + """ + self._check_session() + + async with self.session.post(f"{self.base_url}/containers/{cuid}/start") as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to start container: {error}") + + async def stop_container(self, cuid: str) -> Dict[str, str]: + """ + Stop a running container + + Args: + cuid: Container UID + + Returns: + Status response {"status": "stopped"} + """ + self._check_session() + + async with self.session.post(f"{self.base_url}/containers/{cuid}/stop") as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to stop container: {error}") + + async def pause_container(self, cuid: str) -> Dict[str, str]: + """ + Pause a running container + + Args: + cuid: Container UID + + Returns: + Status response {"status": "paused"} + """ + self._check_session() + + async with self.session.post(f"{self.base_url}/containers/{cuid}/pause") as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to pause container: {error}") + + async def unpause_container(self, cuid: str) -> Dict[str, str]: + """ + Unpause a paused container + + Args: + cuid: Container UID + + Returns: + Status response {"status": "unpaused"} + """ + self._check_session() + + async with self.session.post(f"{self.base_url}/containers/{cuid}/unpause") as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to unpause container: {error}") + + async def restart_container(self, cuid: str) -> Dict[str, str]: + """ + Restart a container + + Args: + cuid: Container UID + + Returns: + Status response {"status": "restarted"} + """ + self._check_session() + + async with self.session.post(f"{self.base_url}/containers/{cuid}/restart") as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to restart container: {error}") + + # Port Management + + async def update_ports( + self, + cuid: str, + ports: List[Dict[str, Any]] + ) -> Dict[str, str]: + """ + Update container port mappings + + Args: + cuid: Container UID + ports: New port mappings [{"host": 8080, "container": 80, "protocol": "tcp"}] + + Returns: + Status response {"status": "updated"} + """ + self._check_session() + + payload = {"ports": ports} + + async with self.session.patch( + f"{self.base_url}/containers/{cuid}/ports", + json=payload + ) as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to update ports: {error}") + + # File Operations + + async def upload_zip( + self, + cuid: str, + zip_data: Optional[bytes] = None, + zip_path: Optional[Union[str, Path]] = None, + files: Optional[Dict[str, bytes]] = None + ) -> Dict[str, str]: + """ + Upload ZIP archive to container mount + + Args: + cuid: Container UID + zip_data: Raw ZIP data (bytes) + zip_path: Path to ZIP file on disk + files: Dictionary of {path: content} to create ZIP from + + Returns: + Status response {"status": "uploaded"} + + Note: Provide exactly one of zip_data, zip_path, or files + """ + self._check_session() + + # Determine ZIP data source + if sum([zip_data is not None, zip_path is not None, files is not None]) != 1: + raise ValueError("Provide exactly one of: zip_data, zip_path, or files") + + if zip_path: + # Read ZIP from file + with open(zip_path, 'rb') as f: + zip_data = f.read() + elif files: + # Create ZIP from files dict + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + for path, content in files.items(): + zf.writestr(path, content) + zip_data = zip_buffer.getvalue() + + # Upload ZIP + headers = { + "Authorization": self.auth_header, + "Content-Type": "application/zip" + } + + async with self.session.post( + f"{self.base_url}/containers/{cuid}/upload-zip", + data=zip_data, + headers=headers + ) as resp: + if resp.status == 200: + return await resp.json() + else: + error = await resp.json() + raise Exception(f"Failed to upload ZIP: {error}") + + async def download_file( + self, + cuid: str, + path: str, + save_to: Optional[Union[str, Path]] = None + ) -> bytes: + """ + Download a single file from container mount + + Args: + cuid: Container UID + path: Relative path to file in container mount + save_to: Optional path to save file locally + + Returns: + File content as bytes + """ + self._check_session() + + params = {"path": path} + + async with self.session.get( + f"{self.base_url}/containers/{cuid}/download", + params=params + ) as resp: + if resp.status == 200: + data = await resp.read() + + if save_to: + save_path = Path(save_to) + save_path.parent.mkdir(parents=True, exist_ok=True) + save_path.write_bytes(data) + + return data + else: + error = await resp.json() + raise Exception(f"Failed to download file: {error}") + + async def download_zip( + self, + cuid: str, + path: str = "", + save_to: Optional[Union[str, Path]] = None, + extract_to: Optional[Union[str, Path]] = None + ) -> bytes: + """ + Download directory as ZIP archive + + Args: + cuid: Container UID + path: Relative path to directory (empty for root) + save_to: Optional path to save ZIP file + extract_to: Optional path to extract ZIP contents + + Returns: + ZIP data as bytes + """ + self._check_session() + + params = {"path": path} if path else {} + + async with self.session.get( + f"{self.base_url}/containers/{cuid}/download-zip", + params=params + ) as resp: + if resp.status == 200: + data = await resp.read() + + if save_to: + save_path = Path(save_to) + save_path.parent.mkdir(parents=True, exist_ok=True) + save_path.write_bytes(data) + + if extract_to: + extract_path = Path(extract_to) + extract_path.mkdir(parents=True, exist_ok=True) + + with zipfile.ZipFile(io.BytesIO(data), 'r') as zf: + zf.extractall(extract_path) + + return data + else: + error = await resp.json() + raise Exception(f"Failed to download ZIP: {error}") + + # Container Status + + async def get_status(self, cuid: str) -> ContainerStatus: + """ + Get detailed container status + + Args: + cuid: Container UID + + Returns: + ContainerStatus object with status details + """ + self._check_session() + + async with self.session.get(f"{self.base_url}/containers/{cuid}/status") as resp: + if resp.status == 200: + data = await resp.json() + return ContainerStatus(**data) + else: + error = await resp.json() + raise Exception(f"Failed to get status: {error}") + + # WebSocket Terminal + + async def create_terminal( + self, + cuid: str, + cols: int = 80, + rows: int = 24 + ) -> ContainerTerminal: + """ + Create WebSocket terminal connection to container + + Args: + cuid: Container UID + cols: Terminal columns + rows: Terminal rows + + Returns: + ContainerTerminal object for interactive use + """ + self._check_session() + + url = f"{self.ws_url}/ws/{cuid}?cols={cols}&rows={rows}" + headers = {"Authorization": self.auth_header} + + ws = await self.session.ws_connect(url, headers=headers) + return ContainerTerminal(ws, cuid) + + async def enter_container(self, cuid: str) -> None: + """ + Enter interactive terminal session for a container + (Blocks until session is terminated with Ctrl+]) + + Args: + cuid: Container UID + """ + terminal = await self.create_terminal(cuid) + await terminal.start_interactive() + + try: + await terminal.wait() + finally: + await terminal.stop() + + async def execute_command( + self, + cuid: str, + command: str, + timeout: float = 30.0 + ) -> Tuple[str, str, int]: + """ + Execute a command in container and return output + + Args: + cuid: Container UID + command: Command to execute + timeout: Execution timeout in seconds + + Returns: + Tuple of (stdout, stderr, exit_code) + """ + self._check_session() + + url = f"{self.ws_url}/ws/{cuid}" + headers = {"Authorization": self.auth_header} + + stdout_data = [] + stderr_data = [] + exit_code = -1 + + async with self.session.ws_connect(url, headers=headers) as ws: + # Send command + await ws.send_json({ + "type": "stdin", + "data": command + "\n", + "encoding": "utf8" + }) + + # Send exit command after a short delay + await asyncio.sleep(0.1) + await ws.send_json({ + "type": "stdin", + "data": "exit\n", + "encoding": "utf8" + }) + + # Read output with timeout + try: + async with asyncio.timeout(timeout): + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + data = json.loads(msg.data) + + if data['type'] == 'stdout': + if data['encoding'] == 'base64': + stdout_data.append(base64.b64decode(data['data']).decode('utf-8', errors='replace')) + else: + stdout_data.append(data['data']) + + elif data['type'] == 'stderr': + if data['encoding'] == 'base64': + stderr_data.append(base64.b64decode(data['data']).decode('utf-8', errors='replace')) + else: + stderr_data.append(data['data']) + + elif data['type'] == 'exit': + exit_code = data['code'] + break + except asyncio.TimeoutError: + raise TimeoutError(f"Command execution timed out after {timeout} seconds") + + return ''.join(stdout_data), ''.join(stderr_data), exit_code + + async def stream_output( + self, + cuid: str, + command: str, + on_stdout: Optional[Callable[[str], None]] = None, + on_stderr: Optional[Callable[[str], None]] = None + ) -> int: + """ + Stream command output in real-time + + Args: + cuid: Container UID + command: Command to execute + on_stdout: Callback for stdout data + on_stderr: Callback for stderr data + + Returns: + Exit code + """ + self._check_session() + + url = f"{self.ws_url}/ws/{cuid}" + headers = {"Authorization": self.auth_header} + + exit_code = -1 + + async with self.session.ws_connect(url, headers=headers) as ws: + # Send command + await ws.send_json({ + "type": "stdin", + "data": command + "\n", + "encoding": "utf8" + }) + + # Read output + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + data = json.loads(msg.data) + + if data['type'] == 'stdout': + if on_stdout: + if data['encoding'] == 'base64': + text = base64.b64decode(data['data']).decode('utf-8', errors='replace') + else: + text = data['data'] + on_stdout(text) + + elif data['type'] == 'stderr': + if on_stderr: + if data['encoding'] == 'base64': + text = base64.b64decode(data['data']).decode('utf-8', errors='replace') + else: + text = data['data'] + on_stderr(text) + + elif data['type'] == 'exit': + exit_code = data['code'] + break + + return exit_code + + # Batch Operations + + async def batch_create( + self, + configs: List[ContainerConfig] + ) -> List[ContainerInfo]: + """ + Create multiple containers in parallel + + Args: + configs: List of ContainerConfig objects + + Returns: + List of created ContainerInfo objects + """ + tasks = [] + for config in configs: + task = self.create_container( + image=config.image, + env=config.env, + tags=config.tags, + resources=config.resources, + ports=config.ports + ) + tasks.append(task) + + return await asyncio.gather(*tasks) + + async def batch_delete(self, cuids: List[str]) -> List[Optional[Exception]]: + """ + Delete multiple containers in parallel + + Args: + cuids: List of container UIDs + + Returns: + List of exceptions (None if successful) + """ + tasks = [] + for cuid in cuids: + tasks.append(self.delete_container(cuid)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + return [r if isinstance(r, Exception) else None for r in results] + + async def batch_execute( + self, + commands: Dict[str, str], + timeout: float = 30.0 + ) -> Dict[str, Tuple[str, str, int]]: + """ + Execute commands on multiple containers in parallel + + Args: + commands: Dict of {cuid: command} + timeout: Execution timeout per command + + Returns: + Dict of {cuid: (stdout, stderr, exit_code)} + """ + tasks = {} + for cuid, command in commands.items(): + tasks[cuid] = self.execute_command(cuid, command, timeout) + + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + output = {} + for cuid, result in zip(tasks.keys(), results): + if isinstance(result, Exception): + output[cuid] = ("", str(result), -1) + else: + output[cuid] = result + + return output + + # Utility Methods + + async def list_all_containers( + self, + status: Optional[List[str]] = None + ) -> List[ContainerInfo]: + """ + List all containers (handles pagination automatically) + + Args: + status: Optional status filter + + Returns: + Complete list of ContainerInfo objects + """ + all_containers = [] + cursor = None + + while True: + containers, cursor = await self.list_containers( + status=status, + cursor=cursor, + limit=100 + ) + all_containers.extend(containers) + + if not cursor: + break + + return all_containers + + async def wait_for_status( + self, + cuid: str, + target_status: str, + timeout: float = 60.0, + poll_interval: float = 1.0 + ) -> bool: + """ + Wait for container to reach target status + + Args: + cuid: Container UID + target_status: Target status to wait for + timeout: Maximum wait time in seconds + poll_interval: Status check interval in seconds + + Returns: + True if target status reached, False if timeout + """ + start_time = asyncio.get_event_loop().time() + + while (asyncio.get_event_loop().time() - start_time) < timeout: + try: + info = await self.get_container(cuid) + if info.status == target_status: + return True + except: + pass + + await asyncio.sleep(poll_interval) + + return False + + async def upload_and_run( + self, + cuid: str, + files: Dict[str, bytes], + command: str, + wait_for_completion: bool = True + ) -> Union[int, None]: + """ + Upload files and run a command + + Args: + cuid: Container UID + files: Files to upload {path: content} + command: Command to execute + wait_for_completion: Wait for command to complete + + Returns: + Exit code if waiting, None otherwise + """ + # Upload files + await self.upload_zip(cuid, files=files) + + # Execute command + if wait_for_completion: + _, _, exit_code = await self.execute_command(cuid, command) + return exit_code + else: + # Start command without waiting + url = f"{self.ws_url}/ws/{cuid}" + headers = {"Authorization": self.auth_header} + + async with self.session.ws_connect(url, headers=headers) as ws: + await ws.send_json({ + "type": "stdin", + "data": command + "\n", + "encoding": "utf8" + }) + + return None diff --git a/example_usage.py b/example_usage.py new file mode 100644 index 0000000..7b5351a --- /dev/null +++ b/example_usage.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python3 +""" +Example usage of the Container Management API Client +""" + +import asyncio +import sys +from container_client import ContainerClient, ContainerConfig + + +async def example_basic_operations(): + """Basic container operations example""" + print("=== Basic Container Operations ===\n") + + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Check API health + health = await client.health_check() + print(f"✓ API Status: {health.status}") + print(f"✓ Compose Version: {health.compose_version}\n") + + # Create a container + print("Creating container...") + container = await client.create_container( + image="python:3.12-slim", + env={ + "APP_NAME": "example_app", + "ENV": "development" + }, + tags=["example", "test"], + resources={ + "cpus": 0.5, + "memory": "512m" + } + ) + print(f"✓ Created container: {container.cuid}\n") + + # Upload a simple Python script + print("Uploading application code...") + files = { + "boot.py": b""" +import os +import time + +print(f"Container {os.environ.get('CONTAINER_UID')} starting...") +print(f"App Name: {os.environ.get('APP_NAME')}") +print(f"Environment: {os.environ.get('ENV')}") +print("Ready!") + +# Keep running +for i in range(5): + print(f"Working... {i+1}/5") + time.sleep(1) + +print("Done!") +""" + } + await client.upload_zip(container.cuid, files=files) + print("✓ Code uploaded\n") + + # Start the container + print("Starting container...") + await client.start_container(container.cuid) + await client.wait_for_status(container.cuid, "running", timeout=10) + print("✓ Container running\n") + + # Execute the script and stream output + print("Executing application:") + print("-" * 40) + + def print_output(data): + print(data, end="") + + exit_code = await client.stream_output( + container.cuid, + "python /app/boot.py", + on_stdout=print_output + ) + + print("-" * 40) + print(f"✓ Exit code: {exit_code}\n") + + # Get container status + status = await client.get_status(container.cuid) + print(f"Container Status: {status.status}") + print(f"Uptime: {status.uptime}\n") + + # Clean up + print("Cleaning up...") + await client.stop_container(container.cuid) + await client.delete_container(container.cuid) + print("✓ Container deleted\n") + + +async def example_interactive_terminal(): + """Interactive terminal example""" + print("=== Interactive Terminal Example ===\n") + + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create a container with development tools + print("Creating development container...") + container = await client.create_container( + image="python:3.12", + env={"PS1": "\\u@container:\\w$ "}, + tags=["interactive", "development"] + ) + print(f"✓ Created container: {container.cuid}\n") + + # Upload some sample files + files = { + "boot.py": b"#!/usr/bin/env python3\nprint('Container ready for interactive use!')\n", + "hello.py": b"print('Hello from the container!')\n", + "data.txt": b"Sample data file\n" + } + await client.upload_zip(container.cuid, files=files) + + # Start container + await client.start_container(container.cuid) + await client.wait_for_status(container.cuid, "running") + + print("Entering interactive terminal...") + print("Commands to try:") + print(" - ls -la") + print(" - python hello.py") + print(" - cat data.txt") + print(" - python --version") + print("\nPress Ctrl+] to exit the terminal\n") + print("=" * 40) + + # Enter interactive terminal + await client.enter_container(container.cuid) + + # Clean up after terminal session + print("\n" + "=" * 40) + print("\nCleaning up...") + await client.stop_container(container.cuid) + await client.delete_container(container.cuid) + print("✓ Container deleted\n") + + +async def example_batch_operations(): + """Batch operations example""" + print("=== Batch Operations Example ===\n") + + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create multiple containers at once + print("Creating 3 containers in parallel...") + + configs = [ + ContainerConfig( + image="alpine:latest", + env={"NAME": "Worker 1"}, + tags=["worker", "batch"] + ), + ContainerConfig( + image="alpine:latest", + env={"NAME": "Worker 2"}, + tags=["worker", "batch"] + ), + ContainerConfig( + image="alpine:latest", + env={"NAME": "Worker 3"}, + tags=["worker", "batch"] + ) + ] + + containers = await client.batch_create(configs) + print(f"✓ Created {len(containers)} containers\n") + + # Prepare containers with a simple script + for i, container in enumerate(containers, 1): + files = { + "boot.py": f""" +import os +print(f"Worker {{os.environ.get('NAME')}} ready!") +""".encode() + } + await client.upload_zip(container.cuid, files=files) + await client.start_container(container.cuid) + + print("Executing commands on all containers...") + + # Execute commands in parallel + commands = { + containers[0].cuid: "echo 'Worker 1 processing...' && sleep 1 && echo 'Worker 1 done!'", + containers[1].cuid: "echo 'Worker 2 processing...' && sleep 1 && echo 'Worker 2 done!'", + containers[2].cuid: "echo 'Worker 3 processing...' && sleep 1 && echo 'Worker 3 done!'" + } + + results = await client.batch_execute(commands, timeout=10) + + print("\nResults:") + print("-" * 40) + for i, (cuid, (stdout, stderr, exit_code)) in enumerate(results.items(), 1): + print(f"Worker {i}:") + print(f" Output: {stdout.strip()}") + print(f" Exit Code: {exit_code}") + print("-" * 40) + + # Clean up all containers + print("\nCleaning up all containers...") + cuids = [c.cuid for c in containers] + errors = await client.batch_delete(cuids) + + failed = sum(1 for e in errors if e is not None) + if failed == 0: + print(f"✓ All {len(containers)} containers deleted\n") + else: + print(f"⚠ Deleted {len(containers) - failed} containers, {failed} failed\n") + + +async def example_file_operations(): + """File upload and download example""" + print("=== File Operations Example ===\n") + + async with ContainerClient("http://localhost:8080", "admin", "password") as client: + # Create container + print("Creating container for file operations...") + container = await client.create_container( + image="python:3.12-slim", + tags=["files", "example"] + ) + print(f"✓ Created container: {container.cuid}\n") + + # Upload multiple files + print("Uploading application files...") + app_files = { + "app.py": b""" +import json + +# Generate some data +data = { + 'processed': True, + 'items': [1, 2, 3, 4, 5], + 'message': 'Data processed successfully' +} + +# Save to file +with open('output.json', 'w') as f: + json.dump(data, f, indent=2) + +print('Data saved to output.json') +""", + "config/settings.json": b'{"debug": true, "version": "1.0"}', + "data/input.txt": b"Sample input data\nLine 2\nLine 3", + "boot.py": b"print('Container ready!')" + } + + await client.upload_zip(container.cuid, files=app_files) + print("✓ Files uploaded\n") + + # Start container and run the app + await client.start_container(container.cuid) + + print("Running application...") + stdout, stderr, exit_code = await client.execute_command( + container.cuid, + "python app.py" + ) + print(f"Output: {stdout}") + + # Download the generated file + print("\nDownloading generated file...") + content = await client.download_file( + container.cuid, + path="output.json" + ) + + import json + result = json.loads(content) + print(f"Downloaded content: {json.dumps(result, indent=2)}\n") + + # Download entire directory as ZIP + print("Downloading entire container as ZIP...") + zip_data = await client.download_zip( + container.cuid, + path="", + save_to="container_backup.zip" + ) + print(f"✓ Saved backup to container_backup.zip ({len(zip_data)} bytes)\n") + + # Clean up + await client.stop_container(container.cuid) + await client.delete_container(container.cuid) + print("✓ Container deleted\n") + + +async def main(): + """Main function to run examples""" + examples = { + "1": ("Basic Operations", example_basic_operations), + "2": ("Interactive Terminal", example_interactive_terminal), + "3": ("Batch Operations", example_batch_operations), + "4": ("File Operations", example_file_operations) + } + + print("\nContainer Management API Client Examples") + print("=" * 40) + print("\nAvailable examples:") + for key, (name, _) in examples.items(): + print(f" {key}. {name}") + print(" 0. Run all examples") + print(" q. Quit") + + choice = input("\nSelect an example (0-4 or q): ").strip().lower() + + if choice == 'q': + print("Goodbye!") + return + elif choice == '0': + # Run all examples + for name, func in examples.values(): + print(f"\n{'='*50}") + print(f"Running: {name}") + print('='*50 + "\n") + await func() + print("\nPress Enter to continue...") + input() + elif choice in examples: + name, func = examples[choice] + print(f"\n{'='*50}") + print(f"Running: {name}") + print('='*50 + "\n") + await func() + else: + print("Invalid choice!") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\nInterrupted by user") + except Exception as e: + print(f"\nError: {e}") + sys.exit(1)