All source listed below is under MIT license if no LICENSE file stating different is available.

Container Management API Integration Guide

Overview

This API provides comprehensive container management capabilities through REST endpoints, WebSocket connections, and file upload/download interfaces. All containers are managed via Docker Compose v2 CLI.

Quick Start

1. Prerequisites

# Required software
- Python 3.12+
- Docker with Docker Compose v2
- Linux environment

# Python dependencies
pip install aiohttp aiofiles python-dotenv ruamel.yaml

2. Configuration

Create a .env file:

APP_USER=admin
APP_PASS=your_secure_password_here
DEFAULT_USER_UID=1000:1000

3. Run the Server

python container_manager.py
# Server starts at http://0.0.0.0:8080

Authentication

All API endpoints require Basic Authentication.

Authorization: Basic base64(username:password)

Example:

import base64

username = "admin"
password = "your_secure_password_here"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}

REST API Integration

Container Lifecycle

Create Container

import requests
import json

# Create container with Python example
def create_container(base_url, auth_headers):
    payload = {
        "image": "python:3.12-slim",
        "env": {
            "APP_ENV": "production",
            "DEBUG": "false"
        },
        "tags": ["web", "production"],
        "resources": {
            "cpus": 1.0,
            "memory": "4096m",
            "pids": 2048
        },
        "ports": [
            {
                "host": 8080,
                "container": 80,
                "protocol": "tcp"
            }
        ]
    }
    
    response = requests.post(
        f"{base_url}/containers",
        json=payload,
        headers=auth_headers
    )
    
    if response.status_code == 201:
        container = response.json()
        print(f"Created container: {container['cuid']}")
        return container['cuid']
    else:
        print(f"Error: {response.json()}")
        return None

# Usage
base_url = "http://localhost:8080"
auth = base64.b64encode(b"admin:password").decode()
headers = {"Authorization": f"Basic {auth}"}

cuid = create_container(base_url, headers)

List Containers with Pagination

// Node.js example with axios
const axios = require('axios');

async function listContainers(baseUrl, auth, status = null, cursor = null) {
    const params = {};
    if (status) params.status = status.join(',');
    if (cursor) params.cursor = cursor;
    
    const response = await axios.get(`${baseUrl}/containers`, {
        params,
        headers: {
            'Authorization': `Basic ${auth}`
        }
    });
    
    const data = response.data;
    console.log(`Found ${data.containers.length} containers`);
    
    // Handle pagination
    if (data.next_cursor) {
        console.log('More results available, cursor:', data.next_cursor);
        // Fetch next page
        await listContainers(baseUrl, auth, status, data.next_cursor);
    }
    
    return data.containers;
}

// Usage
const auth = Buffer.from('admin:password').toString('base64');
listContainers('http://localhost:8080', auth, ['running', 'paused']);

Update Container Configuration

// Go example
package main

import (
    "bytes"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
)

type UpdateRequest struct {
    Env       map[string]interface{} `json:"env,omitempty"`
    Tags      []string              `json:"tags,omitempty"`
    Resources map[string]interface{} `json:"resources,omitempty"`
    Image     string                `json:"image,omitempty"`
}

func updateContainer(baseURL, cuid, username, password string) error {
    update := UpdateRequest{
        Env: map[string]interface{}{
            "NEW_VAR": "value",
            "OLD_VAR": nil, // Remove this env var
        },
        Tags: []string{"updated", "v2"},
        Resources: map[string]interface{}{
            "memory": "8192m",
        },
    }
    
    jsonData, _ := json.Marshal(update)
    
    req, err := http.NewRequest("PATCH", 
        fmt.Sprintf("%s/containers/%s", baseURL, cuid),
        bytes.NewBuffer(jsonData))
    if err != nil {
        return err
    }
    
    auth := base64.StdEncoding.EncodeToString(
        []byte(fmt.Sprintf("%s:%s", username, password)))
    req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth))
    req.Header.Set("Content-Type", "application/json")
    
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != 200 {
        return fmt.Errorf("update failed with status: %d", resp.StatusCode)
    }
    
    return nil
}

Container Control Operations

# Ruby example
require 'net/http'
require 'json'
require 'base64'

class ContainerManager
  def initialize(base_url, username, password)
    @base_url = base_url
    @auth = Base64.encode64("#{username}:#{password}").strip
  end
  
  def control_action(cuid, action)
    uri = URI("#{@base_url}/containers/#{cuid}/#{action}")
    http = Net::HTTP.new(uri.host, uri.port)
    
    request = Net::HTTP::Post.new(uri)
    request['Authorization'] = "Basic #{@auth}"
    request['Content-Type'] = 'application/json'
    
    response = http.request(request)
    JSON.parse(response.body)
  end
  
  def start(cuid)
    control_action(cuid, 'start')
  end
  
  def stop(cuid)
    control_action(cuid, 'stop')
  end
  
  def restart(cuid)
    control_action(cuid, 'restart')
  end
  
  def pause(cuid)
    control_action(cuid, 'pause')
  end
  
  def unpause(cuid)
    control_action(cuid, 'unpause')
  end
end

# Usage
manager = ContainerManager.new('http://localhost:8080', 'admin', 'password')
manager.start('c123e4567-e89b-12d3-a456-426614174000')

Port Management

# Update container ports
def update_ports(base_url, cuid, auth_headers):
    new_ports = {
        "ports": [
            {"host": 3000, "container": 3000, "protocol": "tcp"},
            {"host": 9000, "container": 9000, "protocol": "udp"}
        ]
    }
    
    response = requests.patch(
        f"{base_url}/containers/{cuid}/ports",
        json=new_ports,
        headers=auth_headers
    )
    
    return response.status_code == 200

File Upload/Download Integration

Upload ZIP Archive

import zipfile
import io
import requests

def upload_files_to_container(base_url, cuid, files_dict, auth_headers):
    """
    Upload files to container mount
    files_dict: {"path/in/container": file_content_bytes}
    """
    # Create ZIP in memory
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
        for path, content in files_dict.items():
            zf.writestr(path, content)
    
    zip_buffer.seek(0)
    
    # Upload ZIP
    response = requests.post(
        f"{base_url}/containers/{cuid}/upload-zip",
        data=zip_buffer.read(),
        headers={
            **auth_headers,
            'Content-Type': 'application/zip'
        }
    )
    
    return response.status_code == 200

# Example: Upload Python application
files = {
    "boot.py": b"""
import os
import time

print(f"Container {os.environ.get('CONTAINER_UID')} started")
print(f"Tags: {os.environ.get('TAGS', 'none')}")

while True:
    print("Working...")
    time.sleep(10)
""",
    "requirements.txt": b"requests==2.31.0\nnumpy==1.24.0\n",
    "data/config.json": b'{"setting": "value"}'
}

upload_files_to_container(base_url, cuid, files, headers)

Download Files

// Download single file
async function downloadFile(baseUrl, cuid, filePath, auth) {
    const response = await axios.get(
        `${baseUrl}/containers/${cuid}/download`,
        {
            params: { path: filePath },
            headers: { 'Authorization': `Basic ${auth}` },
            responseType: 'stream'
        }
    );
    
    // Save to file
    const fs = require('fs');
    const writer = fs.createWriteStream('downloaded_file');
    response.data.pipe(writer);
    
    return new Promise((resolve, reject) => {
        writer.on('finish', resolve);
        writer.on('error', reject);
    });
}

// Download directory as ZIP
async function downloadDirectory(baseUrl, cuid, dirPath, auth) {
    const response = await axios.get(
        `${baseUrl}/containers/${cuid}/download-zip`,
        {
            params: { path: dirPath },
            headers: { 'Authorization': `Basic ${auth}` },
            responseType: 'arraybuffer'
        }
    );
    
    // Save ZIP file
    fs.writeFileSync('container_backup.zip', response.data);
}

WebSocket Integration

Interactive Terminal Session

import asyncio
import websockets
import json
import base64

class ContainerTerminal:
    def __init__(self, base_url, cuid, username, password):
        self.ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
        self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.ws = None
    
    async def connect(self, cols=80, rows=24):
        # Connect with TTY size
        headers = {"Authorization": f"Basic {self.auth}"}
        uri = f"{self.ws_url}?cols={cols}&rows={rows}"
        
        self.ws = await websockets.connect(uri, extra_headers=headers)
        
        # Start receiver task
        asyncio.create_task(self._receive_output())
    
    async def _receive_output(self):
        """Receive and print output from container"""
        async for message in self.ws:
            data = json.loads(message)
            
            if data['type'] == 'stdout':
                if data['encoding'] == 'base64':
                    import base64
                    text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
                else:
                    text = data['data']
                print(text, end='')
            
            elif data['type'] == 'stderr':
                if data['encoding'] == 'base64':
                    import base64
                    text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
                else:
                    text = data['data']
                print(f"[ERROR] {text}", end='')
            
            elif data['type'] == 'exit':
                print(f"\n[Process exited with code {data['code']}]")
                await self.ws.close()
            
            elif data['type'] == 'error':
                print(f"[ERROR] {data['error']}")
    
    async def send_input(self, text):
        """Send input to container"""
        await self.ws.send(json.dumps({
            "type": "stdin",
            "data": text,
            "encoding": "utf8"
        }))
    
    async def resize(self, cols, rows):
        """Resize terminal"""
        await self.ws.send(json.dumps({
            "type": "resize",
            "cols": cols,
            "rows": rows
        }))
    
    async def send_signal(self, signal_name):
        """Send signal (INT, TERM, KILL)"""
        await self.ws.send(json.dumps({
            "type": "signal",
            "name": signal_name
        }))
    
    async def run_interactive(self):
        """Run interactive terminal"""
        import aioconsole
        
        await self.connect()
        
        print("Connected to container. Type 'exit' to quit.")
        print("Special commands: !INT (Ctrl+C), !TERM (terminate), !KILL (force kill)")
        
        while self.ws and not self.ws.closed:
            try:
                line = await aioconsole.ainput()
                
                if line == 'exit':
                    await self.ws.send(json.dumps({"type": "close"}))
                    break
                elif line == '!INT':
                    await self.send_signal('INT')
                elif line == '!TERM':
                    await self.send_signal('TERM')
                elif line == '!KILL':
                    await self.send_signal('KILL')
                else:
                    await self.send_input(line + '\n')
            
            except Exception as e:
                print(f"Error: {e}")
                break

# Usage
async def main():
    terminal = ContainerTerminal(
        'localhost:8080',
        'c123e4567-e89b-12d3-a456-426614174000',
        'admin',
        'password'
    )
    await terminal.run_interactive()

asyncio.run(main())

Non-Interactive Command Execution

const WebSocket = require('ws');

class ContainerExecutor {
    constructor(baseUrl, cuid, username, password) {
        this.wsUrl = `ws://${baseUrl.replace('http://', '')}/ws/${cuid}`;
        this.auth = Buffer.from(`${username}:${password}`).toString('base64');
    }
    
    async execute(command) {
        return new Promise((resolve, reject) => {
            const ws = new WebSocket(this.wsUrl, {
                headers: {
                    'Authorization': `Basic ${this.auth}`
                }
            });
            
            let output = '';
            let errorOutput = '';
            
            ws.on('open', () => {
                // Send command
                ws.send(JSON.stringify({
                    type: 'stdin',
                    data: command + '\n',
                    encoding: 'utf8'
                }));
                
                // Send EOF/exit after command
                setTimeout(() => {
                    ws.send(JSON.stringify({
                        type: 'stdin',
                        data: 'exit\n',
                        encoding: 'utf8'
                    }));
                }, 100);
            });
            
            ws.on('message', (data) => {
                const msg = JSON.parse(data.toString());
                
                if (msg.type === 'stdout') {
                    output += msg.encoding === 'base64' 
                        ? Buffer.from(msg.data, 'base64').toString()
                        : msg.data;
                } else if (msg.type === 'stderr') {
                    errorOutput += msg.encoding === 'base64'
                        ? Buffer.from(msg.data, 'base64').toString()
                        : msg.data;
                } else if (msg.type === 'exit') {
                    ws.close();
                    resolve({
                        exitCode: msg.code,
                        stdout: output,
                        stderr: errorOutput
                    });
                } else if (msg.type === 'error') {
                    ws.close();
                    reject(new Error(msg.error.message));
                }
            });
            
            ws.on('error', reject);
        });
    }
}

// Usage
const executor = new ContainerExecutor(
    'localhost:8080',
    'c123e4567-e89b-12d3-a456-426614174000',
    'admin',
    'password'
);

executor.execute('ls -la /app')
    .then(result => {
        console.log('Exit code:', result.exitCode);
        console.log('Output:', result.stdout);
        if (result.stderr) {
            console.error('Errors:', result.stderr);
        }
    })
    .catch(err => console.error('Failed:', err));

Error Handling

All API errors follow a consistent JSON schema:

{
    "error": "validation_error",
    "code": "MISSING_IMAGE",
    "message": "Image is required",
    "status": 400,
    "request_id": "123e4567-e89b-12d3-a456-426614174000",
    "timestamp": "2024-01-20T15:30:00+01:00",
    "details": {
        "field": "image",
        "provided": null
    },
    "stacktrace": "..." // Only in development mode
}

Error Types

Error ID Description HTTP Status
auth_error Authentication failed 401
validation_error Invalid request data 400
not_found Resource not found 404
conflict Resource conflict 409
compose_error Docker Compose operation failed 500
io_error File I/O operation failed 500
timeout Operation timed out 408
state_error Invalid state transition 422

Error Handling Example

def safe_api_call(func):
    """Decorator for safe API calls with retry logic"""
    def wrapper(*args, **kwargs):
        max_retries = 3
        retry_delay = 1
        
        for attempt in range(max_retries):
            try:
                response = func(*args, **kwargs)
                
                if response.status_code >= 500:
                    # Server error, retry
                    if attempt < max_retries - 1:
                        time.sleep(retry_delay * (attempt + 1))
                        continue
                
                if response.status_code >= 400:
                    # Client error, don't retry
                    error = response.json()
                    print(f"API Error: {error['message']} (ID: {error['request_id']})")
                    return None
                
                return response.json()
                
            except requests.exceptions.RequestException as e:
                print(f"Network error (attempt {attempt + 1}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(retry_delay * (attempt + 1))
                else:
                    raise
        
        return None
    
    return wrapper

@safe_api_call
def get_container_status(base_url, cuid, headers):
    return requests.get(f"{base_url}/containers/{cuid}/status", headers=headers)

Best Practices

1. Connection Pooling

# Use session for connection pooling
import requests

class APIClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.session = requests.Session()
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.session.headers.update({"Authorization": f"Basic {auth}"})
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()
    
    def create_container(self, config):
        return self.session.post(f"{self.base_url}/containers", json=config)

# Usage
with APIClient('http://localhost:8080', 'admin', 'password') as client:
    response = client.create_container({"image": "nginx:alpine"})

2. Async Operations

import aiohttp
import asyncio

class AsyncAPIClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.headers = {"Authorization": f"Basic {auth}"}
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(headers=self.headers)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def batch_create(self, configs):
        tasks = []
        for config in configs:
            task = self.session.post(f"{self.base_url}/containers", json=config)
            tasks.append(task)
        
        responses = await asyncio.gather(*tasks)
        return [await r.json() for r in responses]

# Usage
async def main():
    configs = [
        {"image": "nginx:alpine"},
        {"image": "redis:alpine"},
        {"image": "postgres:alpine"}
    ]
    
    async with AsyncAPIClient('http://localhost:8080', 'admin', 'password') as client:
        containers = await client.batch_create(configs)
        print(f"Created {len(containers)} containers")

asyncio.run(main())

3. Health Monitoring

import time
import threading

class HealthMonitor:
    def __init__(self, base_url, auth_headers, check_interval=30):
        self.base_url = base_url
        self.headers = auth_headers
        self.check_interval = check_interval
        self.running = False
        self.thread = None
    
    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._monitor_loop)
        self.thread.daemon = True
        self.thread.start()
    
    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join()
    
    def _monitor_loop(self):
        while self.running:
            try:
                response = requests.get(
                    f"{self.base_url}/healthz",
                    headers=self.headers,
                    timeout=5
                )
                
                if response.status_code == 200:
                    data = response.json()
                    print(f"API healthy: Compose {data['compose_version']}")
                else:
                    print(f"API unhealthy: Status {response.status_code}")
            
            except Exception as e:
                print(f"Health check failed: {e}")
            
            time.sleep(self.check_interval)

4. Container Logs Streaming

async def stream_logs(base_url, cuid, auth):
    """Stream container logs in real-time"""
    ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
    
    async with websockets.connect(
        ws_url,
        extra_headers={"Authorization": f"Basic {auth}"}
    ) as ws:
        # Request log streaming
        await ws.send(json.dumps({
            "type": "stdin",
            "data": "tail -f /app/logs/app.log\n",
            "encoding": "utf8"
        }))
        
        # Process incoming logs
        async for message in ws:
            data = json.loads(message)
            if data['type'] == 'stdout':
                log_line = data['data']
                # Process log line (e.g., parse JSON logs, send to monitoring)
                process_log_line(log_line)

Security Considerations

  1. Always use HTTPS in production - Deploy behind a reverse proxy (nginx/traefik) with TLS
  2. Rotate credentials regularly - Update .env file and restart service
  3. Limit network exposure - Bind to localhost if only local access needed
  4. Monitor failed auth attempts - Check logs/actions.jsonl for auth_error entries
  5. Validate all inputs - The API validates paths and prevents escapes, but always sanitize on client side too
  6. Set resource limits - Always specify CPU/memory limits when creating containers

Troubleshooting

Common Issues

  1. Container won't start

    • Check if /app/boot.py exists in the mount
    • Verify the image is available locally
    • Check compose logs: docker compose logs <cuid>
  2. WebSocket connection fails

    • Ensure Basic Auth is included in WebSocket headers
    • Check if container is running before connecting
    • Verify /app/boot.py exists
  3. File upload fails

    • Check file size (no limit by default, but system may have limits)
    • Ensure ZIP format is valid
    • Verify mount directory exists and has correct permissions
  4. Port conflicts

    • Check if host ports are already in use
    • Use docker compose ps to verify current port mappings

Debug Mode

Enable debug logging by examining logs/actions.jsonl:

import json
from datetime import datetime

def analyze_logs(log_file='logs/actions.jsonl'):
    errors = []
    slow_requests = []
    
    with open(log_file, 'r') as f:
        for line in f:
            entry = json.loads(line)
            
            # Find errors
            if entry.get('error'):
                errors.append(entry)
            
            # Find slow requests (>5 seconds)
            if entry.get('duration_ms', 0) > 5000:
                slow_requests.append(entry)
    
    print(f"Found {len(errors)} errors")
    print(f"Found {len(slow_requests)} slow requests")
    
    return errors, slow_requests

Complete Integration Example

Here's a full example that creates a container, uploads code, executes it, and cleans up:

import asyncio
import aiohttp
import base64
import json
import zipfile
import io

class ContainerOrchestrator:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.headers = {"Authorization": f"Basic {auth}"}
    
    async def deploy_and_run(self, image, code, env=None):
        async with aiohttp.ClientSession(headers=self.headers) as session:
            # 1. Create container
            create_payload = {
                "image": image,
                "env": env or {},
                "tags": ["automated"],
                "resources": {
                    "cpus": 0.5,
                    "memory": "1024m"
                }
            }
            
            async with session.post(
                f"{self.base_url}/containers",
                json=create_payload
            ) as resp:
                if resp.status != 201:
                    raise Exception(f"Failed to create container: {await resp.text()}")
                container = await resp.json()
                cuid = container['cuid']
                print(f"Created container: {cuid}")
            
            # 2. Upload code
            zip_buffer = io.BytesIO()
            with zipfile.ZipFile(zip_buffer, 'w') as zf:
                zf.writestr('boot.py', code)
            
            async with session.post(
                f"{self.base_url}/containers/{cuid}/upload-zip",
                data=zip_buffer.getvalue(),
                headers={**self.headers, 'Content-Type': 'application/zip'}
            ) as resp:
                if resp.status != 200:
                    raise Exception(f"Failed to upload code: {await resp.text()}")
                print("Code uploaded successfully")
            
            # 3. Start container
            async with session.post(
                f"{self.base_url}/containers/{cuid}/start"
            ) as resp:
                if resp.status != 200:
                    raise Exception(f"Failed to start container: {await resp.text()}")
                print("Container started")
            
            # 4. Connect via WebSocket to see output
            ws_url = f"ws://{self.base_url.replace('http://', '')}/ws/{cuid}"
            
            async with aiohttp.ClientSession() as ws_session:
                async with ws_session.ws_connect(
                    ws_url,
                    headers=self.headers
                ) as ws:
                    print("Connected to container output:")
                    
                    # Read output for 10 seconds
                    timeout = asyncio.create_task(asyncio.sleep(10))
                    receive = asyncio.create_task(ws.receive())
                    
                    while not timeout.done():
                        done, pending = await asyncio.wait(
                            {timeout, receive},
                            return_when=asyncio.FIRST_COMPLETED
                        )
                        
                        if receive in done:
                            msg = receive.result()
                            if msg.type == aiohttp.WSMsgType.TEXT:
                                data = json.loads(msg.data)
                                if data['type'] == 'stdout':
                                    print(f"[OUT] {data['data']}", end='')
                                elif data['type'] == 'stderr':
                                    print(f"[ERR] {data['data']}", end='')
                            receive = asyncio.create_task(ws.receive())
            
            # 5. Stop and delete container
            async with session.post(
                f"{self.base_url}/containers/{cuid}/stop"
            ) as resp:
                print("\nContainer stopped")
            
            async with session.delete(
                f"{self.base_url}/containers/{cuid}"
            ) as resp:
                print("Container deleted")
            
            return cuid

# Usage
async def main():
    orchestrator = ContainerOrchestrator(
        'http://localhost:8080',
        'admin',
        'password'
    )
    
    code = """
import os
import time

print(f"Hello from container {os.environ.get('CONTAINER_UID')}")
print(f"Environment: {os.environ.get('APP_ENV', 'development')}")

for i in range(5):
    print(f"Iteration {i+1}")
    time.sleep(1)

print("Done!")
"""
    
    await orchestrator.deploy_and_run(
        image='python:3.12-slim',
        code=code,
        env={'APP_ENV': 'production'}
    )

asyncio.run(main())

Support

For issues or questions about the API, check:

  1. The logs/actions.jsonl file for detailed request/response logs
  2. Docker Compose logs: docker compose logs
  3. Container-specific logs: docker compose logs <cuid>

License

This API is designed for container orchestration and management. Ensure you comply with Docker's licensing terms and your organization's security policies when deploying.

app.py
CLIENT_MANUAL.md
container_client.py
example_usage.py
README.md
requirements.txt