Upate cache.
This commit is contained in:
parent
f5a2928f1b
commit
7b245869d5
@ -1,167 +1,137 @@
|
||||
import asyncio
|
||||
import functools
|
||||
import json
|
||||
import asyncio
|
||||
from collections import OrderedDict # Use OrderedDict for O(1) LRU management
|
||||
|
||||
# Assuming snek.system.security exists and security.hash is an async function
|
||||
from collections import OrderedDict
|
||||
from snek.system import security
|
||||
|
||||
# NOTE: functools.cache is only for synchronous functions and is not used in the class
|
||||
# cache = functools.cache # Unused, removed from class logic
|
||||
CACHE_MAX_ITEMS_DEFAULT = 5000
|
||||
|
||||
|
||||
class Cache:
|
||||
"""
|
||||
An asynchronous, thread-safe, in-memory LRU (Least Recently Used) cache.
|
||||
|
||||
This implementation uses an OrderedDict for efficient O(1) time complexity
|
||||
for its core get, set, and delete operations.
|
||||
"""
|
||||
|
||||
def __init__(self, app, max_items=CACHE_MAX_ITEMS_DEFAULT):
|
||||
self.app = app
|
||||
# Replaced dict with OrderedDict for O(1) LRU moves
|
||||
self.cache = OrderedDict()
|
||||
# OrderedDict is the core of the LRU logic. It remembers the order
|
||||
# in which items were inserted.
|
||||
self.cache: OrderedDict = OrderedDict()
|
||||
self.max_items = max_items
|
||||
self.stats = {}
|
||||
self.enabled = False
|
||||
# LRU list is no longer needed; OrderedDict handles the order
|
||||
self.lru = []
|
||||
# Add an asyncio Lock for concurrent access safety
|
||||
self.enabled = True
|
||||
# A lock is crucial to prevent race conditions in an async environment.
|
||||
self._lock = asyncio.Lock()
|
||||
self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4
|
||||
[cite_start]self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4 [cite: 1760]
|
||||
|
||||
# --- Core Cache Logic (Now O(1) operations) ---
|
||||
async def get(self, key):
|
||||
"""
|
||||
Retrieves an item from the cache. If found, it's marked as recently used.
|
||||
Returns None if the item is not found or the cache is disabled.
|
||||
"""
|
||||
if not self.enabled:
|
||||
return None
|
||||
|
||||
async def get(self, args):
|
||||
# Must be protected by a lock for thread safety
|
||||
async with self._lock:
|
||||
if not self.enabled:
|
||||
if key not in self.cache:
|
||||
await self.update_stat(key, "get")
|
||||
return None
|
||||
|
||||
# Check for cache miss
|
||||
if args not in self.cache:
|
||||
await self.update_stat(args, "get")
|
||||
# print("Cache miss!", args, flush=True)
|
||||
return None
|
||||
|
||||
await self.update_stat(args, "get")
|
||||
|
||||
# 1. Update LRU order: Move to end (most recently used)
|
||||
# Use self.cache.move_to_end() for O(1) LRU update
|
||||
value = self.cache.pop(args) # Pop to get the value
|
||||
self.cache[args] = value # Re-add to the end (MRU)
|
||||
|
||||
# NOTE: The original code had a confusing LRU list implementation
|
||||
# that was completely wrong. It should have been:
|
||||
# 1. Check if in self.cache (dict).
|
||||
# 2. If in cache, move it to the front/end of the LRU structure.
|
||||
# 3. Return the value.
|
||||
|
||||
# Since self.lru is part of the public interface (used in get_stats),
|
||||
# we must maintain its state for that method, but it is not
|
||||
# used for core LRU logic anymore.
|
||||
|
||||
# print("Cache hit!", args, flush=True)
|
||||
return value
|
||||
# Mark as recently used by moving it to the end of the OrderedDict.
|
||||
# This is an O(1) operation.
|
||||
self.cache.move_to_end(key)
|
||||
await self.update_stat(key, "get")
|
||||
return self.cache[key]
|
||||
|
||||
async def set(self, key, value):
|
||||
"""
|
||||
Adds or updates an item in the cache and marks it as recently used.
|
||||
If the cache exceeds its maximum size, the least recently used item is evicted.
|
||||
"""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
async def set(self, args, result):
|
||||
# Must be protected by a lock for thread safety
|
||||
async with self._lock:
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
is_new = args not in self.cache
|
||||
is_new = key not in self.cache
|
||||
|
||||
# 1. Update/Set value
|
||||
self.cache[args] = result
|
||||
|
||||
# 2. Update LRU order (Move to end/MRU)
|
||||
self.cache.move_to_end(args)
|
||||
# Add or update the item. If it exists, it's moved to the end.
|
||||
self.cache[key] = value
|
||||
self.cache.move_to_end(key)
|
||||
|
||||
await self.update_stat(args, "set")
|
||||
await self.update_stat(key, "set")
|
||||
|
||||
# 3. Handle eviction (Now O(1))
|
||||
# Evict the least recently used item if the cache is full.
|
||||
# This is an O(1) operation.
|
||||
if len(self.cache) > self.max_items:
|
||||
# popitem(last=False) removes the first (LRU) item
|
||||
# popitem(last=False) removes and returns the first (oldest) item.
|
||||
evicted_key, _ = self.cache.popitem(last=False)
|
||||
# NOTE: The original code failed to update self.lru on eviction.
|
||||
# Since we are using OrderedDict, we don't need self.lru for LRU tracking.
|
||||
# However, if self.lru must be updated for `get_stats`,
|
||||
# we must manage it here and in `get_stats`.
|
||||
# For a clean repair, self.cache (OrderedDict) is the source of truth.
|
||||
# Optionally, you could log the evicted key here.
|
||||
|
||||
if is_new:
|
||||
self.version += 1
|
||||
# print(f"Cache store! {len(self.cache)} items. New version:", self.version, flush=True)
|
||||
|
||||
async def delete(self, args):
|
||||
# Must be protected by a lock for thread safety
|
||||
async def delete(self, key):
|
||||
"""Removes an item from the cache if it exists."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
async with self._lock:
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
if args in self.cache:
|
||||
await self.update_stat(args, "delete")
|
||||
del self.cache[args]
|
||||
# NOTE: No list manipulation needed due to OrderedDict
|
||||
|
||||
# --- Utility Methods (Interface Retained) ---
|
||||
if key in self.cache:
|
||||
await self.update_stat(key, "delete")
|
||||
# Deleting from OrderedDict is an O(1) operation on average.
|
||||
del self.cache[key]
|
||||
|
||||
async def get_stats(self):
|
||||
# Must be protected by a lock for thread safety
|
||||
"""Returns statistics for all items currently in the cache."""
|
||||
async with self._lock:
|
||||
all_ = []
|
||||
# Iterate through self.cache (OrderedDict) to get the MRU-to-LRU order
|
||||
# The public interface uses self.lru, so we must generate it here
|
||||
# from the source of truth (self.cache keys) in MRU order.
|
||||
|
||||
# Generate the keys in MRU order (reverse of iteration)
|
||||
lru_keys = list(self.cache.keys())
|
||||
# For the original self.lru list, front was MRU, back was LRU
|
||||
lru_keys.reverse()
|
||||
self.lru = lru_keys # Update the redundant public attribute self.lru
|
||||
|
||||
for key in self.lru:
|
||||
if key not in self.stats:
|
||||
self.stats[key] = {"set": 0, "get": 0, "delete": 0}
|
||||
|
||||
# Handling potential KeyError if key was evicted but stat remains
|
||||
if key in self.cache:
|
||||
value_record = self.cache[key].record if hasattr(self.cache.get(key), 'record') else self.cache[key]
|
||||
all_.append(
|
||||
{
|
||||
"key": key,
|
||||
"set": self.stats[key]["set"],
|
||||
"get": self.stats[key]["get"],
|
||||
"delete": self.stats[key]["delete"],
|
||||
"value": str(self.serialize(value_record)),
|
||||
}
|
||||
)
|
||||
return all_
|
||||
stats_list = []
|
||||
# Items are iterated from oldest to newest. We reverse to show
|
||||
# most recently used items first.
|
||||
for key in reversed(self.cache):
|
||||
stat_data = self.stats.get(key, {"set": 0, "get": 0, "delete": 0})
|
||||
value = self.cache[key]
|
||||
value_record = value.record if hasattr(value, 'record') else value
|
||||
|
||||
# Made synchronous as it's a CPU-bound operation
|
||||
def serialize(self, obj):
|
||||
cpy = obj.copy()
|
||||
cpy.pop("created_at", None)
|
||||
cpy.pop("deleted_at", None)
|
||||
cpy.pop("email", None)
|
||||
cpy.pop("password", None)
|
||||
return cpy
|
||||
stats_list.append({
|
||||
"key": key,
|
||||
"set": stat_data.get("set", 0),
|
||||
"get": stat_data.get("get", 0),
|
||||
"delete": stat_data.get("delete", 0),
|
||||
"value": str(self.serialize(value_record)),
|
||||
})
|
||||
return stats_list
|
||||
|
||||
# Made synchronous as it's a CPU-bound operation
|
||||
async def update_stat(self, key, action):
|
||||
# Although called within locked methods, we lock it here to make it safe
|
||||
# if called directly, as the original signature is async.
|
||||
"""Updates hit/miss/set counts for a given cache key."""
|
||||
# This method is already called within a locked context,
|
||||
# but the lock makes it safe if ever called directly.
|
||||
async with self._lock:
|
||||
if key not in self.stats:
|
||||
self.stats[key] = {"set": 0, "get": 0, "delete": 0}
|
||||
self.stats[key][action] = self.stats[key][action] + 1
|
||||
|
||||
# Made synchronous as it's a CPU-bound operation
|
||||
self.stats[key][action] += 1
|
||||
|
||||
def serialize(self, obj):
|
||||
"""A synchronous helper to create a serializable representation of an object."""
|
||||
if not isinstance(obj, dict):
|
||||
return obj
|
||||
cpy = obj.copy()
|
||||
for key_to_remove in ["created_at", "deleted_at", "email", "password"]:
|
||||
cpy.pop(key_to_remove, None)
|
||||
return cpy
|
||||
|
||||
def json_default(self, value):
|
||||
"""JSON serializer fallback for objects that are not directly serializable."""
|
||||
try:
|
||||
return json.dumps(value.__dict__, default=str)
|
||||
except:
|
||||
return str(value)
|
||||
|
||||
# Retained async due to the call to await security.hash()
|
||||
async def create_cache_key(self, args, kwargs):
|
||||
# CPU-bound operations don't need a lock, but retain async for security.hash
|
||||
"""Creates a consistent, hashable cache key from function arguments."""
|
||||
# security.hash is async, so this method remains async.
|
||||
return await security.hash(
|
||||
json.dumps(
|
||||
{"args": args, "kwargs": kwargs},
|
||||
@ -171,7 +141,7 @@ class Cache:
|
||||
)
|
||||
|
||||
def async_cache(self, func):
|
||||
# No change to the decorator structure
|
||||
"""Decorator to cache the results of an async function."""
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
cache_key = await self.create_cache_key(args, kwargs)
|
||||
@ -184,27 +154,11 @@ class Cache:
|
||||
return wrapper
|
||||
|
||||
def async_delete_cache(self, func):
|
||||
# The internal logic is now clean O(1) using self.delete()
|
||||
"""Decorator to invalidate a cache entry before running an async function."""
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args, **kwargs):
|
||||
cache_key = await self.create_cache_key(args, kwargs)
|
||||
# Use the fixed self.delete method
|
||||
await self.delete(cache_key)
|
||||
await self.delete(cache_key)
|
||||
return await func(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
# --- Standalone async_cache (No Change) ---
|
||||
# NOTE: This implementation is separate from the Cache class and is not LRU.
|
||||
def async_cache(func):
|
||||
cache = {}
|
||||
|
||||
@functools.wraps(func)
|
||||
async def wrapper(*args):
|
||||
if args in cache:
|
||||
return cache[args]
|
||||
result = await func(*args)
|
||||
cache[args] = result
|
||||
return result
|
||||
|
||||
return wrapper
|
||||
|
Loading…
Reference in New Issue
Block a user