import asyncio
from collections import defaultdict
from typing import Dict
class LockManager:
def __init__(self):
self._locks: Dict[str, asyncio.Lock] = {}
self._master_lock = asyncio.Lock()
async def get_lock(self, identifier: str) -> asyncio.Lock:
async with self._master_lock:
if identifier not in self._locks:
self._locks[identifier] = asyncio.Lock()
return self._locks[identifier]
async def cleanup_unused_locks(self):
async with self._master_lock:
to_remove = [key for key, lock in self._locks.items() if not lock.locked()]
for key in to_remove:
del self._locks[key]
_global_lock_manager = LockManager()
def get_lock_manager() -> LockManager:
return _global_lock_manager