Container Management API Integration Guide
Overview
This API provides comprehensive container management capabilities through REST endpoints, WebSocket connections, and file upload/download interfaces. All containers are managed via Docker Compose v2 CLI.
Quick Start
1. Prerequisites
# Required software
- Python 3.12+
- Docker with Docker Compose v2
- Linux environment
# Python dependencies
pip install aiohttp aiofiles python-dotenv ruamel.yaml
2. Configuration
Create a .env
file:
APP_USER=admin
APP_PASS=your_secure_password_here
DEFAULT_USER_UID=1000:1000
3. Run the Server
python container_manager.py
# Server starts at http://0.0.0.0:8080
Authentication
All API endpoints require Basic Authentication.
Authorization: Basic base64(username:password)
Example:
import base64
username = "admin"
password = "your_secure_password_here"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}
REST API Integration
Container Lifecycle
Create Container
import requests
import json
# Create container with Python example
def create_container(base_url, auth_headers):
payload = {
"image": "python:3.12-slim",
"env": {
"APP_ENV": "production",
"DEBUG": "false"
},
"tags": ["web", "production"],
"resources": {
"cpus": 1.0,
"memory": "4096m",
"pids": 2048
},
"ports": [
{
"host": 8080,
"container": 80,
"protocol": "tcp"
}
]
}
response = requests.post(
f"{base_url}/containers",
json=payload,
headers=auth_headers
)
if response.status_code == 201:
container = response.json()
print(f"Created container: {container['cuid']}")
return container['cuid']
else:
print(f"Error: {response.json()}")
return None
# Usage
base_url = "http://localhost:8080"
auth = base64.b64encode(b"admin:password").decode()
headers = {"Authorization": f"Basic {auth}"}
cuid = create_container(base_url, headers)
List Containers with Pagination
// Node.js example with axios
const axios = require('axios');
async function listContainers(baseUrl, auth, status = null, cursor = null) {
const params = {};
if (status) params.status = status.join(',');
if (cursor) params.cursor = cursor;
const response = await axios.get(`${baseUrl}/containers`, {
params,
headers: {
'Authorization': `Basic ${auth}`
}
});
const data = response.data;
console.log(`Found ${data.containers.length} containers`);
// Handle pagination
if (data.next_cursor) {
console.log('More results available, cursor:', data.next_cursor);
// Fetch next page
await listContainers(baseUrl, auth, status, data.next_cursor);
}
return data.containers;
}
// Usage
const auth = Buffer.from('admin:password').toString('base64');
listContainers('http://localhost:8080', auth, ['running', 'paused']);
Update Container Configuration
// Go example
package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
)
type UpdateRequest struct {
Env map[string]interface{} `json:"env,omitempty"`
Tags []string `json:"tags,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
Image string `json:"image,omitempty"`
}
func updateContainer(baseURL, cuid, username, password string) error {
update := UpdateRequest{
Env: map[string]interface{}{
"NEW_VAR": "value",
"OLD_VAR": nil, // Remove this env var
},
Tags: []string{"updated", "v2"},
Resources: map[string]interface{}{
"memory": "8192m",
},
}
jsonData, _ := json.Marshal(update)
req, err := http.NewRequest("PATCH",
fmt.Sprintf("%s/containers/%s", baseURL, cuid),
bytes.NewBuffer(jsonData))
if err != nil {
return err
}
auth := base64.StdEncoding.EncodeToString(
[]byte(fmt.Sprintf("%s:%s", username, password)))
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("update failed with status: %d", resp.StatusCode)
}
return nil
}
Container Control Operations
# Ruby example
require 'net/http'
require 'json'
require 'base64'
class ContainerManager
def initialize(base_url, username, password)
@base_url = base_url
@auth = Base64.encode64("#{username}:#{password}").strip
end
def control_action(cuid, action)
uri = URI("#{@base_url}/containers/#{cuid}/#{action}")
http = Net::HTTP.new(uri.host, uri.port)
request = Net::HTTP::Post.new(uri)
request['Authorization'] = "Basic #{@auth}"
request['Content-Type'] = 'application/json'
response = http.request(request)
JSON.parse(response.body)
end
def start(cuid)
control_action(cuid, 'start')
end
def stop(cuid)
control_action(cuid, 'stop')
end
def restart(cuid)
control_action(cuid, 'restart')
end
def pause(cuid)
control_action(cuid, 'pause')
end
def unpause(cuid)
control_action(cuid, 'unpause')
end
end
# Usage
manager = ContainerManager.new('http://localhost:8080', 'admin', 'password')
manager.start('c123e4567-e89b-12d3-a456-426614174000')
Port Management
# Update container ports
def update_ports(base_url, cuid, auth_headers):
new_ports = {
"ports": [
{"host": 3000, "container": 3000, "protocol": "tcp"},
{"host": 9000, "container": 9000, "protocol": "udp"}
]
}
response = requests.patch(
f"{base_url}/containers/{cuid}/ports",
json=new_ports,
headers=auth_headers
)
return response.status_code == 200
File Upload/Download Integration
Upload ZIP Archive
import zipfile
import io
import requests
def upload_files_to_container(base_url, cuid, files_dict, auth_headers):
"""
Upload files to container mount
files_dict: {"path/in/container": file_content_bytes}
"""
# Create ZIP in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for path, content in files_dict.items():
zf.writestr(path, content)
zip_buffer.seek(0)
# Upload ZIP
response = requests.post(
f"{base_url}/containers/{cuid}/upload-zip",
data=zip_buffer.read(),
headers={
**auth_headers,
'Content-Type': 'application/zip'
}
)
return response.status_code == 200
# Example: Upload Python application
files = {
"boot.py": b"""
import os
import time
print(f"Container {os.environ.get('CONTAINER_UID')} started")
print(f"Tags: {os.environ.get('TAGS', 'none')}")
while True:
print("Working...")
time.sleep(10)
""",
"requirements.txt": b"requests==2.31.0\nnumpy==1.24.0\n",
"data/config.json": b'{"setting": "value"}'
}
upload_files_to_container(base_url, cuid, files, headers)
Download Files
// Download single file
async function downloadFile(baseUrl, cuid, filePath, auth) {
const response = await axios.get(
`${baseUrl}/containers/${cuid}/download`,
{
params: { path: filePath },
headers: { 'Authorization': `Basic ${auth}` },
responseType: 'stream'
}
);
// Save to file
const fs = require('fs');
const writer = fs.createWriteStream('downloaded_file');
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
}
// Download directory as ZIP
async function downloadDirectory(baseUrl, cuid, dirPath, auth) {
const response = await axios.get(
`${baseUrl}/containers/${cuid}/download-zip`,
{
params: { path: dirPath },
headers: { 'Authorization': `Basic ${auth}` },
responseType: 'arraybuffer'
}
);
// Save ZIP file
fs.writeFileSync('container_backup.zip', response.data);
}
WebSocket Integration
Interactive Terminal Session
import asyncio
import websockets
import json
import base64
class ContainerTerminal:
def __init__(self, base_url, cuid, username, password):
self.ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.ws = None
async def connect(self, cols=80, rows=24):
# Connect with TTY size
headers = {"Authorization": f"Basic {self.auth}"}
uri = f"{self.ws_url}?cols={cols}&rows={rows}"
self.ws = await websockets.connect(uri, extra_headers=headers)
# Start receiver task
asyncio.create_task(self._receive_output())
async def _receive_output(self):
"""Receive and print output from container"""
async for message in self.ws:
data = json.loads(message)
if data['type'] == 'stdout':
if data['encoding'] == 'base64':
import base64
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
print(text, end='')
elif data['type'] == 'stderr':
if data['encoding'] == 'base64':
import base64
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
print(f"[ERROR] {text}", end='')
elif data['type'] == 'exit':
print(f"\n[Process exited with code {data['code']}]")
await self.ws.close()
elif data['type'] == 'error':
print(f"[ERROR] {data['error']}")
async def send_input(self, text):
"""Send input to container"""
await self.ws.send(json.dumps({
"type": "stdin",
"data": text,
"encoding": "utf8"
}))
async def resize(self, cols, rows):
"""Resize terminal"""
await self.ws.send(json.dumps({
"type": "resize",
"cols": cols,
"rows": rows
}))
async def send_signal(self, signal_name):
"""Send signal (INT, TERM, KILL)"""
await self.ws.send(json.dumps({
"type": "signal",
"name": signal_name
}))
async def run_interactive(self):
"""Run interactive terminal"""
import aioconsole
await self.connect()
print("Connected to container. Type 'exit' to quit.")
print("Special commands: !INT (Ctrl+C), !TERM (terminate), !KILL (force kill)")
while self.ws and not self.ws.closed:
try:
line = await aioconsole.ainput()
if line == 'exit':
await self.ws.send(json.dumps({"type": "close"}))
break
elif line == '!INT':
await self.send_signal('INT')
elif line == '!TERM':
await self.send_signal('TERM')
elif line == '!KILL':
await self.send_signal('KILL')
else:
await self.send_input(line + '\n')
except Exception as e:
print(f"Error: {e}")
break
# Usage
async def main():
terminal = ContainerTerminal(
'localhost:8080',
'c123e4567-e89b-12d3-a456-426614174000',
'admin',
'password'
)
await terminal.run_interactive()
asyncio.run(main())
Non-Interactive Command Execution
const WebSocket = require('ws');
class ContainerExecutor {
constructor(baseUrl, cuid, username, password) {
this.wsUrl = `ws://${baseUrl.replace('http://', '')}/ws/${cuid}`;
this.auth = Buffer.from(`${username}:${password}`).toString('base64');
}
async execute(command) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(this.wsUrl, {
headers: {
'Authorization': `Basic ${this.auth}`
}
});
let output = '';
let errorOutput = '';
ws.on('open', () => {
// Send command
ws.send(JSON.stringify({
type: 'stdin',
data: command + '\n',
encoding: 'utf8'
}));
// Send EOF/exit after command
setTimeout(() => {
ws.send(JSON.stringify({
type: 'stdin',
data: 'exit\n',
encoding: 'utf8'
}));
}, 100);
});
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'stdout') {
output += msg.encoding === 'base64'
? Buffer.from(msg.data, 'base64').toString()
: msg.data;
} else if (msg.type === 'stderr') {
errorOutput += msg.encoding === 'base64'
? Buffer.from(msg.data, 'base64').toString()
: msg.data;
} else if (msg.type === 'exit') {
ws.close();
resolve({
exitCode: msg.code,
stdout: output,
stderr: errorOutput
});
} else if (msg.type === 'error') {
ws.close();
reject(new Error(msg.error.message));
}
});
ws.on('error', reject);
});
}
}
// Usage
const executor = new ContainerExecutor(
'localhost:8080',
'c123e4567-e89b-12d3-a456-426614174000',
'admin',
'password'
);
executor.execute('ls -la /app')
.then(result => {
console.log('Exit code:', result.exitCode);
console.log('Output:', result.stdout);
if (result.stderr) {
console.error('Errors:', result.stderr);
}
})
.catch(err => console.error('Failed:', err));
Error Handling
All API errors follow a consistent JSON schema:
{
"error": "validation_error",
"code": "MISSING_IMAGE",
"message": "Image is required",
"status": 400,
"request_id": "123e4567-e89b-12d3-a456-426614174000",
"timestamp": "2024-01-20T15:30:00+01:00",
"details": {
"field": "image",
"provided": null
},
"stacktrace": "..." // Only in development mode
}
Error Types
Error ID | Description | HTTP Status |
---|---|---|
auth_error |
Authentication failed | 401 |
validation_error |
Invalid request data | 400 |
not_found |
Resource not found | 404 |
conflict |
Resource conflict | 409 |
compose_error |
Docker Compose operation failed | 500 |
io_error |
File I/O operation failed | 500 |
timeout |
Operation timed out | 408 |
state_error |
Invalid state transition | 422 |
Error Handling Example
def safe_api_call(func):
"""Decorator for safe API calls with retry logic"""
def wrapper(*args, **kwargs):
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
response = func(*args, **kwargs)
if response.status_code >= 500:
# Server error, retry
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continue
if response.status_code >= 400:
# Client error, don't retry
error = response.json()
print(f"API Error: {error['message']} (ID: {error['request_id']})")
return None
return response.json()
except requests.exceptions.RequestException as e:
print(f"Network error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
raise
return None
return wrapper
@safe_api_call
def get_container_status(base_url, cuid, headers):
return requests.get(f"{base_url}/containers/{cuid}/status", headers=headers)
Best Practices
1. Connection Pooling
# Use session for connection pooling
import requests
class APIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.session = requests.Session()
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.session.headers.update({"Authorization": f"Basic {auth}"})
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
def create_container(self, config):
return self.session.post(f"{self.base_url}/containers", json=config)
# Usage
with APIClient('http://localhost:8080', 'admin', 'password') as client:
response = client.create_container({"image": "nginx:alpine"})
2. Async Operations
import aiohttp
import asyncio
class AsyncAPIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.headers = {"Authorization": f"Basic {auth}"}
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def batch_create(self, configs):
tasks = []
for config in configs:
task = self.session.post(f"{self.base_url}/containers", json=config)
tasks.append(task)
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# Usage
async def main():
configs = [
{"image": "nginx:alpine"},
{"image": "redis:alpine"},
{"image": "postgres:alpine"}
]
async with AsyncAPIClient('http://localhost:8080', 'admin', 'password') as client:
containers = await client.batch_create(configs)
print(f"Created {len(containers)} containers")
asyncio.run(main())
3. Health Monitoring
import time
import threading
class HealthMonitor:
def __init__(self, base_url, auth_headers, check_interval=30):
self.base_url = base_url
self.headers = auth_headers
self.check_interval = check_interval
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.running = False
if self.thread:
self.thread.join()
def _monitor_loop(self):
while self.running:
try:
response = requests.get(
f"{self.base_url}/healthz",
headers=self.headers,
timeout=5
)
if response.status_code == 200:
data = response.json()
print(f"API healthy: Compose {data['compose_version']}")
else:
print(f"API unhealthy: Status {response.status_code}")
except Exception as e:
print(f"Health check failed: {e}")
time.sleep(self.check_interval)
4. Container Logs Streaming
async def stream_logs(base_url, cuid, auth):
"""Stream container logs in real-time"""
ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
async with websockets.connect(
ws_url,
extra_headers={"Authorization": f"Basic {auth}"}
) as ws:
# Request log streaming
await ws.send(json.dumps({
"type": "stdin",
"data": "tail -f /app/logs/app.log\n",
"encoding": "utf8"
}))
# Process incoming logs
async for message in ws:
data = json.loads(message)
if data['type'] == 'stdout':
log_line = data['data']
# Process log line (e.g., parse JSON logs, send to monitoring)
process_log_line(log_line)
Security Considerations
- Always use HTTPS in production - Deploy behind a reverse proxy (nginx/traefik) with TLS
- Rotate credentials regularly - Update
.env
file and restart service - Limit network exposure - Bind to localhost if only local access needed
- Monitor failed auth attempts - Check
logs/actions.jsonl
forauth_error
entries - Validate all inputs - The API validates paths and prevents escapes, but always sanitize on client side too
- Set resource limits - Always specify CPU/memory limits when creating containers
Troubleshooting
Common Issues
-
Container won't start
- Check if
/app/boot.py
exists in the mount - Verify the image is available locally
- Check compose logs:
docker compose logs <cuid>
- Check if
-
WebSocket connection fails
- Ensure Basic Auth is included in WebSocket headers
- Check if container is running before connecting
- Verify
/app/boot.py
exists
-
File upload fails
- Check file size (no limit by default, but system may have limits)
- Ensure ZIP format is valid
- Verify mount directory exists and has correct permissions
-
Port conflicts
- Check if host ports are already in use
- Use
docker compose ps
to verify current port mappings
Debug Mode
Enable debug logging by examining logs/actions.jsonl
:
import json
from datetime import datetime
def analyze_logs(log_file='logs/actions.jsonl'):
errors = []
slow_requests = []
with open(log_file, 'r') as f:
for line in f:
entry = json.loads(line)
# Find errors
if entry.get('error'):
errors.append(entry)
# Find slow requests (>5 seconds)
if entry.get('duration_ms', 0) > 5000:
slow_requests.append(entry)
print(f"Found {len(errors)} errors")
print(f"Found {len(slow_requests)} slow requests")
return errors, slow_requests
Complete Integration Example
Here's a full example that creates a container, uploads code, executes it, and cleans up:
import asyncio
import aiohttp
import base64
import json
import zipfile
import io
class ContainerOrchestrator:
def __init__(self, base_url, username, password):
self.base_url = base_url
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.headers = {"Authorization": f"Basic {auth}"}
async def deploy_and_run(self, image, code, env=None):
async with aiohttp.ClientSession(headers=self.headers) as session:
# 1. Create container
create_payload = {
"image": image,
"env": env or {},
"tags": ["automated"],
"resources": {
"cpus": 0.5,
"memory": "1024m"
}
}
async with session.post(
f"{self.base_url}/containers",
json=create_payload
) as resp:
if resp.status != 201:
raise Exception(f"Failed to create container: {await resp.text()}")
container = await resp.json()
cuid = container['cuid']
print(f"Created container: {cuid}")
# 2. Upload code
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
zf.writestr('boot.py', code)
async with session.post(
f"{self.base_url}/containers/{cuid}/upload-zip",
data=zip_buffer.getvalue(),
headers={**self.headers, 'Content-Type': 'application/zip'}
) as resp:
if resp.status != 200:
raise Exception(f"Failed to upload code: {await resp.text()}")
print("Code uploaded successfully")
# 3. Start container
async with session.post(
f"{self.base_url}/containers/{cuid}/start"
) as resp:
if resp.status != 200:
raise Exception(f"Failed to start container: {await resp.text()}")
print("Container started")
# 4. Connect via WebSocket to see output
ws_url = f"ws://{self.base_url.replace('http://', '')}/ws/{cuid}"
async with aiohttp.ClientSession() as ws_session:
async with ws_session.ws_connect(
ws_url,
headers=self.headers
) as ws:
print("Connected to container output:")
# Read output for 10 seconds
timeout = asyncio.create_task(asyncio.sleep(10))
receive = asyncio.create_task(ws.receive())
while not timeout.done():
done, pending = await asyncio.wait(
{timeout, receive},
return_when=asyncio.FIRST_COMPLETED
)
if receive in done:
msg = receive.result()
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'stdout':
print(f"[OUT] {data['data']}", end='')
elif data['type'] == 'stderr':
print(f"[ERR] {data['data']}", end='')
receive = asyncio.create_task(ws.receive())
# 5. Stop and delete container
async with session.post(
f"{self.base_url}/containers/{cuid}/stop"
) as resp:
print("\nContainer stopped")
async with session.delete(
f"{self.base_url}/containers/{cuid}"
) as resp:
print("Container deleted")
return cuid
# Usage
async def main():
orchestrator = ContainerOrchestrator(
'http://localhost:8080',
'admin',
'password'
)
code = """
import os
import time
print(f"Hello from container {os.environ.get('CONTAINER_UID')}")
print(f"Environment: {os.environ.get('APP_ENV', 'development')}")
for i in range(5):
print(f"Iteration {i+1}")
time.sleep(1)
print("Done!")
"""
await orchestrator.deploy_and_run(
image='python:3.12-slim',
code=code,
env={'APP_ENV': 'production'}
)
asyncio.run(main())
Support
For issues or questions about the API, check:
- The
logs/actions.jsonl
file for detailed request/response logs - Docker Compose logs:
docker compose logs
- Container-specific logs:
docker compose logs <cuid>
License
This API is designed for container orchestration and management. Ensure you comply with Docker's licensing terms and your organization's security policies when deploying.
app.py | |
CLIENT_MANUAL.md | |
container_client.py | |
example_usage.py | |
README.md | |
requirements.txt |