All source listed below is under MIT license if no LICENSE file stating different is available.
Retoor's Guide to Modern Python: Mastering aiohttp 3.13+ with Python 3.13
Complete Tutorial: aiohttp, Testing, Authentication, WebSockets, and Git Protocol Integration
Version Requirements:
- Python: 3.13.3 (Released October 7, 2024)
- aiohttp: 3.13.2 (Latest stable as of October 28, 2025)
- pytest: 8.3+
- pytest-aiohttp: 1.1.0 (Released January 23, 2025)
- pytest-asyncio: 1.2.0 (Released September 12, 2025)
- pydantic: 2.12.3 (Released October 17, 2025)
Table of Contents
- Python 3.13 Modern Features
- aiohttp Fundamentals
- Client Sessions and Connection Management
- Authentication Patterns
- Server Development
- Request Validation with Pydantic
- WebSocket Implementation
- Testing with pytest and pytest-aiohttp
- Advanced Middleware and Error Handling
- Performance Optimization
- Git Protocol Integration
- Repository Manager Implementation
- Best Practices and Patterns
- Automatic Memory and Context Search
Python 3.13 Modern Features
Key Python 3.13 Enhancements
Python 3.13 introduces significant improvements for asynchronous programming:
Experimental Free-Threaded Mode (No GIL)
- Enable true multi-threading with
python -X gil=0 - Significant for CPU-bound async operations
- Better performance on multi-core systems
JIT Compiler (Preview)
- Just-In-Time compilation for performance boosts
- Enable with
PYTHON_JIT=1environment variable - Early benchmarks show 10-20% improvements
Enhanced Interactive Interpreter
- Multi-line editing with syntax highlighting
- Colorized tracebacks for better debugging
- Improved REPL experience
Better Error Messages
- More precise error locations
- Clearer exception messages
- Context-aware suggestions
Modern Type Hints (PEP 695)
Python 3.13 fully supports modern generic syntax:
from typing import TypeVar, Generic
from collections.abc import Sequence, Mapping
# Old style (still works)
T = TypeVar('T')
class OldGeneric(Generic[T]):
def process(self, item: T) -> T:
return item
# New PEP 695 style (Python 3.12+)
class NewGeneric[T]:
def process(self, item: T) -> T:
return item
# Type aliases with 'type' keyword
type RequestHandler[T] = Callable[[Request], Awaitable[T]]
type JSONDict = dict[str, str | int | float | bool | None]
# Modern function annotations
async def fetch_data[T](url: str, parser: Callable[[bytes], T]) -> T | None:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return parser(await response.read())
return None
Dataclasses in Python 3.13
from dataclasses import dataclass, field, replace
from typing import ClassVar
import copy
@dataclass(slots=True, kw_only=True)
class User:
user_id: str
username: str
email: str
created_at: float = field(default_factory=time.time)
is_active: bool = True
_password_hash: str = field(repr=False, compare=False)
# New in 3.13: __static_attributes__
def __post_init__(self):
self.last_login: float | None = None
@property
def is_new(self) -> bool:
return time.time() - self.created_at < 86400
# New in 3.13: copy.replace() works with dataclasses
user = User(user_id="123", username="alice", email="alice@example.com", _password_hash="hashed")
updated = copy.replace(user, username="alice_updated")
# Access static attributes (new in 3.13)
print(User.__static_attributes__) # ('last_login',)
Modern Async Patterns
import asyncio
from collections.abc import AsyncIterator
# Async generators
async def fetch_paginated[T](
url: str,
parser: Callable[[dict], T]
) -> AsyncIterator[T]:
page = 1
async with aiohttp.ClientSession() as session:
while True:
async with session.get(f"{url}?page={page}") as response:
data = await response.json()
if not data['items']:
break
for item in data['items']:
yield parser(item)
page += 1
# Context manager pattern
class AsyncResourceManager:
async def __aenter__(self):
self.session = aiohttp.ClientSession()
await self.session.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.__aexit__(exc_type, exc_val, exc_tb)
aiohttp Fundamentals
Installation and Setup
# Core installation
pip install aiohttp==3.13.2
# With speedups (highly recommended)
pip install aiohttp[speedups]==3.13.2
# Additional recommended packages
pip install aiodns>=3.0.0 # Fast DNS resolution
pip install Brotli>=1.0.9 # Brotli compression support
pip install pydantic>=2.12.3 # Request validation
Basic Client Usage
import aiohttp
import asyncio
async def fetch_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
print(f"Status: {response.status}")
print(f"Content-Type: {response.headers['content-type']}")
# Various response methods
text = await response.text() # Text content
data = await response.json() # JSON parsing
raw = await response.read() # Raw bytes
asyncio.run(fetch_example())
Basic Server Usage
from aiohttp import web
async def hello_handler(request: web.Request) -> web.Response:
name = request.match_info.get('name', 'Anonymous')
return web.Response(text=f"Hello, {name}!")
async def json_handler(request: web.Request) -> web.Response:
data = await request.json()
return web.json_response({
'status': 'success',
'received': data
})
app = web.Application()
app.router.add_get('/', hello_handler)
app.router.add_get('/{name}', hello_handler)
app.router.add_post('/api/data', json_handler)
if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=8080)
Client Sessions and Connection Management
Session Management Best Practices
Never create a new session for each request - this is the most common mistake:
# ❌ WRONG - Creates new session for every request
async def bad_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.text()
# Session destroyed here
# ✅ CORRECT - Reuse session across requests
class APIClient:
def __init__(self, base_url: str):
self.base_url = base_url
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(
limit=100, # Total connection limit
limit_per_host=30, # Per-host limit
ttl_dns_cache=300, # DNS cache TTL
)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def get(self, path: str) -> dict:
async with self._session.get(path) as response:
response.raise_for_status()
return await response.json()
# Usage
async def main():
async with APIClient('https://api.example.com') as client:
user = await client.get('/users/123')
posts = await client.get('/posts')
comments = await client.get('/comments')
asyncio.run(main())
Advanced Session Configuration
import aiohttp
from aiohttp import ClientTimeout, TCPConnector, ClientSession
from typing import Optional
class AdvancedHTTPClient:
def __init__(
self,
base_url: str,
timeout: int = 30,
max_connections: int = 100,
max_connections_per_host: int = 30,
headers: Optional[dict[str, str]] = None
):
self.base_url = base_url
self.timeout = ClientTimeout(
total=timeout,
connect=10, # Connection timeout
sock_read=20 # Socket read timeout
)
self.connector = TCPConnector(
limit=max_connections,
limit_per_host=max_connections_per_host,
ttl_dns_cache=300,
ssl=None, # SSL context if needed
force_close=False, # Keep connections alive
enable_cleanup_closed=True
)
self.default_headers = headers or {}
self._session: Optional[ClientSession] = None
async def start(self):
if self._session is None:
self._session = ClientSession(
base_url=self.base_url,
timeout=self.timeout,
connector=self.connector,
headers=self.default_headers,
raise_for_status=False,
connector_owner=True,
auto_decompress=True,
trust_env=True
)
async def close(self):
if self._session:
await self._session.close()
await asyncio.sleep(0.25) # Allow cleanup
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def request(
self,
method: str,
path: str,
**kwargs
) -> aiohttp.ClientResponse:
if not self._session:
await self.start()
return await self._session.request(method, path, **kwargs)
Cookie Management
import aiohttp
from http.cookies import SimpleCookie
# Automatic cookie handling
async def with_cookies():
# Create cookie jar
jar = aiohttp.CookieJar(unsafe=False) # Only HTTPS cookies
async with aiohttp.ClientSession(cookie_jar=jar) as session:
# Cookies are automatically stored and sent
await session.get('https://example.com/login')
# Manually update cookies
session.cookie_jar.update_cookies(
{'session_id': 'abc123'},
response_url='https://example.com'
)
# Access cookies
for cookie in session.cookie_jar:
print(f"{cookie.key}: {cookie.value}")
# Custom cookie handling
async def custom_cookies():
cookies = {'auth_token': 'xyz789'}
async with aiohttp.ClientSession(cookies=cookies) as session:
async with session.get('https://example.com/api') as response:
# Read response cookies
print(response.cookies)
Authentication Patterns
Basic Authentication
import aiohttp
from aiohttp import BasicAuth
import base64
# Method 1: Using BasicAuth helper
async def basic_auth_helper(username: str, password: str):
auth = BasicAuth(login=username, password=password)
async with aiohttp.ClientSession(auth=auth) as session:
async with session.get('https://api.example.com/protected') as response:
return await response.json()
# Method 2: Manual base64 encoding
async def basic_auth_manual(username: str, password: str):
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers = {'Authorization': f'Basic {encoded}'}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('https://api.example.com/protected') as response:
return await response.json()
# Method 3: Per-request authentication
async def basic_auth_per_request(username: str, password: str, url: str):
auth = BasicAuth(login=username, password=password)
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=auth) as response:
return await response.json()
Bearer Token Authentication
class TokenAuthClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.token = token
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Accept': 'application/json'
}
self._session = aiohttp.ClientSession(
base_url=self.base_url,
headers=headers
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def get(self, path: str) -> dict:
async with self._session.get(path) as response:
response.raise_for_status()
return await response.json()
async def post(self, path: str, data: dict) -> dict:
async with self._session.post(path, json=data) as response:
response.raise_for_status()
return await response.json()
# Usage
async def example():
async with TokenAuthClient('https://api.example.com', 'your_token_here') as client:
user = await client.get('/user')
result = await client.post('/items', {'name': 'test'})
API Key Authentication
class APIKeyClient:
def __init__(
self,
base_url: str,
api_key: str,
key_location: str = 'header', # 'header' or 'query'
key_name: str = 'X-API-Key'
):
self.base_url = base_url
self.api_key = api_key
self.key_location = key_location
self.key_name = key_name
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
if self.key_location == 'header':
headers = {self.key_name: self.api_key}
self._session = aiohttp.ClientSession(
base_url=self.base_url,
headers=headers
)
else:
self._session = aiohttp.ClientSession(base_url=self.base_url)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def request(self, method: str, path: str, **kwargs):
if self.key_location == 'query':
params = kwargs.get('params', {})
params[self.key_name] = self.api_key
kwargs['params'] = params
async with self._session.request(method, path, **kwargs) as response:
response.raise_for_status()
return await response.json()
Digest Authentication (aiohttp 3.12.8+)
from aiohttp import ClientSession, DigestAuthMiddleware
async def digest_auth_example():
# Create digest auth middleware
digest_auth = DigestAuthMiddleware(
login="user",
password="password",
preemptive=True # New in 3.12.8: preemptive authentication
)
# Pass middleware to session
async with ClientSession(middlewares=(digest_auth,)) as session:
async with session.get("https://httpbin.org/digest-auth/auth/user/password") as resp:
print(await resp.text())
OAuth 2.0 Token Refresh Pattern
import asyncio
from datetime import datetime, timedelta
from typing import Optional
class OAuth2Client:
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None
self._refresh_token: Optional[str] = None
self._session: Optional[aiohttp.ClientSession] = None
self._lock = asyncio.Lock()
async def __aenter__(self):
self._session = aiohttp.ClientSession(base_url=self.base_url)
await self._ensure_token()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def _ensure_token(self):
async with self._lock:
now = datetime.now()
if (
not self._access_token or
not self._token_expires_at or
now >= self._token_expires_at
):
await self._refresh_access_token()
async def _refresh_access_token(self):
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
async with aiohttp.ClientSession() as session:
async with session.post(self.token_url, data=data) as response:
response.raise_for_status()
token_data = await response.json()
self._access_token = token_data['access_token']
expires_in = token_data.get('expires_in', 3600)
self._token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)
self._refresh_token = token_data.get('refresh_token')
async def request(self, method: str, path: str, **kwargs):
await self._ensure_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {self._access_token}'
kwargs['headers'] = headers
async with self._session.request(method, path, **kwargs) as response:
if response.status == 401:
await self._refresh_access_token()
headers['Authorization'] = f'Bearer {self._access_token}'
async with self._session.request(method, path, **kwargs) as retry:
retry.raise_for_status()
return await retry.json()
response.raise_for_status()
return await response.json()
Server Development
Application Structure
from aiohttp import web
from typing import Callable, Awaitable
# Type aliases
Handler = Callable[[web.Request], Awaitable[web.Response]]
class Application:
def __init__(self):
self.app = web.Application()
self.setup_routes()
self.setup_middlewares()
def setup_routes(self):
self.app.router.add_get('/', self.index)
self.app.router.add_get('/health', self.health)
# API routes
self.app.router.add_route('*', '/api/{path:.*}', self.api_handler)
def setup_middlewares(self):
self.app.middlewares.append(self.error_middleware)
self.app.middlewares.append(self.logging_middleware)
async def index(self, request: web.Request) -> web.Response:
return web.Response(text='Hello, World!')
async def health(self, request: web.Request) -> web.Response:
return web.json_response({'status': 'healthy'})
async def api_handler(self, request: web.Request) -> web.Response:
path = request.match_info['path']
return web.json_response({
'path': path,
'method': request.method
})
@web.middleware
async def error_middleware(self, request: web.Request, handler: Handler):
try:
return await handler(request)
except web.HTTPException:
raise
except Exception as e:
return web.json_response(
{'error': str(e)},
status=500
)
@web.middleware
async def logging_middleware(self, request: web.Request, handler: Handler):
print(f"{request.method} {request.path}")
response = await handler(request)
print(f"Response: {response.status}")
return response
def run(self, host: str = '127.0.0.1', port: int = 8080):
web.run_app(self.app, host=host, port=port)
if __name__ == '__main__':
app = Application()
app.run()
Request Handling
from aiohttp import web, multipart
from typing import Optional
class RequestHandlers:
# Query parameters
async def query_params(self, request: web.Request) -> web.Response:
# Get single parameter
name = request.query.get('name', 'Anonymous')
# Get all values for a key
tags = request.query.getall('tag', [])
# Get as integer with default
page = int(request.query.get('page', '1'))
return web.json_response({
'name': name,
'tags': tags,
'page': page
})
# Path parameters
async def path_params(self, request: web.Request) -> web.Response:
user_id = request.match_info['user_id']
action = request.match_info.get('action', 'view')
return web.json_response({
'user_id': user_id,
'action': action
})
# JSON body
async def json_body(self, request: web.Request) -> web.Response:
try:
data = await request.json()
except ValueError:
return web.json_response(
{'error': 'Invalid JSON'},
status=400
)
return web.json_response({
'received': data,
'type': type(data).__name__
})
# Form data
async def form_data(self, request: web.Request) -> web.Response:
data = await request.post()
result = {}
for key in data:
value = data.get(key)
result[key] = value
return web.json_response(result)
# File upload
async def file_upload(self, request: web.Request) -> web.Response:
reader = await request.multipart()
uploaded_files = []
async for field in reader:
if field.filename:
size = 0
content = bytearray()
while True:
chunk = await field.read_chunk()
if not chunk:
break
size += len(chunk)
content.extend(chunk)
uploaded_files.append({
'filename': field.filename,
'size': size,
'content_type': field.headers.get('Content-Type')
})
return web.json_response({
'files': uploaded_files
})
# Headers
async def headers_example(self, request: web.Request) -> web.Response:
auth_header = request.headers.get('Authorization')
user_agent = request.headers.get('User-Agent')
custom_header = request.headers.get('X-Custom-Header')
response_headers = {
'X-Custom-Response': 'value',
'X-Request-ID': 'unique-id-123'
}
return web.json_response(
{
'auth': auth_header,
'user_agent': user_agent,
'custom': custom_header
},
headers=response_headers
)
# Cookies
async def cookies_example(self, request: web.Request) -> web.Response:
session_id = request.cookies.get('session_id')
response = web.json_response({
'session_id': session_id
})
# Set cookie
response.set_cookie(
'session_id',
'new-session-id',
max_age=3600,
httponly=True,
secure=True,
samesite='Strict'
)
return response
Response Types
from aiohttp import web
import json
class ResponseExamples:
# Text response
async def text_response(self, request: web.Request) -> web.Response:
return web.Response(
text='Plain text response',
content_type='text/plain'
)
# JSON response
async def json_response(self, request: web.Request) -> web.Response:
return web.json_response({
'status': 'success',
'data': {'key': 'value'}
})
# HTML response
async def html_response(self, request: web.Request) -> web.Response:
html = """
<!DOCTYPE html>
<html>
<head><title>Example</title></head>
<body><h1>Hello, World!</h1></body>
</html>
"""
return web.Response(
text=html,
content_type='text/html'
)
# Binary response
async def binary_response(self, request: web.Request) -> web.Response:
data = b'\x00\x01\x02\x03\x04'
return web.Response(
body=data,
content_type='application/octet-stream'
)
# File download
async def file_download(self, request: web.Request) -> web.Response:
return web.FileResponse(
path='./example.pdf',
headers={
'Content-Disposition': 'attachment; filename="example.pdf"'
}
)
# Streaming response
async def streaming_response(self, request: web.Request) -> web.StreamResponse:
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/plain'
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
# Redirect
async def redirect_response(self, request: web.Request) -> web.Response:
raise web.HTTPFound('/new-location')
# Custom status codes
async def custom_status(self, request: web.Request) -> web.Response:
return web.json_response(
{'message': 'Created'},
status=201
)
Request Validation with Pydantic
Basic Pydantic Integration
from pydantic import BaseModel, Field, field_validator, ConfigDict
from pydantic import EmailStr, HttpUrl
from aiohttp import web
from typing import Optional, Literal
# Pydantic 2.12+ models
class UserCreate(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
username: str = Field(min_length=3, max_length=50)
email: EmailStr
password: str = Field(min_length=8)
age: Optional[int] = Field(None, ge=18, le=120)
role: Literal['user', 'admin', 'moderator'] = 'user'
website: Optional[HttpUrl] = None
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v.lower()
@field_validator('password')
@classmethod
def validate_password(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v
class UserResponse(BaseModel):
user_id: str
username: str
email: EmailStr
role: str
created_at: float
# Handler with validation
async def create_user(request: web.Request) -> web.Response:
try:
data = await request.json()
user_data = UserCreate(**data)
except ValueError as e:
return web.json_response(
{'error': 'Validation error', 'details': str(e)},
status=400
)
# Process validated data
user = UserResponse(
user_id='123',
username=user_data.username,
email=user_data.email,
role=user_data.role,
created_at=time.time()
)
return web.json_response(
user.model_dump(),
status=201
)
Advanced Validation Patterns
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Any, Optional
from enum import Enum
class Priority(str, Enum):
LOW = 'low'
MEDIUM = 'medium'
HIGH = 'high'
URGENT = 'urgent'
class TaskCreate(BaseModel):
model_config = ConfigDict(
str_strip_whitespace=True,
extra='forbid' # Reject extra fields
)
title: str = Field(min_length=1, max_length=200)
description: Optional[str] = Field(None, max_length=5000)
priority: Priority = Priority.MEDIUM
tags: list[str] = Field(default_factory=list, max_length=10)
due_date: Optional[float] = None
assigned_to: Optional[str] = None
@field_validator('tags')
@classmethod
def validate_tags(cls, v: list[str]) -> list[str]:
if len(v) != len(set(v)):
raise ValueError('Tags must be unique')
return [tag.lower() for tag in v]
@model_validator(mode='after')
def validate_model(self) -> 'TaskCreate':
if self.priority == Priority.URGENT and not self.assigned_to:
raise ValueError('Urgent tasks must be assigned')
if self.due_date and self.due_date < time.time():
raise ValueError('Due date cannot be in the past')
return self
# Middleware for automatic validation
@web.middleware
async def validation_middleware(request: web.Request, handler):
# Get validation schema from route
schema = getattr(handler, '_validation_schema', None)
if schema and request.method in ('POST', 'PUT', 'PATCH'):
try:
data = await request.json()
validated = schema(**data)
request['validated_data'] = validated
except ValueError as e:
return web.json_response(
{'error': 'Validation error', 'details': str(e)},
status=400
)
return await handler(request)
# Decorator for validation
def validate_with(schema: type[BaseModel]):
def decorator(handler):
handler._validation_schema = schema
return handler
return decorator
# Usage
@validate_with(TaskCreate)
async def create_task(request: web.Request) -> web.Response:
task_data: TaskCreate = request['validated_data']
# Data is already validated
return web.json_response({
'task_id': 'task-123',
'title': task_data.title,
'priority': task_data.priority.value
}, status=201)
Query Parameter Validation
from pydantic import BaseModel, Field
from typing import Optional
class PaginationParams(BaseModel):
page: int = Field(1, ge=1, le=1000)
per_page: int = Field(20, ge=1, le=100)
sort_by: Optional[str] = Field(None, pattern=r'^[a-zA-Z_]+$')
order: Literal['asc', 'desc'] = 'asc'
@property
def offset(self) -> int:
return (self.page - 1) * self.per_page
@property
def limit(self) -> int:
return self.per_page
async def list_items(request: web.Request) -> web.Response:
try:
params = PaginationParams(**request.query)
except ValueError as e:
return web.json_response(
{'error': 'Invalid parameters', 'details': str(e)},
status=400
)
# Use validated params
items = [] # Fetch from database with params.offset and params.limit
return web.json_response({
'items': items,
'page': params.page,
'per_page': params.per_page,
'total': 100
})
WebSocket Implementation
Basic WebSocket Server
from aiohttp import web, WSMsgType
import asyncio
class WebSocketHandler:
def __init__(self):
self.active_connections: set[web.WebSocketResponse] = set()
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self.active_connections.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
# Echo message back
await ws.send_str(f"Echo: {msg.data}")
# Broadcast to all connections
await self.broadcast(f"User says: {msg.data}")
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
finally:
self.active_connections.discard(ws)
return ws
async def broadcast(self, message: str):
if self.active_connections:
await asyncio.gather(
*[ws.send_str(message) for ws in self.active_connections],
return_exceptions=True
)
# Setup
app = web.Application()
handler = WebSocketHandler()
app.router.add_get('/ws', handler.websocket_handler)
Advanced WebSocket Server with Authentication
from aiohttp import web, WSMsgType
import json
import asyncio
from typing import Optional
import jwt
class AuthenticatedWebSocketHandler:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.connections: dict[str, web.WebSocketResponse] = {}
def verify_token(self, token: str) -> Optional[dict]:
try:
return jwt.decode(token, self.secret_key, algorithms=['HS256'])
except jwt.InvalidTokenError:
return None
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse(heartbeat=30)
await ws.prepare(request)
user_id: Optional[str] = None
try:
# Wait for authentication message
msg = await asyncio.wait_for(ws.receive(), timeout=10.0)
if msg.type != WSMsgType.TEXT:
await ws.send_json({'error': 'Authentication required'})
await ws.close()
return ws
auth_data = json.loads(msg.data)
token = auth_data.get('token')
if not token:
await ws.send_json({'error': 'Token required'})
await ws.close()
return ws
payload = self.verify_token(token)
if not payload:
await ws.send_json({'error': 'Invalid token'})
await ws.close()
return ws
user_id = payload['user_id']
self.connections[user_id] = ws
await ws.send_json({
'type': 'auth_success',
'user_id': user_id
})
# Handle messages
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = json.loads(msg.data)
await self.handle_message(user_id, data)
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
except asyncio.TimeoutError:
await ws.send_json({'error': 'Authentication timeout'})
finally:
if user_id and user_id in self.connections:
del self.connections[user_id]
return ws
async def handle_message(self, user_id: str, data: dict):
message_type = data.get('type')
if message_type == 'ping':
await self.send_to_user(user_id, {'type': 'pong'})
elif message_type == 'broadcast':
await self.broadcast({
'type': 'message',
'from': user_id,
'content': data.get('content')
})
elif message_type == 'direct':
to_user = data.get('to')
await self.send_to_user(to_user, {
'type': 'direct_message',
'from': user_id,
'content': data.get('content')
})
async def send_to_user(self, user_id: str, message: dict):
ws = self.connections.get(user_id)
if ws and not ws.closed:
await ws.send_json(message)
async def broadcast(self, message: dict):
if self.connections:
await asyncio.gather(
*[ws.send_json(message) for ws in self.connections.values() if not ws.closed],
return_exceptions=True
)
WebSocket Client
import aiohttp
import asyncio
async def websocket_client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('http://localhost:8080/ws') as ws:
# Send authentication
await ws.send_json({
'token': 'your-jwt-token'
})
# Receive authentication response
msg = await ws.receive()
print(f"Auth response: {msg.data}")
# Send messages
await ws.send_json({
'type': 'broadcast',
'content': 'Hello, everyone!'
})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.json()
print(f"Received: {data}")
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
asyncio.run(websocket_client())
Testing with pytest and pytest-aiohttp
Basic Test Setup
# conftest.py
import pytest
import asyncio
from aiohttp import web
from typing import AsyncIterator
pytest_plugins = 'aiohttp.pytest_plugin'
@pytest.fixture
async def app() -> AsyncIterator[web.Application]:
app = web.Application()
async def hello(request):
return web.Response(text='Hello, World!')
app.router.add_get('/', hello)
yield app
# Cleanup
await app.cleanup()
@pytest.fixture
async def client(aiohttp_client, app):
return await aiohttp_client(app)
Basic Tests
# test_basic.py
import pytest
from aiohttp import web
@pytest.mark.asyncio
async def test_hello(client):
resp = await client.get('/')
assert resp.status == 200
text = await resp.text()
assert 'Hello, World!' in text
@pytest.mark.asyncio
async def test_json_endpoint(client):
resp = await client.post('/api/data', json={'key': 'value'})
assert resp.status == 200
data = await resp.json()
assert data['key'] == 'value'
Testing with Fixtures
# conftest.py
import pytest
import asyncio
from typing import AsyncIterator
import aiohttp
@pytest.fixture
async def http_session() -> AsyncIterator[aiohttp.ClientSession]:
session = aiohttp.ClientSession()
yield session
await session.close()
@pytest.fixture
def sample_user():
return {
'username': 'testuser',
'email': 'test@example.com',
'password': 'TestPass123'
}
@pytest.fixture
async def authenticated_client(client, sample_user):
# Login
resp = await client.post('/login', json=sample_user)
assert resp.status == 200
# Extract token
data = await resp.json()
token = data['token']
# Set authorization header
client.session.headers['Authorization'] = f'Bearer {token}'
yield client
# tests/test_auth.py
@pytest.mark.asyncio
async def test_protected_endpoint(authenticated_client):
resp = await authenticated_client.get('/api/protected')
assert resp.status == 200
data = await resp.json()
assert 'user_id' in data
Parameterized Tests
import pytest
@pytest.mark.parametrize('username,email,expected_status', [
('valid', 'valid@example.com', 201),
('ab', 'valid@example.com', 400), # Too short
('valid', 'invalid-email', 400), # Invalid email
('', 'valid@example.com', 400), # Empty username
])
@pytest.mark.asyncio
async def test_user_creation_validation(client, username, email, expected_status):
resp = await client.post('/users', json={
'username': username,
'email': email,
'password': 'ValidPass123'
})
assert resp.status == expected_status
@pytest.mark.parametrize('method,path,expected', [
('GET', '/', 200),
('GET', '/api/users', 200),
('POST', '/api/users', 401), # Requires auth
('GET', '/nonexistent', 404),
])
@pytest.mark.asyncio
async def test_endpoints(client, method, path, expected):
if method == 'GET':
resp = await client.get(path)
elif method == 'POST':
resp = await client.post(path, json={})
assert resp.status == expected
Mocking External APIs
import pytest
from unittest.mock import AsyncMock, patch
from aiohttp import web
@pytest.mark.asyncio
async def test_external_api_call(client):
# Mock external API
with patch('aiohttp.ClientSession.get') as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={'data': 'mocked'})
mock_get.return_value.__aenter__.return_value = mock_response
resp = await client.get('/api/external')
assert resp.status == 200
data = await resp.json()
assert data['data'] == 'mocked'
@pytest.fixture
async def mock_database():
class MockDB:
def __init__(self):
self.data = {}
async def get(self, key):
return self.data.get(key)
async def set(self, key, value):
self.data[key] = value
async def delete(self, key):
if key in self.data:
del self.data[key]
return MockDB()
@pytest.mark.asyncio
async def test_with_mock_db(client, mock_database):
# Inject mock database into app
client.app['db'] = mock_database
# Test database operations
await mock_database.set('user:1', {'name': 'Test'})
resp = await client.get('/users/1')
assert resp.status == 200
Testing WebSockets
import pytest
from aiohttp import WSMsgType
@pytest.mark.asyncio
async def test_websocket_echo(aiohttp_client):
app = web.Application()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
return ws
app.router.add_get('/ws', websocket_handler)
client = await aiohttp_client(app)
async with client.ws_connect('/ws') as ws:
await ws.send_str('Hello')
msg = await ws.receive()
assert msg.data == 'Echo: Hello'
@pytest.mark.asyncio
async def test_websocket_broadcast(aiohttp_client):
from collections import defaultdict
connections = set()
app = web.Application()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
connections.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
# Broadcast to all
for conn in connections:
if conn != ws:
await conn.send_str(msg.data)
finally:
connections.discard(ws)
return ws
app.router.add_get('/ws', websocket_handler)
client = await aiohttp_client(app)
# Create two connections
async with client.ws_connect('/ws') as ws1:
async with client.ws_connect('/ws') as ws2:
await ws1.send_str('Hello from ws1')
msg = await ws2.receive()
assert msg.data == 'Hello from ws1'
Testing Middleware
import pytest
from aiohttp import web
@pytest.mark.asyncio
async def test_auth_middleware(aiohttp_client):
@web.middleware
async def auth_middleware(request, handler):
token = request.headers.get('Authorization')
if not token or not token.startswith('Bearer '):
raise web.HTTPUnauthorized()
request['user_id'] = 'user-123'
return await handler(request)
app = web.Application(middlewares=[auth_middleware])
async def protected(request):
return web.json_response({'user_id': request['user_id']})
app.router.add_get('/protected', protected)
client = await aiohttp_client(app)
# Without token
resp = await client.get('/protected')
assert resp.status == 401
# With token
resp = await client.get('/protected', headers={
'Authorization': 'Bearer valid-token'
})
assert resp.status == 200
data = await resp.json()
assert data['user_id'] == 'user-123'
Coverage and Best Practices
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
"--verbose",
"--strict-markers",
"--cov=app",
"--cov-report=html",
"--cov-report=term-missing",
]
# Best practices
# 1. Keep tests independent
# 2. Use fixtures for common setup
# 3. Mock external dependencies
# 4. Test edge cases
# 5. Use parametrize for similar tests
# 6. Clean up resources properly
Advanced Middleware and Error Handling
Error Handling Middleware
from aiohttp import web
import logging
from typing import Callable, Awaitable
logger = logging.getLogger(__name__)
@web.middleware
async def error_middleware(
request: web.Request,
handler: Callable[[web.Request], Awaitable[web.Response]]
) -> web.Response:
try:
return await handler(request)
except web.HTTPException as e:
# HTTP exceptions should pass through
raise
except ValueError as e:
logger.warning(f"Validation error: {e}")
return web.json_response(
{
'error': 'Validation Error',
'message': str(e)
},
status=400
)
except PermissionError as e:
logger.warning(f"Permission denied: {e}")
return web.json_response(
{
'error': 'Forbidden',
'message': 'You do not have permission to access this resource'
},
status=403
)
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return web.json_response(
{
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
},
status=500
)
Logging Middleware
import time
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
@web.middleware
async def logging_middleware(request: web.Request, handler):
start_time = time.time()
# Log request
logger.info(
f"Request started",
extra={
'method': request.method,
'path': request.path,
'query': dict(request.query),
'remote': request.remote
}
)
try:
response = await handler(request)
# Log response
duration = time.time() - start_time
logger.info(
f"Request completed",
extra={
'method': request.method,
'path': request.path,
'status': response.status,
'duration_ms': duration * 1000
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed",
extra={
'method': request.method,
'path': request.path,
'duration_ms': duration * 1000,
'error': str(e)
},
exc_info=True
)
raise
CORS Middleware
from aiohttp import web
from typing import Optional
@web.middleware
async def cors_middleware(request: web.Request, handler):
# Handle preflight
if request.method == 'OPTIONS':
response = web.Response()
else:
response = await handler(request)
# Add CORS headers
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Max-Age'] = '3600'
return response
# Or use aiohttp-cors library
import aiohttp_cors
app = web.Application()
# Configure CORS
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*"
)
})
# Add routes
resource = app.router.add_resource("/api/endpoint")
route = resource.add_route("GET", handler)
cors.add(route)
Rate Limiting Middleware
from aiohttp import web
import time
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter:
def __init__(self, max_requests: int = 100, window: int = 60):
self.max_requests = max_requests
self.window = window
self.requests: Dict[str, list[float]] = defaultdict(list)
def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
now = time.time()
cutoff = now - self.window
# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > cutoff
]
if len(self.requests[client_id]) >= self.max_requests:
oldest = self.requests[client_id][0]
retry_after = oldest + self.window - now
return False, retry_after
self.requests[client_id].append(now)
return True, None
rate_limiter = RateLimiter(max_requests=100, window=60)
@web.middleware
async def rate_limit_middleware(request: web.Request, handler):
# Use IP address as client identifier
client_id = request.remote
allowed, retry_after = rate_limiter.is_allowed(client_id)
if not allowed:
return web.json_response(
{
'error': 'Rate limit exceeded',
'retry_after': int(retry_after)
},
status=429,
headers={'Retry-After': str(int(retry_after))}
)
return await handler(request)
Performance Optimization
Connection Pooling
import aiohttp
from aiohttp import TCPConnector, ClientTimeout
class OptimizedClient:
def __init__(self, base_url: str):
self.base_url = base_url
# Optimized connector
self.connector = TCPConnector(
limit=100, # Total connections
limit_per_host=30, # Per host
ttl_dns_cache=300, # DNS cache TTL
force_close=False, # Keep-alive
enable_cleanup_closed=True,
use_dns_cache=True
)
# Optimized timeout
self.timeout = ClientTimeout(
total=30,
connect=10,
sock_read=20
)
self._session: Optional[aiohttp.ClientSession] = None
async def start(self):
self._session = aiohttp.ClientSession(
base_url=self.base_url,
connector=self.connector,
timeout=self.timeout,
connector_owner=True,
auto_decompress=True,
trust_env=True,
read_bufsize=2**16 # 64KB buffer
)
async def close(self):
if self._session:
await self._session.close()
await asyncio.sleep(0.25)
Concurrent Requests
import asyncio
import aiohttp
from typing import List, Any
async def fetch_many(urls: List[str]) -> List[Any]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_one(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.json()
# With semaphore for limiting concurrency
async def fetch_with_limit(urls: List[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_limited(url: str):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[fetch_limited(url) for url in urls])
Streaming Large Responses
async def download_large_file(url: str, filepath: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
# Server-side streaming
async def stream_large_response(request: web.Request) -> web.StreamResponse:
response = web.StreamResponse()
response.headers['Content-Type'] = 'application/octet-stream'
await response.prepare(request)
# Stream data in chunks
with open('large_file.dat', 'rb') as f:
while chunk := f.read(8192):
await response.write(chunk)
await response.write_eof()
return response
Caching
from functools import lru_cache
import time
class CachedClient:
def __init__(self):
self.cache = {}
self.cache_ttl = 300 # 5 minutes
async def get_with_cache(self, url: str):
now = time.time()
# Check cache
if url in self.cache:
data, timestamp = self.cache[url]
if now - timestamp < self.cache_ttl:
return data
# Fetch and cache
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
self.cache[url] = (data, now)
return data
Git Protocol Integration
Understanding Git Smart HTTP
Git Smart HTTP protocol allows git clients to clone, fetch, and push over HTTP/HTTPS. The protocol involves:
- Service Discovery: Client requests
/info/refs?service=git-upload-packorgit-receive-pack - Negotiation: Client and server negotiate which objects to transfer
- Pack Transfer: Server sends/receives packfiles
Basic Git HTTP Backend
from aiohttp import web
import subprocess
import os
from pathlib import Path
class GitHTTPBackend:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.git_backend = '/usr/lib/git-core/git-http-backend'
async def handle_info_refs(self, request: web.Request) -> web.Response:
repo_path = request.match_info['repo']
service = request.query.get('service', '')
if service not in ('git-upload-pack', 'git-receive-pack'):
return web.Response(status=400, text='Invalid service')
full_path = self.repo_root / repo_path
if not full_path.exists():
return web.Response(status=404, text='Repository not found')
# Build environment
env = os.environ.copy()
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
env['GIT_HTTP_EXPORT_ALL'] = '1'
env['PATH_INFO'] = f'/{repo_path}/info/refs'
env['QUERY_STRING'] = f'service={service}'
env['REQUEST_METHOD'] = 'GET'
# Execute git-http-backend
proc = await asyncio.create_subprocess_exec(
self.git_backend,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.Response(status=500, text=stderr.decode())
# Parse CGI output
headers_end = stdout.find(b'\r\n\r\n')
if headers_end == -1:
return web.Response(status=500)
header_lines = stdout[:headers_end].decode().split('\r\n')
body = stdout[headers_end + 4:]
# Parse headers
headers = {}
for line in header_lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return web.Response(
body=body,
headers=headers,
status=200
)
async def handle_service(self, request: web.Request) -> web.Response:
repo_path = request.match_info['repo']
service = request.match_info['service']
if service not in ('git-upload-pack', 'git-receive-pack'):
return web.Response(status=400)
full_path = self.repo_root / repo_path
if not full_path.exists():
return web.Response(status=404)
# Read request body
body = await request.read()
# Build environment
env = os.environ.copy()
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
env['GIT_HTTP_EXPORT_ALL'] = '1'
env['PATH_INFO'] = f'/{repo_path}/{service}'
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = request.content_type
env['CONTENT_LENGTH'] = str(len(body))
# Execute git service
proc = await asyncio.create_subprocess_exec(
self.git_backend,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await proc.communicate(input=body)
if proc.returncode != 0:
return web.Response(status=500, text=stderr.decode())
# Parse CGI output
headers_end = stdout.find(b'\r\n\r\n')
header_lines = stdout[:headers_end].decode().split('\r\n')
body = stdout[headers_end + 4:]
headers = {}
for line in header_lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return web.Response(
body=body,
headers=headers,
status=200
)
# Setup routes
git_backend = GitHTTPBackend(Path('/var/git/repos'))
app = web.Application()
app.router.add_get('/{repo:.+}/info/refs', git_backend.handle_info_refs)
app.router.add_post('/{repo:.+}/{service:(git-upload-pack|git-receive-pack)}', git_backend.handle_service)
Git Backend with Authentication
from aiohttp import web
import base64
import subprocess
from pathlib import Path
class AuthenticatedGitBackend:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.users = {
'alice': 'password123',
'bob': 'secret456'
}
def verify_auth(self, request: web.Request) -> tuple[bool, str | None]:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Basic '):
return False, None
try:
encoded = auth_header[6:]
decoded = base64.b64decode(encoded).decode()
username, password = decoded.split(':', 1)
if self.users.get(username) == password:
return True, username
except:
pass
return False, None
@web.middleware
async def auth_middleware(self, request: web.Request, handler):
# Allow anonymous reads
service = request.query.get('service', '')
if request.method == 'GET' and service == 'git-upload-pack':
return await handler(request)
# Require authentication for pushes
if service == 'git-receive-pack' or 'git-receive-pack' in request.path:
authorized, username = self.verify_auth(request)
if not authorized:
return web.Response(
status=401,
headers={'WWW-Authenticate': 'Basic realm="Git Access"'},
text='Authentication required'
)
request['username'] = username
return await handler(request)
async def handle_info_refs(self, request: web.Request):
# Git info/refs implementation
# Similar to previous example but with auth
pass
async def handle_service(self, request: web.Request):
# Git service implementation
# Similar to previous example but with auth
pass
Repository Manager Implementation
Repository Browser
from aiohttp import web
import os
from pathlib import Path
import subprocess
from typing import Optional
import json
class RepositoryManager:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
async def list_repositories(self, request: web.Request) -> web.Response:
repos = []
for item in self.repo_root.iterdir():
if item.is_dir() and (item / '.git').exists():
repos.append({
'name': item.name,
'path': str(item.relative_to(self.repo_root)),
'type': 'git'
})
return web.json_response({'repositories': repos})
async def get_repository(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# Get repository info
info = await self._get_repo_info(repo_path)
return web.json_response(info)
async def create_repository(self, request: web.Request) -> web.Response:
data = await request.json()
repo_name = data.get('name')
if not repo_name:
return web.json_response(
{'error': 'Repository name required'},
status=400
)
repo_path = self.repo_root / repo_name
if repo_path.exists():
return web.json_response(
{'error': 'Repository already exists'},
status=400
)
# Create bare repository
repo_path.mkdir(parents=True)
proc = await asyncio.create_subprocess_exec(
'git', 'init', '--bare', str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': 'Failed to create repository'},
status=500
)
return web.json_response({
'name': repo_name,
'path': str(repo_path.relative_to(self.repo_root))
}, status=201)
async def delete_repository(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# Delete repository directory
import shutil
shutil.rmtree(repo_path)
return web.Response(status=204)
async def browse_tree(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
path = request.query.get('path', '')
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# List files in tree
proc = await asyncio.create_subprocess_exec(
'git', 'ls-tree', ref, path,
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': stderr.decode()},
status=400
)
# Parse ls-tree output
entries = []
for line in stdout.decode().strip().split('\n'):
if not line:
continue
mode, type_, hash_, name = line.split(None, 3)
entries.append({
'mode': mode,
'type': type_,
'hash': hash_,
'name': name
})
return web.json_response({'entries': entries})
async def get_file_content(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
path = request.query['path']
repo_path = self.repo_root / repo_name
# Get file content
proc = await asyncio.create_subprocess_exec(
'git', 'show', f'{ref}:{path}',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': 'File not found'},
status=404
)
return web.Response(
body=stdout,
content_type='text/plain'
)
async def get_commits(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
limit = int(request.query.get('limit', '50'))
repo_path = self.repo_root / repo_name
# Get commit log
proc = await asyncio.create_subprocess_exec(
'git', 'log', ref,
'--pretty=format:%H|%an|%ae|%at|%s',
f'-{limit}',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
commits = []
for line in stdout.decode().strip().split('\n'):
if not line:
continue
hash_, author, email, timestamp, message = line.split('|', 4)
commits.append({
'hash': hash_,
'author': author,
'email': email,
'timestamp': int(timestamp),
'message': message
})
return web.json_response({'commits': commits})
async def _get_repo_info(self, repo_path: Path) -> dict:
# Get HEAD
proc = await asyncio.create_subprocess_exec(
'git', 'rev-parse', 'HEAD',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
head = stdout.decode().strip() if proc.returncode == 0 else None
# Get branches
proc = await asyncio.create_subprocess_exec(
'git', 'branch', '-a',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
branches = [
line.strip().lstrip('* ')
for line in stdout.decode().strip().split('\n')
]
return {
'name': repo_path.name,
'head': head,
'branches': branches
}
# Setup routes
repo_manager = RepositoryManager(Path('/var/git/repos'))
app.router.add_get('/api/repositories', repo_manager.list_repositories)
app.router.add_get('/api/repositories/{repo}', repo_manager.get_repository)
app.router.add_post('/api/repositories', repo_manager.create_repository)
app.router.add_delete('/api/repositories/{repo}', repo_manager.delete_repository)
app.router.add_get('/api/repositories/{repo}/tree', repo_manager.browse_tree)
app.router.add_get('/api/repositories/{repo}/file', repo_manager.get_file_content)
app.router.add_get('/api/repositories/{repo}/commits', repo_manager.get_commits)
Best Practices and Patterns
Application Structure
project/
├── app/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── routes.py # Route definitions
│ ├── handlers.py # Request handlers
│ ├── middleware.py # Custom middleware
│ ├── models.py # Pydantic models
│ ├── database.py # Database connections
│ └── utils.py # Utility functions
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Test fixtures
│ ├── test_handlers.py
│ └── test_integration.py
├── requirements.txt
├── pyproject.toml
└── README.md
Configuration Management
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
model_config = ConfigDict(
env_file='.env',
env_file_encoding='utf-8',
case_sensitive=False
)
# Server
host: str = '127.0.0.1'
port: int = 8080
debug: bool = False
# Database
database_url: str
database_pool_size: int = 10
# Security
secret_key: str
jwt_algorithm: str = 'HS256'
jwt_expiration: int = 3600
# External APIs
external_api_url: Optional[str] = None
external_api_key: Optional[str] = None
settings = Settings()
Graceful Shutdown
from aiohttp import web
import asyncio
import signal
class Application:
def __init__(self):
self.app = web.Application()
self.cleanup_tasks = []
async def startup(self):
# Initialize resources
pass
async def cleanup(self):
# Cleanup resources
for task in self.cleanup_tasks:
await task
async def shutdown(self, app):
# Close database connections
# Close HTTP sessions
# Wait for background tasks
await self.cleanup()
def run(self):
self.app.on_startup.append(lambda app: self.startup())
self.app.on_cleanup.append(self.shutdown)
web.run_app(
self.app,
host='127.0.0.1',
port=8080,
shutdown_timeout=60.0
)
Summary
This guide covers:
- Python 3.13 modern features and type hints
- aiohttp 3.13+ client and server development
- Complete authentication patterns (Basic, Bearer, API Key, OAuth2)
- Pydantic 2.12+ validation
- WebSocket implementation
- Comprehensive testing with pytest-aiohttp
- Git protocol integration
- Repository management system
- Performance optimization
- Production-ready patterns
Key Takeaways:
- Always reuse ClientSession across requests
- Use type hints and Pydantic for validation
- Implement proper error handling and middleware
- Write comprehensive tests with pytest-aiohttp
- Follow async/await patterns consistently
- Optimize connection pooling and timeouts
- Handle cleanup and graceful shutdown properly
Guide Version: 1.0 Last Updated: November 2025 Compatible with: Python 3.13.3, aiohttp 3.13.2, pytest-aiohttp 1.1.0, pydantic 2.12.3
| .gitea/workflows | |
| rp | |
| tests | |
| .editorconfig | |
| .gitignore | |
| .pre-commit-config.yaml | |
| CHANGELOG.md | |
| CONTRIBUTING.md | |
| LICENSE | |
| Makefile | |
| pyproject.toml | |
| README.md | |
| requirements.txt | |
| rp.py |