#!/usr/bin/env python3
"""
Container Management API Client
Async client for container management with full WebSocket terminal support
"""
import asyncio
import base64
import io
import json
import os
import sys
import termios
import tty
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, Union
import aiohttp
from aiohttp import ClientSession, ClientWebSocketResponse
@dataclass
class ContainerConfig:
"""Container configuration"""
image: str
env: Optional[Dict[str, str]] = None
tags: Optional[List[str]] = None
resources: Optional[Dict[str, Any]] = None
ports: Optional[List[Dict[str, Any]]] = None
@dataclass
class ContainerInfo:
"""Container information"""
cuid: str
image: str
env: Dict[str, str]
tags: List[str]
resources: Dict[str, Any]
ports: List[Dict[str, Any]]
status: str
created_at: Optional[str] = None
started_at: Optional[str] = None
@dataclass
class ContainerStatus:
"""Container status"""
cuid: str
status: str
created_at: str
uptime: str
restarts: int
last_error: Optional[Dict[str, Any]] = None
@dataclass
class HealthStatus:
"""API health status"""
status: str
compose_version: str
uptime_s: int
class ContainerTerminal:
"""Interactive terminal session for a container"""
def __init__(self, ws: ClientWebSocketResponse, cuid: str):
self.ws = ws
self.cuid = cuid
self.running = False
self.old_tty = None
self.reader_task = None
self.input_task = None
async def start_interactive(self) -> None:
"""Start interactive terminal session"""
self.running = True
# Save terminal settings and set raw mode
if sys.stdin.isatty():
self.old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# Get terminal size
rows, cols = self._get_terminal_size()
# Send initial resize
await self.resize(cols, rows)
# Start reader and input tasks
self.reader_task = asyncio.create_task(self._read_output())
self.input_task = asyncio.create_task(self._read_input())
# Handle terminal resize
import signal
signal.signal(signal.SIGWINCH, self._handle_resize)
print(f"\r\n[Connected to container {self.cuid}]\r\n")
print("[Press Ctrl+] to exit, Ctrl+C to send interrupt]\r\n")
async def stop(self) -> None:
"""Stop interactive session"""
self.running = False
# Cancel tasks
if self.reader_task:
self.reader_task.cancel()
if self.input_task:
self.input_task.cancel()
# Restore terminal settings
if self.old_tty:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_tty)
# Close WebSocket
await self.ws.send_json({"type": "close"})
await self.ws.close()
print("\r\n[Disconnected]\r\n")
def _get_terminal_size(self) -> Tuple[int, int]:
"""Get terminal size"""
import struct
import fcntl
try:
size = fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ, ' ')
rows, cols = struct.unpack('hh', size)
return rows, cols
except:
return 24, 80
def _handle_resize(self, signum, frame) -> None:
"""Handle terminal resize signal"""
if self.running:
rows, cols = self._get_terminal_size()
asyncio.create_task(self.resize(cols, rows))
async def _read_output(self) -> None:
"""Read output from WebSocket and print to terminal"""
try:
async for msg in self.ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] in ['stdout', 'stderr']:
if data['encoding'] == 'base64':
output = base64.b64decode(data['data'])
else:
output = data['data'].encode('utf-8')
sys.stdout.buffer.write(output)
sys.stdout.buffer.flush()
elif data['type'] == 'exit':
print(f"\r\n[Process exited with code {data['code']}]\r\n")
self.running = False
break
elif data['type'] == 'error':
print(f"\r\n[Error: {data['error']}]\r\n")
self.running = False
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"\r\n[WebSocket error: {msg.data}]\r\n")
self.running = False
break
except asyncio.CancelledError:
pass
except Exception as e:
print(f"\r\n[Reader error: {e}]\r\n")
self.running = False
async def _read_input(self) -> None:
"""Read input from terminal and send to WebSocket"""
try:
while self.running:
# Read input byte by byte
byte = await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.buffer.read, 1
)
if not byte:
break
# Check for exit sequence (Ctrl+])
if byte == b'\x1d': # Ctrl+]
self.running = False
break
# Send input to container
await self.ws.send_json({
"type": "stdin",
"data": byte.decode('utf-8', errors='ignore'),
"encoding": "utf8"
})
except asyncio.CancelledError:
pass
except Exception as e:
print(f"\r\n[Input error: {e}]\r\n")
self.running = False
async def send_input(self, data: str) -> None:
"""Send input to container"""
await self.ws.send_json({
"type": "stdin",
"data": data,
"encoding": "utf8"
})
async def send_input_bytes(self, data: bytes) -> None:
"""Send binary input to container"""
await self.ws.send_json({
"type": "stdin",
"data": base64.b64encode(data).decode('ascii'),
"encoding": "base64"
})
async def resize(self, cols: int, rows: int) -> None:
"""Resize terminal"""
await self.ws.send_json({
"type": "resize",
"cols": cols,
"rows": rows
})
async def send_interrupt(self) -> None:
"""Send Ctrl+C (SIGINT) to container"""
await self.ws.send_json({
"type": "signal",
"name": "INT"
})
async def send_terminate(self) -> None:
"""Send SIGTERM to container"""
await self.ws.send_json({
"type": "signal",
"name": "TERM"
})
async def send_kill(self) -> None:
"""Send SIGKILL to container"""
await self.ws.send_json({
"type": "signal",
"name": "KILL"
})
async def wait(self) -> None:
"""Wait for terminal session to complete"""
if self.reader_task:
await self.reader_task
if self.input_task:
await self.input_task
class ContainerClient:
"""Async client for Container Management API"""
def __init__(self, base_url: str, username: str, password: str):
"""
Initialize client
Args:
base_url: API base URL (e.g., "http://localhost:8080")
username: Basic auth username
password: Basic auth password
"""
self.base_url = base_url.rstrip('/')
self.ws_url = self.base_url.replace('http://', 'ws://').replace('https://', 'wss://')
# Setup authentication
auth_str = f"{username}:{password}"
auth_bytes = auth_str.encode('utf-8')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
self.auth_header = f"Basic {auth_b64}"
self.session: Optional[ClientSession] = None
async def __aenter__(self):
"""Async context manager entry"""
self.session = ClientSession(
headers={"Authorization": self.auth_header}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.session:
await self.session.close()
def _check_session(self) -> None:
"""Check if session is initialized"""
if not self.session:
raise RuntimeError("Client session not initialized. Use 'async with' or call connect()")
async def connect(self) -> None:
"""Manually connect the client session"""
if not self.session:
self.session = ClientSession(
headers={"Authorization": self.auth_header}
)
async def close(self) -> None:
"""Manually close the client session"""
if self.session:
await self.session.close()
self.session = None
# Health Check
async def health_check(self) -> HealthStatus:
"""
Check API health status
Returns:
HealthStatus object with status, compose version, and uptime
"""
self._check_session()
async with self.session.get(f"{self.base_url}/healthz") as resp:
if resp.status == 200:
data = await resp.json()
return HealthStatus(**data)
else:
error = await resp.json()
raise Exception(f"Health check failed: {error}")
# Container Management
async def create_container(
self,
image: str,
env: Optional[Dict[str, str]] = None,
tags: Optional[List[str]] = None,
resources: Optional[Dict[str, Any]] = None,
ports: Optional[List[Dict[str, Any]]] = None
) -> ContainerInfo:
"""
Create a new container
Args:
image: Docker image (e.g., "python:3.12-slim")
env: Environment variables
tags: Container tags
resources: Resource limits {"cpus": 0.5, "memory": "2048m", "pids": 1024}
ports: Port mappings [{"host": 8080, "container": 80, "protocol": "tcp"}]
Returns:
ContainerInfo object with created container details
"""
self._check_session()
payload = {
"image": image,
"env": env or {},
"tags": tags or [],
"resources": resources or {},
"ports": ports or []
}
async with self.session.post(
f"{self.base_url}/containers",
json=payload
) as resp:
if resp.status == 201:
data = await resp.json()
return ContainerInfo(**data)
else:
error = await resp.json()
raise Exception(f"Failed to create container: {error}")
async def list_containers(
self,
status: Optional[List[str]] = None,
cursor: Optional[str] = None,
limit: int = 20
) -> Tuple[List[ContainerInfo], Optional[str]]:
"""
List containers with optional filtering and pagination
Args:
status: Filter by status (e.g., ["running", "paused"])
cursor: Pagination cursor from previous response
limit: Maximum number of results
Returns:
Tuple of (list of ContainerInfo objects, next cursor if more results exist)
"""
self._check_session()
params = {"limit": limit}
if status:
params["status"] = ",".join(status)
if cursor:
params["cursor"] = cursor
async with self.session.get(
f"{self.base_url}/containers",
params=params
) as resp:
if resp.status == 200:
data = await resp.json()
containers = [ContainerInfo(**c) for c in data["containers"]]
next_cursor = data.get("next_cursor")
return containers, next_cursor
else:
error = await resp.json()
raise Exception(f"Failed to list containers: {error}")
async def get_container(self, cuid: str) -> ContainerInfo:
"""
Get container details
Args:
cuid: Container UID
Returns:
ContainerInfo object
"""
self._check_session()
async with self.session.get(f"{self.base_url}/containers/{cuid}") as resp:
if resp.status == 200:
data = await resp.json()
return ContainerInfo(**data)
else:
error = await resp.json()
raise Exception(f"Failed to get container: {error}")
async def update_container(
self,
cuid: str,
env: Optional[Dict[str, Optional[str]]] = None,
tags: Optional[List[str]] = None,
resources: Optional[Dict[str, Any]] = None,
image: Optional[str] = None
) -> ContainerInfo:
"""
Update container configuration
Args:
cuid: Container UID
env: Environment variables (set to None to remove a key)
tags: New tags (replaces existing)
resources: Resource limits (merged with existing)
image: New Docker image
Returns:
Updated ContainerInfo object
"""
self._check_session()
payload = {}
if env is not None:
payload["env"] = env
if tags is not None:
payload["tags"] = tags
if resources is not None:
payload["resources"] = resources
if image is not None:
payload["image"] = image
async with self.session.patch(
f"{self.base_url}/containers/{cuid}",
json=payload
) as resp:
if resp.status == 200:
data = await resp.json()
return ContainerInfo(**data)
else:
error = await resp.json()
raise Exception(f"Failed to update container: {error}")
async def delete_container(self, cuid: str) -> None:
"""
Delete container and its mount directory
Args:
cuid: Container UID
"""
self._check_session()
async with self.session.delete(f"{self.base_url}/containers/{cuid}") as resp:
if resp.status != 204:
error = await resp.json()
raise Exception(f"Failed to delete container: {error}")
# Container Lifecycle
async def start_container(self, cuid: str) -> Dict[str, str]:
"""
Start a stopped container
Args:
cuid: Container UID
Returns:
Status response {"status": "started"}
"""
self._check_session()
async with self.session.post(f"{self.base_url}/containers/{cuid}/start") as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to start container: {error}")
async def stop_container(self, cuid: str) -> Dict[str, str]:
"""
Stop a running container
Args:
cuid: Container UID
Returns:
Status response {"status": "stopped"}
"""
self._check_session()
async with self.session.post(f"{self.base_url}/containers/{cuid}/stop") as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to stop container: {error}")
async def pause_container(self, cuid: str) -> Dict[str, str]:
"""
Pause a running container
Args:
cuid: Container UID
Returns:
Status response {"status": "paused"}
"""
self._check_session()
async with self.session.post(f"{self.base_url}/containers/{cuid}/pause") as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to pause container: {error}")
async def unpause_container(self, cuid: str) -> Dict[str, str]:
"""
Unpause a paused container
Args:
cuid: Container UID
Returns:
Status response {"status": "unpaused"}
"""
self._check_session()
async with self.session.post(f"{self.base_url}/containers/{cuid}/unpause") as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to unpause container: {error}")
async def restart_container(self, cuid: str) -> Dict[str, str]:
"""
Restart a container
Args:
cuid: Container UID
Returns:
Status response {"status": "restarted"}
"""
self._check_session()
async with self.session.post(f"{self.base_url}/containers/{cuid}/restart") as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to restart container: {error}")
# Port Management
async def update_ports(
self,
cuid: str,
ports: List[Dict[str, Any]]
) -> Dict[str, str]:
"""
Update container port mappings
Args:
cuid: Container UID
ports: New port mappings [{"host": 8080, "container": 80, "protocol": "tcp"}]
Returns:
Status response {"status": "updated"}
"""
self._check_session()
payload = {"ports": ports}
async with self.session.patch(
f"{self.base_url}/containers/{cuid}/ports",
json=payload
) as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to update ports: {error}")
# File Operations
async def upload_zip(
self,
cuid: str,
zip_data: Optional[bytes] = None,
zip_path: Optional[Union[str, Path]] = None,
files: Optional[Dict[str, bytes]] = None
) -> Dict[str, str]:
"""
Upload ZIP archive to container mount
Args:
cuid: Container UID
zip_data: Raw ZIP data (bytes)
zip_path: Path to ZIP file on disk
files: Dictionary of {path: content} to create ZIP from
Returns:
Status response {"status": "uploaded"}
Note: Provide exactly one of zip_data, zip_path, or files
"""
self._check_session()
# Determine ZIP data source
if sum([zip_data is not None, zip_path is not None, files is not None]) != 1:
raise ValueError("Provide exactly one of: zip_data, zip_path, or files")
if zip_path:
# Read ZIP from file
with open(zip_path, 'rb') as f:
zip_data = f.read()
elif files:
# Create ZIP from files dict
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for path, content in files.items():
zf.writestr(path, content)
zip_data = zip_buffer.getvalue()
# Upload ZIP
headers = {
"Authorization": self.auth_header,
"Content-Type": "application/zip"
}
async with self.session.post(
f"{self.base_url}/containers/{cuid}/upload-zip",
data=zip_data,
headers=headers
) as resp:
if resp.status == 200:
return await resp.json()
else:
error = await resp.json()
raise Exception(f"Failed to upload ZIP: {error}")
async def download_file(
self,
cuid: str,
path: str,
save_to: Optional[Union[str, Path]] = None
) -> bytes:
"""
Download a single file from container mount
Args:
cuid: Container UID
path: Relative path to file in container mount
save_to: Optional path to save file locally
Returns:
File content as bytes
"""
self._check_session()
params = {"path": path}
async with self.session.get(
f"{self.base_url}/containers/{cuid}/download",
params=params
) as resp:
if resp.status == 200:
data = await resp.read()
if save_to:
save_path = Path(save_to)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(data)
return data
else:
error = await resp.json()
raise Exception(f"Failed to download file: {error}")
async def download_zip(
self,
cuid: str,
path: str = "",
save_to: Optional[Union[str, Path]] = None,
extract_to: Optional[Union[str, Path]] = None
) -> bytes:
"""
Download directory as ZIP archive
Args:
cuid: Container UID
path: Relative path to directory (empty for root)
save_to: Optional path to save ZIP file
extract_to: Optional path to extract ZIP contents
Returns:
ZIP data as bytes
"""
self._check_session()
params = {"path": path} if path else {}
async with self.session.get(
f"{self.base_url}/containers/{cuid}/download-zip",
params=params
) as resp:
if resp.status == 200:
data = await resp.read()
if save_to:
save_path = Path(save_to)
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(data)
if extract_to:
extract_path = Path(extract_to)
extract_path.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(io.BytesIO(data), 'r') as zf:
zf.extractall(extract_path)
return data
else:
error = await resp.json()
raise Exception(f"Failed to download ZIP: {error}")
# Container Status
async def get_status(self, cuid: str) -> ContainerStatus:
"""
Get detailed container status
Args:
cuid: Container UID
Returns:
ContainerStatus object with status details
"""
self._check_session()
async with self.session.get(f"{self.base_url}/containers/{cuid}/status") as resp:
if resp.status == 200:
data = await resp.json()
return ContainerStatus(**data)
else:
error = await resp.json()
raise Exception(f"Failed to get status: {error}")
# WebSocket Terminal
async def create_terminal(
self,
cuid: str,
cols: int = 80,
rows: int = 24
) -> ContainerTerminal:
"""
Create WebSocket terminal connection to container
Args:
cuid: Container UID
cols: Terminal columns
rows: Terminal rows
Returns:
ContainerTerminal object for interactive use
"""
self._check_session()
url = f"{self.ws_url}/ws/{cuid}?cols={cols}&rows={rows}"
headers = {"Authorization": self.auth_header}
ws = await self.session.ws_connect(url, headers=headers)
return ContainerTerminal(ws, cuid)
async def enter_container(self, cuid: str) -> None:
"""
Enter interactive terminal session for a container
(Blocks until session is terminated with Ctrl+])
Args:
cuid: Container UID
"""
terminal = await self.create_terminal(cuid)
await terminal.start_interactive()
try:
await terminal.wait()
finally:
await terminal.stop()
async def execute_command(
self,
cuid: str,
command: str,
timeout: float = 30.0
) -> Tuple[str, str, int]:
"""
Execute a command in container and return output
Args:
cuid: Container UID
command: Command to execute
timeout: Execution timeout in seconds
Returns:
Tuple of (stdout, stderr, exit_code)
"""
self._check_session()
url = f"{self.ws_url}/ws/{cuid}"
headers = {"Authorization": self.auth_header}
stdout_data = []
stderr_data = []
exit_code = -1
async with self.session.ws_connect(url, headers=headers) as ws:
# Send command
await ws.send_json({
"type": "stdin",
"data": command + "\n",
"encoding": "utf8"
})
# Send exit command after a short delay
await asyncio.sleep(0.1)
await ws.send_json({
"type": "stdin",
"data": "exit\n",
"encoding": "utf8"
})
# Read output with timeout
try:
async with asyncio.timeout(timeout):
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'stdout':
if data['encoding'] == 'base64':
stdout_data.append(base64.b64decode(data['data']).decode('utf-8', errors='replace'))
else:
stdout_data.append(data['data'])
elif data['type'] == 'stderr':
if data['encoding'] == 'base64':
stderr_data.append(base64.b64decode(data['data']).decode('utf-8', errors='replace'))
else:
stderr_data.append(data['data'])
elif data['type'] == 'exit':
exit_code = data['code']
break
except asyncio.TimeoutError:
raise TimeoutError(f"Command execution timed out after {timeout} seconds")
return ''.join(stdout_data), ''.join(stderr_data), exit_code
async def stream_output(
self,
cuid: str,
command: str,
on_stdout: Optional[Callable[[str], None]] = None,
on_stderr: Optional[Callable[[str], None]] = None
) -> int:
"""
Stream command output in real-time
Args:
cuid: Container UID
command: Command to execute
on_stdout: Callback for stdout data
on_stderr: Callback for stderr data
Returns:
Exit code
"""
self._check_session()
url = f"{self.ws_url}/ws/{cuid}"
headers = {"Authorization": self.auth_header}
exit_code = -1
async with self.session.ws_connect(url, headers=headers) as ws:
# Send command
await ws.send_json({
"type": "stdin",
"data": command + "\n",
"encoding": "utf8"
})
# Read output
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data['type'] == 'stdout':
if on_stdout:
if data['encoding'] == 'base64':
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
on_stdout(text)
elif data['type'] == 'stderr':
if on_stderr:
if data['encoding'] == 'base64':
text = base64.b64decode(data['data']).decode('utf-8', errors='replace')
else:
text = data['data']
on_stderr(text)
elif data['type'] == 'exit':
exit_code = data['code']
break
return exit_code
# Batch Operations
async def batch_create(
self,
configs: List[ContainerConfig]
) -> List[ContainerInfo]:
"""
Create multiple containers in parallel
Args:
configs: List of ContainerConfig objects
Returns:
List of created ContainerInfo objects
"""
tasks = []
for config in configs:
task = self.create_container(
image=config.image,
env=config.env,
tags=config.tags,
resources=config.resources,
ports=config.ports
)
tasks.append(task)
return await asyncio.gather(*tasks)
async def batch_delete(self, cuids: List[str]) -> List[Optional[Exception]]:
"""
Delete multiple containers in parallel
Args:
cuids: List of container UIDs
Returns:
List of exceptions (None if successful)
"""
tasks = []
for cuid in cuids:
tasks.append(self.delete_container(cuid))
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if isinstance(r, Exception) else None for r in results]
async def batch_execute(
self,
commands: Dict[str, str],
timeout: float = 30.0
) -> Dict[str, Tuple[str, str, int]]:
"""
Execute commands on multiple containers in parallel
Args:
commands: Dict of {cuid: command}
timeout: Execution timeout per command
Returns:
Dict of {cuid: (stdout, stderr, exit_code)}
"""
tasks = {}
for cuid, command in commands.items():
tasks[cuid] = self.execute_command(cuid, command, timeout)
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
output = {}
for cuid, result in zip(tasks.keys(), results):
if isinstance(result, Exception):
output[cuid] = ("", str(result), -1)
else:
output[cuid] = result
return output
# Utility Methods
async def list_all_containers(
self,
status: Optional[List[str]] = None
) -> List[ContainerInfo]:
"""
List all containers (handles pagination automatically)
Args:
status: Optional status filter
Returns:
Complete list of ContainerInfo objects
"""
all_containers = []
cursor = None
while True:
containers, cursor = await self.list_containers(
status=status,
cursor=cursor,
limit=100
)
all_containers.extend(containers)
if not cursor:
break
return all_containers
async def wait_for_status(
self,
cuid: str,
target_status: str,
timeout: float = 60.0,
poll_interval: float = 1.0
) -> bool:
"""
Wait for container to reach target status
Args:
cuid: Container UID
target_status: Target status to wait for
timeout: Maximum wait time in seconds
poll_interval: Status check interval in seconds
Returns:
True if target status reached, False if timeout
"""
start_time = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start_time) < timeout:
try:
info = await self.get_container(cuid)
if info.status == target_status:
return True
except:
pass
await asyncio.sleep(poll_interval)
return False
async def upload_and_run(
self,
cuid: str,
files: Dict[str, bytes],
command: str,
wait_for_completion: bool = True
) -> Union[int, None]:
"""
Upload files and run a command
Args:
cuid: Container UID
files: Files to upload {path: content}
command: Command to execute
wait_for_completion: Wait for command to complete
Returns:
Exit code if waiting, None otherwise
"""
# Upload files
await self.upload_zip(cuid, files=files)
# Execute command
if wait_for_completion:
_, _, exit_code = await self.execute_command(cuid, command)
return exit_code
else:
# Start command without waiting
url = f"{self.ws_url}/ws/{cuid}"
headers = {"Authorization": self.auth_header}
async with self.session.ws_connect(url, headers=headers) as ws:
await ws.send_json({
"type": "stdin",
"data": command + "\n",
"encoding": "utf8"
})
return None