|
# Retoor's Guide to Modern Python: Mastering aiohttp 3.13+ with Python 3.13
|
|
|
|
**Complete Tutorial: aiohttp, Testing, Authentication, WebSockets, and Git Protocol Integration**
|
|
|
|
Version Requirements:
|
|
- Python: **3.13.3** (Released October 7, 2024)
|
|
- aiohttp: **3.13.2** (Latest stable as of October 28, 2025)
|
|
- pytest: **8.3+**
|
|
- pytest-aiohttp: **1.1.0** (Released January 23, 2025)
|
|
- pytest-asyncio: **1.2.0** (Released September 12, 2025)
|
|
- pydantic: **2.12.3** (Released October 17, 2025)
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [Python 3.13 Modern Features](#python-313-modern-features)
|
|
2. [aiohttp Fundamentals](#aiohttp-fundamentals)
|
|
3. [Client Sessions and Connection Management](#client-sessions-and-connection-management)
|
|
4. [Authentication Patterns](#authentication-patterns)
|
|
5. [Server Development](#server-development)
|
|
6. [Request Validation with Pydantic](#request-validation-with-pydantic)
|
|
7. [WebSocket Implementation](#websocket-implementation)
|
|
8. [Testing with pytest and pytest-aiohttp](#testing-with-pytest-and-pytest-aiohttp)
|
|
9. [Advanced Middleware and Error Handling](#advanced-middleware-and-error-handling)
|
|
10. [Performance Optimization](#performance-optimization)
|
|
11. [Git Protocol Integration](#git-protocol-integration)
|
|
12. [Repository Manager Implementation](#repository-manager-implementation)
|
|
13. [Best Practices and Patterns](#best-practices-and-patterns)
|
|
14. [Automatic Memory and Context Search](#automatic-memory-and-context-search)
|
|
|
|
---
|
|
|
|
## Python 3.13 Modern Features
|
|
|
|
### Key Python 3.13 Enhancements
|
|
|
|
Python 3.13 introduces significant improvements for asynchronous programming:
|
|
|
|
**Experimental Free-Threaded Mode (No GIL)**
|
|
- Enable true multi-threading with `python -X gil=0`
|
|
- Significant for CPU-bound async operations
|
|
- Better performance on multi-core systems
|
|
|
|
**JIT Compiler (Preview)**
|
|
- Just-In-Time compilation for performance boosts
|
|
- Enable with `PYTHON_JIT=1` environment variable
|
|
- Early benchmarks show 10-20% improvements
|
|
|
|
**Enhanced Interactive Interpreter**
|
|
- Multi-line editing with syntax highlighting
|
|
- Colorized tracebacks for better debugging
|
|
- Improved REPL experience
|
|
|
|
**Better Error Messages**
|
|
- More precise error locations
|
|
- Clearer exception messages
|
|
- Context-aware suggestions
|
|
|
|
### Modern Type Hints (PEP 695)
|
|
|
|
Python 3.13 fully supports modern generic syntax:
|
|
|
|
```python
|
|
from typing import TypeVar, Generic
|
|
from collections.abc import Sequence, Mapping
|
|
|
|
# Old style (still works)
|
|
T = TypeVar('T')
|
|
class OldGeneric(Generic[T]):
|
|
def process(self, item: T) -> T:
|
|
return item
|
|
|
|
# New PEP 695 style (Python 3.12+)
|
|
class NewGeneric[T]:
|
|
def process(self, item: T) -> T:
|
|
return item
|
|
|
|
# Type aliases with 'type' keyword
|
|
type RequestHandler[T] = Callable[[Request], Awaitable[T]]
|
|
type JSONDict = dict[str, str | int | float | bool | None]
|
|
|
|
# Modern function annotations
|
|
async def fetch_data[T](url: str, parser: Callable[[bytes], T]) -> T | None:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return parser(await response.read())
|
|
return None
|
|
```
|
|
|
|
### Dataclasses in Python 3.13
|
|
|
|
```python
|
|
from dataclasses import dataclass, field, replace
|
|
from typing import ClassVar
|
|
import copy
|
|
|
|
@dataclass(slots=True, kw_only=True)
|
|
class User:
|
|
user_id: str
|
|
username: str
|
|
email: str
|
|
created_at: float = field(default_factory=time.time)
|
|
is_active: bool = True
|
|
_password_hash: str = field(repr=False, compare=False)
|
|
|
|
# New in 3.13: __static_attributes__
|
|
def __post_init__(self):
|
|
self.last_login: float | None = None
|
|
|
|
@property
|
|
def is_new(self) -> bool:
|
|
return time.time() - self.created_at < 86400
|
|
|
|
# New in 3.13: copy.replace() works with dataclasses
|
|
user = User(user_id="123", username="alice", email="alice@example.com", _password_hash="hashed")
|
|
updated = copy.replace(user, username="alice_updated")
|
|
|
|
# Access static attributes (new in 3.13)
|
|
print(User.__static_attributes__) # ('last_login',)
|
|
```
|
|
|
|
### Modern Async Patterns
|
|
|
|
```python
|
|
import asyncio
|
|
from collections.abc import AsyncIterator
|
|
|
|
# Async generators
|
|
async def fetch_paginated[T](
|
|
url: str,
|
|
parser: Callable[[dict], T]
|
|
) -> AsyncIterator[T]:
|
|
page = 1
|
|
async with aiohttp.ClientSession() as session:
|
|
while True:
|
|
async with session.get(f"{url}?page={page}") as response:
|
|
data = await response.json()
|
|
if not data['items']:
|
|
break
|
|
for item in data['items']:
|
|
yield parser(item)
|
|
page += 1
|
|
|
|
# Context manager pattern
|
|
class AsyncResourceManager:
|
|
async def __aenter__(self):
|
|
self.session = aiohttp.ClientSession()
|
|
await self.session.__aenter__()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.session.__aexit__(exc_type, exc_val, exc_tb)
|
|
```
|
|
|
|
---
|
|
|
|
## aiohttp Fundamentals
|
|
|
|
### Installation and Setup
|
|
|
|
```bash
|
|
# Core installation
|
|
pip install aiohttp==3.13.2
|
|
|
|
# With speedups (highly recommended)
|
|
pip install aiohttp[speedups]==3.13.2
|
|
|
|
# Additional recommended packages
|
|
pip install aiodns>=3.0.0 # Fast DNS resolution
|
|
pip install Brotli>=1.0.9 # Brotli compression support
|
|
pip install pydantic>=2.12.3 # Request validation
|
|
```
|
|
|
|
### Basic Client Usage
|
|
|
|
```python
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
async def fetch_example():
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get('https://api.example.com/data') as response:
|
|
print(f"Status: {response.status}")
|
|
print(f"Content-Type: {response.headers['content-type']}")
|
|
|
|
# Various response methods
|
|
text = await response.text() # Text content
|
|
data = await response.json() # JSON parsing
|
|
raw = await response.read() # Raw bytes
|
|
|
|
asyncio.run(fetch_example())
|
|
```
|
|
|
|
### Basic Server Usage
|
|
|
|
```python
|
|
from aiohttp import web
|
|
|
|
async def hello_handler(request: web.Request) -> web.Response:
|
|
name = request.match_info.get('name', 'Anonymous')
|
|
return web.Response(text=f"Hello, {name}!")
|
|
|
|
async def json_handler(request: web.Request) -> web.Response:
|
|
data = await request.json()
|
|
return web.json_response({
|
|
'status': 'success',
|
|
'received': data
|
|
})
|
|
|
|
app = web.Application()
|
|
app.router.add_get('/', hello_handler)
|
|
app.router.add_get('/{name}', hello_handler)
|
|
app.router.add_post('/api/data', json_handler)
|
|
|
|
if __name__ == '__main__':
|
|
web.run_app(app, host='127.0.0.1', port=8080)
|
|
```
|
|
|
|
---
|
|
|
|
## Client Sessions and Connection Management
|
|
|
|
### Session Management Best Practices
|
|
|
|
**Never create a new session for each request** - this is the most common mistake:
|
|
|
|
```python
|
|
# ❌ WRONG - Creates new session for every request
|
|
async def bad_example():
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get('https://api.example.com') as response:
|
|
return await response.text()
|
|
|
|
# Session destroyed here
|
|
|
|
# ✅ CORRECT - Reuse session across requests
|
|
class APIClient:
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url
|
|
self._session: aiohttp.ClientSession | None = None
|
|
|
|
async def __aenter__(self):
|
|
self._session = aiohttp.ClientSession(
|
|
base_url=self.base_url,
|
|
timeout=aiohttp.ClientTimeout(total=30),
|
|
connector=aiohttp.TCPConnector(
|
|
limit=100, # Total connection limit
|
|
limit_per_host=30, # Per-host limit
|
|
ttl_dns_cache=300, # DNS cache TTL
|
|
)
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._session:
|
|
await self._session.close()
|
|
|
|
async def get(self, path: str) -> dict:
|
|
async with self._session.get(path) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
# Usage
|
|
async def main():
|
|
async with APIClient('https://api.example.com') as client:
|
|
user = await client.get('/users/123')
|
|
posts = await client.get('/posts')
|
|
comments = await client.get('/comments')
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
### Advanced Session Configuration
|
|
|
|
```python
|
|
import aiohttp
|
|
from aiohttp import ClientTimeout, TCPConnector, ClientSession
|
|
from typing import Optional
|
|
|
|
class AdvancedHTTPClient:
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
timeout: int = 30,
|
|
max_connections: int = 100,
|
|
max_connections_per_host: int = 30,
|
|
headers: Optional[dict[str, str]] = None
|
|
):
|
|
self.base_url = base_url
|
|
self.timeout = ClientTimeout(
|
|
total=timeout,
|
|
connect=10, # Connection timeout
|
|
sock_read=20 # Socket read timeout
|
|
)
|
|
|
|
self.connector = TCPConnector(
|
|
limit=max_connections,
|
|
limit_per_host=max_connections_per_host,
|
|
ttl_dns_cache=300,
|
|
ssl=None, # SSL context if needed
|
|
force_close=False, # Keep connections alive
|
|
enable_cleanup_closed=True
|
|
)
|
|
|
|
self.default_headers = headers or {}
|
|
self._session: Optional[ClientSession] = None
|
|
|
|
async def start(self):
|
|
if self._session is None:
|
|
self._session = ClientSession(
|
|
base_url=self.base_url,
|
|
timeout=self.timeout,
|
|
connector=self.connector,
|
|
headers=self.default_headers,
|
|
raise_for_status=False,
|
|
connector_owner=True,
|
|
auto_decompress=True,
|
|
trust_env=True
|
|
)
|
|
|
|
async def close(self):
|
|
if self._session:
|
|
await self._session.close()
|
|
await asyncio.sleep(0.25) # Allow cleanup
|
|
|
|
async def __aenter__(self):
|
|
await self.start()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
**kwargs
|
|
) -> aiohttp.ClientResponse:
|
|
if not self._session:
|
|
await self.start()
|
|
|
|
return await self._session.request(method, path, **kwargs)
|
|
```
|
|
|
|
### Cookie Management
|
|
|
|
```python
|
|
import aiohttp
|
|
from http.cookies import SimpleCookie
|
|
|
|
# Automatic cookie handling
|
|
async def with_cookies():
|
|
# Create cookie jar
|
|
jar = aiohttp.CookieJar(unsafe=False) # Only HTTPS cookies
|
|
|
|
async with aiohttp.ClientSession(cookie_jar=jar) as session:
|
|
# Cookies are automatically stored and sent
|
|
await session.get('https://example.com/login')
|
|
|
|
# Manually update cookies
|
|
session.cookie_jar.update_cookies(
|
|
{'session_id': 'abc123'},
|
|
response_url='https://example.com'
|
|
)
|
|
|
|
# Access cookies
|
|
for cookie in session.cookie_jar:
|
|
print(f"{cookie.key}: {cookie.value}")
|
|
|
|
# Custom cookie handling
|
|
async def custom_cookies():
|
|
cookies = {'auth_token': 'xyz789'}
|
|
|
|
async with aiohttp.ClientSession(cookies=cookies) as session:
|
|
async with session.get('https://example.com/api') as response:
|
|
# Read response cookies
|
|
print(response.cookies)
|
|
```
|
|
|
|
---
|
|
|
|
## Authentication Patterns
|
|
|
|
### Basic Authentication
|
|
|
|
```python
|
|
import aiohttp
|
|
from aiohttp import BasicAuth
|
|
import base64
|
|
|
|
# Method 1: Using BasicAuth helper
|
|
async def basic_auth_helper(username: str, password: str):
|
|
auth = BasicAuth(login=username, password=password)
|
|
|
|
async with aiohttp.ClientSession(auth=auth) as session:
|
|
async with session.get('https://api.example.com/protected') as response:
|
|
return await response.json()
|
|
|
|
# Method 2: Manual base64 encoding
|
|
async def basic_auth_manual(username: str, password: str):
|
|
credentials = f"{username}:{password}"
|
|
encoded = base64.b64encode(credentials.encode()).decode()
|
|
|
|
headers = {'Authorization': f'Basic {encoded}'}
|
|
|
|
async with aiohttp.ClientSession(headers=headers) as session:
|
|
async with session.get('https://api.example.com/protected') as response:
|
|
return await response.json()
|
|
|
|
# Method 3: Per-request authentication
|
|
async def basic_auth_per_request(username: str, password: str, url: str):
|
|
auth = BasicAuth(login=username, password=password)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, auth=auth) as response:
|
|
return await response.json()
|
|
```
|
|
|
|
### Bearer Token Authentication
|
|
|
|
```python
|
|
class TokenAuthClient:
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url
|
|
self.token = token
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
|
|
async def __aenter__(self):
|
|
headers = {
|
|
'Authorization': f'Bearer {self.token}',
|
|
'Accept': 'application/json'
|
|
}
|
|
self._session = aiohttp.ClientSession(
|
|
base_url=self.base_url,
|
|
headers=headers
|
|
)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._session:
|
|
await self._session.close()
|
|
|
|
async def get(self, path: str) -> dict:
|
|
async with self._session.get(path) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
async def post(self, path: str, data: dict) -> dict:
|
|
async with self._session.post(path, json=data) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
|
|
# Usage
|
|
async def example():
|
|
async with TokenAuthClient('https://api.example.com', 'your_token_here') as client:
|
|
user = await client.get('/user')
|
|
result = await client.post('/items', {'name': 'test'})
|
|
```
|
|
|
|
### API Key Authentication
|
|
|
|
```python
|
|
class APIKeyClient:
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
api_key: str,
|
|
key_location: str = 'header', # 'header' or 'query'
|
|
key_name: str = 'X-API-Key'
|
|
):
|
|
self.base_url = base_url
|
|
self.api_key = api_key
|
|
self.key_location = key_location
|
|
self.key_name = key_name
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
|
|
async def __aenter__(self):
|
|
if self.key_location == 'header':
|
|
headers = {self.key_name: self.api_key}
|
|
self._session = aiohttp.ClientSession(
|
|
base_url=self.base_url,
|
|
headers=headers
|
|
)
|
|
else:
|
|
self._session = aiohttp.ClientSession(base_url=self.base_url)
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._session:
|
|
await self._session.close()
|
|
|
|
async def request(self, method: str, path: str, **kwargs):
|
|
if self.key_location == 'query':
|
|
params = kwargs.get('params', {})
|
|
params[self.key_name] = self.api_key
|
|
kwargs['params'] = params
|
|
|
|
async with self._session.request(method, path, **kwargs) as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
```
|
|
|
|
### Digest Authentication (aiohttp 3.12.8+)
|
|
|
|
```python
|
|
from aiohttp import ClientSession, DigestAuthMiddleware
|
|
|
|
async def digest_auth_example():
|
|
# Create digest auth middleware
|
|
digest_auth = DigestAuthMiddleware(
|
|
login="user",
|
|
password="password",
|
|
preemptive=True # New in 3.12.8: preemptive authentication
|
|
)
|
|
|
|
# Pass middleware to session
|
|
async with ClientSession(middlewares=(digest_auth,)) as session:
|
|
async with session.get("https://httpbin.org/digest-auth/auth/user/password") as resp:
|
|
print(await resp.text())
|
|
```
|
|
|
|
### OAuth 2.0 Token Refresh Pattern
|
|
|
|
```python
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
class OAuth2Client:
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
token_url: str
|
|
):
|
|
self.base_url = base_url
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.token_url = token_url
|
|
|
|
self._access_token: Optional[str] = None
|
|
self._token_expires_at: Optional[datetime] = None
|
|
self._refresh_token: Optional[str] = None
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def __aenter__(self):
|
|
self._session = aiohttp.ClientSession(base_url=self.base_url)
|
|
await self._ensure_token()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._session:
|
|
await self._session.close()
|
|
|
|
async def _ensure_token(self):
|
|
async with self._lock:
|
|
now = datetime.now()
|
|
if (
|
|
not self._access_token or
|
|
not self._token_expires_at or
|
|
now >= self._token_expires_at
|
|
):
|
|
await self._refresh_access_token()
|
|
|
|
async def _refresh_access_token(self):
|
|
data = {
|
|
'grant_type': 'client_credentials',
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(self.token_url, data=data) as response:
|
|
response.raise_for_status()
|
|
token_data = await response.json()
|
|
|
|
self._access_token = token_data['access_token']
|
|
expires_in = token_data.get('expires_in', 3600)
|
|
self._token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)
|
|
self._refresh_token = token_data.get('refresh_token')
|
|
|
|
async def request(self, method: str, path: str, **kwargs):
|
|
await self._ensure_token()
|
|
|
|
headers = kwargs.get('headers', {})
|
|
headers['Authorization'] = f'Bearer {self._access_token}'
|
|
kwargs['headers'] = headers
|
|
|
|
async with self._session.request(method, path, **kwargs) as response:
|
|
if response.status == 401:
|
|
await self._refresh_access_token()
|
|
headers['Authorization'] = f'Bearer {self._access_token}'
|
|
async with self._session.request(method, path, **kwargs) as retry:
|
|
retry.raise_for_status()
|
|
return await retry.json()
|
|
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
```
|
|
|
|
---
|
|
|
|
## Server Development
|
|
|
|
### Application Structure
|
|
|
|
```python
|
|
from aiohttp import web
|
|
from typing import Callable, Awaitable
|
|
|
|
# Type aliases
|
|
Handler = Callable[[web.Request], Awaitable[web.Response]]
|
|
|
|
class Application:
|
|
def __init__(self):
|
|
self.app = web.Application()
|
|
self.setup_routes()
|
|
self.setup_middlewares()
|
|
|
|
def setup_routes(self):
|
|
self.app.router.add_get('/', self.index)
|
|
self.app.router.add_get('/health', self.health)
|
|
|
|
# API routes
|
|
self.app.router.add_route('*', '/api/{path:.*}', self.api_handler)
|
|
|
|
def setup_middlewares(self):
|
|
self.app.middlewares.append(self.error_middleware)
|
|
self.app.middlewares.append(self.logging_middleware)
|
|
|
|
async def index(self, request: web.Request) -> web.Response:
|
|
return web.Response(text='Hello, World!')
|
|
|
|
async def health(self, request: web.Request) -> web.Response:
|
|
return web.json_response({'status': 'healthy'})
|
|
|
|
async def api_handler(self, request: web.Request) -> web.Response:
|
|
path = request.match_info['path']
|
|
return web.json_response({
|
|
'path': path,
|
|
'method': request.method
|
|
})
|
|
|
|
@web.middleware
|
|
async def error_middleware(self, request: web.Request, handler: Handler):
|
|
try:
|
|
return await handler(request)
|
|
except web.HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
return web.json_response(
|
|
{'error': str(e)},
|
|
status=500
|
|
)
|
|
|
|
@web.middleware
|
|
async def logging_middleware(self, request: web.Request, handler: Handler):
|
|
print(f"{request.method} {request.path}")
|
|
response = await handler(request)
|
|
print(f"Response: {response.status}")
|
|
return response
|
|
|
|
def run(self, host: str = '127.0.0.1', port: int = 8080):
|
|
web.run_app(self.app, host=host, port=port)
|
|
|
|
if __name__ == '__main__':
|
|
app = Application()
|
|
app.run()
|
|
```
|
|
|
|
### Request Handling
|
|
|
|
```python
|
|
from aiohttp import web, multipart
|
|
from typing import Optional
|
|
|
|
class RequestHandlers:
|
|
# Query parameters
|
|
async def query_params(self, request: web.Request) -> web.Response:
|
|
# Get single parameter
|
|
name = request.query.get('name', 'Anonymous')
|
|
|
|
# Get all values for a key
|
|
tags = request.query.getall('tag', [])
|
|
|
|
# Get as integer with default
|
|
page = int(request.query.get('page', '1'))
|
|
|
|
return web.json_response({
|
|
'name': name,
|
|
'tags': tags,
|
|
'page': page
|
|
})
|
|
|
|
# Path parameters
|
|
async def path_params(self, request: web.Request) -> web.Response:
|
|
user_id = request.match_info['user_id']
|
|
action = request.match_info.get('action', 'view')
|
|
|
|
return web.json_response({
|
|
'user_id': user_id,
|
|
'action': action
|
|
})
|
|
|
|
# JSON body
|
|
async def json_body(self, request: web.Request) -> web.Response:
|
|
try:
|
|
data = await request.json()
|
|
except ValueError:
|
|
return web.json_response(
|
|
{'error': 'Invalid JSON'},
|
|
status=400
|
|
)
|
|
|
|
return web.json_response({
|
|
'received': data,
|
|
'type': type(data).__name__
|
|
})
|
|
|
|
# Form data
|
|
async def form_data(self, request: web.Request) -> web.Response:
|
|
data = await request.post()
|
|
|
|
result = {}
|
|
for key in data:
|
|
value = data.get(key)
|
|
result[key] = value
|
|
|
|
return web.json_response(result)
|
|
|
|
# File upload
|
|
async def file_upload(self, request: web.Request) -> web.Response:
|
|
reader = await request.multipart()
|
|
|
|
uploaded_files = []
|
|
|
|
async for field in reader:
|
|
if field.filename:
|
|
size = 0
|
|
content = bytearray()
|
|
|
|
while True:
|
|
chunk = await field.read_chunk()
|
|
if not chunk:
|
|
break
|
|
size += len(chunk)
|
|
content.extend(chunk)
|
|
|
|
uploaded_files.append({
|
|
'filename': field.filename,
|
|
'size': size,
|
|
'content_type': field.headers.get('Content-Type')
|
|
})
|
|
|
|
return web.json_response({
|
|
'files': uploaded_files
|
|
})
|
|
|
|
# Headers
|
|
async def headers_example(self, request: web.Request) -> web.Response:
|
|
auth_header = request.headers.get('Authorization')
|
|
user_agent = request.headers.get('User-Agent')
|
|
custom_header = request.headers.get('X-Custom-Header')
|
|
|
|
response_headers = {
|
|
'X-Custom-Response': 'value',
|
|
'X-Request-ID': 'unique-id-123'
|
|
}
|
|
|
|
return web.json_response(
|
|
{
|
|
'auth': auth_header,
|
|
'user_agent': user_agent,
|
|
'custom': custom_header
|
|
},
|
|
headers=response_headers
|
|
)
|
|
|
|
# Cookies
|
|
async def cookies_example(self, request: web.Request) -> web.Response:
|
|
session_id = request.cookies.get('session_id')
|
|
|
|
response = web.json_response({
|
|
'session_id': session_id
|
|
})
|
|
|
|
# Set cookie
|
|
response.set_cookie(
|
|
'session_id',
|
|
'new-session-id',
|
|
max_age=3600,
|
|
httponly=True,
|
|
secure=True,
|
|
samesite='Strict'
|
|
)
|
|
|
|
return response
|
|
```
|
|
|
|
### Response Types
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import json
|
|
|
|
class ResponseExamples:
|
|
# Text response
|
|
async def text_response(self, request: web.Request) -> web.Response:
|
|
return web.Response(
|
|
text='Plain text response',
|
|
content_type='text/plain'
|
|
)
|
|
|
|
# JSON response
|
|
async def json_response(self, request: web.Request) -> web.Response:
|
|
return web.json_response({
|
|
'status': 'success',
|
|
'data': {'key': 'value'}
|
|
})
|
|
|
|
# HTML response
|
|
async def html_response(self, request: web.Request) -> web.Response:
|
|
html = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head><title>Example</title></head>
|
|
<body><h1>Hello, World!</h1></body>
|
|
</html>
|
|
"""
|
|
return web.Response(
|
|
text=html,
|
|
content_type='text/html'
|
|
)
|
|
|
|
# Binary response
|
|
async def binary_response(self, request: web.Request) -> web.Response:
|
|
data = b'\x00\x01\x02\x03\x04'
|
|
return web.Response(
|
|
body=data,
|
|
content_type='application/octet-stream'
|
|
)
|
|
|
|
# File download
|
|
async def file_download(self, request: web.Request) -> web.Response:
|
|
return web.FileResponse(
|
|
path='./example.pdf',
|
|
headers={
|
|
'Content-Disposition': 'attachment; filename="example.pdf"'
|
|
}
|
|
)
|
|
|
|
# Streaming response
|
|
async def streaming_response(self, request: web.Request) -> web.StreamResponse:
|
|
response = web.StreamResponse()
|
|
response.headers['Content-Type'] = 'text/plain'
|
|
await response.prepare(request)
|
|
|
|
for i in range(10):
|
|
await response.write(f"Chunk {i}\n".encode())
|
|
await asyncio.sleep(0.5)
|
|
|
|
await response.write_eof()
|
|
return response
|
|
|
|
# Redirect
|
|
async def redirect_response(self, request: web.Request) -> web.Response:
|
|
raise web.HTTPFound('/new-location')
|
|
|
|
# Custom status codes
|
|
async def custom_status(self, request: web.Request) -> web.Response:
|
|
return web.json_response(
|
|
{'message': 'Created'},
|
|
status=201
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Request Validation with Pydantic
|
|
|
|
### Basic Pydantic Integration
|
|
|
|
```python
|
|
from pydantic import BaseModel, Field, field_validator, ConfigDict
|
|
from pydantic import EmailStr, HttpUrl
|
|
from aiohttp import web
|
|
from typing import Optional, Literal
|
|
|
|
# Pydantic 2.12+ models
|
|
class UserCreate(BaseModel):
|
|
model_config = ConfigDict(str_strip_whitespace=True)
|
|
|
|
username: str = Field(min_length=3, max_length=50)
|
|
email: EmailStr
|
|
password: str = Field(min_length=8)
|
|
age: Optional[int] = Field(None, ge=18, le=120)
|
|
role: Literal['user', 'admin', 'moderator'] = 'user'
|
|
website: Optional[HttpUrl] = None
|
|
|
|
@field_validator('username')
|
|
@classmethod
|
|
def validate_username(cls, v: str) -> str:
|
|
if not v.isalnum():
|
|
raise ValueError('Username must be alphanumeric')
|
|
return v.lower()
|
|
|
|
@field_validator('password')
|
|
@classmethod
|
|
def validate_password(cls, v: str) -> str:
|
|
if not any(c.isupper() for c in v):
|
|
raise ValueError('Password must contain uppercase letter')
|
|
if not any(c.isdigit() for c in v):
|
|
raise ValueError('Password must contain digit')
|
|
return v
|
|
|
|
class UserResponse(BaseModel):
|
|
user_id: str
|
|
username: str
|
|
email: EmailStr
|
|
role: str
|
|
created_at: float
|
|
|
|
# Handler with validation
|
|
async def create_user(request: web.Request) -> web.Response:
|
|
try:
|
|
data = await request.json()
|
|
user_data = UserCreate(**data)
|
|
except ValueError as e:
|
|
return web.json_response(
|
|
{'error': 'Validation error', 'details': str(e)},
|
|
status=400
|
|
)
|
|
|
|
# Process validated data
|
|
user = UserResponse(
|
|
user_id='123',
|
|
username=user_data.username,
|
|
email=user_data.email,
|
|
role=user_data.role,
|
|
created_at=time.time()
|
|
)
|
|
|
|
return web.json_response(
|
|
user.model_dump(),
|
|
status=201
|
|
)
|
|
```
|
|
|
|
### Advanced Validation Patterns
|
|
|
|
```python
|
|
from pydantic import BaseModel, Field, field_validator, model_validator
|
|
from typing import Any, Optional
|
|
from enum import Enum
|
|
|
|
class Priority(str, Enum):
|
|
LOW = 'low'
|
|
MEDIUM = 'medium'
|
|
HIGH = 'high'
|
|
URGENT = 'urgent'
|
|
|
|
class TaskCreate(BaseModel):
|
|
model_config = ConfigDict(
|
|
str_strip_whitespace=True,
|
|
extra='forbid' # Reject extra fields
|
|
)
|
|
|
|
title: str = Field(min_length=1, max_length=200)
|
|
description: Optional[str] = Field(None, max_length=5000)
|
|
priority: Priority = Priority.MEDIUM
|
|
tags: list[str] = Field(default_factory=list, max_length=10)
|
|
due_date: Optional[float] = None
|
|
assigned_to: Optional[str] = None
|
|
|
|
@field_validator('tags')
|
|
@classmethod
|
|
def validate_tags(cls, v: list[str]) -> list[str]:
|
|
if len(v) != len(set(v)):
|
|
raise ValueError('Tags must be unique')
|
|
return [tag.lower() for tag in v]
|
|
|
|
@model_validator(mode='after')
|
|
def validate_model(self) -> 'TaskCreate':
|
|
if self.priority == Priority.URGENT and not self.assigned_to:
|
|
raise ValueError('Urgent tasks must be assigned')
|
|
|
|
if self.due_date and self.due_date < time.time():
|
|
raise ValueError('Due date cannot be in the past')
|
|
|
|
return self
|
|
|
|
# Middleware for automatic validation
|
|
@web.middleware
|
|
async def validation_middleware(request: web.Request, handler):
|
|
# Get validation schema from route
|
|
schema = getattr(handler, '_validation_schema', None)
|
|
|
|
if schema and request.method in ('POST', 'PUT', 'PATCH'):
|
|
try:
|
|
data = await request.json()
|
|
validated = schema(**data)
|
|
request['validated_data'] = validated
|
|
except ValueError as e:
|
|
return web.json_response(
|
|
{'error': 'Validation error', 'details': str(e)},
|
|
status=400
|
|
)
|
|
|
|
return await handler(request)
|
|
|
|
# Decorator for validation
|
|
def validate_with(schema: type[BaseModel]):
|
|
def decorator(handler):
|
|
handler._validation_schema = schema
|
|
return handler
|
|
return decorator
|
|
|
|
# Usage
|
|
@validate_with(TaskCreate)
|
|
async def create_task(request: web.Request) -> web.Response:
|
|
task_data: TaskCreate = request['validated_data']
|
|
|
|
# Data is already validated
|
|
return web.json_response({
|
|
'task_id': 'task-123',
|
|
'title': task_data.title,
|
|
'priority': task_data.priority.value
|
|
}, status=201)
|
|
```
|
|
|
|
### Query Parameter Validation
|
|
|
|
```python
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional
|
|
|
|
class PaginationParams(BaseModel):
|
|
page: int = Field(1, ge=1, le=1000)
|
|
per_page: int = Field(20, ge=1, le=100)
|
|
sort_by: Optional[str] = Field(None, pattern=r'^[a-zA-Z_]+$')
|
|
order: Literal['asc', 'desc'] = 'asc'
|
|
|
|
@property
|
|
def offset(self) -> int:
|
|
return (self.page - 1) * self.per_page
|
|
|
|
@property
|
|
def limit(self) -> int:
|
|
return self.per_page
|
|
|
|
async def list_items(request: web.Request) -> web.Response:
|
|
try:
|
|
params = PaginationParams(**request.query)
|
|
except ValueError as e:
|
|
return web.json_response(
|
|
{'error': 'Invalid parameters', 'details': str(e)},
|
|
status=400
|
|
)
|
|
|
|
# Use validated params
|
|
items = [] # Fetch from database with params.offset and params.limit
|
|
|
|
return web.json_response({
|
|
'items': items,
|
|
'page': params.page,
|
|
'per_page': params.per_page,
|
|
'total': 100
|
|
})
|
|
```
|
|
|
|
---
|
|
|
|
## WebSocket Implementation
|
|
|
|
### Basic WebSocket Server
|
|
|
|
```python
|
|
from aiohttp import web, WSMsgType
|
|
import asyncio
|
|
|
|
class WebSocketHandler:
|
|
def __init__(self):
|
|
self.active_connections: set[web.WebSocketResponse] = set()
|
|
|
|
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
|
|
self.active_connections.add(ws)
|
|
|
|
try:
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
if msg.data == 'close':
|
|
await ws.close()
|
|
else:
|
|
# Echo message back
|
|
await ws.send_str(f"Echo: {msg.data}")
|
|
|
|
# Broadcast to all connections
|
|
await self.broadcast(f"User says: {msg.data}")
|
|
|
|
elif msg.type == WSMsgType.ERROR:
|
|
print(f'WebSocket error: {ws.exception()}')
|
|
|
|
finally:
|
|
self.active_connections.discard(ws)
|
|
|
|
return ws
|
|
|
|
async def broadcast(self, message: str):
|
|
if self.active_connections:
|
|
await asyncio.gather(
|
|
*[ws.send_str(message) for ws in self.active_connections],
|
|
return_exceptions=True
|
|
)
|
|
|
|
# Setup
|
|
app = web.Application()
|
|
handler = WebSocketHandler()
|
|
app.router.add_get('/ws', handler.websocket_handler)
|
|
```
|
|
|
|
### Advanced WebSocket Server with Authentication
|
|
|
|
```python
|
|
from aiohttp import web, WSMsgType
|
|
import json
|
|
import asyncio
|
|
from typing import Optional
|
|
import jwt
|
|
|
|
class AuthenticatedWebSocketHandler:
|
|
def __init__(self, secret_key: str):
|
|
self.secret_key = secret_key
|
|
self.connections: dict[str, web.WebSocketResponse] = {}
|
|
|
|
def verify_token(self, token: str) -> Optional[dict]:
|
|
try:
|
|
return jwt.decode(token, self.secret_key, algorithms=['HS256'])
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
|
|
ws = web.WebSocketResponse(heartbeat=30)
|
|
await ws.prepare(request)
|
|
|
|
user_id: Optional[str] = None
|
|
|
|
try:
|
|
# Wait for authentication message
|
|
msg = await asyncio.wait_for(ws.receive(), timeout=10.0)
|
|
|
|
if msg.type != WSMsgType.TEXT:
|
|
await ws.send_json({'error': 'Authentication required'})
|
|
await ws.close()
|
|
return ws
|
|
|
|
auth_data = json.loads(msg.data)
|
|
token = auth_data.get('token')
|
|
|
|
if not token:
|
|
await ws.send_json({'error': 'Token required'})
|
|
await ws.close()
|
|
return ws
|
|
|
|
payload = self.verify_token(token)
|
|
if not payload:
|
|
await ws.send_json({'error': 'Invalid token'})
|
|
await ws.close()
|
|
return ws
|
|
|
|
user_id = payload['user_id']
|
|
self.connections[user_id] = ws
|
|
|
|
await ws.send_json({
|
|
'type': 'auth_success',
|
|
'user_id': user_id
|
|
})
|
|
|
|
# Handle messages
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
data = json.loads(msg.data)
|
|
await self.handle_message(user_id, data)
|
|
|
|
elif msg.type == WSMsgType.ERROR:
|
|
print(f'WebSocket error: {ws.exception()}')
|
|
|
|
except asyncio.TimeoutError:
|
|
await ws.send_json({'error': 'Authentication timeout'})
|
|
|
|
finally:
|
|
if user_id and user_id in self.connections:
|
|
del self.connections[user_id]
|
|
|
|
return ws
|
|
|
|
async def handle_message(self, user_id: str, data: dict):
|
|
message_type = data.get('type')
|
|
|
|
if message_type == 'ping':
|
|
await self.send_to_user(user_id, {'type': 'pong'})
|
|
|
|
elif message_type == 'broadcast':
|
|
await self.broadcast({
|
|
'type': 'message',
|
|
'from': user_id,
|
|
'content': data.get('content')
|
|
})
|
|
|
|
elif message_type == 'direct':
|
|
to_user = data.get('to')
|
|
await self.send_to_user(to_user, {
|
|
'type': 'direct_message',
|
|
'from': user_id,
|
|
'content': data.get('content')
|
|
})
|
|
|
|
async def send_to_user(self, user_id: str, message: dict):
|
|
ws = self.connections.get(user_id)
|
|
if ws and not ws.closed:
|
|
await ws.send_json(message)
|
|
|
|
async def broadcast(self, message: dict):
|
|
if self.connections:
|
|
await asyncio.gather(
|
|
*[ws.send_json(message) for ws in self.connections.values() if not ws.closed],
|
|
return_exceptions=True
|
|
)
|
|
```
|
|
|
|
### WebSocket Client
|
|
|
|
```python
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
async def websocket_client():
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.ws_connect('http://localhost:8080/ws') as ws:
|
|
# Send authentication
|
|
await ws.send_json({
|
|
'token': 'your-jwt-token'
|
|
})
|
|
|
|
# Receive authentication response
|
|
msg = await ws.receive()
|
|
print(f"Auth response: {msg.data}")
|
|
|
|
# Send messages
|
|
await ws.send_json({
|
|
'type': 'broadcast',
|
|
'content': 'Hello, everyone!'
|
|
})
|
|
|
|
# Receive messages
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
data = msg.json()
|
|
print(f"Received: {data}")
|
|
|
|
elif msg.type == aiohttp.WSMsgType.CLOSED:
|
|
break
|
|
|
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
|
break
|
|
|
|
asyncio.run(websocket_client())
|
|
```
|
|
|
|
---
|
|
|
|
## Testing with pytest and pytest-aiohttp
|
|
|
|
### Basic Test Setup
|
|
|
|
```python
|
|
# conftest.py
|
|
import pytest
|
|
import asyncio
|
|
from aiohttp import web
|
|
from typing import AsyncIterator
|
|
|
|
pytest_plugins = 'aiohttp.pytest_plugin'
|
|
|
|
@pytest.fixture
|
|
async def app() -> AsyncIterator[web.Application]:
|
|
app = web.Application()
|
|
|
|
async def hello(request):
|
|
return web.Response(text='Hello, World!')
|
|
|
|
app.router.add_get('/', hello)
|
|
|
|
yield app
|
|
|
|
# Cleanup
|
|
await app.cleanup()
|
|
|
|
@pytest.fixture
|
|
async def client(aiohttp_client, app):
|
|
return await aiohttp_client(app)
|
|
```
|
|
|
|
### Basic Tests
|
|
|
|
```python
|
|
# test_basic.py
|
|
import pytest
|
|
from aiohttp import web
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_hello(client):
|
|
resp = await client.get('/')
|
|
assert resp.status == 200
|
|
text = await resp.text()
|
|
assert 'Hello, World!' in text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_json_endpoint(client):
|
|
resp = await client.post('/api/data', json={'key': 'value'})
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert data['key'] == 'value'
|
|
```
|
|
|
|
### Testing with Fixtures
|
|
|
|
```python
|
|
# conftest.py
|
|
import pytest
|
|
import asyncio
|
|
from typing import AsyncIterator
|
|
import aiohttp
|
|
|
|
@pytest.fixture
|
|
async def http_session() -> AsyncIterator[aiohttp.ClientSession]:
|
|
session = aiohttp.ClientSession()
|
|
yield session
|
|
await session.close()
|
|
|
|
@pytest.fixture
|
|
def sample_user():
|
|
return {
|
|
'username': 'testuser',
|
|
'email': 'test@example.com',
|
|
'password': 'TestPass123'
|
|
}
|
|
|
|
@pytest.fixture
|
|
async def authenticated_client(client, sample_user):
|
|
# Login
|
|
resp = await client.post('/login', json=sample_user)
|
|
assert resp.status == 200
|
|
|
|
# Extract token
|
|
data = await resp.json()
|
|
token = data['token']
|
|
|
|
# Set authorization header
|
|
client.session.headers['Authorization'] = f'Bearer {token}'
|
|
|
|
yield client
|
|
|
|
# tests/test_auth.py
|
|
@pytest.mark.asyncio
|
|
async def test_protected_endpoint(authenticated_client):
|
|
resp = await authenticated_client.get('/api/protected')
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert 'user_id' in data
|
|
```
|
|
|
|
### Parameterized Tests
|
|
|
|
```python
|
|
import pytest
|
|
|
|
@pytest.mark.parametrize('username,email,expected_status', [
|
|
('valid', 'valid@example.com', 201),
|
|
('ab', 'valid@example.com', 400), # Too short
|
|
('valid', 'invalid-email', 400), # Invalid email
|
|
('', 'valid@example.com', 400), # Empty username
|
|
])
|
|
@pytest.mark.asyncio
|
|
async def test_user_creation_validation(client, username, email, expected_status):
|
|
resp = await client.post('/users', json={
|
|
'username': username,
|
|
'email': email,
|
|
'password': 'ValidPass123'
|
|
})
|
|
assert resp.status == expected_status
|
|
|
|
@pytest.mark.parametrize('method,path,expected', [
|
|
('GET', '/', 200),
|
|
('GET', '/api/users', 200),
|
|
('POST', '/api/users', 401), # Requires auth
|
|
('GET', '/nonexistent', 404),
|
|
])
|
|
@pytest.mark.asyncio
|
|
async def test_endpoints(client, method, path, expected):
|
|
if method == 'GET':
|
|
resp = await client.get(path)
|
|
elif method == 'POST':
|
|
resp = await client.post(path, json={})
|
|
|
|
assert resp.status == expected
|
|
```
|
|
|
|
### Mocking External APIs
|
|
|
|
```python
|
|
import pytest
|
|
from unittest.mock import AsyncMock, patch
|
|
from aiohttp import web
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_api_call(client):
|
|
# Mock external API
|
|
with patch('aiohttp.ClientSession.get') as mock_get:
|
|
mock_response = AsyncMock()
|
|
mock_response.status = 200
|
|
mock_response.json = AsyncMock(return_value={'data': 'mocked'})
|
|
mock_get.return_value.__aenter__.return_value = mock_response
|
|
|
|
resp = await client.get('/api/external')
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert data['data'] == 'mocked'
|
|
|
|
@pytest.fixture
|
|
async def mock_database():
|
|
class MockDB:
|
|
def __init__(self):
|
|
self.data = {}
|
|
|
|
async def get(self, key):
|
|
return self.data.get(key)
|
|
|
|
async def set(self, key, value):
|
|
self.data[key] = value
|
|
|
|
async def delete(self, key):
|
|
if key in self.data:
|
|
del self.data[key]
|
|
|
|
return MockDB()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_with_mock_db(client, mock_database):
|
|
# Inject mock database into app
|
|
client.app['db'] = mock_database
|
|
|
|
# Test database operations
|
|
await mock_database.set('user:1', {'name': 'Test'})
|
|
|
|
resp = await client.get('/users/1')
|
|
assert resp.status == 200
|
|
```
|
|
|
|
### Testing WebSockets
|
|
|
|
```python
|
|
import pytest
|
|
from aiohttp import WSMsgType
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_echo(aiohttp_client):
|
|
app = web.Application()
|
|
|
|
async def websocket_handler(request):
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
await ws.send_str(f"Echo: {msg.data}")
|
|
|
|
return ws
|
|
|
|
app.router.add_get('/ws', websocket_handler)
|
|
|
|
client = await aiohttp_client(app)
|
|
|
|
async with client.ws_connect('/ws') as ws:
|
|
await ws.send_str('Hello')
|
|
msg = await ws.receive()
|
|
assert msg.data == 'Echo: Hello'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_broadcast(aiohttp_client):
|
|
from collections import defaultdict
|
|
connections = set()
|
|
|
|
app = web.Application()
|
|
|
|
async def websocket_handler(request):
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
connections.add(ws)
|
|
|
|
try:
|
|
async for msg in ws:
|
|
if msg.type == WSMsgType.TEXT:
|
|
# Broadcast to all
|
|
for conn in connections:
|
|
if conn != ws:
|
|
await conn.send_str(msg.data)
|
|
finally:
|
|
connections.discard(ws)
|
|
|
|
return ws
|
|
|
|
app.router.add_get('/ws', websocket_handler)
|
|
client = await aiohttp_client(app)
|
|
|
|
# Create two connections
|
|
async with client.ws_connect('/ws') as ws1:
|
|
async with client.ws_connect('/ws') as ws2:
|
|
await ws1.send_str('Hello from ws1')
|
|
msg = await ws2.receive()
|
|
assert msg.data == 'Hello from ws1'
|
|
```
|
|
|
|
### Testing Middleware
|
|
|
|
```python
|
|
import pytest
|
|
from aiohttp import web
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_auth_middleware(aiohttp_client):
|
|
@web.middleware
|
|
async def auth_middleware(request, handler):
|
|
token = request.headers.get('Authorization')
|
|
if not token or not token.startswith('Bearer '):
|
|
raise web.HTTPUnauthorized()
|
|
|
|
request['user_id'] = 'user-123'
|
|
return await handler(request)
|
|
|
|
app = web.Application(middlewares=[auth_middleware])
|
|
|
|
async def protected(request):
|
|
return web.json_response({'user_id': request['user_id']})
|
|
|
|
app.router.add_get('/protected', protected)
|
|
|
|
client = await aiohttp_client(app)
|
|
|
|
# Without token
|
|
resp = await client.get('/protected')
|
|
assert resp.status == 401
|
|
|
|
# With token
|
|
resp = await client.get('/protected', headers={
|
|
'Authorization': 'Bearer valid-token'
|
|
})
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert data['user_id'] == 'user-123'
|
|
```
|
|
|
|
### Coverage and Best Practices
|
|
|
|
```python
|
|
# pytest.ini or pyproject.toml
|
|
[tool.pytest.ini_options]
|
|
asyncio_mode = "auto"
|
|
testpaths = ["tests"]
|
|
python_files = ["test_*.py"]
|
|
python_classes = ["Test*"]
|
|
python_functions = ["test_*"]
|
|
addopts = [
|
|
"--verbose",
|
|
"--strict-markers",
|
|
"--cov=app",
|
|
"--cov-report=html",
|
|
"--cov-report=term-missing",
|
|
]
|
|
|
|
# Best practices
|
|
# 1. Keep tests independent
|
|
# 2. Use fixtures for common setup
|
|
# 3. Mock external dependencies
|
|
# 4. Test edge cases
|
|
# 5. Use parametrize for similar tests
|
|
# 6. Clean up resources properly
|
|
```
|
|
|
|
---
|
|
|
|
## Advanced Middleware and Error Handling
|
|
|
|
### Error Handling Middleware
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import logging
|
|
from typing import Callable, Awaitable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@web.middleware
|
|
async def error_middleware(
|
|
request: web.Request,
|
|
handler: Callable[[web.Request], Awaitable[web.Response]]
|
|
) -> web.Response:
|
|
try:
|
|
return await handler(request)
|
|
|
|
except web.HTTPException as e:
|
|
# HTTP exceptions should pass through
|
|
raise
|
|
|
|
except ValueError as e:
|
|
logger.warning(f"Validation error: {e}")
|
|
return web.json_response(
|
|
{
|
|
'error': 'Validation Error',
|
|
'message': str(e)
|
|
},
|
|
status=400
|
|
)
|
|
|
|
except PermissionError as e:
|
|
logger.warning(f"Permission denied: {e}")
|
|
return web.json_response(
|
|
{
|
|
'error': 'Forbidden',
|
|
'message': 'You do not have permission to access this resource'
|
|
},
|
|
status=403
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e}", exc_info=True)
|
|
return web.json_response(
|
|
{
|
|
'error': 'Internal Server Error',
|
|
'message': 'An unexpected error occurred'
|
|
},
|
|
status=500
|
|
)
|
|
```
|
|
|
|
### Logging Middleware
|
|
|
|
```python
|
|
import time
|
|
import logging
|
|
from aiohttp import web
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@web.middleware
|
|
async def logging_middleware(request: web.Request, handler):
|
|
start_time = time.time()
|
|
|
|
# Log request
|
|
logger.info(
|
|
f"Request started",
|
|
extra={
|
|
'method': request.method,
|
|
'path': request.path,
|
|
'query': dict(request.query),
|
|
'remote': request.remote
|
|
}
|
|
)
|
|
|
|
try:
|
|
response = await handler(request)
|
|
|
|
# Log response
|
|
duration = time.time() - start_time
|
|
logger.info(
|
|
f"Request completed",
|
|
extra={
|
|
'method': request.method,
|
|
'path': request.path,
|
|
'status': response.status,
|
|
'duration_ms': duration * 1000
|
|
}
|
|
)
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
logger.error(
|
|
f"Request failed",
|
|
extra={
|
|
'method': request.method,
|
|
'path': request.path,
|
|
'duration_ms': duration * 1000,
|
|
'error': str(e)
|
|
},
|
|
exc_info=True
|
|
)
|
|
raise
|
|
```
|
|
|
|
### CORS Middleware
|
|
|
|
```python
|
|
from aiohttp import web
|
|
from typing import Optional
|
|
|
|
@web.middleware
|
|
async def cors_middleware(request: web.Request, handler):
|
|
# Handle preflight
|
|
if request.method == 'OPTIONS':
|
|
response = web.Response()
|
|
else:
|
|
response = await handler(request)
|
|
|
|
# Add CORS headers
|
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
|
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
|
|
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
|
|
response.headers['Access-Control-Max-Age'] = '3600'
|
|
|
|
return response
|
|
|
|
# Or use aiohttp-cors library
|
|
import aiohttp_cors
|
|
|
|
app = web.Application()
|
|
|
|
# Configure CORS
|
|
cors = aiohttp_cors.setup(app, defaults={
|
|
"*": aiohttp_cors.ResourceOptions(
|
|
allow_credentials=True,
|
|
expose_headers="*",
|
|
allow_headers="*",
|
|
allow_methods="*"
|
|
)
|
|
})
|
|
|
|
# Add routes
|
|
resource = app.router.add_resource("/api/endpoint")
|
|
route = resource.add_route("GET", handler)
|
|
cors.add(route)
|
|
```
|
|
|
|
### Rate Limiting Middleware
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import Dict, Tuple
|
|
|
|
class RateLimiter:
|
|
def __init__(self, max_requests: int = 100, window: int = 60):
|
|
self.max_requests = max_requests
|
|
self.window = window
|
|
self.requests: Dict[str, list[float]] = defaultdict(list)
|
|
|
|
def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
|
|
now = time.time()
|
|
cutoff = now - self.window
|
|
|
|
# Remove old requests
|
|
self.requests[client_id] = [
|
|
req_time for req_time in self.requests[client_id]
|
|
if req_time > cutoff
|
|
]
|
|
|
|
if len(self.requests[client_id]) >= self.max_requests:
|
|
oldest = self.requests[client_id][0]
|
|
retry_after = oldest + self.window - now
|
|
return False, retry_after
|
|
|
|
self.requests[client_id].append(now)
|
|
return True, None
|
|
|
|
rate_limiter = RateLimiter(max_requests=100, window=60)
|
|
|
|
@web.middleware
|
|
async def rate_limit_middleware(request: web.Request, handler):
|
|
# Use IP address as client identifier
|
|
client_id = request.remote
|
|
|
|
allowed, retry_after = rate_limiter.is_allowed(client_id)
|
|
|
|
if not allowed:
|
|
return web.json_response(
|
|
{
|
|
'error': 'Rate limit exceeded',
|
|
'retry_after': int(retry_after)
|
|
},
|
|
status=429,
|
|
headers={'Retry-After': str(int(retry_after))}
|
|
)
|
|
|
|
return await handler(request)
|
|
```
|
|
|
|
---
|
|
|
|
## Performance Optimization
|
|
|
|
### Connection Pooling
|
|
|
|
```python
|
|
import aiohttp
|
|
from aiohttp import TCPConnector, ClientTimeout
|
|
|
|
class OptimizedClient:
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url
|
|
|
|
# Optimized connector
|
|
self.connector = TCPConnector(
|
|
limit=100, # Total connections
|
|
limit_per_host=30, # Per host
|
|
ttl_dns_cache=300, # DNS cache TTL
|
|
force_close=False, # Keep-alive
|
|
enable_cleanup_closed=True,
|
|
use_dns_cache=True
|
|
)
|
|
|
|
# Optimized timeout
|
|
self.timeout = ClientTimeout(
|
|
total=30,
|
|
connect=10,
|
|
sock_read=20
|
|
)
|
|
|
|
self._session: Optional[aiohttp.ClientSession] = None
|
|
|
|
async def start(self):
|
|
self._session = aiohttp.ClientSession(
|
|
base_url=self.base_url,
|
|
connector=self.connector,
|
|
timeout=self.timeout,
|
|
connector_owner=True,
|
|
auto_decompress=True,
|
|
trust_env=True,
|
|
read_bufsize=2**16 # 64KB buffer
|
|
)
|
|
|
|
async def close(self):
|
|
if self._session:
|
|
await self._session.close()
|
|
await asyncio.sleep(0.25)
|
|
```
|
|
|
|
### Concurrent Requests
|
|
|
|
```python
|
|
import asyncio
|
|
import aiohttp
|
|
from typing import List, Any
|
|
|
|
async def fetch_many(urls: List[str]) -> List[Any]:
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [fetch_one(session, url) for url in urls]
|
|
return await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
async def fetch_one(session: aiohttp.ClientSession, url: str):
|
|
async with session.get(url) as response:
|
|
return await response.json()
|
|
|
|
# With semaphore for limiting concurrency
|
|
async def fetch_with_limit(urls: List[str], max_concurrent: int = 10):
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def fetch_limited(url: str):
|
|
async with semaphore:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
return await response.json()
|
|
|
|
return await asyncio.gather(*[fetch_limited(url) for url in urls])
|
|
```
|
|
|
|
### Streaming Large Responses
|
|
|
|
```python
|
|
async def download_large_file(url: str, filepath: str):
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
with open(filepath, 'wb') as f:
|
|
async for chunk in response.content.iter_chunked(8192):
|
|
f.write(chunk)
|
|
|
|
# Server-side streaming
|
|
async def stream_large_response(request: web.Request) -> web.StreamResponse:
|
|
response = web.StreamResponse()
|
|
response.headers['Content-Type'] = 'application/octet-stream'
|
|
await response.prepare(request)
|
|
|
|
# Stream data in chunks
|
|
with open('large_file.dat', 'rb') as f:
|
|
while chunk := f.read(8192):
|
|
await response.write(chunk)
|
|
|
|
await response.write_eof()
|
|
return response
|
|
```
|
|
|
|
### Caching
|
|
|
|
```python
|
|
from functools import lru_cache
|
|
import time
|
|
|
|
class CachedClient:
|
|
def __init__(self):
|
|
self.cache = {}
|
|
self.cache_ttl = 300 # 5 minutes
|
|
|
|
async def get_with_cache(self, url: str):
|
|
now = time.time()
|
|
|
|
# Check cache
|
|
if url in self.cache:
|
|
data, timestamp = self.cache[url]
|
|
if now - timestamp < self.cache_ttl:
|
|
return data
|
|
|
|
# Fetch and cache
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
data = await response.json()
|
|
self.cache[url] = (data, now)
|
|
return data
|
|
```
|
|
|
|
---
|
|
|
|
## Git Protocol Integration
|
|
|
|
### Understanding Git Smart HTTP
|
|
|
|
Git Smart HTTP protocol allows git clients to clone, fetch, and push over HTTP/HTTPS. The protocol involves:
|
|
|
|
1. **Service Discovery**: Client requests `/info/refs?service=git-upload-pack` or `git-receive-pack`
|
|
2. **Negotiation**: Client and server negotiate which objects to transfer
|
|
3. **Pack Transfer**: Server sends/receives packfiles
|
|
|
|
### Basic Git HTTP Backend
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import subprocess
|
|
import os
|
|
from pathlib import Path
|
|
|
|
class GitHTTPBackend:
|
|
def __init__(self, repo_root: Path):
|
|
self.repo_root = repo_root
|
|
self.git_backend = '/usr/lib/git-core/git-http-backend'
|
|
|
|
async def handle_info_refs(self, request: web.Request) -> web.Response:
|
|
repo_path = request.match_info['repo']
|
|
service = request.query.get('service', '')
|
|
|
|
if service not in ('git-upload-pack', 'git-receive-pack'):
|
|
return web.Response(status=400, text='Invalid service')
|
|
|
|
full_path = self.repo_root / repo_path
|
|
if not full_path.exists():
|
|
return web.Response(status=404, text='Repository not found')
|
|
|
|
# Build environment
|
|
env = os.environ.copy()
|
|
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
|
|
env['GIT_HTTP_EXPORT_ALL'] = '1'
|
|
env['PATH_INFO'] = f'/{repo_path}/info/refs'
|
|
env['QUERY_STRING'] = f'service={service}'
|
|
env['REQUEST_METHOD'] = 'GET'
|
|
|
|
# Execute git-http-backend
|
|
proc = await asyncio.create_subprocess_exec(
|
|
self.git_backend,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=env
|
|
)
|
|
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
return web.Response(status=500, text=stderr.decode())
|
|
|
|
# Parse CGI output
|
|
headers_end = stdout.find(b'\r\n\r\n')
|
|
if headers_end == -1:
|
|
return web.Response(status=500)
|
|
|
|
header_lines = stdout[:headers_end].decode().split('\r\n')
|
|
body = stdout[headers_end + 4:]
|
|
|
|
# Parse headers
|
|
headers = {}
|
|
for line in header_lines:
|
|
if ':' in line:
|
|
key, value = line.split(':', 1)
|
|
headers[key.strip()] = value.strip()
|
|
|
|
return web.Response(
|
|
body=body,
|
|
headers=headers,
|
|
status=200
|
|
)
|
|
|
|
async def handle_service(self, request: web.Request) -> web.Response:
|
|
repo_path = request.match_info['repo']
|
|
service = request.match_info['service']
|
|
|
|
if service not in ('git-upload-pack', 'git-receive-pack'):
|
|
return web.Response(status=400)
|
|
|
|
full_path = self.repo_root / repo_path
|
|
if not full_path.exists():
|
|
return web.Response(status=404)
|
|
|
|
# Read request body
|
|
body = await request.read()
|
|
|
|
# Build environment
|
|
env = os.environ.copy()
|
|
env['GIT_PROJECT_ROOT'] = str(self.repo_root)
|
|
env['GIT_HTTP_EXPORT_ALL'] = '1'
|
|
env['PATH_INFO'] = f'/{repo_path}/{service}'
|
|
env['REQUEST_METHOD'] = 'POST'
|
|
env['CONTENT_TYPE'] = request.content_type
|
|
env['CONTENT_LENGTH'] = str(len(body))
|
|
|
|
# Execute git service
|
|
proc = await asyncio.create_subprocess_exec(
|
|
self.git_backend,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=env
|
|
)
|
|
|
|
stdout, stderr = await proc.communicate(input=body)
|
|
|
|
if proc.returncode != 0:
|
|
return web.Response(status=500, text=stderr.decode())
|
|
|
|
# Parse CGI output
|
|
headers_end = stdout.find(b'\r\n\r\n')
|
|
header_lines = stdout[:headers_end].decode().split('\r\n')
|
|
body = stdout[headers_end + 4:]
|
|
|
|
headers = {}
|
|
for line in header_lines:
|
|
if ':' in line:
|
|
key, value = line.split(':', 1)
|
|
headers[key.strip()] = value.strip()
|
|
|
|
return web.Response(
|
|
body=body,
|
|
headers=headers,
|
|
status=200
|
|
)
|
|
|
|
# Setup routes
|
|
git_backend = GitHTTPBackend(Path('/var/git/repos'))
|
|
|
|
app = web.Application()
|
|
app.router.add_get('/{repo:.+}/info/refs', git_backend.handle_info_refs)
|
|
app.router.add_post('/{repo:.+}/{service:(git-upload-pack|git-receive-pack)}', git_backend.handle_service)
|
|
```
|
|
|
|
### Git Backend with Authentication
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import base64
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
class AuthenticatedGitBackend:
|
|
def __init__(self, repo_root: Path):
|
|
self.repo_root = repo_root
|
|
self.users = {
|
|
'alice': 'password123',
|
|
'bob': 'secret456'
|
|
}
|
|
|
|
def verify_auth(self, request: web.Request) -> tuple[bool, str | None]:
|
|
auth_header = request.headers.get('Authorization', '')
|
|
|
|
if not auth_header.startswith('Basic '):
|
|
return False, None
|
|
|
|
try:
|
|
encoded = auth_header[6:]
|
|
decoded = base64.b64decode(encoded).decode()
|
|
username, password = decoded.split(':', 1)
|
|
|
|
if self.users.get(username) == password:
|
|
return True, username
|
|
except:
|
|
pass
|
|
|
|
return False, None
|
|
|
|
@web.middleware
|
|
async def auth_middleware(self, request: web.Request, handler):
|
|
# Allow anonymous reads
|
|
service = request.query.get('service', '')
|
|
if request.method == 'GET' and service == 'git-upload-pack':
|
|
return await handler(request)
|
|
|
|
# Require authentication for pushes
|
|
if service == 'git-receive-pack' or 'git-receive-pack' in request.path:
|
|
authorized, username = self.verify_auth(request)
|
|
if not authorized:
|
|
return web.Response(
|
|
status=401,
|
|
headers={'WWW-Authenticate': 'Basic realm="Git Access"'},
|
|
text='Authentication required'
|
|
)
|
|
|
|
request['username'] = username
|
|
|
|
return await handler(request)
|
|
|
|
async def handle_info_refs(self, request: web.Request):
|
|
# Git info/refs implementation
|
|
# Similar to previous example but with auth
|
|
pass
|
|
|
|
async def handle_service(self, request: web.Request):
|
|
# Git service implementation
|
|
# Similar to previous example but with auth
|
|
pass
|
|
```
|
|
|
|
---
|
|
|
|
## Repository Manager Implementation
|
|
|
|
### Repository Browser
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import os
|
|
from pathlib import Path
|
|
import subprocess
|
|
from typing import Optional
|
|
import json
|
|
|
|
class RepositoryManager:
|
|
def __init__(self, repo_root: Path):
|
|
self.repo_root = repo_root
|
|
|
|
async def list_repositories(self, request: web.Request) -> web.Response:
|
|
repos = []
|
|
|
|
for item in self.repo_root.iterdir():
|
|
if item.is_dir() and (item / '.git').exists():
|
|
repos.append({
|
|
'name': item.name,
|
|
'path': str(item.relative_to(self.repo_root)),
|
|
'type': 'git'
|
|
})
|
|
|
|
return web.json_response({'repositories': repos})
|
|
|
|
async def get_repository(self, request: web.Request) -> web.Response:
|
|
repo_name = request.match_info['repo']
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
if not repo_path.exists():
|
|
return web.json_response(
|
|
{'error': 'Repository not found'},
|
|
status=404
|
|
)
|
|
|
|
# Get repository info
|
|
info = await self._get_repo_info(repo_path)
|
|
|
|
return web.json_response(info)
|
|
|
|
async def create_repository(self, request: web.Request) -> web.Response:
|
|
data = await request.json()
|
|
repo_name = data.get('name')
|
|
|
|
if not repo_name:
|
|
return web.json_response(
|
|
{'error': 'Repository name required'},
|
|
status=400
|
|
)
|
|
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
if repo_path.exists():
|
|
return web.json_response(
|
|
{'error': 'Repository already exists'},
|
|
status=400
|
|
)
|
|
|
|
# Create bare repository
|
|
repo_path.mkdir(parents=True)
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'init', '--bare', str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
return web.json_response(
|
|
{'error': 'Failed to create repository'},
|
|
status=500
|
|
)
|
|
|
|
return web.json_response({
|
|
'name': repo_name,
|
|
'path': str(repo_path.relative_to(self.repo_root))
|
|
}, status=201)
|
|
|
|
async def delete_repository(self, request: web.Request) -> web.Response:
|
|
repo_name = request.match_info['repo']
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
if not repo_path.exists():
|
|
return web.json_response(
|
|
{'error': 'Repository not found'},
|
|
status=404
|
|
)
|
|
|
|
# Delete repository directory
|
|
import shutil
|
|
shutil.rmtree(repo_path)
|
|
|
|
return web.Response(status=204)
|
|
|
|
async def browse_tree(self, request: web.Request) -> web.Response:
|
|
repo_name = request.match_info['repo']
|
|
ref = request.query.get('ref', 'HEAD')
|
|
path = request.query.get('path', '')
|
|
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
if not repo_path.exists():
|
|
return web.json_response(
|
|
{'error': 'Repository not found'},
|
|
status=404
|
|
)
|
|
|
|
# List files in tree
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'ls-tree', ref, path,
|
|
cwd=str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
return web.json_response(
|
|
{'error': stderr.decode()},
|
|
status=400
|
|
)
|
|
|
|
# Parse ls-tree output
|
|
entries = []
|
|
for line in stdout.decode().strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
|
|
mode, type_, hash_, name = line.split(None, 3)
|
|
entries.append({
|
|
'mode': mode,
|
|
'type': type_,
|
|
'hash': hash_,
|
|
'name': name
|
|
})
|
|
|
|
return web.json_response({'entries': entries})
|
|
|
|
async def get_file_content(self, request: web.Request) -> web.Response:
|
|
repo_name = request.match_info['repo']
|
|
ref = request.query.get('ref', 'HEAD')
|
|
path = request.query['path']
|
|
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
# Get file content
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'show', f'{ref}:{path}',
|
|
cwd=str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
if proc.returncode != 0:
|
|
return web.json_response(
|
|
{'error': 'File not found'},
|
|
status=404
|
|
)
|
|
|
|
return web.Response(
|
|
body=stdout,
|
|
content_type='text/plain'
|
|
)
|
|
|
|
async def get_commits(self, request: web.Request) -> web.Response:
|
|
repo_name = request.match_info['repo']
|
|
ref = request.query.get('ref', 'HEAD')
|
|
limit = int(request.query.get('limit', '50'))
|
|
|
|
repo_path = self.repo_root / repo_name
|
|
|
|
# Get commit log
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'log', ref,
|
|
'--pretty=format:%H|%an|%ae|%at|%s',
|
|
f'-{limit}',
|
|
cwd=str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await proc.communicate()
|
|
|
|
commits = []
|
|
for line in stdout.decode().strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
|
|
hash_, author, email, timestamp, message = line.split('|', 4)
|
|
commits.append({
|
|
'hash': hash_,
|
|
'author': author,
|
|
'email': email,
|
|
'timestamp': int(timestamp),
|
|
'message': message
|
|
})
|
|
|
|
return web.json_response({'commits': commits})
|
|
|
|
async def _get_repo_info(self, repo_path: Path) -> dict:
|
|
# Get HEAD
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'rev-parse', 'HEAD',
|
|
cwd=str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
head = stdout.decode().strip() if proc.returncode == 0 else None
|
|
|
|
# Get branches
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'git', 'branch', '-a',
|
|
cwd=str(repo_path),
|
|
stdout=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
branches = [
|
|
line.strip().lstrip('* ')
|
|
for line in stdout.decode().strip().split('\n')
|
|
]
|
|
|
|
return {
|
|
'name': repo_path.name,
|
|
'head': head,
|
|
'branches': branches
|
|
}
|
|
|
|
# Setup routes
|
|
repo_manager = RepositoryManager(Path('/var/git/repos'))
|
|
|
|
app.router.add_get('/api/repositories', repo_manager.list_repositories)
|
|
app.router.add_get('/api/repositories/{repo}', repo_manager.get_repository)
|
|
app.router.add_post('/api/repositories', repo_manager.create_repository)
|
|
app.router.add_delete('/api/repositories/{repo}', repo_manager.delete_repository)
|
|
app.router.add_get('/api/repositories/{repo}/tree', repo_manager.browse_tree)
|
|
app.router.add_get('/api/repositories/{repo}/file', repo_manager.get_file_content)
|
|
app.router.add_get('/api/repositories/{repo}/commits', repo_manager.get_commits)
|
|
```
|
|
|
|
---
|
|
|
|
## Best Practices and Patterns
|
|
|
|
### Application Structure
|
|
|
|
```
|
|
project/
|
|
├── app/
|
|
│ ├── __init__.py
|
|
│ ├── main.py # Application entry point
|
|
│ ├── routes.py # Route definitions
|
|
│ ├── handlers.py # Request handlers
|
|
│ ├── middleware.py # Custom middleware
|
|
│ ├── models.py # Pydantic models
|
|
│ ├── database.py # Database connections
|
|
│ └── utils.py # Utility functions
|
|
├── tests/
|
|
│ ├── __init__.py
|
|
│ ├── conftest.py # Test fixtures
|
|
│ ├── test_handlers.py
|
|
│ └── test_integration.py
|
|
├── requirements.txt
|
|
├── pyproject.toml
|
|
└── README.md
|
|
```
|
|
|
|
### Configuration Management
|
|
|
|
```python
|
|
from pydantic_settings import BaseSettings
|
|
from typing import Optional
|
|
|
|
class Settings(BaseSettings):
|
|
model_config = ConfigDict(
|
|
env_file='.env',
|
|
env_file_encoding='utf-8',
|
|
case_sensitive=False
|
|
)
|
|
|
|
# Server
|
|
host: str = '127.0.0.1'
|
|
port: int = 8080
|
|
debug: bool = False
|
|
|
|
# Database
|
|
database_url: str
|
|
database_pool_size: int = 10
|
|
|
|
# Security
|
|
secret_key: str
|
|
jwt_algorithm: str = 'HS256'
|
|
jwt_expiration: int = 3600
|
|
|
|
# External APIs
|
|
external_api_url: Optional[str] = None
|
|
external_api_key: Optional[str] = None
|
|
|
|
settings = Settings()
|
|
```
|
|
|
|
### Graceful Shutdown
|
|
|
|
```python
|
|
from aiohttp import web
|
|
import asyncio
|
|
import signal
|
|
|
|
class Application:
|
|
def __init__(self):
|
|
self.app = web.Application()
|
|
self.cleanup_tasks = []
|
|
|
|
async def startup(self):
|
|
# Initialize resources
|
|
pass
|
|
|
|
async def cleanup(self):
|
|
# Cleanup resources
|
|
for task in self.cleanup_tasks:
|
|
await task
|
|
|
|
async def shutdown(self, app):
|
|
# Close database connections
|
|
# Close HTTP sessions
|
|
# Wait for background tasks
|
|
await self.cleanup()
|
|
|
|
def run(self):
|
|
self.app.on_startup.append(lambda app: self.startup())
|
|
self.app.on_cleanup.append(self.shutdown)
|
|
|
|
web.run_app(
|
|
self.app,
|
|
host='127.0.0.1',
|
|
port=8080,
|
|
shutdown_timeout=60.0
|
|
)
|
|
```
|
|
|
|
### Summary
|
|
|
|
This guide covers:
|
|
- Python 3.13 modern features and type hints
|
|
- aiohttp 3.13+ client and server development
|
|
- Complete authentication patterns (Basic, Bearer, API Key, OAuth2)
|
|
- Pydantic 2.12+ validation
|
|
- WebSocket implementation
|
|
- Comprehensive testing with pytest-aiohttp
|
|
- Git protocol integration
|
|
- Repository management system
|
|
- Performance optimization
|
|
- Production-ready patterns
|
|
|
|
**Key Takeaways:**
|
|
1. Always reuse ClientSession across requests
|
|
2. Use type hints and Pydantic for validation
|
|
3. Implement proper error handling and middleware
|
|
4. Write comprehensive tests with pytest-aiohttp
|
|
5. Follow async/await patterns consistently
|
|
6. Optimize connection pooling and timeouts
|
|
7. Handle cleanup and graceful shutdown properly
|
|
|
|
---
|
|
|
|
*Guide Version: 1.0*
|
|
*Last Updated: November 2025*
|
|
*Compatible with: Python 3.13.3, aiohttp 3.13.2, pytest-aiohttp 1.1.0, pydantic 2.12.3*
|