2522 lines
70 KiB
Markdown
Raw Normal View History

# Retoor's Guide to Modern Python: Mastering aiohttp 3.13+ with Python 3.13
2025-11-04 05:57:23 +01:00
**Complete Tutorial: aiohttp, Testing, Authentication, WebSockets, and Git Protocol Integration**
2025-11-04 05:57:23 +01:00
Version Requirements:
- Python: **3.13.3** (Released October 7, 2024)
- aiohttp: **3.13.2** (Latest stable as of October 28, 2025)
- pytest: **8.3+**
- pytest-aiohttp: **1.1.0** (Released January 23, 2025)
- pytest-asyncio: **1.2.0** (Released September 12, 2025)
- pydantic: **2.12.3** (Released October 17, 2025)
2025-11-04 05:57:23 +01:00
---
## Table of Contents
1. [Python 3.13 Modern Features](#python-313-modern-features)
2. [aiohttp Fundamentals](#aiohttp-fundamentals)
3. [Client Sessions and Connection Management](#client-sessions-and-connection-management)
4. [Authentication Patterns](#authentication-patterns)
5. [Server Development](#server-development)
6. [Request Validation with Pydantic](#request-validation-with-pydantic)
7. [WebSocket Implementation](#websocket-implementation)
8. [Testing with pytest and pytest-aiohttp](#testing-with-pytest-and-pytest-aiohttp)
9. [Advanced Middleware and Error Handling](#advanced-middleware-and-error-handling)
10. [Performance Optimization](#performance-optimization)
11. [Git Protocol Integration](#git-protocol-integration)
12. [Repository Manager Implementation](#repository-manager-implementation)
13. [Best Practices and Patterns](#best-practices-and-patterns)
14. [Automatic Memory and Context Search](#automatic-memory-and-context-search)
2025-11-04 05:17:27 +01:00
---
2025-11-04 05:17:27 +01:00
## Python 3.13 Modern Features
2025-11-04 05:17:27 +01:00
### Key Python 3.13 Enhancements
2025-11-04 05:17:27 +01:00
Python 3.13 introduces significant improvements for asynchronous programming:
2025-11-04 05:17:27 +01:00
**Experimental Free-Threaded Mode (No GIL)**
- Enable true multi-threading with `python -X gil=0`
- Significant for CPU-bound async operations
- Better performance on multi-core systems
2025-11-04 05:17:27 +01:00
**JIT Compiler (Preview)**
- Just-In-Time compilation for performance boosts
- Enable with `PYTHON_JIT=1` environment variable
- Early benchmarks show 10-20% improvements
**Enhanced Interactive Interpreter**
- Multi-line editing with syntax highlighting
- Colorized tracebacks for better debugging
- Improved REPL experience
**Better Error Messages**
- More precise error locations
- Clearer exception messages
- Context-aware suggestions
### Modern Type Hints (PEP 695)
Python 3.13 fully supports modern generic syntax:
```python
from typing import TypeVar, Generic
from collections.abc import Sequence, Mapping
# Old style (still works)
T = TypeVar('T')
class OldGeneric(Generic[T]):
def process(self, item: T) -> T:
return item
# New PEP 695 style (Python 3.12+)
class NewGeneric[T]:
def process(self, item: T) -> T:
return item
# Type aliases with 'type' keyword
type RequestHandler[T] = Callable[[Request], Awaitable[T]]
type JSONDict = dict[str, str | int | float | bool | None]
# Modern function annotations
async def fetch_data[T](url: str, parser: Callable[[bytes], T]) -> T | None:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return parser(await response.read())
return None
2025-11-04 05:17:27 +01:00
```
### Dataclasses in Python 3.13
2025-11-04 05:17:27 +01:00
```python
from dataclasses import dataclass, field, replace
from typing import ClassVar
import copy
@dataclass(slots=True, kw_only=True)
class User:
user_id: str
username: str
email: str
created_at: float = field(default_factory=time.time)
is_active: bool = True
_password_hash: str = field(repr=False, compare=False)
# New in 3.13: __static_attributes__
def __post_init__(self):
self.last_login: float | None = None
@property
def is_new(self) -> bool:
return time.time() - self.created_at < 86400
# New in 3.13: copy.replace() works with dataclasses
user = User(user_id="123", username="alice", email="alice@example.com", _password_hash="hashed")
updated = copy.replace(user, username="alice_updated")
# Access static attributes (new in 3.13)
print(User.__static_attributes__) # ('last_login',)
2025-11-04 05:17:27 +01:00
```
### Modern Async Patterns
2025-11-04 05:17:27 +01:00
```python
import asyncio
from collections.abc import AsyncIterator
# Async generators
async def fetch_paginated[T](
url: str,
parser: Callable[[dict], T]
) -> AsyncIterator[T]:
page = 1
async with aiohttp.ClientSession() as session:
while True:
async with session.get(f"{url}?page={page}") as response:
data = await response.json()
if not data['items']:
break
for item in data['items']:
yield parser(item)
page += 1
# Context manager pattern
class AsyncResourceManager:
async def __aenter__(self):
self.session = aiohttp.ClientSession()
await self.session.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.__aexit__(exc_type, exc_val, exc_tb)
```
---
## aiohttp Fundamentals
### Installation and Setup
2025-11-04 05:17:27 +01:00
```bash
# Core installation
pip install aiohttp==3.13.2
# With speedups (highly recommended)
pip install aiohttp[speedups]==3.13.2
# Additional recommended packages
pip install aiodns>=3.0.0 # Fast DNS resolution
pip install Brotli>=1.0.9 # Brotli compression support
pip install pydantic>=2.12.3 # Request validation
2025-11-04 05:17:27 +01:00
```
### Basic Client Usage
```python
import aiohttp
import asyncio
async def fetch_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
print(f"Status: {response.status}")
print(f"Content-Type: {response.headers['content-type']}")
# Various response methods
text = await response.text() # Text content
data = await response.json() # JSON parsing
raw = await response.read() # Raw bytes
asyncio.run(fetch_example())
2025-11-04 05:17:27 +01:00
```
### Basic Server Usage
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web
async def hello_handler(request: web.Request) -> web.Response:
name = request.match_info.get('name', 'Anonymous')
return web.Response(text=f"Hello, {name}!")
async def json_handler(request: web.Request) -> web.Response:
data = await request.json()
return web.json_response({
'status': 'success',
'received': data
})
app = web.Application()
app.router.add_get('/', hello_handler)
app.router.add_get('/{name}', hello_handler)
app.router.add_post('/api/data', json_handler)
if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=8080)
2025-11-04 05:17:27 +01:00
```
---
## Client Sessions and Connection Management
### Session Management Best Practices
**Never create a new session for each request** - this is the most common mistake:
```python
# ❌ WRONG - Creates new session for every request
async def bad_example():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.text()
# Session destroyed here
# ✅ CORRECT - Reuse session across requests
class APIClient:
def __init__(self, base_url: str):
self.base_url = base_url
self._session: aiohttp.ClientSession | None = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(
limit=100, # Total connection limit
limit_per_host=30, # Per-host limit
ttl_dns_cache=300, # DNS cache TTL
)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def get(self, path: str) -> dict:
async with self._session.get(path) as response:
response.raise_for_status()
return await response.json()
# Usage
async def main():
async with APIClient('https://api.example.com') as client:
user = await client.get('/users/123')
posts = await client.get('/posts')
comments = await client.get('/comments')
asyncio.run(main())
2025-11-04 05:17:27 +01:00
```
### Advanced Session Configuration
```python
import aiohttp
from aiohttp import ClientTimeout, TCPConnector, ClientSession
from typing import Optional
class AdvancedHTTPClient:
def __init__(
self,
base_url: str,
timeout: int = 30,
max_connections: int = 100,
max_connections_per_host: int = 30,
headers: Optional[dict[str, str]] = None
):
self.base_url = base_url
self.timeout = ClientTimeout(
total=timeout,
connect=10, # Connection timeout
sock_read=20 # Socket read timeout
)
self.connector = TCPConnector(
limit=max_connections,
limit_per_host=max_connections_per_host,
ttl_dns_cache=300,
ssl=None, # SSL context if needed
force_close=False, # Keep connections alive
enable_cleanup_closed=True
)
self.default_headers = headers or {}
self._session: Optional[ClientSession] = None
async def start(self):
if self._session is None:
self._session = ClientSession(
base_url=self.base_url,
timeout=self.timeout,
connector=self.connector,
headers=self.default_headers,
raise_for_status=False,
connector_owner=True,
auto_decompress=True,
trust_env=True
)
async def close(self):
if self._session:
await self._session.close()
await asyncio.sleep(0.25) # Allow cleanup
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def request(
self,
method: str,
path: str,
**kwargs
) -> aiohttp.ClientResponse:
if not self._session:
await self.start()
return await self._session.request(method, path, **kwargs)
2025-11-04 05:17:27 +01:00
```
### Cookie Management
```python
import aiohttp
from http.cookies import SimpleCookie
# Automatic cookie handling
async def with_cookies():
# Create cookie jar
jar = aiohttp.CookieJar(unsafe=False) # Only HTTPS cookies
async with aiohttp.ClientSession(cookie_jar=jar) as session:
# Cookies are automatically stored and sent
await session.get('https://example.com/login')
# Manually update cookies
session.cookie_jar.update_cookies(
{'session_id': 'abc123'},
response_url='https://example.com'
)
# Access cookies
for cookie in session.cookie_jar:
print(f"{cookie.key}: {cookie.value}")
# Custom cookie handling
async def custom_cookies():
cookies = {'auth_token': 'xyz789'}
async with aiohttp.ClientSession(cookies=cookies) as session:
async with session.get('https://example.com/api') as response:
# Read response cookies
print(response.cookies)
2025-11-04 05:17:27 +01:00
```
---
## Authentication Patterns
### Basic Authentication
```python
import aiohttp
from aiohttp import BasicAuth
import base64
# Method 1: Using BasicAuth helper
async def basic_auth_helper(username: str, password: str):
auth = BasicAuth(login=username, password=password)
async with aiohttp.ClientSession(auth=auth) as session:
async with session.get('https://api.example.com/protected') as response:
return await response.json()
# Method 2: Manual base64 encoding
async def basic_auth_manual(username: str, password: str):
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers = {'Authorization': f'Basic {encoded}'}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('https://api.example.com/protected') as response:
return await response.json()
# Method 3: Per-request authentication
async def basic_auth_per_request(username: str, password: str, url: str):
auth = BasicAuth(login=username, password=password)
async with aiohttp.ClientSession() as session:
async with session.get(url, auth=auth) as response:
return await response.json()
2025-11-04 05:17:27 +01:00
```
### Bearer Token Authentication
```python
class TokenAuthClient:
def __init__(self, base_url: str, token: str):
self.base_url = base_url
self.token = token
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Accept': 'application/json'
}
self._session = aiohttp.ClientSession(
base_url=self.base_url,
headers=headers
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def get(self, path: str) -> dict:
async with self._session.get(path) as response:
response.raise_for_status()
return await response.json()
async def post(self, path: str, data: dict) -> dict:
async with self._session.post(path, json=data) as response:
response.raise_for_status()
return await response.json()
# Usage
async def example():
async with TokenAuthClient('https://api.example.com', 'your_token_here') as client:
user = await client.get('/user')
result = await client.post('/items', {'name': 'test'})
2025-11-04 05:17:27 +01:00
```
### API Key Authentication
```python
class APIKeyClient:
def __init__(
self,
base_url: str,
api_key: str,
key_location: str = 'header', # 'header' or 'query'
key_name: str = 'X-API-Key'
):
self.base_url = base_url
self.api_key = api_key
self.key_location = key_location
self.key_name = key_name
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
if self.key_location == 'header':
headers = {self.key_name: self.api_key}
self._session = aiohttp.ClientSession(
base_url=self.base_url,
headers=headers
)
else:
self._session = aiohttp.ClientSession(base_url=self.base_url)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def request(self, method: str, path: str, **kwargs):
if self.key_location == 'query':
params = kwargs.get('params', {})
params[self.key_name] = self.api_key
kwargs['params'] = params
async with self._session.request(method, path, **kwargs) as response:
response.raise_for_status()
return await response.json()
```
### Digest Authentication (aiohttp 3.12.8+)
```python
from aiohttp import ClientSession, DigestAuthMiddleware
async def digest_auth_example():
# Create digest auth middleware
digest_auth = DigestAuthMiddleware(
login="user",
password="password",
preemptive=True # New in 3.12.8: preemptive authentication
)
# Pass middleware to session
async with ClientSession(middlewares=(digest_auth,)) as session:
async with session.get("https://httpbin.org/digest-auth/auth/user/password") as resp:
print(await resp.text())
```
### OAuth 2.0 Token Refresh Pattern
```python
import asyncio
from datetime import datetime, timedelta
from typing import Optional
class OAuth2Client:
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None
self._refresh_token: Optional[str] = None
self._session: Optional[aiohttp.ClientSession] = None
self._lock = asyncio.Lock()
async def __aenter__(self):
self._session = aiohttp.ClientSession(base_url=self.base_url)
await self._ensure_token()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.close()
async def _ensure_token(self):
async with self._lock:
now = datetime.now()
if (
not self._access_token or
not self._token_expires_at or
now >= self._token_expires_at
):
await self._refresh_access_token()
async def _refresh_access_token(self):
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
2025-11-04 05:17:27 +01:00
}
async with aiohttp.ClientSession() as session:
async with session.post(self.token_url, data=data) as response:
response.raise_for_status()
token_data = await response.json()
self._access_token = token_data['access_token']
expires_in = token_data.get('expires_in', 3600)
self._token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)
self._refresh_token = token_data.get('refresh_token')
async def request(self, method: str, path: str, **kwargs):
await self._ensure_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {self._access_token}'
kwargs['headers'] = headers
async with self._session.request(method, path, **kwargs) as response:
if response.status == 401:
await self._refresh_access_token()
headers['Authorization'] = f'Bearer {self._access_token}'
async with self._session.request(method, path, **kwargs) as retry:
retry.raise_for_status()
return await retry.json()
response.raise_for_status()
return await response.json()
2025-11-04 05:17:27 +01:00
```
---
## Server Development
### Application Structure
```python
from aiohttp import web
from typing import Callable, Awaitable
# Type aliases
Handler = Callable[[web.Request], Awaitable[web.Response]]
class Application:
def __init__(self):
self.app = web.Application()
self.setup_routes()
self.setup_middlewares()
def setup_routes(self):
self.app.router.add_get('/', self.index)
self.app.router.add_get('/health', self.health)
# API routes
self.app.router.add_route('*', '/api/{path:.*}', self.api_handler)
def setup_middlewares(self):
self.app.middlewares.append(self.error_middleware)
self.app.middlewares.append(self.logging_middleware)
async def index(self, request: web.Request) -> web.Response:
return web.Response(text='Hello, World!')
async def health(self, request: web.Request) -> web.Response:
return web.json_response({'status': 'healthy'})
async def api_handler(self, request: web.Request) -> web.Response:
path = request.match_info['path']
return web.json_response({
'path': path,
'method': request.method
})
@web.middleware
async def error_middleware(self, request: web.Request, handler: Handler):
try:
return await handler(request)
except web.HTTPException:
raise
except Exception as e:
return web.json_response(
{'error': str(e)},
status=500
)
@web.middleware
async def logging_middleware(self, request: web.Request, handler: Handler):
print(f"{request.method} {request.path}")
response = await handler(request)
print(f"Response: {response.status}")
return response
def run(self, host: str = '127.0.0.1', port: int = 8080):
web.run_app(self.app, host=host, port=port)
if __name__ == '__main__':
app = Application()
app.run()
2025-11-04 05:17:27 +01:00
```
### Request Handling
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web, multipart
from typing import Optional
class RequestHandlers:
# Query parameters
async def query_params(self, request: web.Request) -> web.Response:
# Get single parameter
name = request.query.get('name', 'Anonymous')
# Get all values for a key
tags = request.query.getall('tag', [])
# Get as integer with default
page = int(request.query.get('page', '1'))
return web.json_response({
'name': name,
'tags': tags,
'page': page
})
# Path parameters
async def path_params(self, request: web.Request) -> web.Response:
user_id = request.match_info['user_id']
action = request.match_info.get('action', 'view')
return web.json_response({
'user_id': user_id,
'action': action
})
# JSON body
async def json_body(self, request: web.Request) -> web.Response:
try:
data = await request.json()
except ValueError:
return web.json_response(
{'error': 'Invalid JSON'},
status=400
)
return web.json_response({
'received': data,
'type': type(data).__name__
})
# Form data
async def form_data(self, request: web.Request) -> web.Response:
data = await request.post()
result = {}
for key in data:
value = data.get(key)
result[key] = value
return web.json_response(result)
# File upload
async def file_upload(self, request: web.Request) -> web.Response:
reader = await request.multipart()
uploaded_files = []
async for field in reader:
if field.filename:
size = 0
content = bytearray()
while True:
chunk = await field.read_chunk()
if not chunk:
break
size += len(chunk)
content.extend(chunk)
uploaded_files.append({
'filename': field.filename,
'size': size,
'content_type': field.headers.get('Content-Type')
})
return web.json_response({
'files': uploaded_files
})
# Headers
async def headers_example(self, request: web.Request) -> web.Response:
auth_header = request.headers.get('Authorization')
user_agent = request.headers.get('User-Agent')
custom_header = request.headers.get('X-Custom-Header')
response_headers = {
'X-Custom-Response': 'value',
'X-Request-ID': 'unique-id-123'
}
2025-11-04 05:17:27 +01:00
return web.json_response(
{
'auth': auth_header,
'user_agent': user_agent,
'custom': custom_header
},
headers=response_headers
)
# Cookies
async def cookies_example(self, request: web.Request) -> web.Response:
session_id = request.cookies.get('session_id')
response = web.json_response({
'session_id': session_id
})
# Set cookie
response.set_cookie(
'session_id',
'new-session-id',
max_age=3600,
httponly=True,
secure=True,
samesite='Strict'
)
return response
```
2025-11-04 05:17:27 +01:00
### Response Types
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web
import json
class ResponseExamples:
# Text response
async def text_response(self, request: web.Request) -> web.Response:
return web.Response(
text='Plain text response',
content_type='text/plain'
)
# JSON response
async def json_response(self, request: web.Request) -> web.Response:
return web.json_response({
'status': 'success',
'data': {'key': 'value'}
})
# HTML response
async def html_response(self, request: web.Request) -> web.Response:
html = """
<!DOCTYPE html>
<html>
<head><title>Example</title></head>
<body><h1>Hello, World!</h1></body>
</html>
"""
return web.Response(
text=html,
content_type='text/html'
)
# Binary response
async def binary_response(self, request: web.Request) -> web.Response:
data = b'\x00\x01\x02\x03\x04'
return web.Response(
body=data,
content_type='application/octet-stream'
)
# File download
async def file_download(self, request: web.Request) -> web.Response:
return web.FileResponse(
path='./example.pdf',
headers={
'Content-Disposition': 'attachment; filename="example.pdf"'
}
)
# Streaming response
async def streaming_response(self, request: web.Request) -> web.StreamResponse:
response = web.StreamResponse()
response.headers['Content-Type'] = 'text/plain'
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return response
# Redirect
async def redirect_response(self, request: web.Request) -> web.Response:
raise web.HTTPFound('/new-location')
# Custom status codes
async def custom_status(self, request: web.Request) -> web.Response:
return web.json_response(
{'message': 'Created'},
status=201
)
```
2025-11-04 05:17:27 +01:00
---
2025-11-04 05:17:27 +01:00
## Request Validation with Pydantic
2025-11-04 05:17:27 +01:00
### Basic Pydantic Integration
2025-11-04 05:17:27 +01:00
```python
from pydantic import BaseModel, Field, field_validator, ConfigDict
from pydantic import EmailStr, HttpUrl
from aiohttp import web
from typing import Optional, Literal
# Pydantic 2.12+ models
class UserCreate(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
username: str = Field(min_length=3, max_length=50)
email: EmailStr
password: str = Field(min_length=8)
age: Optional[int] = Field(None, ge=18, le=120)
role: Literal['user', 'admin', 'moderator'] = 'user'
website: Optional[HttpUrl] = None
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
if not v.isalnum():
raise ValueError('Username must be alphanumeric')
return v.lower()
@field_validator('password')
@classmethod
def validate_password(cls, v: str) -> str:
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain digit')
return v
class UserResponse(BaseModel):
user_id: str
username: str
email: EmailStr
role: str
created_at: float
# Handler with validation
async def create_user(request: web.Request) -> web.Response:
try:
data = await request.json()
user_data = UserCreate(**data)
except ValueError as e:
return web.json_response(
{'error': 'Validation error', 'details': str(e)},
status=400
)
# Process validated data
user = UserResponse(
user_id='123',
username=user_data.username,
email=user_data.email,
role=user_data.role,
created_at=time.time()
)
return web.json_response(
user.model_dump(),
status=201
)
2025-11-04 05:17:27 +01:00
```
### Advanced Validation Patterns
2025-11-04 05:17:27 +01:00
```python
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Any, Optional
from enum import Enum
class Priority(str, Enum):
LOW = 'low'
MEDIUM = 'medium'
HIGH = 'high'
URGENT = 'urgent'
class TaskCreate(BaseModel):
model_config = ConfigDict(
str_strip_whitespace=True,
extra='forbid' # Reject extra fields
)
title: str = Field(min_length=1, max_length=200)
description: Optional[str] = Field(None, max_length=5000)
priority: Priority = Priority.MEDIUM
tags: list[str] = Field(default_factory=list, max_length=10)
due_date: Optional[float] = None
assigned_to: Optional[str] = None
@field_validator('tags')
@classmethod
def validate_tags(cls, v: list[str]) -> list[str]:
if len(v) != len(set(v)):
raise ValueError('Tags must be unique')
return [tag.lower() for tag in v]
@model_validator(mode='after')
def validate_model(self) -> 'TaskCreate':
if self.priority == Priority.URGENT and not self.assigned_to:
raise ValueError('Urgent tasks must be assigned')
if self.due_date and self.due_date < time.time():
raise ValueError('Due date cannot be in the past')
return self
# Middleware for automatic validation
@web.middleware
async def validation_middleware(request: web.Request, handler):
# Get validation schema from route
schema = getattr(handler, '_validation_schema', None)
if schema and request.method in ('POST', 'PUT', 'PATCH'):
try:
data = await request.json()
validated = schema(**data)
request['validated_data'] = validated
except ValueError as e:
return web.json_response(
{'error': 'Validation error', 'details': str(e)},
status=400
)
return await handler(request)
# Decorator for validation
def validate_with(schema: type[BaseModel]):
def decorator(handler):
handler._validation_schema = schema
return handler
return decorator
# Usage
@validate_with(TaskCreate)
async def create_task(request: web.Request) -> web.Response:
task_data: TaskCreate = request['validated_data']
# Data is already validated
return web.json_response({
'task_id': 'task-123',
'title': task_data.title,
'priority': task_data.priority.value
}, status=201)
2025-11-04 05:17:27 +01:00
```
### Query Parameter Validation
2025-11-04 05:17:27 +01:00
```python
from pydantic import BaseModel, Field
from typing import Optional
class PaginationParams(BaseModel):
page: int = Field(1, ge=1, le=1000)
per_page: int = Field(20, ge=1, le=100)
sort_by: Optional[str] = Field(None, pattern=r'^[a-zA-Z_]+$')
order: Literal['asc', 'desc'] = 'asc'
@property
def offset(self) -> int:
return (self.page - 1) * self.per_page
@property
def limit(self) -> int:
return self.per_page
async def list_items(request: web.Request) -> web.Response:
try:
params = PaginationParams(**request.query)
except ValueError as e:
return web.json_response(
{'error': 'Invalid parameters', 'details': str(e)},
status=400
)
# Use validated params
items = [] # Fetch from database with params.offset and params.limit
return web.json_response({
'items': items,
'page': params.page,
'per_page': params.per_page,
'total': 100
})
2025-11-04 05:17:27 +01:00
```
---
## WebSocket Implementation
2025-11-04 05:17:27 +01:00
### Basic WebSocket Server
```python
from aiohttp import web, WSMsgType
import asyncio
class WebSocketHandler:
def __init__(self):
self.active_connections: set[web.WebSocketResponse] = set()
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
self.active_connections.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
# Echo message back
await ws.send_str(f"Echo: {msg.data}")
# Broadcast to all connections
await self.broadcast(f"User says: {msg.data}")
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
finally:
self.active_connections.discard(ws)
return ws
async def broadcast(self, message: str):
if self.active_connections:
await asyncio.gather(
*[ws.send_str(message) for ws in self.active_connections],
return_exceptions=True
)
# Setup
app = web.Application()
handler = WebSocketHandler()
app.router.add_get('/ws', handler.websocket_handler)
2025-11-04 05:17:27 +01:00
```
### Advanced WebSocket Server with Authentication
```python
from aiohttp import web, WSMsgType
import json
import asyncio
from typing import Optional
import jwt
class AuthenticatedWebSocketHandler:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.connections: dict[str, web.WebSocketResponse] = {}
def verify_token(self, token: str) -> Optional[dict]:
try:
return jwt.decode(token, self.secret_key, algorithms=['HS256'])
except jwt.InvalidTokenError:
return None
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse(heartbeat=30)
await ws.prepare(request)
user_id: Optional[str] = None
try:
# Wait for authentication message
msg = await asyncio.wait_for(ws.receive(), timeout=10.0)
if msg.type != WSMsgType.TEXT:
await ws.send_json({'error': 'Authentication required'})
await ws.close()
return ws
auth_data = json.loads(msg.data)
token = auth_data.get('token')
if not token:
await ws.send_json({'error': 'Token required'})
await ws.close()
return ws
payload = self.verify_token(token)
if not payload:
await ws.send_json({'error': 'Invalid token'})
await ws.close()
return ws
user_id = payload['user_id']
self.connections[user_id] = ws
await ws.send_json({
'type': 'auth_success',
'user_id': user_id
})
# Handle messages
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = json.loads(msg.data)
await self.handle_message(user_id, data)
elif msg.type == WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
except asyncio.TimeoutError:
await ws.send_json({'error': 'Authentication timeout'})
finally:
if user_id and user_id in self.connections:
del self.connections[user_id]
return ws
async def handle_message(self, user_id: str, data: dict):
message_type = data.get('type')
if message_type == 'ping':
await self.send_to_user(user_id, {'type': 'pong'})
elif message_type == 'broadcast':
await self.broadcast({
'type': 'message',
'from': user_id,
'content': data.get('content')
})
elif message_type == 'direct':
to_user = data.get('to')
await self.send_to_user(to_user, {
'type': 'direct_message',
'from': user_id,
'content': data.get('content')
})
async def send_to_user(self, user_id: str, message: dict):
ws = self.connections.get(user_id)
if ws and not ws.closed:
await ws.send_json(message)
async def broadcast(self, message: dict):
if self.connections:
await asyncio.gather(
*[ws.send_json(message) for ws in self.connections.values() if not ws.closed],
return_exceptions=True
)
```
### WebSocket Client
```python
import aiohttp
import asyncio
async def websocket_client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('http://localhost:8080/ws') as ws:
# Send authentication
await ws.send_json({
'token': 'your-jwt-token'
})
# Receive authentication response
msg = await ws.receive()
print(f"Auth response: {msg.data}")
# Send messages
await ws.send_json({
'type': 'broadcast',
'content': 'Hello, everyone!'
})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.json()
print(f"Received: {data}")
elif msg.type == aiohttp.WSMsgType.CLOSED:
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
asyncio.run(websocket_client())
```
---
## Testing with pytest and pytest-aiohttp
### Basic Test Setup
```python
# conftest.py
import pytest
import asyncio
from aiohttp import web
from typing import AsyncIterator
pytest_plugins = 'aiohttp.pytest_plugin'
@pytest.fixture
async def app() -> AsyncIterator[web.Application]:
app = web.Application()
async def hello(request):
return web.Response(text='Hello, World!')
app.router.add_get('/', hello)
yield app
# Cleanup
await app.cleanup()
@pytest.fixture
async def client(aiohttp_client, app):
return await aiohttp_client(app)
2025-11-04 05:17:27 +01:00
```
### Basic Tests
2025-11-04 05:17:27 +01:00
```python
# test_basic.py
import pytest
from aiohttp import web
@pytest.mark.asyncio
async def test_hello(client):
resp = await client.get('/')
assert resp.status == 200
text = await resp.text()
assert 'Hello, World!' in text
@pytest.mark.asyncio
async def test_json_endpoint(client):
resp = await client.post('/api/data', json={'key': 'value'})
assert resp.status == 200
data = await resp.json()
assert data['key'] == 'value'
```
### Testing with Fixtures
```python
# conftest.py
import pytest
import asyncio
from typing import AsyncIterator
import aiohttp
@pytest.fixture
async def http_session() -> AsyncIterator[aiohttp.ClientSession]:
session = aiohttp.ClientSession()
yield session
await session.close()
@pytest.fixture
def sample_user():
return {
'username': 'testuser',
'email': 'test@example.com',
'password': 'TestPass123'
}
@pytest.fixture
async def authenticated_client(client, sample_user):
# Login
resp = await client.post('/login', json=sample_user)
assert resp.status == 200
# Extract token
data = await resp.json()
token = data['token']
# Set authorization header
client.session.headers['Authorization'] = f'Bearer {token}'
yield client
# tests/test_auth.py
@pytest.mark.asyncio
async def test_protected_endpoint(authenticated_client):
resp = await authenticated_client.get('/api/protected')
assert resp.status == 200
data = await resp.json()
assert 'user_id' in data
```
### Parameterized Tests
```python
import pytest
@pytest.mark.parametrize('username,email,expected_status', [
('valid', 'valid@example.com', 201),
('ab', 'valid@example.com', 400), # Too short
('valid', 'invalid-email', 400), # Invalid email
('', 'valid@example.com', 400), # Empty username
])
@pytest.mark.asyncio
async def test_user_creation_validation(client, username, email, expected_status):
resp = await client.post('/users', json={
'username': username,
'email': email,
'password': 'ValidPass123'
})
assert resp.status == expected_status
@pytest.mark.parametrize('method,path,expected', [
('GET', '/', 200),
('GET', '/api/users', 200),
('POST', '/api/users', 401), # Requires auth
('GET', '/nonexistent', 404),
])
@pytest.mark.asyncio
async def test_endpoints(client, method, path, expected):
if method == 'GET':
resp = await client.get(path)
elif method == 'POST':
resp = await client.post(path, json={})
assert resp.status == expected
```
### Mocking External APIs
```python
import pytest
from unittest.mock import AsyncMock, patch
from aiohttp import web
@pytest.mark.asyncio
async def test_external_api_call(client):
# Mock external API
with patch('aiohttp.ClientSession.get') as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={'data': 'mocked'})
mock_get.return_value.__aenter__.return_value = mock_response
resp = await client.get('/api/external')
assert resp.status == 200
data = await resp.json()
assert data['data'] == 'mocked'
@pytest.fixture
async def mock_database():
class MockDB:
def __init__(self):
self.data = {}
async def get(self, key):
return self.data.get(key)
async def set(self, key, value):
self.data[key] = value
async def delete(self, key):
if key in self.data:
del self.data[key]
return MockDB()
@pytest.mark.asyncio
async def test_with_mock_db(client, mock_database):
# Inject mock database into app
client.app['db'] = mock_database
# Test database operations
await mock_database.set('user:1', {'name': 'Test'})
resp = await client.get('/users/1')
assert resp.status == 200
```
### Testing WebSockets
```python
import pytest
from aiohttp import WSMsgType
@pytest.mark.asyncio
async def test_websocket_echo(aiohttp_client):
app = web.Application()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == WSMsgType.TEXT:
await ws.send_str(f"Echo: {msg.data}")
return ws
app.router.add_get('/ws', websocket_handler)
client = await aiohttp_client(app)
async with client.ws_connect('/ws') as ws:
await ws.send_str('Hello')
msg = await ws.receive()
assert msg.data == 'Echo: Hello'
@pytest.mark.asyncio
async def test_websocket_broadcast(aiohttp_client):
from collections import defaultdict
connections = set()
app = web.Application()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
connections.add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
# Broadcast to all
for conn in connections:
if conn != ws:
await conn.send_str(msg.data)
finally:
connections.discard(ws)
return ws
app.router.add_get('/ws', websocket_handler)
client = await aiohttp_client(app)
# Create two connections
async with client.ws_connect('/ws') as ws1:
async with client.ws_connect('/ws') as ws2:
await ws1.send_str('Hello from ws1')
msg = await ws2.receive()
assert msg.data == 'Hello from ws1'
```
### Testing Middleware
```python
import pytest
from aiohttp import web
@pytest.mark.asyncio
async def test_auth_middleware(aiohttp_client):
@web.middleware
async def auth_middleware(request, handler):
token = request.headers.get('Authorization')
if not token or not token.startswith('Bearer '):
raise web.HTTPUnauthorized()
request['user_id'] = 'user-123'
return await handler(request)
app = web.Application(middlewares=[auth_middleware])
async def protected(request):
return web.json_response({'user_id': request['user_id']})
app.router.add_get('/protected', protected)
client = await aiohttp_client(app)
# Without token
resp = await client.get('/protected')
assert resp.status == 401
# With token
resp = await client.get('/protected', headers={
'Authorization': 'Bearer valid-token'
})
assert resp.status == 200
data = await resp.json()
assert data['user_id'] == 'user-123'
```
### Coverage and Best Practices
```python
# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
"--verbose",
"--strict-markers",
"--cov=app",
"--cov-report=html",
"--cov-report=term-missing",
]
# Best practices
# 1. Keep tests independent
# 2. Use fixtures for common setup
# 3. Mock external dependencies
# 4. Test edge cases
# 5. Use parametrize for similar tests
# 6. Clean up resources properly
```
---
## Advanced Middleware and Error Handling
### Error Handling Middleware
```python
from aiohttp import web
import logging
from typing import Callable, Awaitable
logger = logging.getLogger(__name__)
@web.middleware
async def error_middleware(
request: web.Request,
handler: Callable[[web.Request], Awaitable[web.Response]]
) -> web.Response:
try:
return await handler(request)
except web.HTTPException as e:
# HTTP exceptions should pass through
raise
except ValueError as e:
logger.warning(f"Validation error: {e}")
return web.json_response(
{
'error': 'Validation Error',
'message': str(e)
},
status=400
)
except PermissionError as e:
logger.warning(f"Permission denied: {e}")
return web.json_response(
{
'error': 'Forbidden',
'message': 'You do not have permission to access this resource'
},
status=403
)
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return web.json_response(
{
'error': 'Internal Server Error',
'message': 'An unexpected error occurred'
},
status=500
)
```
### Logging Middleware
```python
import time
import logging
from aiohttp import web
logger = logging.getLogger(__name__)
@web.middleware
async def logging_middleware(request: web.Request, handler):
start_time = time.time()
# Log request
logger.info(
f"Request started",
extra={
'method': request.method,
'path': request.path,
'query': dict(request.query),
'remote': request.remote
}
)
try:
response = await handler(request)
# Log response
duration = time.time() - start_time
logger.info(
f"Request completed",
extra={
'method': request.method,
'path': request.path,
'status': response.status,
'duration_ms': duration * 1000
}
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
f"Request failed",
extra={
'method': request.method,
'path': request.path,
'duration_ms': duration * 1000,
'error': str(e)
},
exc_info=True
)
raise
```
### CORS Middleware
```python
from aiohttp import web
from typing import Optional
@web.middleware
async def cors_middleware(request: web.Request, handler):
# Handle preflight
if request.method == 'OPTIONS':
response = web.Response()
else:
response = await handler(request)
# Add CORS headers
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Max-Age'] = '3600'
return response
# Or use aiohttp-cors library
import aiohttp_cors
app = web.Application()
# Configure CORS
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*"
)
})
# Add routes
resource = app.router.add_resource("/api/endpoint")
route = resource.add_route("GET", handler)
cors.add(route)
```
### Rate Limiting Middleware
```python
from aiohttp import web
import time
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter:
def __init__(self, max_requests: int = 100, window: int = 60):
self.max_requests = max_requests
self.window = window
self.requests: Dict[str, list[float]] = defaultdict(list)
def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
now = time.time()
cutoff = now - self.window
# Remove old requests
self.requests[client_id] = [
req_time for req_time in self.requests[client_id]
if req_time > cutoff
]
if len(self.requests[client_id]) >= self.max_requests:
oldest = self.requests[client_id][0]
retry_after = oldest + self.window - now
return False, retry_after
self.requests[client_id].append(now)
return True, None
rate_limiter = RateLimiter(max_requests=100, window=60)
@web.middleware
async def rate_limit_middleware(request: web.Request, handler):
# Use IP address as client identifier
client_id = request.remote
allowed, retry_after = rate_limiter.is_allowed(client_id)
if not allowed:
return web.json_response(
{
'error': 'Rate limit exceeded',
'retry_after': int(retry_after)
},
status=429,
headers={'Retry-After': str(int(retry_after))}
)
return await handler(request)
```
---
## Performance Optimization
### Connection Pooling
```python
import aiohttp
from aiohttp import TCPConnector, ClientTimeout
class OptimizedClient:
def __init__(self, base_url: str):
self.base_url = base_url
# Optimized connector
self.connector = TCPConnector(
limit=100, # Total connections
limit_per_host=30, # Per host
ttl_dns_cache=300, # DNS cache TTL
force_close=False, # Keep-alive
enable_cleanup_closed=True,
use_dns_cache=True
)
# Optimized timeout
self.timeout = ClientTimeout(
total=30,
connect=10,
sock_read=20
)
self._session: Optional[aiohttp.ClientSession] = None
async def start(self):
self._session = aiohttp.ClientSession(
base_url=self.base_url,
connector=self.connector,
timeout=self.timeout,
connector_owner=True,
auto_decompress=True,
trust_env=True,
read_bufsize=2**16 # 64KB buffer
)
async def close(self):
if self._session:
await self._session.close()
await asyncio.sleep(0.25)
```
### Concurrent Requests
```python
import asyncio
import aiohttp
from typing import List, Any
async def fetch_many(urls: List[str]) -> List[Any]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_one(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.json()
# With semaphore for limiting concurrency
async def fetch_with_limit(urls: List[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_limited(url: str):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[fetch_limited(url) for url in urls])
```
### Streaming Large Responses
```python
async def download_large_file(url: str, filepath: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open(filepath, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
# Server-side streaming
async def stream_large_response(request: web.Request) -> web.StreamResponse:
response = web.StreamResponse()
response.headers['Content-Type'] = 'application/octet-stream'
await response.prepare(request)
# Stream data in chunks
with open('large_file.dat', 'rb') as f:
while chunk := f.read(8192):
await response.write(chunk)
await response.write_eof()
return response
```
### Caching
```python
from functools import lru_cache
import time
class CachedClient:
def __init__(self):
self.cache = {}
self.cache_ttl = 300 # 5 minutes
async def get_with_cache(self, url: str):
now = time.time()
# Check cache
if url in self.cache:
data, timestamp = self.cache[url]
if now - timestamp < self.cache_ttl:
return data
# Fetch and cache
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
self.cache[url] = (data, now)
return data
```
2025-11-04 05:17:27 +01:00
---
2025-11-04 05:17:27 +01:00
## Git Protocol Integration
2025-11-04 05:17:27 +01:00
### Understanding Git Smart HTTP
2025-11-04 05:17:27 +01:00
Git Smart HTTP protocol allows git clients to clone, fetch, and push over HTTP/HTTPS. The protocol involves:
2025-11-04 05:17:27 +01:00
1. **Service Discovery**: Client requests `/info/refs?service=git-upload-pack` or `git-receive-pack`
2. **Negotiation**: Client and server negotiate which objects to transfer
3. **Pack Transfer**: Server sends/receives packfiles
2025-11-04 05:17:27 +01:00
### Basic Git HTTP Backend
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web
import subprocess
import os
from pathlib import Path
class GitHTTPBackend:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.git_backend = '/usr/lib/git-core/git-http-backend'
async def handle_info_refs(self, request: web.Request) -> web.Response:
repo_path = request.match_info['repo']
service = request.query.get('service', '')
if service not in ('git-upload-pack', 'git-receive-pack'):
return web.Response(status=400, text='Invalid service')
full_path = self.repo_root / repo_path
if not full_path.exists():
return web.Response(status=404, text='Repository not found')
# Build environment
env = os.environ.copy()
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
env['GIT_HTTP_EXPORT_ALL'] = '1'
env['PATH_INFO'] = f'/{repo_path}/info/refs'
env['QUERY_STRING'] = f'service={service}'
env['REQUEST_METHOD'] = 'GET'
# Execute git-http-backend
proc = await asyncio.create_subprocess_exec(
self.git_backend,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.Response(status=500, text=stderr.decode())
# Parse CGI output
headers_end = stdout.find(b'\r\n\r\n')
if headers_end == -1:
return web.Response(status=500)
header_lines = stdout[:headers_end].decode().split('\r\n')
body = stdout[headers_end + 4:]
# Parse headers
headers = {}
for line in header_lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return web.Response(
body=body,
headers=headers,
status=200
)
async def handle_service(self, request: web.Request) -> web.Response:
repo_path = request.match_info['repo']
service = request.match_info['service']
if service not in ('git-upload-pack', 'git-receive-pack'):
return web.Response(status=400)
full_path = self.repo_root / repo_path
if not full_path.exists():
return web.Response(status=404)
# Read request body
body = await request.read()
# Build environment
env = os.environ.copy()
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
env['GIT_HTTP_EXPORT_ALL'] = '1'
env['PATH_INFO'] = f'/{repo_path}/{service}'
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = request.content_type
env['CONTENT_LENGTH'] = str(len(body))
# Execute git service
proc = await asyncio.create_subprocess_exec(
self.git_backend,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout, stderr = await proc.communicate(input=body)
if proc.returncode != 0:
return web.Response(status=500, text=stderr.decode())
# Parse CGI output
headers_end = stdout.find(b'\r\n\r\n')
header_lines = stdout[:headers_end].decode().split('\r\n')
body = stdout[headers_end + 4:]
headers = {}
for line in header_lines:
if ':' in line:
key, value = line.split(':', 1)
headers[key.strip()] = value.strip()
return web.Response(
body=body,
headers=headers,
status=200
)
# Setup routes
git_backend = GitHTTPBackend(Path('/var/git/repos'))
app = web.Application()
app.router.add_get('/{repo:.+}/info/refs', git_backend.handle_info_refs)
app.router.add_post('/{repo:.+}/{service:(git-upload-pack|git-receive-pack)}', git_backend.handle_service)
```
2025-11-04 05:17:27 +01:00
### Git Backend with Authentication
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web
import base64
import subprocess
from pathlib import Path
class AuthenticatedGitBackend:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
self.users = {
'alice': 'password123',
'bob': 'secret456'
}
2025-11-04 05:17:27 +01:00
def verify_auth(self, request: web.Request) -> tuple[bool, str | None]:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Basic '):
return False, None
try:
encoded = auth_header[6:]
decoded = base64.b64decode(encoded).decode()
username, password = decoded.split(':', 1)
if self.users.get(username) == password:
return True, username
except:
pass
return False, None
@web.middleware
async def auth_middleware(self, request: web.Request, handler):
# Allow anonymous reads
service = request.query.get('service', '')
if request.method == 'GET' and service == 'git-upload-pack':
return await handler(request)
# Require authentication for pushes
if service == 'git-receive-pack' or 'git-receive-pack' in request.path:
authorized, username = self.verify_auth(request)
if not authorized:
return web.Response(
status=401,
headers={'WWW-Authenticate': 'Basic realm="Git Access"'},
text='Authentication required'
)
request['username'] = username
return await handler(request)
async def handle_info_refs(self, request: web.Request):
# Git info/refs implementation
# Similar to previous example but with auth
pass
async def handle_service(self, request: web.Request):
# Git service implementation
# Similar to previous example but with auth
pass
```
2025-11-04 05:17:27 +01:00
---
2025-11-04 05:17:27 +01:00
## Repository Manager Implementation
2025-11-04 05:17:27 +01:00
### Repository Browser
2025-11-04 05:17:27 +01:00
```python
from aiohttp import web
import os
from pathlib import Path
import subprocess
from typing import Optional
import json
class RepositoryManager:
def __init__(self, repo_root: Path):
self.repo_root = repo_root
async def list_repositories(self, request: web.Request) -> web.Response:
repos = []
for item in self.repo_root.iterdir():
if item.is_dir() and (item / '.git').exists():
repos.append({
'name': item.name,
'path': str(item.relative_to(self.repo_root)),
'type': 'git'
})
return web.json_response({'repositories': repos})
async def get_repository(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# Get repository info
info = await self._get_repo_info(repo_path)
return web.json_response(info)
async def create_repository(self, request: web.Request) -> web.Response:
data = await request.json()
repo_name = data.get('name')
if not repo_name:
return web.json_response(
{'error': 'Repository name required'},
status=400
)
repo_path = self.repo_root / repo_name
if repo_path.exists():
return web.json_response(
{'error': 'Repository already exists'},
status=400
)
# Create bare repository
repo_path.mkdir(parents=True)
proc = await asyncio.create_subprocess_exec(
'git', 'init', '--bare', str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': 'Failed to create repository'},
status=500
)
return web.json_response({
'name': repo_name,
'path': str(repo_path.relative_to(self.repo_root))
}, status=201)
async def delete_repository(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# Delete repository directory
import shutil
shutil.rmtree(repo_path)
return web.Response(status=204)
async def browse_tree(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
path = request.query.get('path', '')
repo_path = self.repo_root / repo_name
if not repo_path.exists():
return web.json_response(
{'error': 'Repository not found'},
status=404
)
# List files in tree
proc = await asyncio.create_subprocess_exec(
'git', 'ls-tree', ref, path,
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': stderr.decode()},
status=400
)
# Parse ls-tree output
entries = []
for line in stdout.decode().strip().split('\n'):
if not line:
continue
mode, type_, hash_, name = line.split(None, 3)
entries.append({
'mode': mode,
'type': type_,
'hash': hash_,
'name': name
})
return web.json_response({'entries': entries})
async def get_file_content(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
path = request.query['path']
repo_path = self.repo_root / repo_name
# Get file content
proc = await asyncio.create_subprocess_exec(
'git', 'show', f'{ref}:{path}',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
return web.json_response(
{'error': 'File not found'},
status=404
)
return web.Response(
body=stdout,
content_type='text/plain'
)
async def get_commits(self, request: web.Request) -> web.Response:
repo_name = request.match_info['repo']
ref = request.query.get('ref', 'HEAD')
limit = int(request.query.get('limit', '50'))
repo_path = self.repo_root / repo_name
# Get commit log
proc = await asyncio.create_subprocess_exec(
'git', 'log', ref,
'--pretty=format:%H|%an|%ae|%at|%s',
f'-{limit}',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
commits = []
for line in stdout.decode().strip().split('\n'):
if not line:
continue
hash_, author, email, timestamp, message = line.split('|', 4)
commits.append({
'hash': hash_,
'author': author,
'email': email,
'timestamp': int(timestamp),
'message': message
})
return web.json_response({'commits': commits})
async def _get_repo_info(self, repo_path: Path) -> dict:
# Get HEAD
proc = await asyncio.create_subprocess_exec(
'git', 'rev-parse', 'HEAD',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
head = stdout.decode().strip() if proc.returncode == 0 else None
# Get branches
proc = await asyncio.create_subprocess_exec(
'git', 'branch', '-a',
cwd=str(repo_path),
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
branches = [
line.strip().lstrip('* ')
for line in stdout.decode().strip().split('\n')
]
return {
'name': repo_path.name,
'head': head,
'branches': branches
}
2025-11-04 05:17:27 +01:00
# Setup routes
repo_manager = RepositoryManager(Path('/var/git/repos'))
app.router.add_get('/api/repositories', repo_manager.list_repositories)
app.router.add_get('/api/repositories/{repo}', repo_manager.get_repository)
app.router.add_post('/api/repositories', repo_manager.create_repository)
app.router.add_delete('/api/repositories/{repo}', repo_manager.delete_repository)
app.router.add_get('/api/repositories/{repo}/tree', repo_manager.browse_tree)
app.router.add_get('/api/repositories/{repo}/file', repo_manager.get_file_content)
app.router.add_get('/api/repositories/{repo}/commits', repo_manager.get_commits)
```
2025-11-04 05:17:27 +01:00
---
## Best Practices and Patterns
### Application Structure
```
project/
├── app/
│ ├── __init__.py
│ ├── main.py # Application entry point
│ ├── routes.py # Route definitions
│ ├── handlers.py # Request handlers
│ ├── middleware.py # Custom middleware
│ ├── models.py # Pydantic models
│ ├── database.py # Database connections
│ └── utils.py # Utility functions
├── tests/
│ ├── __init__.py
│ ├── conftest.py # Test fixtures
│ ├── test_handlers.py
│ └── test_integration.py
├── requirements.txt
├── pyproject.toml
└── README.md
```
### Configuration Management
```python
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
model_config = ConfigDict(
env_file='.env',
env_file_encoding='utf-8',
case_sensitive=False
)
# Server
host: str = '127.0.0.1'
port: int = 8080
debug: bool = False
# Database
database_url: str
database_pool_size: int = 10
# Security
secret_key: str
jwt_algorithm: str = 'HS256'
jwt_expiration: int = 3600
# External APIs
external_api_url: Optional[str] = None
external_api_key: Optional[str] = None
settings = Settings()
```
### Graceful Shutdown
```python
from aiohttp import web
import asyncio
import signal
class Application:
def __init__(self):
self.app = web.Application()
self.cleanup_tasks = []
async def startup(self):
# Initialize resources
pass
async def cleanup(self):
# Cleanup resources
for task in self.cleanup_tasks:
await task
async def shutdown(self, app):
# Close database connections
# Close HTTP sessions
# Wait for background tasks
await self.cleanup()
def run(self):
self.app.on_startup.append(lambda app: self.startup())
self.app.on_cleanup.append(self.shutdown)
web.run_app(
self.app,
host='127.0.0.1',
port=8080,
shutdown_timeout=60.0
)
```
### Summary
This guide covers:
- Python 3.13 modern features and type hints
- aiohttp 3.13+ client and server development
- Complete authentication patterns (Basic, Bearer, API Key, OAuth2)
- Pydantic 2.12+ validation
- WebSocket implementation
- Comprehensive testing with pytest-aiohttp
- Git protocol integration
- Repository management system
- Performance optimization
- Production-ready patterns
**Key Takeaways:**
1. Always reuse ClientSession across requests
2. Use type hints and Pydantic for validation
3. Implement proper error handling and middleware
4. Write comprehensive tests with pytest-aiohttp
5. Follow async/await patterns consistently
6. Optimize connection pooling and timeouts
7. Handle cleanup and graceful shutdown properly
---
*Guide Version: 1.0*
*Last Updated: November 2025*
*Compatible with: Python 3.13.3, aiohttp 3.13.2, pytest-aiohttp 1.1.0, pydantic 2.12.3*