import functools import json from snek.system import security cache = functools.cache CACHE_MAX_ITEMS_DEFAULT = 5000 class Cache: def __init__(self, app, max_items=CACHE_MAX_ITEMS_DEFAULT): self.app = app self.cache = {} self.max_items = max_items self.lru = [] self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4 async def get(self, args): try: self.lru.pop(self.lru.index(args)) except: print("Cache miss!", args, flush=True) return None self.lru.insert(0, args) while len(self.lru) > self.max_items: self.cache.pop(self.lru[-1]) self.lru.pop() print("Cache hit!", args, flush=True) return self.cache[args] def json_default(self, value): # if hasattr(value, "to_json"): # return value.to_json() try: return json.dumps(value.__dict__, default=str) except: return str(value) async def create_cache_key(self, args, kwargs): return await security.hash( json.dumps( {"args": args, "kwargs": kwargs}, sort_keys=True, default=self.json_default, ) ) async def set(self, args, result): is_new = args not in self.cache self.cache[args] = result try: self.lru.pop(self.lru.index(args)) except (ValueError, IndexError): pass self.lru.insert(0, args) while len(self.lru) > self.max_items: self.cache.pop(self.lru[-1]) self.lru.pop() if is_new: self.version += 1 print("Cache store! New version:", self.version, flush=True) async def delete(self, args): if args in self.cache: try: self.lru.pop(self.lru.index(args)) except IndexError: pass del self.cache[args] def async_cache(self, func): @functools.wraps(func) async def wrapper(*args, **kwargs): cache_key = await self.create_cache_key(args, kwargs) cached = await self.get(cache_key) if cached: return cached result = await func(*args, **kwargs) await self.set(cache_key, result) return result return wrapper def async_delete_cache(self, func): @functools.wraps(func) async def wrapper(*args, **kwargs): cache_key = await self.create_cache_key(args, kwargs) if cache_key in self.cache: try: self.lru.pop(self.lru.index(cache_key)) except IndexError: pass del self.cache[cache_key] return await func(*args, **kwargs) return wrapper def async_cache(func): cache = {} @functools.wraps(func) async def wrapper(*args): if args in cache: return cache[args] result = await func(*args) cache[args] = result return result return wrapper