Retoor's Guide to Modern Python: Mastering aiohttp 3.13+ with Python 3.13

Complete Tutorial: aiohttp, Testing, Authentication, WebSockets, and Git Protocol Integration

Version Requirements:

  • Python: 3.13.3 (Released October 7, 2024)
  • aiohttp: 3.13.2 (Latest stable as of October 28, 2025)
  • pytest: 8.3+
  • pytest-aiohttp: 1.1.0 (Released January 23, 2025)
  • pytest-asyncio: 1.2.0 (Released September 12, 2025)
  • pydantic: 2.12.3 (Released October 17, 2025)

Table of Contents

  1. Python 3.13 Modern Features
  2. aiohttp Fundamentals
  3. Client Sessions and Connection Management
  4. Authentication Patterns
  5. Server Development
  6. Request Validation with Pydantic
  7. WebSocket Implementation
  8. Testing with pytest and pytest-aiohttp
  9. Advanced Middleware and Error Handling
  10. Performance Optimization
  11. Git Protocol Integration
  12. Repository Manager Implementation
  13. Best Practices and Patterns
  14. Automatic Memory and Context Search

Python 3.13 Modern Features

Key Python 3.13 Enhancements

Python 3.13 introduces significant improvements for asynchronous programming:

Experimental Free-Threaded Mode (No GIL)

  • Enable true multi-threading with python -X gil=0
  • Significant for CPU-bound async operations
  • Better performance on multi-core systems

JIT Compiler (Preview)

  • Just-In-Time compilation for performance boosts
  • Enable with PYTHON_JIT=1 environment variable
  • Early benchmarks show 10-20% improvements

Enhanced Interactive Interpreter

  • Multi-line editing with syntax highlighting
  • Colorized tracebacks for better debugging
  • Improved REPL experience

Better Error Messages

  • More precise error locations
  • Clearer exception messages
  • Context-aware suggestions

Modern Type Hints (PEP 695)

Python 3.13 fully supports modern generic syntax:

from typing import TypeVar, Generic
from collections.abc import Sequence, Mapping

# Old style (still works)
T = TypeVar('T')
class OldGeneric(Generic[T]):
    def process(self, item: T) -> T:
        return item

# New PEP 695 style (Python 3.12+)
class NewGeneric[T]:
    def process(self, item: T) -> T:
        return item

# Type aliases with 'type' keyword
type RequestHandler[T] = Callable[[Request], Awaitable[T]]
type JSONDict = dict[str, str | int | float | bool | None]

# Modern function annotations
async def fetch_data[T](url: str, parser: Callable[[bytes], T]) -> T | None:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                return parser(await response.read())
            return None

Dataclasses in Python 3.13

from dataclasses import dataclass, field, replace
from typing import ClassVar
import copy

@dataclass(slots=True, kw_only=True)
class User:
    user_id: str
    username: str
    email: str
    created_at: float = field(default_factory=time.time)
    is_active: bool = True
    _password_hash: str = field(repr=False, compare=False)

    # New in 3.13: __static_attributes__
    def __post_init__(self):
        self.last_login: float | None = None

    @property
    def is_new(self) -> bool:
        return time.time() - self.created_at < 86400

# New in 3.13: copy.replace() works with dataclasses
user = User(user_id="123", username="alice", email="alice@example.com", _password_hash="hashed")
updated = copy.replace(user, username="alice_updated")

# Access static attributes (new in 3.13)
print(User.__static_attributes__)  # ('last_login',)

Modern Async Patterns

import asyncio
from collections.abc import AsyncIterator

# Async generators
async def fetch_paginated[T](
    url: str,
    parser: Callable[[dict], T]
) -> AsyncIterator[T]:
    page = 1
    async with aiohttp.ClientSession() as session:
        while True:
            async with session.get(f"{url}?page={page}") as response:
                data = await response.json()
                if not data['items']:
                    break
                for item in data['items']:
                    yield parser(item)
                page += 1

# Context manager pattern
class AsyncResourceManager:
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        await self.session.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.__aexit__(exc_type, exc_val, exc_tb)

aiohttp Fundamentals

Installation and Setup

# Core installation
pip install aiohttp==3.13.2

# With speedups (highly recommended)
pip install aiohttp[speedups]==3.13.2

# Additional recommended packages
pip install aiodns>=3.0.0  # Fast DNS resolution
pip install Brotli>=1.0.9  # Brotli compression support
pip install pydantic>=2.12.3  # Request validation

Basic Client Usage

import aiohttp
import asyncio

async def fetch_example():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as response:
            print(f"Status: {response.status}")
            print(f"Content-Type: {response.headers['content-type']}")

            # Various response methods
            text = await response.text()  # Text content
            data = await response.json()  # JSON parsing
            raw = await response.read()   # Raw bytes

asyncio.run(fetch_example())

Basic Server Usage

from aiohttp import web

async def hello_handler(request: web.Request) -> web.Response:
    name = request.match_info.get('name', 'Anonymous')
    return web.Response(text=f"Hello, {name}!")

async def json_handler(request: web.Request) -> web.Response:
    data = await request.json()
    return web.json_response({
        'status': 'success',
        'received': data
    })

app = web.Application()
app.router.add_get('/', hello_handler)
app.router.add_get('/{name}', hello_handler)
app.router.add_post('/api/data', json_handler)

if __name__ == '__main__':
    web.run_app(app, host='127.0.0.1', port=8080)

Client Sessions and Connection Management

Session Management Best Practices

Never create a new session for each request - this is the most common mistake:

# ❌ WRONG - Creates new session for every request
async def bad_example():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as response:
            return await response.text()

    # Session destroyed here

# ✅ CORRECT - Reuse session across requests
class APIClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self):
        self._session = aiohttp.ClientSession(
            base_url=self.base_url,
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(
                limit=100,  # Total connection limit
                limit_per_host=30,  # Per-host limit
                ttl_dns_cache=300,  # DNS cache TTL
            )
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()

    async def get(self, path: str) -> dict:
        async with self._session.get(path) as response:
            response.raise_for_status()
            return await response.json()

# Usage
async def main():
    async with APIClient('https://api.example.com') as client:
        user = await client.get('/users/123')
        posts = await client.get('/posts')
        comments = await client.get('/comments')

asyncio.run(main())

Advanced Session Configuration

import aiohttp
from aiohttp import ClientTimeout, TCPConnector, ClientSession
from typing import Optional

class AdvancedHTTPClient:
    def __init__(
        self,
        base_url: str,
        timeout: int = 30,
        max_connections: int = 100,
        max_connections_per_host: int = 30,
        headers: Optional[dict[str, str]] = None
    ):
        self.base_url = base_url
        self.timeout = ClientTimeout(
            total=timeout,
            connect=10,  # Connection timeout
            sock_read=20  # Socket read timeout
        )

        self.connector = TCPConnector(
            limit=max_connections,
            limit_per_host=max_connections_per_host,
            ttl_dns_cache=300,
            ssl=None,  # SSL context if needed
            force_close=False,  # Keep connections alive
            enable_cleanup_closed=True
        )

        self.default_headers = headers or {}
        self._session: Optional[ClientSession] = None

    async def start(self):
        if self._session is None:
            self._session = ClientSession(
                base_url=self.base_url,
                timeout=self.timeout,
                connector=self.connector,
                headers=self.default_headers,
                raise_for_status=False,
                connector_owner=True,
                auto_decompress=True,
                trust_env=True
            )

    async def close(self):
        if self._session:
            await self._session.close()
            await asyncio.sleep(0.25)  # Allow cleanup

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

    async def request(
        self,
        method: str,
        path: str,
        **kwargs
    ) -> aiohttp.ClientResponse:
        if not self._session:
            await self.start()

        return await self._session.request(method, path, **kwargs)
import aiohttp
from http.cookies import SimpleCookie

# Automatic cookie handling
async def with_cookies():
    # Create cookie jar
    jar = aiohttp.CookieJar(unsafe=False)  # Only HTTPS cookies

    async with aiohttp.ClientSession(cookie_jar=jar) as session:
        # Cookies are automatically stored and sent
        await session.get('https://example.com/login')

        # Manually update cookies
        session.cookie_jar.update_cookies(
            {'session_id': 'abc123'},
            response_url='https://example.com'
        )

        # Access cookies
        for cookie in session.cookie_jar:
            print(f"{cookie.key}: {cookie.value}")

# Custom cookie handling
async def custom_cookies():
    cookies = {'auth_token': 'xyz789'}

    async with aiohttp.ClientSession(cookies=cookies) as session:
        async with session.get('https://example.com/api') as response:
            # Read response cookies
            print(response.cookies)

Authentication Patterns

Basic Authentication

import aiohttp
from aiohttp import BasicAuth
import base64

# Method 1: Using BasicAuth helper
async def basic_auth_helper(username: str, password: str):
    auth = BasicAuth(login=username, password=password)

    async with aiohttp.ClientSession(auth=auth) as session:
        async with session.get('https://api.example.com/protected') as response:
            return await response.json()

# Method 2: Manual base64 encoding
async def basic_auth_manual(username: str, password: str):
    credentials = f"{username}:{password}"
    encoded = base64.b64encode(credentials.encode()).decode()

    headers = {'Authorization': f'Basic {encoded}'}

    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get('https://api.example.com/protected') as response:
            return await response.json()

# Method 3: Per-request authentication
async def basic_auth_per_request(username: str, password: str, url: str):
    auth = BasicAuth(login=username, password=password)

    async with aiohttp.ClientSession() as session:
        async with session.get(url, auth=auth) as response:
            return await response.json()

Bearer Token Authentication

class TokenAuthClient:
    def __init__(self, base_url: str, token: str):
        self.base_url = base_url
        self.token = token
        self._session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        headers = {
            'Authorization': f'Bearer {self.token}',
            'Accept': 'application/json'
        }
        self._session = aiohttp.ClientSession(
            base_url=self.base_url,
            headers=headers
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()

    async def get(self, path: str) -> dict:
        async with self._session.get(path) as response:
            response.raise_for_status()
            return await response.json()

    async def post(self, path: str, data: dict) -> dict:
        async with self._session.post(path, json=data) as response:
            response.raise_for_status()
            return await response.json()

# Usage
async def example():
    async with TokenAuthClient('https://api.example.com', 'your_token_here') as client:
        user = await client.get('/user')
        result = await client.post('/items', {'name': 'test'})

API Key Authentication

class APIKeyClient:
    def __init__(
        self,
        base_url: str,
        api_key: str,
        key_location: str = 'header',  # 'header' or 'query'
        key_name: str = 'X-API-Key'
    ):
        self.base_url = base_url
        self.api_key = api_key
        self.key_location = key_location
        self.key_name = key_name
        self._session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        if self.key_location == 'header':
            headers = {self.key_name: self.api_key}
            self._session = aiohttp.ClientSession(
                base_url=self.base_url,
                headers=headers
            )
        else:
            self._session = aiohttp.ClientSession(base_url=self.base_url)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()

    async def request(self, method: str, path: str, **kwargs):
        if self.key_location == 'query':
            params = kwargs.get('params', {})
            params[self.key_name] = self.api_key
            kwargs['params'] = params

        async with self._session.request(method, path, **kwargs) as response:
            response.raise_for_status()
            return await response.json()

Digest Authentication (aiohttp 3.12.8+)

from aiohttp import ClientSession, DigestAuthMiddleware

async def digest_auth_example():
    # Create digest auth middleware
    digest_auth = DigestAuthMiddleware(
        login="user",
        password="password",
        preemptive=True  # New in 3.12.8: preemptive authentication
    )

    # Pass middleware to session
    async with ClientSession(middlewares=(digest_auth,)) as session:
        async with session.get("https://httpbin.org/digest-auth/auth/user/password") as resp:
            print(await resp.text())

OAuth 2.0 Token Refresh Pattern

import asyncio
from datetime import datetime, timedelta
from typing import Optional

class OAuth2Client:
    def __init__(
        self,
        base_url: str,
        client_id: str,
        client_secret: str,
        token_url: str
    ):
        self.base_url = base_url
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url

        self._access_token: Optional[str] = None
        self._token_expires_at: Optional[datetime] = None
        self._refresh_token: Optional[str] = None
        self._session: Optional[aiohttp.ClientSession] = None
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        self._session = aiohttp.ClientSession(base_url=self.base_url)
        await self._ensure_token()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._session:
            await self._session.close()

    async def _ensure_token(self):
        async with self._lock:
            now = datetime.now()
            if (
                not self._access_token or
                not self._token_expires_at or
                now >= self._token_expires_at
            ):
                await self._refresh_access_token()

    async def _refresh_access_token(self):
        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        async with aiohttp.ClientSession() as session:
            async with session.post(self.token_url, data=data) as response:
                response.raise_for_status()
                token_data = await response.json()

                self._access_token = token_data['access_token']
                expires_in = token_data.get('expires_in', 3600)
                self._token_expires_at = datetime.now() + timedelta(seconds=expires_in - 60)
                self._refresh_token = token_data.get('refresh_token')

    async def request(self, method: str, path: str, **kwargs):
        await self._ensure_token()

        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {self._access_token}'
        kwargs['headers'] = headers

        async with self._session.request(method, path, **kwargs) as response:
            if response.status == 401:
                await self._refresh_access_token()
                headers['Authorization'] = f'Bearer {self._access_token}'
                async with self._session.request(method, path, **kwargs) as retry:
                    retry.raise_for_status()
                    return await retry.json()

            response.raise_for_status()
            return await response.json()

Server Development

Application Structure

from aiohttp import web
from typing import Callable, Awaitable

# Type aliases
Handler = Callable[[web.Request], Awaitable[web.Response]]

class Application:
    def __init__(self):
        self.app = web.Application()
        self.setup_routes()
        self.setup_middlewares()

    def setup_routes(self):
        self.app.router.add_get('/', self.index)
        self.app.router.add_get('/health', self.health)

        # API routes
        self.app.router.add_route('*', '/api/{path:.*}', self.api_handler)

    def setup_middlewares(self):
        self.app.middlewares.append(self.error_middleware)
        self.app.middlewares.append(self.logging_middleware)

    async def index(self, request: web.Request) -> web.Response:
        return web.Response(text='Hello, World!')

    async def health(self, request: web.Request) -> web.Response:
        return web.json_response({'status': 'healthy'})

    async def api_handler(self, request: web.Request) -> web.Response:
        path = request.match_info['path']
        return web.json_response({
            'path': path,
            'method': request.method
        })

    @web.middleware
    async def error_middleware(self, request: web.Request, handler: Handler):
        try:
            return await handler(request)
        except web.HTTPException:
            raise
        except Exception as e:
            return web.json_response(
                {'error': str(e)},
                status=500
            )

    @web.middleware
    async def logging_middleware(self, request: web.Request, handler: Handler):
        print(f"{request.method} {request.path}")
        response = await handler(request)
        print(f"Response: {response.status}")
        return response

    def run(self, host: str = '127.0.0.1', port: int = 8080):
        web.run_app(self.app, host=host, port=port)

if __name__ == '__main__':
    app = Application()
    app.run()

Request Handling

from aiohttp import web, multipart
from typing import Optional

class RequestHandlers:
    # Query parameters
    async def query_params(self, request: web.Request) -> web.Response:
        # Get single parameter
        name = request.query.get('name', 'Anonymous')

        # Get all values for a key
        tags = request.query.getall('tag', [])

        # Get as integer with default
        page = int(request.query.get('page', '1'))

        return web.json_response({
            'name': name,
            'tags': tags,
            'page': page
        })

    # Path parameters
    async def path_params(self, request: web.Request) -> web.Response:
        user_id = request.match_info['user_id']
        action = request.match_info.get('action', 'view')

        return web.json_response({
            'user_id': user_id,
            'action': action
        })

    # JSON body
    async def json_body(self, request: web.Request) -> web.Response:
        try:
            data = await request.json()
        except ValueError:
            return web.json_response(
                {'error': 'Invalid JSON'},
                status=400
            )

        return web.json_response({
            'received': data,
            'type': type(data).__name__
        })

    # Form data
    async def form_data(self, request: web.Request) -> web.Response:
        data = await request.post()

        result = {}
        for key in data:
            value = data.get(key)
            result[key] = value

        return web.json_response(result)

    # File upload
    async def file_upload(self, request: web.Request) -> web.Response:
        reader = await request.multipart()

        uploaded_files = []

        async for field in reader:
            if field.filename:
                size = 0
                content = bytearray()

                while True:
                    chunk = await field.read_chunk()
                    if not chunk:
                        break
                    size += len(chunk)
                    content.extend(chunk)

                uploaded_files.append({
                    'filename': field.filename,
                    'size': size,
                    'content_type': field.headers.get('Content-Type')
                })

        return web.json_response({
            'files': uploaded_files
        })

    # Headers
    async def headers_example(self, request: web.Request) -> web.Response:
        auth_header = request.headers.get('Authorization')
        user_agent = request.headers.get('User-Agent')
        custom_header = request.headers.get('X-Custom-Header')

        response_headers = {
            'X-Custom-Response': 'value',
            'X-Request-ID': 'unique-id-123'
        }

        return web.json_response(
            {
                'auth': auth_header,
                'user_agent': user_agent,
                'custom': custom_header
            },
            headers=response_headers
        )

    # Cookies
    async def cookies_example(self, request: web.Request) -> web.Response:
        session_id = request.cookies.get('session_id')

        response = web.json_response({
            'session_id': session_id
        })

        # Set cookie
        response.set_cookie(
            'session_id',
            'new-session-id',
            max_age=3600,
            httponly=True,
            secure=True,
            samesite='Strict'
        )

        return response

Response Types

from aiohttp import web
import json

class ResponseExamples:
    # Text response
    async def text_response(self, request: web.Request) -> web.Response:
        return web.Response(
            text='Plain text response',
            content_type='text/plain'
        )

    # JSON response
    async def json_response(self, request: web.Request) -> web.Response:
        return web.json_response({
            'status': 'success',
            'data': {'key': 'value'}
        })

    # HTML response
    async def html_response(self, request: web.Request) -> web.Response:
        html = """
        <!DOCTYPE html>
        <html>
        <head><title>Example</title></head>
        <body><h1>Hello, World!</h1></body>
        </html>
        """
        return web.Response(
            text=html,
            content_type='text/html'
        )

    # Binary response
    async def binary_response(self, request: web.Request) -> web.Response:
        data = b'\x00\x01\x02\x03\x04'
        return web.Response(
            body=data,
            content_type='application/octet-stream'
        )

    # File download
    async def file_download(self, request: web.Request) -> web.Response:
        return web.FileResponse(
            path='./example.pdf',
            headers={
                'Content-Disposition': 'attachment; filename="example.pdf"'
            }
        )

    # Streaming response
    async def streaming_response(self, request: web.Request) -> web.StreamResponse:
        response = web.StreamResponse()
        response.headers['Content-Type'] = 'text/plain'
        await response.prepare(request)

        for i in range(10):
            await response.write(f"Chunk {i}\n".encode())
            await asyncio.sleep(0.5)

        await response.write_eof()
        return response

    # Redirect
    async def redirect_response(self, request: web.Request) -> web.Response:
        raise web.HTTPFound('/new-location')

    # Custom status codes
    async def custom_status(self, request: web.Request) -> web.Response:
        return web.json_response(
            {'message': 'Created'},
            status=201
        )

Request Validation with Pydantic

Basic Pydantic Integration

from pydantic import BaseModel, Field, field_validator, ConfigDict
from pydantic import EmailStr, HttpUrl
from aiohttp import web
from typing import Optional, Literal

# Pydantic 2.12+ models
class UserCreate(BaseModel):
    model_config = ConfigDict(str_strip_whitespace=True)

    username: str = Field(min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(min_length=8)
    age: Optional[int] = Field(None, ge=18, le=120)
    role: Literal['user', 'admin', 'moderator'] = 'user'
    website: Optional[HttpUrl] = None

    @field_validator('username')
    @classmethod
    def validate_username(cls, v: str) -> str:
        if not v.isalnum():
            raise ValueError('Username must be alphanumeric')
        return v.lower()

    @field_validator('password')
    @classmethod
    def validate_password(cls, v: str) -> str:
        if not any(c.isupper() for c in v):
            raise ValueError('Password must contain uppercase letter')
        if not any(c.isdigit() for c in v):
            raise ValueError('Password must contain digit')
        return v

class UserResponse(BaseModel):
    user_id: str
    username: str
    email: EmailStr
    role: str
    created_at: float

# Handler with validation
async def create_user(request: web.Request) -> web.Response:
    try:
        data = await request.json()
        user_data = UserCreate(**data)
    except ValueError as e:
        return web.json_response(
            {'error': 'Validation error', 'details': str(e)},
            status=400
        )

    # Process validated data
    user = UserResponse(
        user_id='123',
        username=user_data.username,
        email=user_data.email,
        role=user_data.role,
        created_at=time.time()
    )

    return web.json_response(
        user.model_dump(),
        status=201
    )

Advanced Validation Patterns

from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Any, Optional
from enum import Enum

class Priority(str, Enum):
    LOW = 'low'
    MEDIUM = 'medium'
    HIGH = 'high'
    URGENT = 'urgent'

class TaskCreate(BaseModel):
    model_config = ConfigDict(
        str_strip_whitespace=True,
        extra='forbid'  # Reject extra fields
    )

    title: str = Field(min_length=1, max_length=200)
    description: Optional[str] = Field(None, max_length=5000)
    priority: Priority = Priority.MEDIUM
    tags: list[str] = Field(default_factory=list, max_length=10)
    due_date: Optional[float] = None
    assigned_to: Optional[str] = None

    @field_validator('tags')
    @classmethod
    def validate_tags(cls, v: list[str]) -> list[str]:
        if len(v) != len(set(v)):
            raise ValueError('Tags must be unique')
        return [tag.lower() for tag in v]

    @model_validator(mode='after')
    def validate_model(self) -> 'TaskCreate':
        if self.priority == Priority.URGENT and not self.assigned_to:
            raise ValueError('Urgent tasks must be assigned')

        if self.due_date and self.due_date < time.time():
            raise ValueError('Due date cannot be in the past')

        return self

# Middleware for automatic validation
@web.middleware
async def validation_middleware(request: web.Request, handler):
    # Get validation schema from route
    schema = getattr(handler, '_validation_schema', None)

    if schema and request.method in ('POST', 'PUT', 'PATCH'):
        try:
            data = await request.json()
            validated = schema(**data)
            request['validated_data'] = validated
        except ValueError as e:
            return web.json_response(
                {'error': 'Validation error', 'details': str(e)},
                status=400
            )

    return await handler(request)

# Decorator for validation
def validate_with(schema: type[BaseModel]):
    def decorator(handler):
        handler._validation_schema = schema
        return handler
    return decorator

# Usage
@validate_with(TaskCreate)
async def create_task(request: web.Request) -> web.Response:
    task_data: TaskCreate = request['validated_data']

    # Data is already validated
    return web.json_response({
        'task_id': 'task-123',
        'title': task_data.title,
        'priority': task_data.priority.value
    }, status=201)

Query Parameter Validation

from pydantic import BaseModel, Field
from typing import Optional

class PaginationParams(BaseModel):
    page: int = Field(1, ge=1, le=1000)
    per_page: int = Field(20, ge=1, le=100)
    sort_by: Optional[str] = Field(None, pattern=r'^[a-zA-Z_]+$')
    order: Literal['asc', 'desc'] = 'asc'

    @property
    def offset(self) -> int:
        return (self.page - 1) * self.per_page

    @property
    def limit(self) -> int:
        return self.per_page

async def list_items(request: web.Request) -> web.Response:
    try:
        params = PaginationParams(**request.query)
    except ValueError as e:
        return web.json_response(
            {'error': 'Invalid parameters', 'details': str(e)},
            status=400
        )

    # Use validated params
    items = []  # Fetch from database with params.offset and params.limit

    return web.json_response({
        'items': items,
        'page': params.page,
        'per_page': params.per_page,
        'total': 100
    })

WebSocket Implementation

Basic WebSocket Server

from aiohttp import web, WSMsgType
import asyncio

class WebSocketHandler:
    def __init__(self):
        self.active_connections: set[web.WebSocketResponse] = set()

    async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        self.active_connections.add(ws)

        try:
            async for msg in ws:
                if msg.type == WSMsgType.TEXT:
                    if msg.data == 'close':
                        await ws.close()
                    else:
                        # Echo message back
                        await ws.send_str(f"Echo: {msg.data}")

                        # Broadcast to all connections
                        await self.broadcast(f"User says: {msg.data}")

                elif msg.type == WSMsgType.ERROR:
                    print(f'WebSocket error: {ws.exception()}')

        finally:
            self.active_connections.discard(ws)

        return ws

    async def broadcast(self, message: str):
        if self.active_connections:
            await asyncio.gather(
                *[ws.send_str(message) for ws in self.active_connections],
                return_exceptions=True
            )

# Setup
app = web.Application()
handler = WebSocketHandler()
app.router.add_get('/ws', handler.websocket_handler)

Advanced WebSocket Server with Authentication

from aiohttp import web, WSMsgType
import json
import asyncio
from typing import Optional
import jwt

class AuthenticatedWebSocketHandler:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.connections: dict[str, web.WebSocketResponse] = {}

    def verify_token(self, token: str) -> Optional[dict]:
        try:
            return jwt.decode(token, self.secret_key, algorithms=['HS256'])
        except jwt.InvalidTokenError:
            return None

    async def websocket_handler(self, request: web.Request) -> web.WebSocketResponse:
        ws = web.WebSocketResponse(heartbeat=30)
        await ws.prepare(request)

        user_id: Optional[str] = None

        try:
            # Wait for authentication message
            msg = await asyncio.wait_for(ws.receive(), timeout=10.0)

            if msg.type != WSMsgType.TEXT:
                await ws.send_json({'error': 'Authentication required'})
                await ws.close()
                return ws

            auth_data = json.loads(msg.data)
            token = auth_data.get('token')

            if not token:
                await ws.send_json({'error': 'Token required'})
                await ws.close()
                return ws

            payload = self.verify_token(token)
            if not payload:
                await ws.send_json({'error': 'Invalid token'})
                await ws.close()
                return ws

            user_id = payload['user_id']
            self.connections[user_id] = ws

            await ws.send_json({
                'type': 'auth_success',
                'user_id': user_id
            })

            # Handle messages
            async for msg in ws:
                if msg.type == WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    await self.handle_message(user_id, data)

                elif msg.type == WSMsgType.ERROR:
                    print(f'WebSocket error: {ws.exception()}')

        except asyncio.TimeoutError:
            await ws.send_json({'error': 'Authentication timeout'})

        finally:
            if user_id and user_id in self.connections:
                del self.connections[user_id]

        return ws

    async def handle_message(self, user_id: str, data: dict):
        message_type = data.get('type')

        if message_type == 'ping':
            await self.send_to_user(user_id, {'type': 'pong'})

        elif message_type == 'broadcast':
            await self.broadcast({
                'type': 'message',
                'from': user_id,
                'content': data.get('content')
            })

        elif message_type == 'direct':
            to_user = data.get('to')
            await self.send_to_user(to_user, {
                'type': 'direct_message',
                'from': user_id,
                'content': data.get('content')
            })

    async def send_to_user(self, user_id: str, message: dict):
        ws = self.connections.get(user_id)
        if ws and not ws.closed:
            await ws.send_json(message)

    async def broadcast(self, message: dict):
        if self.connections:
            await asyncio.gather(
                *[ws.send_json(message) for ws in self.connections.values() if not ws.closed],
                return_exceptions=True
            )

WebSocket Client

import aiohttp
import asyncio

async def websocket_client():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('http://localhost:8080/ws') as ws:
            # Send authentication
            await ws.send_json({
                'token': 'your-jwt-token'
            })

            # Receive authentication response
            msg = await ws.receive()
            print(f"Auth response: {msg.data}")

            # Send messages
            await ws.send_json({
                'type': 'broadcast',
                'content': 'Hello, everyone!'
            })

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    print(f"Received: {data}")

                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

asyncio.run(websocket_client())

Testing with pytest and pytest-aiohttp

Basic Test Setup

# conftest.py
import pytest
import asyncio
from aiohttp import web
from typing import AsyncIterator

pytest_plugins = 'aiohttp.pytest_plugin'

@pytest.fixture
async def app() -> AsyncIterator[web.Application]:
    app = web.Application()

    async def hello(request):
        return web.Response(text='Hello, World!')

    app.router.add_get('/', hello)

    yield app

    # Cleanup
    await app.cleanup()

@pytest.fixture
async def client(aiohttp_client, app):
    return await aiohttp_client(app)

Basic Tests

# test_basic.py
import pytest
from aiohttp import web

@pytest.mark.asyncio
async def test_hello(client):
    resp = await client.get('/')
    assert resp.status == 200
    text = await resp.text()
    assert 'Hello, World!' in text

@pytest.mark.asyncio
async def test_json_endpoint(client):
    resp = await client.post('/api/data', json={'key': 'value'})
    assert resp.status == 200
    data = await resp.json()
    assert data['key'] == 'value'

Testing with Fixtures

# conftest.py
import pytest
import asyncio
from typing import AsyncIterator
import aiohttp

@pytest.fixture
async def http_session() -> AsyncIterator[aiohttp.ClientSession]:
    session = aiohttp.ClientSession()
    yield session
    await session.close()

@pytest.fixture
def sample_user():
    return {
        'username': 'testuser',
        'email': 'test@example.com',
        'password': 'TestPass123'
    }

@pytest.fixture
async def authenticated_client(client, sample_user):
    # Login
    resp = await client.post('/login', json=sample_user)
    assert resp.status == 200

    # Extract token
    data = await resp.json()
    token = data['token']

    # Set authorization header
    client.session.headers['Authorization'] = f'Bearer {token}'

    yield client

# tests/test_auth.py
@pytest.mark.asyncio
async def test_protected_endpoint(authenticated_client):
    resp = await authenticated_client.get('/api/protected')
    assert resp.status == 200
    data = await resp.json()
    assert 'user_id' in data

Parameterized Tests

import pytest

@pytest.mark.parametrize('username,email,expected_status', [
    ('valid', 'valid@example.com', 201),
    ('ab', 'valid@example.com', 400),  # Too short
    ('valid', 'invalid-email', 400),   # Invalid email
    ('', 'valid@example.com', 400),    # Empty username
])
@pytest.mark.asyncio
async def test_user_creation_validation(client, username, email, expected_status):
    resp = await client.post('/users', json={
        'username': username,
        'email': email,
        'password': 'ValidPass123'
    })
    assert resp.status == expected_status

@pytest.mark.parametrize('method,path,expected', [
    ('GET', '/', 200),
    ('GET', '/api/users', 200),
    ('POST', '/api/users', 401),  # Requires auth
    ('GET', '/nonexistent', 404),
])
@pytest.mark.asyncio
async def test_endpoints(client, method, path, expected):
    if method == 'GET':
        resp = await client.get(path)
    elif method == 'POST':
        resp = await client.post(path, json={})

    assert resp.status == expected

Mocking External APIs

import pytest
from unittest.mock import AsyncMock, patch
from aiohttp import web

@pytest.mark.asyncio
async def test_external_api_call(client):
    # Mock external API
    with patch('aiohttp.ClientSession.get') as mock_get:
        mock_response = AsyncMock()
        mock_response.status = 200
        mock_response.json = AsyncMock(return_value={'data': 'mocked'})
        mock_get.return_value.__aenter__.return_value = mock_response

        resp = await client.get('/api/external')
        assert resp.status == 200
        data = await resp.json()
        assert data['data'] == 'mocked'

@pytest.fixture
async def mock_database():
    class MockDB:
        def __init__(self):
            self.data = {}

        async def get(self, key):
            return self.data.get(key)

        async def set(self, key, value):
            self.data[key] = value

        async def delete(self, key):
            if key in self.data:
                del self.data[key]

    return MockDB()

@pytest.mark.asyncio
async def test_with_mock_db(client, mock_database):
    # Inject mock database into app
    client.app['db'] = mock_database

    # Test database operations
    await mock_database.set('user:1', {'name': 'Test'})

    resp = await client.get('/users/1')
    assert resp.status == 200

Testing WebSockets

import pytest
from aiohttp import WSMsgType

@pytest.mark.asyncio
async def test_websocket_echo(aiohttp_client):
    app = web.Application()

    async def websocket_handler(request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)

        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                await ws.send_str(f"Echo: {msg.data}")

        return ws

    app.router.add_get('/ws', websocket_handler)

    client = await aiohttp_client(app)

    async with client.ws_connect('/ws') as ws:
        await ws.send_str('Hello')
        msg = await ws.receive()
        assert msg.data == 'Echo: Hello'

@pytest.mark.asyncio
async def test_websocket_broadcast(aiohttp_client):
    from collections import defaultdict
    connections = set()

    app = web.Application()

    async def websocket_handler(request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        connections.add(ws)

        try:
            async for msg in ws:
                if msg.type == WSMsgType.TEXT:
                    # Broadcast to all
                    for conn in connections:
                        if conn != ws:
                            await conn.send_str(msg.data)
        finally:
            connections.discard(ws)

        return ws

    app.router.add_get('/ws', websocket_handler)
    client = await aiohttp_client(app)

    # Create two connections
    async with client.ws_connect('/ws') as ws1:
        async with client.ws_connect('/ws') as ws2:
            await ws1.send_str('Hello from ws1')
            msg = await ws2.receive()
            assert msg.data == 'Hello from ws1'

Testing Middleware

import pytest
from aiohttp import web

@pytest.mark.asyncio
async def test_auth_middleware(aiohttp_client):
    @web.middleware
    async def auth_middleware(request, handler):
        token = request.headers.get('Authorization')
        if not token or not token.startswith('Bearer '):
            raise web.HTTPUnauthorized()

        request['user_id'] = 'user-123'
        return await handler(request)

    app = web.Application(middlewares=[auth_middleware])

    async def protected(request):
        return web.json_response({'user_id': request['user_id']})

    app.router.add_get('/protected', protected)

    client = await aiohttp_client(app)

    # Without token
    resp = await client.get('/protected')
    assert resp.status == 401

    # With token
    resp = await client.get('/protected', headers={
        'Authorization': 'Bearer valid-token'
    })
    assert resp.status == 200
    data = await resp.json()
    assert data['user_id'] == 'user-123'

Coverage and Best Practices

# pytest.ini or pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
    "--verbose",
    "--strict-markers",
    "--cov=app",
    "--cov-report=html",
    "--cov-report=term-missing",
]

# Best practices
# 1. Keep tests independent
# 2. Use fixtures for common setup
# 3. Mock external dependencies
# 4. Test edge cases
# 5. Use parametrize for similar tests
# 6. Clean up resources properly

Advanced Middleware and Error Handling

Error Handling Middleware

from aiohttp import web
import logging
from typing import Callable, Awaitable

logger = logging.getLogger(__name__)

@web.middleware
async def error_middleware(
    request: web.Request,
    handler: Callable[[web.Request], Awaitable[web.Response]]
) -> web.Response:
    try:
        return await handler(request)

    except web.HTTPException as e:
        # HTTP exceptions should pass through
        raise

    except ValueError as e:
        logger.warning(f"Validation error: {e}")
        return web.json_response(
            {
                'error': 'Validation Error',
                'message': str(e)
            },
            status=400
        )

    except PermissionError as e:
        logger.warning(f"Permission denied: {e}")
        return web.json_response(
            {
                'error': 'Forbidden',
                'message': 'You do not have permission to access this resource'
            },
            status=403
        )

    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return web.json_response(
            {
                'error': 'Internal Server Error',
                'message': 'An unexpected error occurred'
            },
            status=500
        )

Logging Middleware

import time
import logging
from aiohttp import web

logger = logging.getLogger(__name__)

@web.middleware
async def logging_middleware(request: web.Request, handler):
    start_time = time.time()

    # Log request
    logger.info(
        f"Request started",
        extra={
            'method': request.method,
            'path': request.path,
            'query': dict(request.query),
            'remote': request.remote
        }
    )

    try:
        response = await handler(request)

        # Log response
        duration = time.time() - start_time
        logger.info(
            f"Request completed",
            extra={
                'method': request.method,
                'path': request.path,
                'status': response.status,
                'duration_ms': duration * 1000
            }
        )

        return response

    except Exception as e:
        duration = time.time() - start_time
        logger.error(
            f"Request failed",
            extra={
                'method': request.method,
                'path': request.path,
                'duration_ms': duration * 1000,
                'error': str(e)
            },
            exc_info=True
        )
        raise

CORS Middleware

from aiohttp import web
from typing import Optional

@web.middleware
async def cors_middleware(request: web.Request, handler):
    # Handle preflight
    if request.method == 'OPTIONS':
        response = web.Response()
    else:
        response = await handler(request)

    # Add CORS headers
    response.headers['Access-Control-Allow-Origin'] = '*'
    response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
    response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
    response.headers['Access-Control-Max-Age'] = '3600'

    return response

# Or use aiohttp-cors library
import aiohttp_cors

app = web.Application()

# Configure CORS
cors = aiohttp_cors.setup(app, defaults={
    "*": aiohttp_cors.ResourceOptions(
        allow_credentials=True,
        expose_headers="*",
        allow_headers="*",
        allow_methods="*"
    )
})

# Add routes
resource = app.router.add_resource("/api/endpoint")
route = resource.add_route("GET", handler)
cors.add(route)

Rate Limiting Middleware

from aiohttp import web
import time
from collections import defaultdict
from typing import Dict, Tuple

class RateLimiter:
    def __init__(self, max_requests: int = 100, window: int = 60):
        self.max_requests = max_requests
        self.window = window
        self.requests: Dict[str, list[float]] = defaultdict(list)

    def is_allowed(self, client_id: str) -> Tuple[bool, Optional[float]]:
        now = time.time()
        cutoff = now - self.window

        # Remove old requests
        self.requests[client_id] = [
            req_time for req_time in self.requests[client_id]
            if req_time > cutoff
        ]

        if len(self.requests[client_id]) >= self.max_requests:
            oldest = self.requests[client_id][0]
            retry_after = oldest + self.window - now
            return False, retry_after

        self.requests[client_id].append(now)
        return True, None

rate_limiter = RateLimiter(max_requests=100, window=60)

@web.middleware
async def rate_limit_middleware(request: web.Request, handler):
    # Use IP address as client identifier
    client_id = request.remote

    allowed, retry_after = rate_limiter.is_allowed(client_id)

    if not allowed:
        return web.json_response(
            {
                'error': 'Rate limit exceeded',
                'retry_after': int(retry_after)
            },
            status=429,
            headers={'Retry-After': str(int(retry_after))}
        )

    return await handler(request)

Performance Optimization

Connection Pooling

import aiohttp
from aiohttp import TCPConnector, ClientTimeout

class OptimizedClient:
    def __init__(self, base_url: str):
        self.base_url = base_url

        # Optimized connector
        self.connector = TCPConnector(
            limit=100,  # Total connections
            limit_per_host=30,  # Per host
            ttl_dns_cache=300,  # DNS cache TTL
            force_close=False,  # Keep-alive
            enable_cleanup_closed=True,
            use_dns_cache=True
        )

        # Optimized timeout
        self.timeout = ClientTimeout(
            total=30,
            connect=10,
            sock_read=20
        )

        self._session: Optional[aiohttp.ClientSession] = None

    async def start(self):
        self._session = aiohttp.ClientSession(
            base_url=self.base_url,
            connector=self.connector,
            timeout=self.timeout,
            connector_owner=True,
            auto_decompress=True,
            trust_env=True,
            read_bufsize=2**16  # 64KB buffer
        )

    async def close(self):
        if self._session:
            await self._session.close()
            await asyncio.sleep(0.25)

Concurrent Requests

import asyncio
import aiohttp
from typing import List, Any

async def fetch_many(urls: List[str]) -> List[Any]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def fetch_one(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        return await response.json()

# With semaphore for limiting concurrency
async def fetch_with_limit(urls: List[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_limited(url: str):
        async with semaphore:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return await response.json()

    return await asyncio.gather(*[fetch_limited(url) for url in urls])

Streaming Large Responses

async def download_large_file(url: str, filepath: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            with open(filepath, 'wb') as f:
                async for chunk in response.content.iter_chunked(8192):
                    f.write(chunk)

# Server-side streaming
async def stream_large_response(request: web.Request) -> web.StreamResponse:
    response = web.StreamResponse()
    response.headers['Content-Type'] = 'application/octet-stream'
    await response.prepare(request)

    # Stream data in chunks
    with open('large_file.dat', 'rb') as f:
        while chunk := f.read(8192):
            await response.write(chunk)

    await response.write_eof()
    return response

Caching

from functools import lru_cache
import time

class CachedClient:
    def __init__(self):
        self.cache = {}
        self.cache_ttl = 300  # 5 minutes

    async def get_with_cache(self, url: str):
        now = time.time()

        # Check cache
        if url in self.cache:
            data, timestamp = self.cache[url]
            if now - timestamp < self.cache_ttl:
                return data

        # Fetch and cache
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.json()
                self.cache[url] = (data, now)
                return data

Git Protocol Integration

Understanding Git Smart HTTP

Git Smart HTTP protocol allows git clients to clone, fetch, and push over HTTP/HTTPS. The protocol involves:

  1. Service Discovery: Client requests /info/refs?service=git-upload-pack or git-receive-pack
  2. Negotiation: Client and server negotiate which objects to transfer
  3. Pack Transfer: Server sends/receives packfiles

Basic Git HTTP Backend

from aiohttp import web
import subprocess
import os
from pathlib import Path

class GitHTTPBackend:
    def __init__(self, repo_root: Path):
        self.repo_root = repo_root
        self.git_backend = '/usr/lib/git-core/git-http-backend'

    async def handle_info_refs(self, request: web.Request) -> web.Response:
        repo_path = request.match_info['repo']
        service = request.query.get('service', '')

        if service not in ('git-upload-pack', 'git-receive-pack'):
            return web.Response(status=400, text='Invalid service')

        full_path = self.repo_root / repo_path
        if not full_path.exists():
            return web.Response(status=404, text='Repository not found')

        # Build environment
        env = os.environ.copy()
        env['GIT_PROJECT_ROOT'] = str(self.repo_root)
        env['GIT_HTTP_EXPORT_ALL'] = '1'
        env['PATH_INFO'] = f'/{repo_path}/info/refs'
        env['QUERY_STRING'] = f'service={service}'
        env['REQUEST_METHOD'] = 'GET'

        # Execute git-http-backend
        proc = await asyncio.create_subprocess_exec(
            self.git_backend,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env
        )

        stdout, stderr = await proc.communicate()

        if proc.returncode != 0:
            return web.Response(status=500, text=stderr.decode())

        # Parse CGI output
        headers_end = stdout.find(b'\r\n\r\n')
        if headers_end == -1:
            return web.Response(status=500)

        header_lines = stdout[:headers_end].decode().split('\r\n')
        body = stdout[headers_end + 4:]

        # Parse headers
        headers = {}
        for line in header_lines:
            if ':' in line:
                key, value = line.split(':', 1)
                headers[key.strip()] = value.strip()

        return web.Response(
            body=body,
            headers=headers,
            status=200
        )

    async def handle_service(self, request: web.Request) -> web.Response:
        repo_path = request.match_info['repo']
        service = request.match_info['service']

        if service not in ('git-upload-pack', 'git-receive-pack'):
            return web.Response(status=400)

        full_path = self.repo_root / repo_path
        if not full_path.exists():
            return web.Response(status=404)

        # Read request body
        body = await request.read()

        # Build environment
        env = os.environ.copy()
        env['GIT_PROJECT_ROOT'] = str(self.repo_root)
        env['GIT_HTTP_EXPORT_ALL'] = '1'
        env['PATH_INFO'] = f'/{repo_path}/{service}'
        env['REQUEST_METHOD'] = 'POST'
        env['CONTENT_TYPE'] = request.content_type
        env['CONTENT_LENGTH'] = str(len(body))

        # Execute git service
        proc = await asyncio.create_subprocess_exec(
            self.git_backend,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env
        )

        stdout, stderr = await proc.communicate(input=body)

        if proc.returncode != 0:
            return web.Response(status=500, text=stderr.decode())

        # Parse CGI output
        headers_end = stdout.find(b'\r\n\r\n')
        header_lines = stdout[:headers_end].decode().split('\r\n')
        body = stdout[headers_end + 4:]

        headers = {}
        for line in header_lines:
            if ':' in line:
                key, value = line.split(':', 1)
                headers[key.strip()] = value.strip()

        return web.Response(
            body=body,
            headers=headers,
            status=200
        )

# Setup routes
git_backend = GitHTTPBackend(Path('/var/git/repos'))

app = web.Application()
app.router.add_get('/{repo:.+}/info/refs', git_backend.handle_info_refs)
app.router.add_post('/{repo:.+}/{service:(git-upload-pack|git-receive-pack)}', git_backend.handle_service)

Git Backend with Authentication

from aiohttp import web
import base64
import subprocess
from pathlib import Path

class AuthenticatedGitBackend:
    def __init__(self, repo_root: Path):
        self.repo_root = repo_root
        self.users = {
            'alice': 'password123',
            'bob': 'secret456'
        }

    def verify_auth(self, request: web.Request) -> tuple[bool, str | None]:
        auth_header = request.headers.get('Authorization', '')

        if not auth_header.startswith('Basic '):
            return False, None

        try:
            encoded = auth_header[6:]
            decoded = base64.b64decode(encoded).decode()
            username, password = decoded.split(':', 1)

            if self.users.get(username) == password:
                return True, username
        except:
            pass

        return False, None

    @web.middleware
    async def auth_middleware(self, request: web.Request, handler):
        # Allow anonymous reads
        service = request.query.get('service', '')
        if request.method == 'GET' and service == 'git-upload-pack':
            return await handler(request)

        # Require authentication for pushes
        if service == 'git-receive-pack' or 'git-receive-pack' in request.path:
            authorized, username = self.verify_auth(request)
            if not authorized:
                return web.Response(
                    status=401,
                    headers={'WWW-Authenticate': 'Basic realm="Git Access"'},
                    text='Authentication required'
                )

            request['username'] = username

        return await handler(request)

    async def handle_info_refs(self, request: web.Request):
        # Git info/refs implementation
        # Similar to previous example but with auth
        pass

    async def handle_service(self, request: web.Request):
        # Git service implementation
        # Similar to previous example but with auth
        pass

Repository Manager Implementation

Repository Browser

from aiohttp import web
import os
from pathlib import Path
import subprocess
from typing import Optional
import json

class RepositoryManager:
    def __init__(self, repo_root: Path):
        self.repo_root = repo_root

    async def list_repositories(self, request: web.Request) -> web.Response:
        repos = []

        for item in self.repo_root.iterdir():
            if item.is_dir() and (item / '.git').exists():
                repos.append({
                    'name': item.name,
                    'path': str(item.relative_to(self.repo_root)),
                    'type': 'git'
                })

        return web.json_response({'repositories': repos})

    async def get_repository(self, request: web.Request) -> web.Response:
        repo_name = request.match_info['repo']
        repo_path = self.repo_root / repo_name

        if not repo_path.exists():
            return web.json_response(
                {'error': 'Repository not found'},
                status=404
            )

        # Get repository info
        info = await self._get_repo_info(repo_path)

        return web.json_response(info)

    async def create_repository(self, request: web.Request) -> web.Response:
        data = await request.json()
        repo_name = data.get('name')

        if not repo_name:
            return web.json_response(
                {'error': 'Repository name required'},
                status=400
            )

        repo_path = self.repo_root / repo_name

        if repo_path.exists():
            return web.json_response(
                {'error': 'Repository already exists'},
                status=400
            )

        # Create bare repository
        repo_path.mkdir(parents=True)

        proc = await asyncio.create_subprocess_exec(
            'git', 'init', '--bare', str(repo_path),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        await proc.communicate()

        if proc.returncode != 0:
            return web.json_response(
                {'error': 'Failed to create repository'},
                status=500
            )

        return web.json_response({
            'name': repo_name,
            'path': str(repo_path.relative_to(self.repo_root))
        }, status=201)

    async def delete_repository(self, request: web.Request) -> web.Response:
        repo_name = request.match_info['repo']
        repo_path = self.repo_root / repo_name

        if not repo_path.exists():
            return web.json_response(
                {'error': 'Repository not found'},
                status=404
            )

        # Delete repository directory
        import shutil
        shutil.rmtree(repo_path)

        return web.Response(status=204)

    async def browse_tree(self, request: web.Request) -> web.Response:
        repo_name = request.match_info['repo']
        ref = request.query.get('ref', 'HEAD')
        path = request.query.get('path', '')

        repo_path = self.repo_root / repo_name

        if not repo_path.exists():
            return web.json_response(
                {'error': 'Repository not found'},
                status=404
            )

        # List files in tree
        proc = await asyncio.create_subprocess_exec(
            'git', 'ls-tree', ref, path,
            cwd=str(repo_path),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await proc.communicate()

        if proc.returncode != 0:
            return web.json_response(
                {'error': stderr.decode()},
                status=400
            )

        # Parse ls-tree output
        entries = []
        for line in stdout.decode().strip().split('\n'):
            if not line:
                continue

            mode, type_, hash_, name = line.split(None, 3)
            entries.append({
                'mode': mode,
                'type': type_,
                'hash': hash_,
                'name': name
            })

        return web.json_response({'entries': entries})

    async def get_file_content(self, request: web.Request) -> web.Response:
        repo_name = request.match_info['repo']
        ref = request.query.get('ref', 'HEAD')
        path = request.query['path']

        repo_path = self.repo_root / repo_name

        # Get file content
        proc = await asyncio.create_subprocess_exec(
            'git', 'show', f'{ref}:{path}',
            cwd=str(repo_path),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await proc.communicate()

        if proc.returncode != 0:
            return web.json_response(
                {'error': 'File not found'},
                status=404
            )

        return web.Response(
            body=stdout,
            content_type='text/plain'
        )

    async def get_commits(self, request: web.Request) -> web.Response:
        repo_name = request.match_info['repo']
        ref = request.query.get('ref', 'HEAD')
        limit = int(request.query.get('limit', '50'))

        repo_path = self.repo_root / repo_name

        # Get commit log
        proc = await asyncio.create_subprocess_exec(
            'git', 'log', ref,
            '--pretty=format:%H|%an|%ae|%at|%s',
            f'-{limit}',
            cwd=str(repo_path),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await proc.communicate()

        commits = []
        for line in stdout.decode().strip().split('\n'):
            if not line:
                continue

            hash_, author, email, timestamp, message = line.split('|', 4)
            commits.append({
                'hash': hash_,
                'author': author,
                'email': email,
                'timestamp': int(timestamp),
                'message': message
            })

        return web.json_response({'commits': commits})

    async def _get_repo_info(self, repo_path: Path) -> dict:
        # Get HEAD
        proc = await asyncio.create_subprocess_exec(
            'git', 'rev-parse', 'HEAD',
            cwd=str(repo_path),
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        stdout, _ = await proc.communicate()
        head = stdout.decode().strip() if proc.returncode == 0 else None

        # Get branches
        proc = await asyncio.create_subprocess_exec(
            'git', 'branch', '-a',
            cwd=str(repo_path),
            stdout=asyncio.subprocess.PIPE
        )
        stdout, _ = await proc.communicate()
        branches = [
            line.strip().lstrip('* ')
            for line in stdout.decode().strip().split('\n')
        ]

        return {
            'name': repo_path.name,
            'head': head,
            'branches': branches
        }

# Setup routes
repo_manager = RepositoryManager(Path('/var/git/repos'))

app.router.add_get('/api/repositories', repo_manager.list_repositories)
app.router.add_get('/api/repositories/{repo}', repo_manager.get_repository)
app.router.add_post('/api/repositories', repo_manager.create_repository)
app.router.add_delete('/api/repositories/{repo}', repo_manager.delete_repository)
app.router.add_get('/api/repositories/{repo}/tree', repo_manager.browse_tree)
app.router.add_get('/api/repositories/{repo}/file', repo_manager.get_file_content)
app.router.add_get('/api/repositories/{repo}/commits', repo_manager.get_commits)

Best Practices and Patterns

Application Structure

project/
├── app/
│   ├── __init__.py
│   ├── main.py          # Application entry point
│   ├── routes.py        # Route definitions
│   ├── handlers.py      # Request handlers
│   ├── middleware.py    # Custom middleware
│   ├── models.py        # Pydantic models
│   ├── database.py      # Database connections
│   └── utils.py         # Utility functions
├── tests/
│   ├── __init__.py
│   ├── conftest.py      # Test fixtures
│   ├── test_handlers.py
│   └── test_integration.py
├── requirements.txt
├── pyproject.toml
└── README.md

Configuration Management

from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    model_config = ConfigDict(
        env_file='.env',
        env_file_encoding='utf-8',
        case_sensitive=False
    )

    # Server
    host: str = '127.0.0.1'
    port: int = 8080
    debug: bool = False

    # Database
    database_url: str
    database_pool_size: int = 10

    # Security
    secret_key: str
    jwt_algorithm: str = 'HS256'
    jwt_expiration: int = 3600

    # External APIs
    external_api_url: Optional[str] = None
    external_api_key: Optional[str] = None

settings = Settings()

Graceful Shutdown

from aiohttp import web
import asyncio
import signal

class Application:
    def __init__(self):
        self.app = web.Application()
        self.cleanup_tasks = []

    async def startup(self):
        # Initialize resources
        pass

    async def cleanup(self):
        # Cleanup resources
        for task in self.cleanup_tasks:
            await task

    async def shutdown(self, app):
        # Close database connections
        # Close HTTP sessions
        # Wait for background tasks
        await self.cleanup()

    def run(self):
        self.app.on_startup.append(lambda app: self.startup())
        self.app.on_cleanup.append(self.shutdown)

        web.run_app(
            self.app,
            host='127.0.0.1',
            port=8080,
            shutdown_timeout=60.0
        )

Summary

This guide covers:

  • Python 3.13 modern features and type hints
  • aiohttp 3.13+ client and server development
  • Complete authentication patterns (Basic, Bearer, API Key, OAuth2)
  • Pydantic 2.12+ validation
  • WebSocket implementation
  • Comprehensive testing with pytest-aiohttp
  • Git protocol integration
  • Repository management system
  • Performance optimization
  • Production-ready patterns

Key Takeaways:

  1. Always reuse ClientSession across requests
  2. Use type hints and Pydantic for validation
  3. Implement proper error handling and middleware
  4. Write comprehensive tests with pytest-aiohttp
  5. Follow async/await patterns consistently
  6. Optimize connection pooling and timeouts
  7. Handle cleanup and graceful shutdown properly

Guide Version: 1.0 Last Updated: November 2025 Compatible with: Python 3.13.3, aiohttp 3.13.2, pytest-aiohttp 1.1.0, pydantic 2.12.3