# Retoor's Guide to Modern Python: Mastering aiohttp 3.13+ with Python 3.13 **Complete Tutorial: aiohttp, Testing, Authentication, WebSockets, and Git Protocol Integration** Version Requirements: - Python: **3.13.3** (Released October 7, 2024) - aiohttp: **3.13.2** (Latest stable as of October 28, 2025) - pytest: **8.3+** - pytest-aiohttp: **1.1.0** (Released January 23, 2025) - pytest-asyncio: **1.2.0** (Released September 12, 2025) - pydantic: **2.12.3** (Released October 17, 2025) --- ## Table of Contents 1. [Python 3.13 Modern Features](#python-313-modern-features) 2. [aiohttp Fundamentals](#aiohttp-fundamentals) 3. [Client Sessions and Connection Management](#client-sessions-and-connection-management) 4. [Authentication Patterns](#authentication-patterns) 5. [Server Development](#server-development) 6. [Request Validation with Pydantic](#request-validation-with-pydantic) 7. [WebSocket Implementation](#websocket-implementation) 8. [Testing with pytest and pytest-aiohttp](#testing-with-pytest-and-pytest-aiohttp) 9. [Advanced Middleware and Error Handling](#advanced-middleware-and-error-handling) 10. [Performance Optimization](#performance-optimization) 11. [Git Protocol Integration](#git-protocol-integration) 12. [Repository Manager Implementation](#repository-manager-implementation) 13. [Best Practices and Patterns](#best-practices-and-patterns) --- ## Python 3.13 Modern Features ### Key Python 3.13 Enhancements Python 3.13 introduces significant improvements for asynchronous programming: **Experimental Free-Threaded Mode (No GIL)** - Enable true multi-threading with `python -X gil=0` - Significant for CPU-bound async operations - Better performance on multi-core systems **JIT Compiler (Preview)** - Just-In-Time compilation for performance boosts - Enable with `PYTHON_JIT=1` environment variable - Early benchmarks show 10-20% improvements **Enhanced Interactive Interpreter** - Multi-line editing with syntax highlighting - Colorized tracebacks for better debugging - Improved REPL experience **Better Error Messages** - More precise error locations - Clearer exception messages - Context-aware suggestions ### Modern Type Hints (PEP 695) Python 3.13 fully supports modern generic syntax: ```python from typing import TypeVar, Generic from collections.abc import Sequence, Mapping # Old style (still works) T = TypeVar('T') class OldGeneric(Generic[T]): def process(self, item: T) -> T: return item # New PEP 695 style (Python 3.12+) class NewGeneric[T]: def process(self, item: T) -> T: return item # Type aliases with 'type' keyword type RequestHandler[T] = Callable[[Request], Awaitable[T]] type JSONDict = dict[str, str | int | float | bool | None] # Modern function annotations async def fetch_data[T](url: str, parser: Callable[[bytes], T]) -> T | None: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: return parser(await response.read()) return None ``` ### Dataclasses in Python 3.13 ```python from dataclasses import dataclass, field, replace from typing import ClassVar import copy @dataclass(slots=True, kw_only=True) class User: user_id: str username: str email: str created_at: float = field(default_factory=time.time) is_active: bool = True _password_hash: str = field(repr=False, compare=False) # New in 3.13: __static_attributes__ def __post_init__(self): self.last_login: float | None = None @property def is_new(self) -> bool: return time.time() - self.created_at < 86400 # New in 3.13: copy.replace() works with dataclasses user = User(user_id="123", username="alice", email="alice@example.com", _password_hash="hashed") updated = copy.replace(user, username="alice_updated") # Access static attributes (new in 3.13) print(User.__static_attributes__) # ('last_login',) ``` ### Modern Async Patterns ```python import asyncio from collections.abc import AsyncIterator # Async generators async def fetch_paginated[T]( url: str, parser: Callable[[dict], T] ) -> AsyncIterator[T]: page = 1 async with aiohttp.ClientSession() as session: while True: async with session.get(f"{url}?page={page}") as response: data = await response.json() if not data['items']: break for item in data['items']: yield parser(item) page += 1 # Context manager pattern class AsyncResourceManager: async def __aenter__(self): self.session = aiohttp.ClientSession() await self.session.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.session.__aexit__(exc_type, exc_val, exc_tb) ``` --- ## aiohttp Fundamentals ### Installation and Setup ```bash # Core installation pip install aiohttp==3.13.2 # With speedups (highly recommended) pip install aiohttp[speedups]==3.13.2 # Additional recommended packages pip install aiodns>=3.0.0 # Fast DNS resolution pip install Brotli>=1.0.9 # Brotli compression support pip install pydantic>=2.12.3 # Request validation ``` ### Basic Client Usage ```python import aiohttp import asyncio async def fetch_example(): async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as response: print(f"Status: {response.status}") print(f"Content-Type: {response.headers['content-type']}") # Various response methods text = await response.text() # Text content data = await response.json() # JSON parsing raw = await response.read() # Raw bytes asyncio.run(fetch_example()) ``` ### Basic Server Usage ```python from aiohttp import web async def hello_handler(request: web.Request) -> web.Response: name = request.match_info.get('name', 'Anonymous') return web.Response(text=f"Hello, {name}!") async def json_handler(request: web.Request) -> web.Response: data = await request.json() return web.json_response({ 'status': 'success', 'received': data }) app = web.Application() app.router.add_get('/', hello_handler) app.router.add_get('/{name}', hello_handler) app.router.add_post('/api/data', json_handler) if __name__ == '__main__': web.run_app(app, host='127.0.0.1', port=8080) ``` --- ## Client Sessions and Connection Management ### Session Management Best Practices **Never create a new session for each request** - this is the most common mistake: ```python # ❌ WRONG - Creates new session for every request async def bad_example(): async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com') as response: return await response.text() # Session destroyed here # ✅ CORRECT - Reuse session across requests class APIClient: def __init__(self, base_url: str): self.base_url = base_url self._session: aiohttp.ClientSession | None = None async def __aenter__(self): self._session = aiohttp.ClientSession( base_url=self.base_url, timeout=aiohttp.ClientTimeout(total=30), connector=aiohttp.TCPConnector( limit=100, # Total connection limit limit_per_host=30, # Per-host limit ttl_dns_cache=300, # DNS cache TTL ) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._session: await self._session.close() async def get(self, path: str) -> dict: async with self._session.get(path) as response: response.raise_for_status() return await response.json() # Usage async def main(): async with APIClient('https://api.example.com') as client: user = await client.get('/users/123') posts = await client.get('/posts') comments = await client.get('/comments') asyncio.run(main()) ``` ### Advanced Session Configuration ```python import aiohttp from aiohttp import ClientTimeout, TCPConnector, ClientSession from typing import Optional class AdvancedHTTPClient: def __init__( self, base_url: str, timeout: int = 30, max_connections: int = 100, max_connections_per_host: int = 30, headers: Optional[dict[str, str]] = None ): self.base_url = base_url self.timeout = ClientTimeout( total=timeout, connect=10, # Connection timeout sock_read=20 # Socket read timeout ) self.connector = TCPConnector( limit=max_connections, limit_per_host=max_connections_per_host, ttl_dns_cache=300, ssl=None, # SSL context if needed force_close=False, # Keep connections alive enable_cleanup_closed=True ) self.default_headers = headers or {} self._session: Optional[ClientSession] = None async def start(self): if self._session is None: self._session = ClientSession( base_url=self.base_url, timeout=self.timeout, connector=self.connector, headers=self.default_headers, raise_for_status=False, connector_owner=True, auto_decompress=True, trust_env=True ) async def close(self): if self._session: await self._session.close() await asyncio.sleep(0.25) # Allow cleanup async def __aenter__(self): await self.start() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def request( self, method: str, path: str, **kwargs ) -> aiohttp.ClientResponse: if not self._session: await self.start() return await self._session.request(method, path, **kwargs) ``` ### Cookie Management ```python import aiohttp from http.cookies import SimpleCookie # Automatic cookie handling async def with_cookies(): # Create cookie jar jar = aiohttp.CookieJar(unsafe=False) # Only HTTPS cookies async with aiohttp.ClientSession(cookie_jar=jar) as session: # Cookies are automatically stored and sent await session.get('https://example.com/login') # Manually update cookies session.cookie_jar.update_cookies( {'session_id': 'abc123'}, response_url='https://example.com' ) # Access cookies for cookie in session.cookie_jar: print(f"{cookie.key}: {cookie.value}") # Custom cookie handling async def custom_cookies(): cookies = {'auth_token': 'xyz789'} async with aiohttp.ClientSession(cookies=cookies) as session: async with session.get('https://example.com/api') as response: # Read response cookies print(response.cookies) ``` --- ## Authentication Patterns ### Basic Authentication ```python import aiohttp from aiohttp import BasicAuth import base64 # Method 1: Using BasicAuth helper async def basic_auth_helper(username: str, password: str): auth = BasicAuth(login=username, password=password) async with aiohttp.ClientSession(auth=auth) as session: async with session.get('https://api.example.com/protected') as response: return await response.json() # Method 2: Manual base64 encoding async def basic_auth_manual(username: str, password: str): credentials = f"{username}:{password}" encoded = base64.b64encode(credentials.encode()).decode() headers = {'Authorization': f'Basic {encoded}'} async with aiohttp.ClientSession(headers=headers) as session: async with session.get('https://api.example.com/protected') as response: return await response.json() # Method 3: Per-request authentication async def basic_auth_per_request(username: str, password: str, url: str): auth = BasicAuth(login=username, password=password) async with aiohttp.ClientSession() as session: async with session.get(url, auth=auth) as response: return await response.json() ``` ### Bearer Token Authentication ```python class TokenAuthClient: def __init__(self, base_url: str, token: str): self.base_url = base_url self.token = token self._session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): headers = { 'Authorization': f'Bearer {self.token}', 'Accept': 'application/json' } self._session = aiohttp.ClientSession( base_url=self.base_url, headers=headers ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._session: await self._session.close() async def get(self, path: str) -> dict: async with self._session.get(path) as response: response.raise_for_status() return await response.json() async def post(self, path: str, data: dict) -> dict: async with self._session.post(path, json=data) as response: response.raise_for_status() return await response.json() # Usage async def example(): async with TokenAuthClient('https://api.example.com', 'your_token_here') as client: user = await client.get('/user') result = await client.post('/items', {'name': 'test'}) ``` ### API Key Authentication ```python class APIKeyClient: def __init__( self, base_url: str, api_key: str, key_location: str = 'header', # 'header' or 'query' key_name: str = 'X-API-Key' ): self.base_url = base_url self.api_key = api_key self.key_location = key_location self.key_name = key_name self._session: Optional[aiohttp.ClientSession] = None async def __aenter__(self): if self.key_location == 'header': headers = {self.key_name: self.api_key} self._session = aiohttp.ClientSession( base_url=self.base_url, headers=headers ) else: self._session = aiohttp.ClientSession(base_url=self.base_url) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._session: await self._session.close() async def request(self, method: str, path: str, **kwargs): if self.key_location == 'query': params = kwargs.get('params', {}) params[self.key_name] = self.api_key kwargs['params'] = params async with self._session.request(method, path, **kwargs) as response: response.raise_for_status() return await response.json() ``` ### Digest Authentication (aiohttp 3.12.8+) ```python from aiohttp import ClientSession, DigestAuthMiddleware async def digest_auth_example(): # Create digest auth middleware digest_auth = DigestAuthMiddleware( login="user", password="password", preemptive=True # New in 3.12.8: preemptive authentication ) # Pass middleware to session async with ClientSession(middlewares=(digest_auth,)) as session: async with session.get("https://httpbin.org/digest-auth/auth/user/password") as resp: print(await resp.text()) ``` ### OAuth 2.0 Token Refresh Pattern ```python import asyncio from datetime import datetime, timedelta from typing import Optional class OAuth2Client: def __init__( self, base_url: str, client_id: str, client_secret: str, token_url: str ): self.base_url = base_url self.client_id = client_id self.client_secret = client_secret self.token_url = token_url self._access_token: Optional[str] = None self._token_expires_at: Optional[datetime] = None self._refresh_token: Optional[str] = None self._session: Optional[aiohttp.ClientSession] = None self._lock = asyncio.Lock() async def __aenter__(self): self._session = aiohttp.ClientSession(base_url=self.base_url) await self._ensure_token() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._session: await self._session.close() async def _ensure_token(self): async with self._lock: now = datetime.now() if ( not self._access_token or not self._token_expires_at or now >= self._token_expires_at ): await self._refresh_access_token() async def _refresh_access_token(self): data = { 'grant_type': 'client_credentials', 'client_id': self.client_id, 'client_secret': self.client_secret } async with aiohttp.ClientSession() as session: async with session.post(self.token_url, data=data) as response: response.raise_for_status() token_data = await response.json() self._access_token = token_data['access_token'] expires_in = token_data.get('expires_in', 3600) self._token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60) self._refresh_token = token_data.get('refresh_token') async def request(self, method: str, path: str, **kwargs): await self._ensure_token() headers = kwargs.get('headers', {}) headers['Authorization'] = f'Bearer {self._access_token}' kwargs['headers'] = headers async with self._session.request(method, path, **kwargs) as response: if response.status == 401: await self._refresh_access_token() headers['Authorization'] = f'Bearer {self._access_token}' async with self._session.request(method, path, **kwargs) as retry: retry.raise_for_status() return await retry.json() response.raise_for_status() return await response.json() ``` --- ## Server Development ### Application Structure ```python from aiohttp import web from typing import Callable, Awaitable # Type aliases Handler = Callable[[web.Request], Awaitable[web.Response]] class Application: def __init__(self): self.app = web.Application() self.setup_routes() self.setup_middlewares() def setup_routes(self): self.app.router.add_get('/', self.index) self.app.router.add_get('/health', self.health) # API routes self.app.router.add_route('*', '/api/{path:.*}', self.api_handler) def setup_middlewares(self): self.app.middlewares.append(self.error_middleware) self.app.middlewares.append(self.logging_middleware) async def index(self, request: web.Request) -> web.Response: return web.Response(text='Hello, World!') async def health(self, request: web.Request) -> web.Response: return web.json_response({'status': 'healthy'}) async def api_handler(self, request: web.Request) -> web.Response: path = request.match_info['path'] return web.json_response({ 'path': path, 'method': request.method }) @web.middleware async def error_middleware(self, request: web.Request, handler: Handler): try: return await handler(request) except web.HTTPException: raise except Exception as e: return web.json_response( {'error': str(e)}, status=500 ) @web.middleware async def logging_middleware(self, request: web.Request, handler: Handler): print(f"{request.method} {request.path}") response = await handler(request) print(f"Response: {response.status}") return response def run(self, host: str = '127.0.0.1', port: int = 8080): web.run_app(self.app, host=host, port=port) if __name__ == '__main__': app = Application() app.run() ``` ### Request Handling ```python from aiohttp import web, multipart from typing import Optional class RequestHandlers: # Query parameters async def query_params(self, request: web.Request) -> web.Response: # Get single parameter name = request.query.get('name', 'Anonymous') # Get all values for a key tags = request.query.getall('tag', []) # Get as integer with default page = int(request.query.get('page', '1')) return web.json_response({ 'name': name, 'tags': tags, 'page': page }) # Path parameters async def path_params(self, request: web.Request) -> web.Response: user_id = request.match_info['user_id'] action = request.match_info.get('action', 'view') return web.json_response({ 'user_id': user_id, 'action': action }) # JSON body async def json_body(self, request: web.Request) -> web.Response: try: data = await request.json() except ValueError: return web.json_response( {'error': 'Invalid JSON'}, status=400 ) return web.json_response({ 'received': data, 'type': type(data).__name__ }) # Form data async def form_data(self, request: web.Request) -> web.Response: data = await request.post() result = {} for key in data: value = data.get(key) result[key] = value return web.json_response(result) # File upload async def file_upload(self, request: web.Request) -> web.Response: reader = await request.multipart() uploaded_files = [] async for field in reader: if field.filename: size = 0 content = bytearray() while True: chunk = await field.read_chunk() if not chunk: break size += len(chunk) content.extend(chunk) uploaded_files.append({ 'filename': field.filename, 'size': size, 'content_type': field.headers.get('Content-Type') }) return web.json_response({ 'files': uploaded_files }) # Headers async def headers_example(self, request: web.Request) -> web.Response: auth_header = request.headers.get('Authorization') user_agent = request.headers.get('User-Agent') custom_header = request.headers.get('X-Custom-Header') response_headers = { 'X-Custom-Response': 'value', 'X-Request-ID': 'unique-id-123' } return web.json_response( { 'auth': auth_header, 'user_agent': user_agent, 'custom': custom_header }, headers=response_headers ) # Cookies async def cookies_example(self, request: web.Request) -> web.Response: session_id = request.cookies.get('session_id') response = web.json_response({ 'session_id': session_id }) # Set cookie response.set_cookie( 'session_id', 'new-session-id', max_age=3600, httponly=True, secure=True, samesite='Strict' ) return response ``` ### Response Types ```python from aiohttp import web import json class ResponseExamples: # Text response async def text_response(self, request: web.Request) -> web.Response: return web.Response( text='Plain text response', content_type='text/plain' ) # JSON response async def json_response(self, request: web.Request) -> web.Response: return web.json_response({ 'status': 'success', 'data': {'key': 'value'} }) # HTML response async def html_response(self, request: web.Request) -> web.Response: html = """ Example

Hello, World!

""" return web.Response( text=html, content_type='text/html' ) # Binary response async def binary_response(self, request: web.Request) -> web.Response: data = b'\x00\x01\x02\x03\x04' return web.Response( body=data, content_type='application/octet-stream' ) # File download async def file_download(self, request: web.Request) -> web.Response: return web.FileResponse( path='./example.pdf', headers={ 'Content-Disposition': 'attachment; filename="example.pdf"' } ) # Streaming response async def streaming_response(self, request: web.Request) -> web.StreamResponse: response = web.StreamResponse() response.headers['Content-Type'] = 'text/plain' await response.prepare(request) for i in range(10): await response.write(f"Chunk {i}\n".encode()) await asyncio.sleep(0.5) await response.write_eof() return response # Redirect async def redirect_response(self, request: web.Request) -> web.Response: raise web.HTTPFound('/new-location') # Custom status codes async def custom_status(self, request: web.Request) -> web.Response: return web.json_response( {'message': 'Created'}, status=201 ) ``` --- ## Request Validation with Pydantic ### Basic Pydantic Integration ```python from pydantic import BaseModel, Field, field_validator, ConfigDict from pydantic import EmailStr, HttpUrl from aiohttp import web from typing import Optional, Literal # Pydantic 2.12+ models class UserCreate(BaseModel): model_config = ConfigDict(str_strip_whitespace=True) username: str = Field(min_length=3, max_length=50) email: EmailStr password: str = Field(min_length=8) age: Optional[int] = Field(None, ge=18, le=120) role: Literal['user', 'admin', 'moderator'] = 'user' website: Optional[HttpUrl] = None @field_validator('username') @classmethod def validate_username(cls, v: str) -> str: if not v.isalnum(): raise ValueError('Username must be alphanumeric') return v.lower() @field_validator('password') @classmethod def validate_password(cls, v: str) -> str: if not any(c.isupper() for c in v): raise ValueError('Password must contain uppercase letter') if not any(c.isdigit() for c in v): raise ValueError('Password must contain digit') return v class UserResponse(BaseModel): user_id: str username: str email: EmailStr role: str created_at: float # Handler with validation async def create_user(request: web.Request) -> web.Response: try: data = await request.json() user_data = UserCreate(**data) except ValueError as e: return web.json_response( {'error': 'Validation error', 'details': str(e)}, status=400 ) # Process validated data user = UserResponse( user_id='123', username=user_data.username, email=user_data.email, role=user_data.role, created_at=time.time() ) return web.json_response( user.model_dump(), status=201 ) ``` ### Advanced Validation Patterns ```python from pydantic import BaseModel, Field, field_validator, model_validator from typing import Any, Optional from enum import Enum class Priority(str, Enum): LOW = 'low' MEDIUM = 'medium' HIGH = 'high' URGENT = 'urgent' class TaskCreate(BaseModel): model_config = ConfigDict( str_strip_whitespace=True, extra='forbid' # Reject extra fields ) title: str = Field(min_length=1, max_length=200) description: Optional[str] = Field(None, max_length=5000) priority: Priority = Priority.MEDIUM tags: list[str] = Field(default_factory=list, max_length=10) due_date: Optional[float] = None assigned_to: Optional[str] = None @field_validator('tags') @classmethod def validate_tags(cls, v: list[str]) -> list[str]: if len(v) != len(set(v)): raise ValueError('Tags must be unique') return [tag.lower() for tag in v] @model_validator(mode='after') def validate_model(self) -> 'TaskCreate': if self.priority == Priority.URGENT and not self.assigned_to: raise ValueError('Urgent tasks must be assigned') if self.due_date and self.due_date < time.time(): raise ValueError('Due date cannot be in the past') return self # Middleware for automatic validation @web.middleware async def validation_middleware(request: web.Request, handler): # Get validation schema from route schema = getattr(handler, '_validation_schema', None) if schema and request.method in ('POST', 'PUT', 'PATCH'): try: data = await request.json() validated = schema(**data) request['validated_data'] = validated except ValueError as e: return web.json_response( {'error': 'Validation error', 'details': str(e)}, status=400 ) return await handler(request) # Decorator for validation def validate_with(schema: type[BaseModel]): def decorator(handler): handler._validation_schema = schema return handler return decorator # Usage @validate_with(TaskCreate) async def create_task(request: web.Request) -> web.Response: task_data: TaskCreate = request['validated_data'] # Data is already validated return web.json_response({ 'task_id': 'task-123', 'title': task_data.title, 'priority': task_data.priority.value }, status=201) ``` ### Query Parameter Validation ```python from pydantic import BaseModel, Field from typing import Optional class PaginationParams(BaseModel): page: int = Field(1, ge=1, le=1000) per_page: int = Field(20, ge=1, le=100) sort_by: Optional[str] = Field(None, pattern=r'^[a-zA-Z_]+$') order: Literal['asc', 'desc'] = 'asc' @property def offset(self) -> int: return (self.page - 1) * self.per_page @property def limit(self) -> int: return self.per_page async def list_items(request: web.Request) -> web.Response: try: params = PaginationParams(**request.query) except ValueError as e: return web.json_response( {'error': 'Invalid parameters', 'details': str(e)}, status=400 ) # Use validated params items = [] # Fetch from database with params.offset and params.limit return web.json_response({ 'items': items, 'page': params.page, 'per_page': params.per_page, 'total': 100 }) ``` --- ## WebSocket Implementation ### Basic WebSocket Server ```python from aiohttp import web, WSMsgType import asyncio class WebSocketHandler: def __init__(self): self.active_connections: set[web.WebSocketResponse] = set() async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) self.active_connections.add(ws) try: async for msg in ws: if msg.type == WSMsgType.TEXT: if msg.data == 'close': await ws.close() else: # Echo message back await ws.send_str(f"Echo: {msg.data}") # Broadcast to all connections await self.broadcast(f"User says: {msg.data}") elif msg.type == WSMsgType.ERROR: print(f'WebSocket error: {ws.exception()}') finally: self.active_connections.discard(ws) return ws async def broadcast(self, message: str): if self.active_connections: await asyncio.gather( *[ws.send_str(message) for ws in self.active_connections], return_exceptions=True ) # Setup app = web.Application() handler = WebSocketHandler() app.router.add_get('/ws', handler.websocket_handler) ``` ### Advanced WebSocket Server with Authentication ```python from aiohttp import web, WSMsgType import json import asyncio from typing import Optional import jwt class AuthenticatedWebSocketHandler: def __init__(self, secret_key: str): self.secret_key = secret_key self.connections: dict[str, web.WebSocketResponse] = {} def verify_token(self, token: str) -> Optional[dict]: try: return jwt.decode(token, self.secret_key, algorithms=['HS256']) except jwt.InvalidTokenError: return None async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse(heartbeat=30) await ws.prepare(request) user_id: Optional[str] = None try: # Wait for authentication message msg = await asyncio.wait_for(ws.receive(), timeout=10.0) if msg.type != WSMsgType.TEXT: await ws.send_json({'error': 'Authentication required'}) await ws.close() return ws auth_data = json.loads(msg.data) token = auth_data.get('token') if not token: await ws.send_json({'error': 'Token required'}) await ws.close() return ws payload = self.verify_token(token) if not payload: await ws.send_json({'error': 'Invalid token'}) await ws.close() return ws user_id = payload['user_id'] self.connections[user_id] = ws await ws.send_json({ 'type': 'auth_success', 'user_id': user_id }) # Handle messages async for msg in ws: if msg.type == WSMsgType.TEXT: data = json.loads(msg.data) await self.handle_message(user_id, data) elif msg.type == WSMsgType.ERROR: print(f'WebSocket error: {ws.exception()}') except asyncio.TimeoutError: await ws.send_json({'error': 'Authentication timeout'}) finally: if user_id and user_id in self.connections: del self.connections[user_id] return ws async def handle_message(self, user_id: str, data: dict): message_type = data.get('type') if message_type == 'ping': await self.send_to_user(user_id, {'type': 'pong'}) elif message_type == 'broadcast': await self.broadcast({ 'type': 'message', 'from': user_id, 'content': data.get('content') }) elif message_type == 'direct': to_user = data.get('to') await self.send_to_user(to_user, { 'type': 'direct_message', 'from': user_id, 'content': data.get('content') }) async def send_to_user(self, user_id: str, message: dict): ws = self.connections.get(user_id) if ws and not ws.closed: await ws.send_json(message) async def broadcast(self, message: dict): if self.connections: await asyncio.gather( *[ws.send_json(message) for ws in self.connections.values() if not ws.closed], return_exceptions=True ) ``` ### WebSocket Client ```python import aiohttp import asyncio async def websocket_client(): async with aiohttp.ClientSession() as session: async with session.ws_connect('http://localhost:8080/ws') as ws: # Send authentication await ws.send_json({ 'token': 'your-jwt-token' }) # Receive authentication response msg = await ws.receive() print(f"Auth response: {msg.data}") # Send messages await ws.send_json({ 'type': 'broadcast', 'content': 'Hello, everyone!' }) # Receive messages async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() print(f"Received: {data}") elif msg.type == aiohttp.WSMsgType.CLOSED: break elif msg.type == aiohttp.WSMsgType.ERROR: break asyncio.run(websocket_client()) ``` --- ## Testing with pytest and pytest-aiohttp ### Basic Test Setup ```python # conftest.py import pytest import asyncio from aiohttp import web from typing import AsyncIterator pytest_plugins = 'aiohttp.pytest_plugin' @pytest.fixture async def app() -> AsyncIterator[web.Application]: app = web.Application() async def hello(request): return web.Response(text='Hello, World!') app.router.add_get('/', hello) yield app # Cleanup await app.cleanup() @pytest.fixture async def client(aiohttp_client, app): return await aiohttp_client(app) ``` ### Basic Tests ```python # test_basic.py import pytest from aiohttp import web @pytest.mark.asyncio async def test_hello(client): resp = await client.get('/') assert resp.status == 200 text = await resp.text() assert 'Hello, World!' in text @pytest.mark.asyncio async def test_json_endpoint(client): resp = await client.post('/api/data', json={'key': 'value'}) assert resp.status == 200 data = await resp.json() assert data['key'] == 'value' ``` ### Testing with Fixtures ```python # conftest.py import pytest import asyncio from typing import AsyncIterator import aiohttp @pytest.fixture async def http_session() -> AsyncIterator[aiohttp.ClientSession]: session = aiohttp.ClientSession() yield session await session.close() @pytest.fixture def sample_user(): return { 'username': 'testuser', 'email': 'test@example.com', 'password': 'TestPass123' } @pytest.fixture async def authenticated_client(client, sample_user): # Login resp = await client.post('/login', json=sample_user) assert resp.status == 200 # Extract token data = await resp.json() token = data['token'] # Set authorization header client.session.headers['Authorization'] = f'Bearer {token}' yield client # tests/test_auth.py @pytest.mark.asyncio async def test_protected_endpoint(authenticated_client): resp = await authenticated_client.get('/api/protected') assert resp.status == 200 data = await resp.json() assert 'user_id' in data ``` ### Parameterized Tests ```python import pytest @pytest.mark.parametrize('username,email,expected_status', [ ('valid', 'valid@example.com', 201), ('ab', 'valid@example.com', 400), # Too short ('valid', 'invalid-email', 400), # Invalid email ('', 'valid@example.com', 400), # Empty username ]) @pytest.mark.asyncio async def test_user_creation_validation(client, username, email, expected_status): resp = await client.post('/users', json={ 'username': username, 'email': email, 'password': 'ValidPass123' }) assert resp.status == expected_status @pytest.mark.parametrize('method,path,expected', [ ('GET', '/', 200), ('GET', '/api/users', 200), ('POST', '/api/users', 401), # Requires auth ('GET', '/nonexistent', 404), ]) @pytest.mark.asyncio async def test_endpoints(client, method, path, expected): if method == 'GET': resp = await client.get(path) elif method == 'POST': resp = await client.post(path, json={}) assert resp.status == expected ``` ### Mocking External APIs ```python import pytest from unittest.mock import AsyncMock, patch from aiohttp import web @pytest.mark.asyncio async def test_external_api_call(client): # Mock external API with patch('aiohttp.ClientSession.get') as mock_get: mock_response = AsyncMock() mock_response.status = 200 mock_response.json = AsyncMock(return_value={'data': 'mocked'}) mock_get.return_value.__aenter__.return_value = mock_response resp = await client.get('/api/external') assert resp.status == 200 data = await resp.json() assert data['data'] == 'mocked' @pytest.fixture async def mock_database(): class MockDB: def __init__(self): self.data = {} async def get(self, key): return self.data.get(key) async def set(self, key, value): self.data[key] = value async def delete(self, key): if key in self.data: del self.data[key] return MockDB() @pytest.mark.asyncio async def test_with_mock_db(client, mock_database): # Inject mock database into app client.app['db'] = mock_database # Test database operations await mock_database.set('user:1', {'name': 'Test'}) resp = await client.get('/users/1') assert resp.status == 200 ``` ### Testing WebSockets ```python import pytest from aiohttp import WSMsgType @pytest.mark.asyncio async def test_websocket_echo(aiohttp_client): app = web.Application() async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) async for msg in ws: if msg.type == WSMsgType.TEXT: await ws.send_str(f"Echo: {msg.data}") return ws app.router.add_get('/ws', websocket_handler) client = await aiohttp_client(app) async with client.ws_connect('/ws') as ws: await ws.send_str('Hello') msg = await ws.receive() assert msg.data == 'Echo: Hello' @pytest.mark.asyncio async def test_websocket_broadcast(aiohttp_client): from collections import defaultdict connections = set() app = web.Application() async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connections.add(ws) try: async for msg in ws: if msg.type == WSMsgType.TEXT: # Broadcast to all for conn in connections: if conn != ws: await conn.send_str(msg.data) finally: connections.discard(ws) return ws app.router.add_get('/ws', websocket_handler) client = await aiohttp_client(app) # Create two connections async with client.ws_connect('/ws') as ws1: async with client.ws_connect('/ws') as ws2: await ws1.send_str('Hello from ws1') msg = await ws2.receive() assert msg.data == 'Hello from ws1' ``` ### Testing Middleware ```python import pytest from aiohttp import web @pytest.mark.asyncio async def test_auth_middleware(aiohttp_client): @web.middleware async def auth_middleware(request, handler): token = request.headers.get('Authorization') if not token or not token.startswith('Bearer '): raise web.HTTPUnauthorized() request['user_id'] = 'user-123' return await handler(request) app = web.Application(middlewares=[auth_middleware]) async def protected(request): return web.json_response({'user_id': request['user_id']}) app.router.add_get('/protected', protected) client = await aiohttp_client(app) # Without token resp = await client.get('/protected') assert resp.status == 401 # With token resp = await client.get('/protected', headers={ 'Authorization': 'Bearer valid-token' }) assert resp.status == 200 data = await resp.json() assert data['user_id'] == 'user-123' ``` ### Coverage and Best Practices ```python # pytest.ini or pyproject.toml [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = [ "--verbose", "--strict-markers", "--cov=app", "--cov-report=html", "--cov-report=term-missing", ] # Best practices # 1. Keep tests independent # 2. Use fixtures for common setup # 3. Mock external dependencies # 4. Test edge cases # 5. Use parametrize for similar tests # 6. Clean up resources properly ``` --- ## Advanced Middleware and Error Handling ### Error Handling Middleware ```python from aiohttp import web import logging from typing import Callable, Awaitable logger = logging.getLogger(__name__) @web.middleware async def error_middleware( request: web.Request, handler: Callable[[web.Request], Awaitable[web.Response]] ) -> web.Response: try: return await handler(request) except web.HTTPException as e: # HTTP exceptions should pass through raise except ValueError as e: logger.warning(f"Validation error: {e}") return web.json_response( { 'error': 'Validation Error', 'message': str(e) }, status=400 ) except PermissionError as e: logger.warning(f"Permission denied: {e}") return web.json_response( { 'error': 'Forbidden', 'message': 'You do not have permission to access this resource' }, status=403 ) except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) return web.json_response( { 'error': 'Internal Server Error', 'message': 'An unexpected error occurred' }, status=500 ) ``` ### Logging Middleware ```python import time import logging from aiohttp import web logger = logging.getLogger(__name__) @web.middleware async def logging_middleware(request: web.Request, handler): start_time = time.time() # Log request logger.info( f"Request started", extra={ 'method': request.method, 'path': request.path, 'query': dict(request.query), 'remote': request.remote } ) try: response = await handler(request) # Log response duration = time.time() - start_time logger.info( f"Request completed", extra={ 'method': request.method, 'path': request.path, 'status': response.status, 'duration_ms': duration * 1000 } ) return response except Exception as e: duration = time.time() - start_time logger.error( f"Request failed", extra={ 'method': request.method, 'path': request.path, 'duration_ms': duration * 1000, 'error': str(e) }, exc_info=True ) raise ``` ### CORS Middleware ```python from aiohttp import web from typing import Optional @web.middleware async def cors_middleware(request: web.Request, handler): # Handle preflight if request.method == 'OPTIONS': response = web.Response() else: response = await handler(request) # Add CORS headers response.headers['Access-Control-Allow-Origin'] = '*' response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS' response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization' response.headers['Access-Control-Max-Age'] = '3600' return response # Or use aiohttp-cors library import aiohttp_cors app = web.Application() # Configure CORS cors = aiohttp_cors.setup(app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) # Add routes resource = app.router.add_resource("/api/endpoint") route = resource.add_route("GET", handler) cors.add(route) ``` ### Rate Limiting Middleware ```python from aiohttp import web import time from collections import defaultdict from typing import Dict, Tuple class RateLimiter: def __init__(self, max_requests: int = 100, window: int = 60): self.max_requests = max_requests self.window = window self.requests: Dict[str, list[float]] = defaultdict(list) def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]: now = time.time() cutoff = now - self.window # Remove old requests self.requests[client_id] = [ req_time for req_time in self.requests[client_id] if req_time > cutoff ] if len(self.requests[client_id]) >= self.max_requests: oldest = self.requests[client_id][0] retry_after = oldest + self.window - now return False, retry_after self.requests[client_id].append(now) return True, None rate_limiter = RateLimiter(max_requests=100, window=60) @web.middleware async def rate_limit_middleware(request: web.Request, handler): # Use IP address as client identifier client_id = request.remote allowed, retry_after = rate_limiter.is_allowed(client_id) if not allowed: return web.json_response( { 'error': 'Rate limit exceeded', 'retry_after': int(retry_after) }, status=429, headers={'Retry-After': str(int(retry_after))} ) return await handler(request) ``` --- ## Performance Optimization ### Connection Pooling ```python import aiohttp from aiohttp import TCPConnector, ClientTimeout class OptimizedClient: def __init__(self, base_url: str): self.base_url = base_url # Optimized connector self.connector = TCPConnector( limit=100, # Total connections limit_per_host=30, # Per host ttl_dns_cache=300, # DNS cache TTL force_close=False, # Keep-alive enable_cleanup_closed=True, use_dns_cache=True ) # Optimized timeout self.timeout = ClientTimeout( total=30, connect=10, sock_read=20 ) self._session: Optional[aiohttp.ClientSession] = None async def start(self): self._session = aiohttp.ClientSession( base_url=self.base_url, connector=self.connector, timeout=self.timeout, connector_owner=True, auto_decompress=True, trust_env=True, read_bufsize=2**16 # 64KB buffer ) async def close(self): if self._session: await self._session.close() await asyncio.sleep(0.25) ``` ### Concurrent Requests ```python import asyncio import aiohttp from typing import List, Any async def fetch_many(urls: List[str]) -> List[Any]: async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks, return_exceptions=True) async def fetch_one(session: aiohttp.ClientSession, url: str): async with session.get(url) as response: return await response.json() # With semaphore for limiting concurrency async def fetch_with_limit(urls: List[str], max_concurrent: int = 10): semaphore = asyncio.Semaphore(max_concurrent) async def fetch_limited(url: str): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() return await asyncio.gather(*[fetch_limited(url) for url in urls]) ``` ### Streaming Large Responses ```python async def download_large_file(url: str, filepath: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: with open(filepath, 'wb') as f: async for chunk in response.content.iter_chunked(8192): f.write(chunk) # Server-side streaming async def stream_large_response(request: web.Request) -> web.StreamResponse: response = web.StreamResponse() response.headers['Content-Type'] = 'application/octet-stream' await response.prepare(request) # Stream data in chunks with open('large_file.dat', 'rb') as f: while chunk := f.read(8192): await response.write(chunk) await response.write_eof() return response ``` ### Caching ```python from functools import lru_cache import time class CachedClient: def __init__(self): self.cache = {} self.cache_ttl = 300 # 5 minutes async def get_with_cache(self, url: str): now = time.time() # Check cache if url in self.cache: data, timestamp = self.cache[url] if now - timestamp < self.cache_ttl: return data # Fetch and cache async with aiohttp.ClientSession() as session: async with session.get(url) as response: data = await response.json() self.cache[url] = (data, now) return data ``` --- ## Git Protocol Integration ### Understanding Git Smart HTTP Git Smart HTTP protocol allows git clients to clone, fetch, and push over HTTP/HTTPS. The protocol involves: 1. **Service Discovery**: Client requests `/info/refs?service=git-upload-pack` or `git-receive-pack` 2. **Negotiation**: Client and server negotiate which objects to transfer 3. **Pack Transfer**: Server sends/receives packfiles ### Basic Git HTTP Backend ```python from aiohttp import web import subprocess import os from pathlib import Path class GitHTTPBackend: def __init__(self, repo_root: Path): self.repo_root = repo_root self.git_backend = '/usr/lib/git-core/git-http-backend' async def handle_info_refs(self, request: web.Request) -> web.Response: repo_path = request.match_info['repo'] service = request.query.get('service', '') if service not in ('git-upload-pack', 'git-receive-pack'): return web.Response(status=400, text='Invalid service') full_path = self.repo_root / repo_path if not full_path.exists(): return web.Response(status=404, text='Repository not found') # Build environment env = os.environ.copy() env['GIT_PROJECT_ROOT'] = str(self.repo_root) env['GIT_HTTP_EXPORT_ALL'] = '1' env['PATH_INFO'] = f'/{repo_path}/info/refs' env['QUERY_STRING'] = f'service={service}' env['REQUEST_METHOD'] = 'GET' # Execute git-http-backend proc = await asyncio.create_subprocess_exec( self.git_backend, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, stderr = await proc.communicate() if proc.returncode != 0: return web.Response(status=500, text=stderr.decode()) # Parse CGI output headers_end = stdout.find(b'\r\n\r\n') if headers_end == -1: return web.Response(status=500) header_lines = stdout[:headers_end].decode().split('\r\n') body = stdout[headers_end + 4:] # Parse headers headers = {} for line in header_lines: if ':' in line: key, value = line.split(':', 1) headers[key.strip()] = value.strip() return web.Response( body=body, headers=headers, status=200 ) async def handle_service(self, request: web.Request) -> web.Response: repo_path = request.match_info['repo'] service = request.match_info['service'] if service not in ('git-upload-pack', 'git-receive-pack'): return web.Response(status=400) full_path = self.repo_root / repo_path if not full_path.exists(): return web.Response(status=404) # Read request body body = await request.read() # Build environment env = os.environ.copy() env['GIT_PROJECT_ROOT'] = str(self.repo_root) env['GIT_HTTP_EXPORT_ALL'] = '1' env['PATH_INFO'] = f'/{repo_path}/{service}' env['REQUEST_METHOD'] = 'POST' env['CONTENT_TYPE'] = request.content_type env['CONTENT_LENGTH'] = str(len(body)) # Execute git service proc = await asyncio.create_subprocess_exec( self.git_backend, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) stdout, stderr = await proc.communicate(input=body) if proc.returncode != 0: return web.Response(status=500, text=stderr.decode()) # Parse CGI output headers_end = stdout.find(b'\r\n\r\n') header_lines = stdout[:headers_end].decode().split('\r\n') body = stdout[headers_end + 4:] headers = {} for line in header_lines: if ':' in line: key, value = line.split(':', 1) headers[key.strip()] = value.strip() return web.Response( body=body, headers=headers, status=200 ) # Setup routes git_backend = GitHTTPBackend(Path('/var/git/repos')) app = web.Application() app.router.add_get('/{repo:.+}/info/refs', git_backend.handle_info_refs) app.router.add_post('/{repo:.+}/{service:(git-upload-pack|git-receive-pack)}', git_backend.handle_service) ``` ### Git Backend with Authentication ```python from aiohttp import web import base64 import subprocess from pathlib import Path class AuthenticatedGitBackend: def __init__(self, repo_root: Path): self.repo_root = repo_root self.users = { 'alice': 'password123', 'bob': 'secret456' } def verify_auth(self, request: web.Request) -> tuple[bool, str | None]: auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Basic '): return False, None try: encoded = auth_header[6:] decoded = base64.b64decode(encoded).decode() username, password = decoded.split(':', 1) if self.users.get(username) == password: return True, username except: pass return False, None @web.middleware async def auth_middleware(self, request: web.Request, handler): # Allow anonymous reads service = request.query.get('service', '') if request.method == 'GET' and service == 'git-upload-pack': return await handler(request) # Require authentication for pushes if service == 'git-receive-pack' or 'git-receive-pack' in request.path: authorized, username = self.verify_auth(request) if not authorized: return web.Response( status=401, headers={'WWW-Authenticate': 'Basic realm="Git Access"'}, text='Authentication required' ) request['username'] = username return await handler(request) async def handle_info_refs(self, request: web.Request): # Git info/refs implementation # Similar to previous example but with auth pass async def handle_service(self, request: web.Request): # Git service implementation # Similar to previous example but with auth pass ``` --- ## Repository Manager Implementation ### Repository Browser ```python from aiohttp import web import os from pathlib import Path import subprocess from typing import Optional import json class RepositoryManager: def __init__(self, repo_root: Path): self.repo_root = repo_root async def list_repositories(self, request: web.Request) -> web.Response: repos = [] for item in self.repo_root.iterdir(): if item.is_dir() and (item / '.git').exists(): repos.append({ 'name': item.name, 'path': str(item.relative_to(self.repo_root)), 'type': 'git' }) return web.json_response({'repositories': repos}) async def get_repository(self, request: web.Request) -> web.Response: repo_name = request.match_info['repo'] repo_path = self.repo_root / repo_name if not repo_path.exists(): return web.json_response( {'error': 'Repository not found'}, status=404 ) # Get repository info info = await self._get_repo_info(repo_path) return web.json_response(info) async def create_repository(self, request: web.Request) -> web.Response: data = await request.json() repo_name = data.get('name') if not repo_name: return web.json_response( {'error': 'Repository name required'}, status=400 ) repo_path = self.repo_root / repo_name if repo_path.exists(): return web.json_response( {'error': 'Repository already exists'}, status=400 ) # Create bare repository repo_path.mkdir(parents=True) proc = await asyncio.create_subprocess_exec( 'git', 'init', '--bare', str(repo_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode != 0: return web.json_response( {'error': 'Failed to create repository'}, status=500 ) return web.json_response({ 'name': repo_name, 'path': str(repo_path.relative_to(self.repo_root)) }, status=201) async def delete_repository(self, request: web.Request) -> web.Response: repo_name = request.match_info['repo'] repo_path = self.repo_root / repo_name if not repo_path.exists(): return web.json_response( {'error': 'Repository not found'}, status=404 ) # Delete repository directory import shutil shutil.rmtree(repo_path) return web.Response(status=204) async def browse_tree(self, request: web.Request) -> web.Response: repo_name = request.match_info['repo'] ref = request.query.get('ref', 'HEAD') path = request.query.get('path', '') repo_path = self.repo_root / repo_name if not repo_path.exists(): return web.json_response( {'error': 'Repository not found'}, status=404 ) # List files in tree proc = await asyncio.create_subprocess_exec( 'git', 'ls-tree', ref, path, cwd=str(repo_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: return web.json_response( {'error': stderr.decode()}, status=400 ) # Parse ls-tree output entries = [] for line in stdout.decode().strip().split('\n'): if not line: continue mode, type_, hash_, name = line.split(None, 3) entries.append({ 'mode': mode, 'type': type_, 'hash': hash_, 'name': name }) return web.json_response({'entries': entries}) async def get_file_content(self, request: web.Request) -> web.Response: repo_name = request.match_info['repo'] ref = request.query.get('ref', 'HEAD') path = request.query['path'] repo_path = self.repo_root / repo_name # Get file content proc = await asyncio.create_subprocess_exec( 'git', 'show', f'{ref}:{path}', cwd=str(repo_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: return web.json_response( {'error': 'File not found'}, status=404 ) return web.Response( body=stdout, content_type='text/plain' ) async def get_commits(self, request: web.Request) -> web.Response: repo_name = request.match_info['repo'] ref = request.query.get('ref', 'HEAD') limit = int(request.query.get('limit', '50')) repo_path = self.repo_root / repo_name # Get commit log proc = await asyncio.create_subprocess_exec( 'git', 'log', ref, '--pretty=format:%H|%an|%ae|%at|%s', f'-{limit}', cwd=str(repo_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() commits = [] for line in stdout.decode().strip().split('\n'): if not line: continue hash_, author, email, timestamp, message = line.split('|', 4) commits.append({ 'hash': hash_, 'author': author, 'email': email, 'timestamp': int(timestamp), 'message': message }) return web.json_response({'commits': commits}) async def _get_repo_info(self, repo_path: Path) -> dict: # Get HEAD proc = await asyncio.create_subprocess_exec( 'git', 'rev-parse', 'HEAD', cwd=str(repo_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate() head = stdout.decode().strip() if proc.returncode == 0 else None # Get branches proc = await asyncio.create_subprocess_exec( 'git', 'branch', '-a', cwd=str(repo_path), stdout=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate() branches = [ line.strip().lstrip('* ') for line in stdout.decode().strip().split('\n') ] return { 'name': repo_path.name, 'head': head, 'branches': branches } # Setup routes repo_manager = RepositoryManager(Path('/var/git/repos')) app.router.add_get('/api/repositories', repo_manager.list_repositories) app.router.add_get('/api/repositories/{repo}', repo_manager.get_repository) app.router.add_post('/api/repositories', repo_manager.create_repository) app.router.add_delete('/api/repositories/{repo}', repo_manager.delete_repository) app.router.add_get('/api/repositories/{repo}/tree', repo_manager.browse_tree) app.router.add_get('/api/repositories/{repo}/file', repo_manager.get_file_content) app.router.add_get('/api/repositories/{repo}/commits', repo_manager.get_commits) ``` --- ## Best Practices and Patterns ### Application Structure ``` project/ ├── app/ │ ├── __init__.py │ ├── main.py # Application entry point │ ├── routes.py # Route definitions │ ├── handlers.py # Request handlers │ ├── middleware.py # Custom middleware │ ├── models.py # Pydantic models │ ├── database.py # Database connections │ └── utils.py # Utility functions ├── tests/ │ ├── __init__.py │ ├── conftest.py # Test fixtures │ ├── test_handlers.py │ └── test_integration.py ├── requirements.txt ├── pyproject.toml └── README.md ``` ### Configuration Management ```python from pydantic_settings import BaseSettings from typing import Optional class Settings(BaseSettings): model_config = ConfigDict( env_file='.env', env_file_encoding='utf-8', case_sensitive=False ) # Server host: str = '127.0.0.1' port: int = 8080 debug: bool = False # Database database_url: str database_pool_size: int = 10 # Security secret_key: str jwt_algorithm: str = 'HS256' jwt_expiration: int = 3600 # External APIs external_api_url: Optional[str] = None external_api_key: Optional[str] = None settings = Settings() ``` ### Graceful Shutdown ```python from aiohttp import web import asyncio import signal class Application: def __init__(self): self.app = web.Application() self.cleanup_tasks = [] async def startup(self): # Initialize resources pass async def cleanup(self): # Cleanup resources for task in self.cleanup_tasks: await task async def shutdown(self, app): # Close database connections # Close HTTP sessions # Wait for background tasks await self.cleanup() def run(self): self.app.on_startup.append(lambda app: self.startup()) self.app.on_cleanup.append(self.shutdown) web.run_app( self.app, host='127.0.0.1', port=8080, shutdown_timeout=60.0 ) ``` ### Summary This guide covers: - Python 3.13 modern features and type hints - aiohttp 3.13+ client and server development - Complete authentication patterns (Basic, Bearer, API Key, OAuth2) - Pydantic 2.12+ validation - WebSocket implementation - Comprehensive testing with pytest-aiohttp - Git protocol integration - Repository management system - Performance optimization - Production-ready patterns **Key Takeaways:** 1. Always reuse ClientSession across requests 2. Use type hints and Pydantic for validation 3. Implement proper error handling and middleware 4. Write comprehensive tests with pytest-aiohttp 5. Follow async/await patterns consistently 6. Optimize connection pooling and timeouts 7. Handle cleanup and graceful shutdown properly --- *Guide Version: 1.0* *Last Updated: November 2025* *Compatible with: Python 3.13.3, aiohttp 3.13.2, pytest-aiohttp 1.1.0, pydantic 2.12.3*