# Container Management API Integration Guide
## Overview
This API provides comprehensive container management capabilities through REST endpoints, WebSocket connections, and file upload/download interfaces. All containers are managed via Docker Compose v2 CLI.
## Quick Start
### 1. Prerequisites
```bash
# Required software
- Python 3.12+
- Docker with Docker Compose v2
- Linux environment
# Python dependencies
pip install aiohttp aiofiles python-dotenv ruamel.yaml
```
### 2. Configuration
Create a `.env` file:
```env
APP_USER=admin
APP_PASS=your_secure_password_here
DEFAULT_USER_UID=1000:1000
```
### 3. Run the Server
```bash
python container_manager.py
# Server starts at http://0.0.0.0:8080
```
## Authentication
All API endpoints require Basic Authentication.
```http
Authorization: Basic base64(username:password)
```
Example:
```python
import base64
username = "admin"
password = "your_secure_password_here"
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {auth}"}
```
## REST API Integration
### Container Lifecycle
#### Create Container
```python
import requests
import json
# Create container with Python example
def create_container(base_url, auth_headers):
payload = {
"image": "python:3.12-slim",
"env": {
"APP_ENV": "production",
"DEBUG": "false"
},
"tags": ["web", "production"],
"resources": {
"cpus": 1.0,
"memory": "4096m",
"pids": 2048
},
"ports": [
{
"host": 8080,
"container": 80,
"protocol": "tcp"
}
]
}
response = requests.post(
f"{base_url}/containers",
json=payload,
headers=auth_headers
)
if response.status_code == 201:
container = response.json()
print(f"Created container: {container['cuid']}")
return container['cuid']
else:
print(f"Error: {response.json()}")
return None
# Usage
base_url = "http://localhost:8080"
auth = base64.b64encode(b"admin:password").decode()
headers = {"Authorization": f"Basic {auth}"}
cuid = create_container(base_url, headers)
```
#### List Containers with Pagination
```javascript
// Node.js example with axios
const axios = require('axios');
async function listContainers(baseUrl, auth, status = null, cursor = null) {
const params = {};
if (status) params.status = status.join(',');
if (cursor) params.cursor = cursor;
const response = await axios.get(`${baseUrl}/containers`, {
params,
headers: {
'Authorization': `Basic ${auth}`
}
});
const data = response.data;
console.log(`Found ${data.containers.length} containers`);
// Handle pagination
if (data.next_cursor) {
console.log('More results available, cursor:', data.next_cursor);
// Fetch next page
await listContainers(baseUrl, auth, status, data.next_cursor);
}
return data.containers;
}
// Usage
const auth = Buffer.from('admin:password').toString('base64');
listContainers('http://localhost:8080', auth, ['running', 'paused']);
```
#### Update Container Configuration
```go
// Go example
package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
)
type UpdateRequest struct {
Env map[string]interface{} `json:"env,omitempty"`
Tags []string `json:"tags,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
Image string `json:"image,omitempty"`
}
func updateContainer(baseURL, cuid, username, password string) error {
update := UpdateRequest{
Env: map[string]interface{}{
"NEW_VAR": "value",
"OLD_VAR": nil, // Remove this env var
},
Tags: []string{"updated", "v2"},
Resources: map[string]interface{}{
"memory": "8192m",
},
}
jsonData, _ := json.Marshal(update)
req, err := http.NewRequest("PATCH",
fmt.Sprintf("%s/containers/%s", baseURL, cuid),
bytes.NewBuffer(jsonData))
if err != nil {
return err
}
auth := base64.StdEncoding.EncodeToString(
[]byte(fmt.Sprintf("%s:%s", username, password)))
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", auth))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("update failed with status: %d", resp.StatusCode)
}
return nil
}
```
### Container Control Operations
```ruby
# Ruby example
require 'net/http'
require 'json'
require 'base64'
class ContainerManager
def initialize(base_url, username, password)
@base_url = base_url
@auth = Base64.encode64("#{username}:#{password}").strip
end
def control_action(cuid, action)
uri = URI("#{@base_url}/containers/#{cuid}/#{action}")
http = Net::HTTP.new(uri.host, uri.port)
request = Net::HTTP::Post.new(uri)
request['Authorization'] = "Basic #{@auth}"
request['Content-Type'] = 'application/json'
response = http.request(request)
JSON.parse(response.body)
end
def start(cuid)
control_action(cuid, 'start')
end
def stop(cuid)
control_action(cuid, 'stop')
end
def restart(cuid)
control_action(cuid, 'restart')
end
def pause(cuid)
control_action(cuid, 'pause')
end
def unpause(cuid)
control_action(cuid, 'unpause')
end
end
# Usage
manager = ContainerManager.new('http://localhost:8080', 'admin', 'password')
manager.start('c123e4567-e89b-12d3-a456-426614174000')
```
### Port Management
```python
# Update container ports
def update_ports(base_url, cuid, auth_headers):
new_ports = {
"ports": [
{"host": 3000, "container": 3000, "protocol": "tcp"},
{"host": 9000, "container": 9000, "protocol": "udp"}
]
}
response = requests.patch(
f"{base_url}/containers/{cuid}/ports",
json=new_ports,
headers=auth_headers
)
return response.status_code == 200
```
## File Upload/Download Integration
### Upload ZIP Archive
```python
import zipfile
import io
import requests
def upload_files_to_container(base_url, cuid, files_dict, auth_headers):
"""
Upload files to container mount
files_dict: {"path/in/container": file_content_bytes}
"""
# Create ZIP in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for path, content in files_dict.items():
zf.writestr(path, content)
zip_buffer.seek(0)
# Upload ZIP
response = requests.post(
f"{base_url}/containers/{cuid}/upload-zip",
data=zip_buffer.read(),
headers={
**auth_headers,
'Content-Type': 'application/zip'
}
)
return response.status_code == 200
# Example: Upload Python application
files = {
"boot.py": b"""
import os
import time
print(f"Container {os.environ.get('CONTAINER_UID')} started")
print(f"Tags: {os.environ.get('TAGS', 'none')}")
while True:
print("Working...")
time.sleep(10)
""",
"requirements.txt": b"requests==2.31.0\nnumpy==1.24.0\n",
"data/config.json": b'{"setting": "value"}'
}
upload_files_to_container(base_url, cuid, files, headers)
```
### Download Files
```javascript
// Download single file
async function downloadFile(baseUrl, cuid, filePath, auth) {
const response = await axios.get(
`${baseUrl}/containers/${cuid}/download`,
{
params: { path: filePath },
headers: { 'Authorization': `Basic ${auth}` },
responseType: 'stream'
}
);
// Save to file
const fs = require('fs');
const writer = fs.createWriteStream('downloaded_file');
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
}
// Download directory as ZIP
async function downloadDirectory(baseUrl, cuid, dirPath, auth) {
const response = await axios.get(
`${baseUrl}/containers/${cuid}/download-zip`,
{
params: { path: dirPath },
headers: { 'Authorization': `Basic ${auth}` },
responseType: 'arraybuffer'
}
);
// Save ZIP file
fs.writeFileSync('container_backup.zip', response.data);
}
```
## WebSocket Integration
### Interactive Terminal Session
```python
import asyncio
import websockets
import json
import base64
class ContainerTerminal:
def __init__(self, base_url, cuid, username, password):
self.ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.ws = None
async def connect(self, cols=80, rows=24):
# Connect with TTY size
headers = {"Authorization": f"Basic {self.auth}"}
uri = f"{self.ws_url}?cols={cols}&rows={rows}"
self.ws = await websockets.connect(uri, extra_headers=headers)
# Start receiver task
asyncio.create_task(self._receive_output())
async def _receive_output(self):
"""Receive and print output from container"""
async for message in self.ws:
data = json.loads(message)
if data['type'] == 'stdout':
if data['encoding'] == 'base64':
import base64
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
print(text, end='')
elif data['type'] == 'stderr':
if data['encoding'] == 'base64':
import base64
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
print(f"[ERROR] {text}", end='')
elif data['type'] == 'exit':
print(f"\n[Process exited with code {data['code']}]")
await self.ws.close()
elif data['type'] == 'error':
print(f"[ERROR] {data['error']}")
async def send_input(self, text):
"""Send input to container"""
await self.ws.send(json.dumps({
"type": "stdin",
"data": text,
"encoding": "utf8"
}))
async def resize(self, cols, rows):
"""Resize terminal"""
await self.ws.send(json.dumps({
"type": "resize",
"cols": cols,
"rows": rows
}))
async def send_signal(self, signal_name):
"""Send signal (INT, TERM, KILL)"""
await self.ws.send(json.dumps({
"type": "signal",
"name": signal_name
}))
async def run_interactive(self):
"""Run interactive terminal"""
import aioconsole
await self.connect()
print("Connected to container. Type 'exit' to quit.")
print("Special commands: !INT (Ctrl+C), !TERM (terminate), !KILL (force kill)")
while self.ws and not self.ws.closed:
try:
line = await aioconsole.ainput()
if line == 'exit':
await self.ws.send(json.dumps({"type": "close"}))
break
elif line == '!INT':
await self.send_signal('INT')
elif line == '!TERM':
await self.send_signal('TERM')
elif line == '!KILL':
await self.send_signal('KILL')
else:
await self.send_input(line + '\n')
except Exception as e:
print(f"Error: {e}")
break
# Usage
async def main():
terminal = ContainerTerminal(
'localhost:8080',
'c123e4567-e89b-12d3-a456-426614174000',
'admin',
'password'
)
await terminal.run_interactive()
asyncio.run(main())
```
### Non-Interactive Command Execution
```javascript
const WebSocket = require('ws');
class ContainerExecutor {
constructor(baseUrl, cuid, username, password) {
this.wsUrl = `ws://${baseUrl.replace('http://', '')}/ws/${cuid}`;
this.auth = Buffer.from(`${username}:${password}`).toString('base64');
}
async execute(command) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(this.wsUrl, {
headers: {
'Authorization': `Basic ${this.auth}`
}
});
let output = '';
let errorOutput = '';
ws.on('open', () => {
// Send command
ws.send(JSON.stringify({
type: 'stdin',
data: command + '\n',
encoding: 'utf8'
}));
// Send EOF/exit after command
setTimeout(() => {
ws.send(JSON.stringify({
type: 'stdin',
data: 'exit\n',
encoding: 'utf8'
}));
}, 100);
});
ws.on('message', (data) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'stdout') {
output += msg.encoding === 'base64'
? Buffer.from(msg.data, 'base64').toString()
: msg.data;
} else if (msg.type === 'stderr') {
errorOutput += msg.encoding === 'base64'
? Buffer.from(msg.data, 'base64').toString()
: msg.data;
} else if (msg.type === 'exit') {
ws.close();
resolve({
exitCode: msg.code,
stdout: output,
stderr: errorOutput
});
} else if (msg.type === 'error') {
ws.close();
reject(new Error(msg.error.message));
}
});
ws.on('error', reject);
});
}
}
// Usage
const executor = new ContainerExecutor(
'localhost:8080',
'c123e4567-e89b-12d3-a456-426614174000',
'admin',
'password'
);
executor.execute('ls -la /app')
.then(result => {
console.log('Exit code:', result.exitCode);
console.log('Output:', result.stdout);
if (result.stderr) {
console.error('Errors:', result.stderr);
}
})
.catch(err => console.error('Failed:', err));
```
## Error Handling
All API errors follow a consistent JSON schema:
```json
{
"error": "validation_error",
"code": "MISSING_IMAGE",
"message": "Image is required",
"status": 400,
"request_id": "123e4567-e89b-12d3-a456-426614174000",
"timestamp": "2024-01-20T15:30:00+01:00",
"details": {
"field": "image",
"provided": null
},
"stacktrace": "..." // Only in development mode
}
```
### Error Types
| Error ID | Description | HTTP Status |
|----------|-------------|-------------|
| `auth_error` | Authentication failed | 401 |
| `validation_error` | Invalid request data | 400 |
| `not_found` | Resource not found | 404 |
| `conflict` | Resource conflict | 409 |
| `compose_error` | Docker Compose operation failed | 500 |
| `io_error` | File I/O operation failed | 500 |
| `timeout` | Operation timed out | 408 |
| `state_error` | Invalid state transition | 422 |
### Error Handling Example
```python
def safe_api_call(func):
"""Decorator for safe API calls with retry logic"""
def wrapper(*args, **kwargs):
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
try:
response = func(*args, **kwargs)
if response.status_code >= 500:
# Server error, retry
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
continue
if response.status_code >= 400:
# Client error, don't retry
error = response.json()
print(f"API Error: {error['message']} (ID: {error['request_id']})")
return None
return response.json()
except requests.exceptions.RequestException as e:
print(f"Network error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
raise
return None
return wrapper
@safe_api_call
def get_container_status(base_url, cuid, headers):
return requests.get(f"{base_url}/containers/{cuid}/status", headers=headers)
```
## Best Practices
### 1. Connection Pooling
```python
# Use session for connection pooling
import requests
class APIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
self.session = requests.Session()
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.session.headers.update({"Authorization": f"Basic {auth}"})
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
def create_container(self, config):
return self.session.post(f"{self.base_url}/containers", json=config)
# Usage
with APIClient('http://localhost:8080', 'admin', 'password') as client:
response = client.create_container({"image": "nginx:alpine"})
```
### 2. Async Operations
```python
import aiohttp
import asyncio
class AsyncAPIClient:
def __init__(self, base_url, username, password):
self.base_url = base_url
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.headers = {"Authorization": f"Basic {auth}"}
async def __aenter__(self):
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def batch_create(self, configs):
tasks = []
for config in configs:
task = self.session.post(f"{self.base_url}/containers", json=config)
tasks.append(task)
responses = await asyncio.gather(*tasks)
return [await r.json() for r in responses]
# Usage
async def main():
configs = [
{"image": "nginx:alpine"},
{"image": "redis:alpine"},
{"image": "postgres:alpine"}
]
async with AsyncAPIClient('http://localhost:8080', 'admin', 'password') as client:
containers = await client.batch_create(configs)
print(f"Created {len(containers)} containers")
asyncio.run(main())
```
### 3. Health Monitoring
```python
import time
import threading
class HealthMonitor:
def __init__(self, base_url, auth_headers, check_interval=30):
self.base_url = base_url
self.headers = auth_headers
self.check_interval = check_interval
self.running = False
self.thread = None
def start(self):
self.running = True
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.running = False
if self.thread:
self.thread.join()
def _monitor_loop(self):
while self.running:
try:
response = requests.get(
f"{self.base_url}/healthz",
headers=self.headers,
timeout=5
)
if response.status_code == 200:
data = response.json()
print(f"API healthy: Compose {data['compose_version']}")
else:
print(f"API unhealthy: Status {response.status_code}")
except Exception as e:
print(f"Health check failed: {e}")
time.sleep(self.check_interval)
```
### 4. Container Logs Streaming
```python
async def stream_logs(base_url, cuid, auth):
"""Stream container logs in real-time"""
ws_url = f"ws://{base_url.replace('http://', '')}/ws/{cuid}"
async with websockets.connect(
ws_url,
extra_headers={"Authorization": f"Basic {auth}"}
) as ws:
# Request log streaming
await ws.send(json.dumps({
"type": "stdin",
"data": "tail -f /app/logs/app.log\n",
"encoding": "utf8"
}))
# Process incoming logs
async for message in ws:
data = json.loads(message)
if data['type'] == 'stdout':
log_line = data['data']
# Process log line (e.g., parse JSON logs, send to monitoring)
process_log_line(log_line)
```
## Security Considerations
1. **Always use HTTPS in production** - Deploy behind a reverse proxy (nginx/traefik) with TLS
2. **Rotate credentials regularly** - Update `.env` file and restart service
3. **Limit network exposure** - Bind to localhost if only local access needed
4. **Monitor failed auth attempts** - Check `logs/actions.jsonl` for `auth_error` entries
5. **Validate all inputs** - The API validates paths and prevents escapes, but always sanitize on client side too
6. **Set resource limits** - Always specify CPU/memory limits when creating containers
## Troubleshooting
### Common Issues
1. **Container won't start**
- Check if `/app/boot.py` exists in the mount
- Verify the image is available locally
- Check compose logs: `docker compose logs <cuid>`
2. **WebSocket connection fails**
- Ensure Basic Auth is included in WebSocket headers
- Check if container is running before connecting
- Verify `/app/boot.py` exists
3. **File upload fails**
- Check file size (no limit by default, but system may have limits)
- Ensure ZIP format is valid
- Verify mount directory exists and has correct permissions
4. **Port conflicts**
- Check if host ports are already in use
- Use `docker compose ps` to verify current port mappings
### Debug Mode
Enable debug logging by examining `logs/actions.jsonl`:
```python
import json
from datetime import datetime
def analyze_logs(log_file='logs/actions.jsonl'):
errors = []
slow_requests = []
with open(log_file, 'r') as f:
for line in f:
entry = json.loads(line)
# Find errors
if entry.get('error'):
errors.append(entry)
# Find slow requests (>5 seconds)
if entry.get('duration_ms', 0) > 5000:
slow_requests.append(entry)
print(f"Found {len(errors)} errors")
print(f"Found {len(slow_requests)} slow requests")
return errors, slow_requests
```
## Complete Integration Example
Here's a full example that creates a container, uploads code, executes it, and cleans up:
```python
import asyncio
import aiohttp
import base64
import json
import zipfile
import io
class ContainerOrchestrator:
def __init__(self, base_url, username, password):
self.base_url = base_url
auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.headers = {"Authorization": f"Basic {auth}"}
async def deploy_and_run(self, image, code, env=None):
async with aiohttp.ClientSession(headers=self.headers) as session:
# 1. Create container
create_payload = {
"image": image,
"env": env or {},
"tags": ["automated"],
"resources": {
"cpus": 0.5,
"memory": "1024m"
}
}
async with session.post(
f"{self.base_url}/containers",
json=create_payload
) as resp:
if resp.status != 201:
raise Exception(f"Failed to create container: {await resp.text()}")
container = await resp.json()
cuid = container['cuid']
print(f"Created container: {cuid}")
# 2. Upload code
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
zf.writestr('boot.py', code)
async with session.post(
f"{self.base_url}/containers/{cuid}/upload-zip",
data=zip_buffer.getvalue(),
headers={**self.headers, 'Content-Type': 'application/zip'}
) as resp:
if resp.status != 200:
raise Exception(f"Failed to upload code: {await resp.text()}")
print("Code uploaded successfully")
# 3. Start container
async with session.post(
f"{self.base_url}/containers/{cuid}/start"
) as resp:
if resp.status != 200:
raise Exception(f"Failed to start container: {await resp.text()}")
print("Container started")
# 4. Connect via WebSocket to see output
ws_url = f"ws://{self.base_url.replace('http://', '')}/ws/{cuid}"
async with aiohttp.ClientSession() as ws_session:
async with ws_session.ws_connect(
ws_url,
headers=self.headers
) as ws:
print("Connected to container output:")
# Read output for 10 seconds
timeout = asyncio.create_task(asyncio.sleep(10))
receive = asyncio.create_task(ws.receive())
while not timeout.done():
done, pending = await asyncio.wait(
{timeout, receive},
return_when=asyncio.FIRST_COMPLETED
)
if receive in done:
msg = receive.result()
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'stdout':
print(f"[OUT] {data['data']}", end='')
elif data['type'] == 'stderr':
print(f"[ERR] {data['data']}", end='')
receive = asyncio.create_task(ws.receive())
# 5. Stop and delete container
async with session.post(
f"{self.base_url}/containers/{cuid}/stop"
) as resp:
print("\nContainer stopped")
async with session.delete(
f"{self.base_url}/containers/{cuid}"
) as resp:
print("Container deleted")
return cuid
# Usage
async def main():
orchestrator = ContainerOrchestrator(
'http://localhost:8080',
'admin',
'password'
)
code = """
import os
import time
print(f"Hello from container {os.environ.get('CONTAINER_UID')}")
print(f"Environment: {os.environ.get('APP_ENV', 'development')}")
for i in range(5):
print(f"Iteration {i+1}")
time.sleep(1)
print("Done!")
"""
await orchestrator.deploy_and_run(
image='python:3.12-slim',
code=code,
env={'APP_ENV': 'production'}
)
asyncio.run(main())
```
## Support
For issues or questions about the API, check:
1. The `logs/actions.jsonl` file for detailed request/response logs
2. Docker Compose logs: `docker compose logs`
3. Container-specific logs: `docker compose logs <cuid>`
## License
This API is designed for container orchestration and management. Ensure you comply with Docker's licensing terms and your organization's security policies when deploying.