Container Management API Integration Guide

Overview

This API provides comprehensive container management capabilities through REST endpoints, WebSocket connections, and file upload/download interfaces. All containers are managed via Docker Compose v2 CLI.

Quick Start

1. Prerequisites

# Required software
- Python 3.12+
- Docker with Docker Compose v2
- Linux environment

# Python dependencies
pip install aiohttp aiofiles python-dotenv ruamel.yaml

2. Configuration

Create a .env file:

APP_USER=admin
APP_PASS=your_secure_password_here
DEFAULT_USER_UID=1000:1000

3. Run the Server

python container_manager.py
# Server starts at http://0.0.0.0:8080

Authentication

All API endpoints require Basic Authentication.

Authorization: Basic base64(username:password)

Example:

import base64

username = "admin"
password = "your_secure_password_here"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}

REST API Integration

Container Lifecycle

Create Container

import requests
import json

# Create container with Python example
def create_container(base_url, auth_headers):
    payload = {
        "image": "python:3.12-slim",
        "env": {
            "APP_ENV": "production",
            "DEBUG": "false"
        },
        "tags": ["web", "production"],
        "resources": {
            "cpus": 1.0,
            "memory": "4096m",
            "pids": 2048
        },
        "ports": [
            {
                "host": 8080,
                "container": 80,
                "protocol": "tcp"
            }
        ]
    }
    
    response = requests.post(
        f"{base_url}/containers",
        json=payload,
        headers=auth_headers
    )
    
    if response.status_code == 201:
        container = response.json()
        print(f"Created container: {container['cuid']}")
        return container['cuid']
    else:
        print(f"Error: {response.json()}")
        return None

# Usage
base_url = "http://localhost:8080"
auth = base64.b64encode(b"admin:password").decode()
headers = {"Authorization": f"Basic {auth}"}

cuid = create_container(base_url, headers)

List Containers with Pagination

// Node.js example with axios
const axios = require('axios');

async function listContainers(baseUrl, auth, status = null, cursor = null) {
    const params = {};
    if (status) params.status = status.join(',');
    if (cursor) params.cursor = cursor;
    
    const response = await axios.get(`${baseUrl}/containers`, {
        params,
        headers: {
            'Authorization': `Basic ${auth}`
        }
    });
    
    const data = response.data;
    console.log(`Found ${data.containers.length} containers`);
    
    // Handle pagination
    if (data.next_cursor) {
        console.log('More results available, cursor:', data.next_cursor);
        // Fetch next page
        await listContainers(baseUrl, auth, status, data.next_cursor);
    }
    
    return data.containers;
}

// Usage
const auth = Buffer.from('admin:password').toString('base64');
listContainers('http://localhost:8080', auth, ['running', 'paused']);

Update Container Configuration

// Go example
package main

import (
    "bytes"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
)

type UpdateRequest struct {
    Env       map[string]interface{} `json:"env,omitempty"`
    Tags      []string              `json:"tags,omitempty"`
    Resources map[string]interface{} `json:"resources,omitempty"`
    Image     string                `json:"image,omitempty"`
}

func updateContainer(baseURL, cuid, username, password string) error {
    update := UpdateRequest{
        Env: map[string]interface{}{
            "NEW_VAR": "value",
            "OLD_VAR": nil, // Remove this env var
        },
        Tags: []string{"updated", "v2"},
        Resources: map[string]interface{}{
            "memory": "8192m",
        },
    }
    
    jsonData, _ := json.Marshal(update)
    
    req, err := http.NewRequest("PATCH", 
        fmt.Sprintf("%s/containers/%s", baseURL, cuid),
        bytes.NewBuffer(jsonData))
    if err != nil {
        return err
    }
    
    auth := base64.StdEncoding.EncodeToString(
        []byte(fmt.Sprintf("%s:%s", username, password)))
    req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth))
    req.Header.Set("Content-Type", "application/json")
    
    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != 200 {
        return fmt.Errorf("update failed with status: %d", resp.StatusCode)
    }
    
    return nil
}

Container Control Operations

# Ruby example
require 'net/http'
require 'json'
require 'base64'

class ContainerManager
  def initialize(base_url, username, password)
    @base_url = base_url
    @auth = Base64.encode64("#{username}:#{password}").strip
  end
  
  def control_action(cuid, action)
    uri = URI("#{@base_url}/containers/#{cuid}/#{action}")
    http = Net::HTTP.new(uri.host, uri.port)
    
    request = Net::HTTP::Post.new(uri)
    request['Authorization'] = "Basic #{@auth}"
    request['Content-Type'] = 'application/json'
    
    response = http.request(request)
    JSON.parse(response.body)
  end
  
  def start(cuid)
    control_action(cuid, 'start')
  end
  
  def stop(cuid)
    control_action(cuid, 'stop')
  end
  
  def restart(cuid)
    control_action(cuid, 'restart')
  end
  
  def pause(cuid)
    control_action(cuid, 'pause')
  end
  
  def unpause(cuid)
    control_action(cuid, 'unpause')
  end
end

# Usage
manager = ContainerManager.new('http://localhost:8080', 'admin', 'password')
manager.start('c123e4567-e89b-12d3-a456-426614174000')

Port Management

# Update container ports
def update_ports(base_url, cuid, auth_headers):
    new_ports = {
        "ports": [
            {"host": 3000, "container": 3000, "protocol": "tcp"},
            {"host": 9000, "container": 9000, "protocol": "udp"}
        ]
    }
    
    response = requests.patch(
        f"{base_url}/containers/{cuid}/ports",
        json=new_ports,
        headers=auth_headers
    )
    
    return response.status_code == 200

File Upload/Download Integration

Upload ZIP Archive

import zipfile
import io
import requests

def upload_files_to_container(base_url, cuid, files_dict, auth_headers):
    """
    Upload files to container mount
    files_dict: {"path/in/container": file_content_bytes}
    """
    # Create ZIP in memory
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
        for path, content in files_dict.items():
            zf.writestr(path, content)
    
    zip_buffer.seek(0)
    
    # Upload ZIP
    response = requests.post(
        f"{base_url}/containers/{cuid}/upload-zip",
        data=zip_buffer.read(),
        headers={
            **auth_headers,
            'Content-Type': 'application/zip'
        }
    )
    
    return response.status_code == 200

# Example: Upload Python application
files = {
    "boot.py": b"""
import os
import time

print(f"Container {os.environ.get('CONTAINER_UID')} started")
print(f"Tags: {os.environ.get('TAGS', 'none')}")

while True:
    print("Working...")
    time.sleep(10)
""",
    "requirements.txt": b"requests==2.31.0\nnumpy==1.24.0\n",
    "data/config.json": b'{"setting": "value"}'
}

upload_files_to_container(base_url, cuid, files, headers)

Download Files

// Download single file
async function downloadFile(baseUrl, cuid, filePath, auth) {
    const response = await axios.get(
        `${baseUrl}/containers/${cuid}/download`,
        {
            params: { path: filePath },
            headers: { 'Authorization': `Basic ${auth}` },
            responseType: 'stream'
        }
    );
    
    // Save to file
    const fs = require('fs');
    const writer = fs.createWriteStream('downloaded_file');
    response.data.pipe(writer);
    
    return new Promise((resolve, reject) => {
        writer.on('finish', resolve);
        writer.on('error', reject);
    });
}

// Download directory as ZIP
async function downloadDirectory(baseUrl, cuid, dirPath, auth) {
    const response = await axios.get(
        `${baseUrl}/containers/${cuid}/download-zip`,
        {
            params: { path: dirPath },
            headers: { 'Authorization': `Basic ${auth}` },
            responseType: 'arraybuffer'
        }
    );
    
    // Save ZIP file
    fs.writeFileSync('container_backup.zip', response.data);
}

WebSocket Integration

Interactive Terminal Session

import asyncio
import websockets
import json
import base64

class ContainerTerminal:
    def __init__(self, base_url, cuid, username, password):
        self.ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
        self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.ws = None
    
    async def connect(self, cols=80, rows=24):
        # Connect with TTY size
        headers = {"Authorization": f"Basic {self.auth}"}
        uri = f"{self.ws_url}?cols={cols}&rows={rows}"
        
        self.ws = await websockets.connect(uri, extra_headers=headers)
        
        # Start receiver task
        asyncio.create_task(self._receive_output())
    
    async def _receive_output(self):
        """Receive and print output from container"""
        async for message in self.ws:
            data = json.loads(message)
            
            if data['type'] == 'stdout':
                if data['encoding'] == 'base64':
                    import base64
                    text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
                else:
                    text = data['data']
                print(text, end='')
            
            elif data['type'] == 'stderr':
                if data['encoding'] == 'base64':
                    import base64
                    text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
                else:
                    text = data['data']
                print(f"[ERROR] {text}", end='')
            
            elif data['type'] == 'exit':
                print(f"\n[Process exited with code {data['code']}]")
                await self.ws.close()
            
            elif data['type'] == 'error':
                print(f"[ERROR] {data['error']}")
    
    async def send_input(self, text):
        """Send input to container"""
        await self.ws.send(json.dumps({
            "type": "stdin",
            "data": text,
            "encoding": "utf8"
        }))
    
    async def resize(self, cols, rows):
        """Resize terminal"""
        await self.ws.send(json.dumps({
            "type": "resize",
            "cols": cols,
            "rows": rows
        }))
    
    async def send_signal(self, signal_name):
        """Send signal (INT, TERM, KILL)"""
        await self.ws.send(json.dumps({
            "type": "signal",
            "name": signal_name
        }))
    
    async def run_interactive(self):
        """Run interactive terminal"""
        import aioconsole
        
        await self.connect()
        
        print("Connected to container. Type 'exit' to quit.")
        print("Special commands: !INT (Ctrl+C), !TERM (terminate), !KILL (force kill)")
        
        while self.ws and not self.ws.closed:
            try:
                line = await aioconsole.ainput()
                
                if line == 'exit':
                    await self.ws.send(json.dumps({"type": "close"}))
                    break
                elif line == '!INT':
                    await self.send_signal('INT')
                elif line == '!TERM':
                    await self.send_signal('TERM')
                elif line == '!KILL':
                    await self.send_signal('KILL')
                else:
                    await self.send_input(line + '\n')
            
            except Exception as e:
                print(f"Error: {e}")
                break

# Usage
async def main():
    terminal = ContainerTerminal(
        'localhost:8080',
        'c123e4567-e89b-12d3-a456-426614174000',
        'admin',
        'password'
    )
    await terminal.run_interactive()

asyncio.run(main())

Non-Interactive Command Execution

const WebSocket = require('ws');

class ContainerExecutor {
    constructor(baseUrl, cuid, username, password) {
        this.wsUrl = `ws://${baseUrl.replace('http://', '')}/ws/${cuid}`;
        this.auth = Buffer.from(`${username}:${password}`).toString('base64');
    }
    
    async execute(command) {
        return new Promise((resolve, reject) => {
            const ws = new WebSocket(this.wsUrl, {
                headers: {
                    'Authorization': `Basic ${this.auth}`
                }
            });
            
            let output = '';
            let errorOutput = '';
            
            ws.on('open', () => {
                // Send command
                ws.send(JSON.stringify({
                    type: 'stdin',
                    data: command + '\n',
                    encoding: 'utf8'
                }));
                
                // Send EOF/exit after command
                setTimeout(() => {
                    ws.send(JSON.stringify({
                        type: 'stdin',
                        data: 'exit\n',
                        encoding: 'utf8'
                    }));
                }, 100);
            });
            
            ws.on('message', (data) => {
                const msg = JSON.parse(data.toString());
                
                if (msg.type === 'stdout') {
                    output += msg.encoding === 'base64' 
                        ? Buffer.from(msg.data, 'base64').toString()
                        : msg.data;
                } else if (msg.type === 'stderr') {
                    errorOutput += msg.encoding === 'base64'
                        ? Buffer.from(msg.data, 'base64').toString()
                        : msg.data;
                } else if (msg.type === 'exit') {
                    ws.close();
                    resolve({
                        exitCode: msg.code,
                        stdout: output,
                        stderr: errorOutput
                    });
                } else if (msg.type === 'error') {
                    ws.close();
                    reject(new Error(msg.error.message));
                }
            });
            
            ws.on('error', reject);
        });
    }
}

// Usage
const executor = new ContainerExecutor(
    'localhost:8080',
    'c123e4567-e89b-12d3-a456-426614174000',
    'admin',
    'password'
);

executor.execute('ls -la /app')
    .then(result => {
        console.log('Exit code:', result.exitCode);
        console.log('Output:', result.stdout);
        if (result.stderr) {
            console.error('Errors:', result.stderr);
        }
    })
    .catch(err => console.error('Failed:', err));

Error Handling

All API errors follow a consistent JSON schema:

{
    "error": "validation_error",
    "code": "MISSING_IMAGE",
    "message": "Image is required",
    "status": 400,
    "request_id": "123e4567-e89b-12d3-a456-426614174000",
    "timestamp": "2024-01-20T15:30:00+01:00",
    "details": {
        "field": "image",
        "provided": null
    },
    "stacktrace": "..." // Only in development mode
}

Error Types

Error ID Description HTTP Status
auth_error Authentication failed 401
validation_error Invalid request data 400
not_found Resource not found 404
conflict Resource conflict 409
compose_error Docker Compose operation failed 500
io_error File I/O operation failed 500
timeout Operation timed out 408
state_error Invalid state transition 422

Error Handling Example

def safe_api_call(func):
    """Decorator for safe API calls with retry logic"""
    def wrapper(*args, **kwargs):
        max_retries = 3
        retry_delay = 1
        
        for attempt in range(max_retries):
            try:
                response = func(*args, **kwargs)
                
                if response.status_code >= 500:
                    # Server error, retry
                    if attempt < max_retries - 1:
                        time.sleep(retry_delay * (attempt + 1))
                        continue
                
                if response.status_code >= 400:
                    # Client error, don't retry
                    error = response.json()
                    print(f"API Error: {error['message']} (ID: {error['request_id']})")
                    return None
                
                return response.json()
                
            except requests.exceptions.RequestException as e:
                print(f"Network error (attempt {attempt + 1}): {e}")
                if attempt < max_retries - 1:
                    time.sleep(retry_delay * (attempt + 1))
                else:
                    raise
        
        return None
    
    return wrapper

@safe_api_call
def get_container_status(base_url, cuid, headers):
    return requests.get(f"{base_url}/containers/{cuid}/status", headers=headers)

Best Practices

1. Connection Pooling

# Use session for connection pooling
import requests

class APIClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.session = requests.Session()
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.session.headers.update({"Authorization": f"Basic {auth}"})
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()
    
    def create_container(self, config):
        return self.session.post(f"{self.base_url}/containers", json=config)

# Usage
with APIClient('http://localhost:8080', 'admin', 'password') as client:
    response = client.create_container({"image": "nginx:alpine"})

2. Async Operations

import aiohttp
import asyncio

class AsyncAPIClient:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.headers = {"Authorization": f"Basic {auth}"}
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(headers=self.headers)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    
    async def batch_create(self, configs):
        tasks = []
        for config in configs:
            task = self.session.post(f"{self.base_url}/containers", json=config)
            tasks.append(task)
        
        responses = await asyncio.gather(*tasks)
        return [await r.json() for r in responses]

# Usage
async def main():
    configs = [
        {"image": "nginx:alpine"},
        {"image": "redis:alpine"},
        {"image": "postgres:alpine"}
    ]
    
    async with AsyncAPIClient('http://localhost:8080', 'admin', 'password') as client:
        containers = await client.batch_create(configs)
        print(f"Created {len(containers)} containers")

asyncio.run(main())

3. Health Monitoring

import time
import threading

class HealthMonitor:
    def __init__(self, base_url, auth_headers, check_interval=30):
        self.base_url = base_url
        self.headers = auth_headers
        self.check_interval = check_interval
        self.running = False
        self.thread = None
    
    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._monitor_loop)
        self.thread.daemon = True
        self.thread.start()
    
    def stop(self):
        self.running = False
        if self.thread:
            self.thread.join()
    
    def _monitor_loop(self):
        while self.running:
            try:
                response = requests.get(
                    f"{self.base_url}/healthz",
                    headers=self.headers,
                    timeout=5
                )
                
                if response.status_code == 200:
                    data = response.json()
                    print(f"API healthy: Compose {data['compose_version']}")
                else:
                    print(f"API unhealthy: Status {response.status_code}")
            
            except Exception as e:
                print(f"Health check failed: {e}")
            
            time.sleep(self.check_interval)

4. Container Logs Streaming

async def stream_logs(base_url, cuid, auth):
    """Stream container logs in real-time"""
    ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
    
    async with websockets.connect(
        ws_url,
        extra_headers={"Authorization": f"Basic {auth}"}
    ) as ws:
        # Request log streaming
        await ws.send(json.dumps({
            "type": "stdin",
            "data": "tail -f /app/logs/app.log\n",
            "encoding": "utf8"
        }))
        
        # Process incoming logs
        async for message in ws:
            data = json.loads(message)
            if data['type'] == 'stdout':
                log_line = data['data']
                # Process log line (e.g., parse JSON logs, send to monitoring)
                process_log_line(log_line)

Security Considerations

  1. Always use HTTPS in production - Deploy behind a reverse proxy (nginx/traefik) with TLS
  2. Rotate credentials regularly - Update .env file and restart service
  3. Limit network exposure - Bind to localhost if only local access needed
  4. Monitor failed auth attempts - Check logs/actions.jsonl for auth_error entries
  5. Validate all inputs - The API validates paths and prevents escapes, but always sanitize on client side too
  6. Set resource limits - Always specify CPU/memory limits when creating containers

Troubleshooting

Common Issues

  1. Container won't start

    • Check if /app/boot.py exists in the mount
    • Verify the image is available locally
    • Check compose logs: docker compose logs <cuid>
  2. WebSocket connection fails

    • Ensure Basic Auth is included in WebSocket headers
    • Check if container is running before connecting
    • Verify /app/boot.py exists
  3. File upload fails

    • Check file size (no limit by default, but system may have limits)
    • Ensure ZIP format is valid
    • Verify mount directory exists and has correct permissions
  4. Port conflicts

    • Check if host ports are already in use
    • Use docker compose ps to verify current port mappings

Debug Mode

Enable debug logging by examining logs/actions.jsonl:

import json
from datetime import datetime

def analyze_logs(log_file='logs/actions.jsonl'):
    errors = []
    slow_requests = []
    
    with open(log_file, 'r') as f:
        for line in f:
            entry = json.loads(line)
            
            # Find errors
            if entry.get('error'):
                errors.append(entry)
            
            # Find slow requests (>5 seconds)
            if entry.get('duration_ms', 0) > 5000:
                slow_requests.append(entry)
    
    print(f"Found {len(errors)} errors")
    print(f"Found {len(slow_requests)} slow requests")
    
    return errors, slow_requests

Complete Integration Example

Here's a full example that creates a container, uploads code, executes it, and cleans up:

import asyncio
import aiohttp
import base64
import json
import zipfile
import io

class ContainerOrchestrator:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        auth = base64.b64encode(f"{username}:{password}".encode()).decode()
        self.headers = {"Authorization": f"Basic {auth}"}
    
    async def deploy_and_run(self, image, code, env=None):
        async with aiohttp.ClientSession(headers=self.headers) as session:
            # 1. Create container
            create_payload = {
                "image": image,
                "env": env or {},
                "tags": ["automated"],
                "resources": {
                    "cpus": 0.5,
                    "memory": "1024m"
                }
            }
            
            async with session.post(
                f"{self.base_url}/containers",
                json=create_payload
            ) as resp:
                if resp.status != 201:
                    raise Exception(f"Failed to create container: {await resp.text()}")
                container = await resp.json()
                cuid = container['cuid']
                print(f"Created container: {cuid}")
            
            # 2. Upload code
            zip_buffer = io.BytesIO()
            with zipfile.ZipFile(zip_buffer, 'w') as zf:
                zf.writestr('boot.py', code)
            
            async with session.post(
                f"{self.base_url}/containers/{cuid}/upload-zip",
                data=zip_buffer.getvalue(),
                headers={**self.headers, 'Content-Type': 'application/zip'}
            ) as resp:
                if resp.status != 200:
                    raise Exception(f"Failed to upload code: {await resp.text()}")
                print("Code uploaded successfully")
            
            # 3. Start container
            async with session.post(
                f"{self.base_url}/containers/{cuid}/start"
            ) as resp:
                if resp.status != 200:
                    raise Exception(f"Failed to start container: {await resp.text()}")
                print("Container started")
            
            # 4. Connect via WebSocket to see output
            ws_url = f"ws://{self.base_url.replace('http://', '')}/ws/{cuid}"
            
            async with aiohttp.ClientSession() as ws_session:
                async with ws_session.ws_connect(
                    ws_url,
                    headers=self.headers
                ) as ws:
                    print("Connected to container output:")
                    
                    # Read output for 10 seconds
                    timeout = asyncio.create_task(asyncio.sleep(10))
                    receive = asyncio.create_task(ws.receive())
                    
                    while not timeout.done():
                        done, pending = await asyncio.wait(
                            {timeout, receive},
                            return_when=asyncio.FIRST_COMPLETED
                        )
                        
                        if receive in done:
                            msg = receive.result()
                            if msg.type == aiohttp.WSMsgType.TEXT:
                                data = json.loads(msg.data)
                                if data['type'] == 'stdout':
                                    print(f"[OUT] {data['data']}", end='')
                                elif data['type'] == 'stderr':
                                    print(f"[ERR] {data['data']}", end='')
                            receive = asyncio.create_task(ws.receive())
            
            # 5. Stop and delete container
            async with session.post(
                f"{self.base_url}/containers/{cuid}/stop"
            ) as resp:
                print("\nContainer stopped")
            
            async with session.delete(
                f"{self.base_url}/containers/{cuid}"
            ) as resp:
                print("Container deleted")
            
            return cuid

# Usage
async def main():
    orchestrator = ContainerOrchestrator(
        'http://localhost:8080',
        'admin',
        'password'
    )
    
    code = """
import os
import time

print(f"Hello from container {os.environ.get('CONTAINER_UID')}")
print(f"Environment: {os.environ.get('APP_ENV', 'development')}")

for i in range(5):
    print(f"Iteration {i+1}")
    time.sleep(1)

print("Done!")
"""
    
    await orchestrator.deploy_and_run(
        image='python:3.12-slim',
        code=code,
        env={'APP_ENV': 'production'}
    )

asyncio.run(main())

Support

For issues or questions about the API, check:

  1. The logs/actions.jsonl file for detailed request/response logs
  2. Docker Compose logs: docker compose logs
  3. Container-specific logs: docker compose logs <cuid>

License

This API is designed for container orchestration and management. Ensure you comply with Docker's licensing terms and your organization's security policies when deploying.