Compare commits

..

2 Commits

Author SHA1 Message Date
7b245869d5 Upate cache. 2025-09-30 19:35:56 +02:00
f5a2928f1b Update, cleanup 2025-09-30 19:28:59 +02:00
6 changed files with 91 additions and 568 deletions

View File

@ -1,167 +1,137 @@
import asyncio
import functools import functools
import json import json
import asyncio from collections import OrderedDict
from collections import OrderedDict # Use OrderedDict for O(1) LRU management
# Assuming snek.system.security exists and security.hash is an async function
from snek.system import security from snek.system import security
# NOTE: functools.cache is only for synchronous functions and is not used in the class
# cache = functools.cache # Unused, removed from class logic
CACHE_MAX_ITEMS_DEFAULT = 5000 CACHE_MAX_ITEMS_DEFAULT = 5000
class Cache: class Cache:
"""
An asynchronous, thread-safe, in-memory LRU (Least Recently Used) cache.
This implementation uses an OrderedDict for efficient O(1) time complexity
for its core get, set, and delete operations.
"""
def __init__(self, app, max_items=CACHE_MAX_ITEMS_DEFAULT): def __init__(self, app, max_items=CACHE_MAX_ITEMS_DEFAULT):
self.app = app self.app = app
# Replaced dict with OrderedDict for O(1) LRU moves # OrderedDict is the core of the LRU logic. It remembers the order
self.cache = OrderedDict() # in which items were inserted.
self.cache: OrderedDict = OrderedDict()
self.max_items = max_items self.max_items = max_items
self.stats = {} self.stats = {}
self.enabled = False self.enabled = True
# LRU list is no longer needed; OrderedDict handles the order # A lock is crucial to prevent race conditions in an async environment.
self.lru = []
# Add an asyncio Lock for concurrent access safety
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4 [cite_start]self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4 [cite: 1760]
# --- Core Cache Logic (Now O(1) operations) --- async def get(self, key):
"""
Retrieves an item from the cache. If found, it's marked as recently used.
Returns None if the item is not found or the cache is disabled.
"""
if not self.enabled:
return None
async def get(self, args):
# Must be protected by a lock for thread safety
async with self._lock: async with self._lock:
if not self.enabled: if key not in self.cache:
await self.update_stat(key, "get")
return None return None
# Check for cache miss
if args not in self.cache:
await self.update_stat(args, "get")
# print("Cache miss!", args, flush=True)
return None
await self.update_stat(args, "get")
# 1. Update LRU order: Move to end (most recently used)
# Use self.cache.move_to_end() for O(1) LRU update
value = self.cache.pop(args) # Pop to get the value
self.cache[args] = value # Re-add to the end (MRU)
# NOTE: The original code had a confusing LRU list implementation # Mark as recently used by moving it to the end of the OrderedDict.
# that was completely wrong. It should have been: # This is an O(1) operation.
# 1. Check if in self.cache (dict). self.cache.move_to_end(key)
# 2. If in cache, move it to the front/end of the LRU structure. await self.update_stat(key, "get")
# 3. Return the value. return self.cache[key]
# Since self.lru is part of the public interface (used in get_stats), async def set(self, key, value):
# we must maintain its state for that method, but it is not """
# used for core LRU logic anymore. Adds or updates an item in the cache and marks it as recently used.
If the cache exceeds its maximum size, the least recently used item is evicted.
# print("Cache hit!", args, flush=True) """
return value if not self.enabled:
return
async def set(self, args, result):
# Must be protected by a lock for thread safety
async with self._lock: async with self._lock:
if not self.enabled: is_new = key not in self.cache
return
is_new = args not in self.cache
# 1. Update/Set value # Add or update the item. If it exists, it's moved to the end.
self.cache[args] = result self.cache[key] = value
self.cache.move_to_end(key)
# 2. Update LRU order (Move to end/MRU)
self.cache.move_to_end(args)
await self.update_stat(args, "set") await self.update_stat(key, "set")
# 3. Handle eviction (Now O(1)) # Evict the least recently used item if the cache is full.
# This is an O(1) operation.
if len(self.cache) > self.max_items: if len(self.cache) > self.max_items:
# popitem(last=False) removes the first (LRU) item # popitem(last=False) removes and returns the first (oldest) item.
evicted_key, _ = self.cache.popitem(last=False) evicted_key, _ = self.cache.popitem(last=False)
# NOTE: The original code failed to update self.lru on eviction. # Optionally, you could log the evicted key here.
# Since we are using OrderedDict, we don't need self.lru for LRU tracking.
# However, if self.lru must be updated for `get_stats`,
# we must manage it here and in `get_stats`.
# For a clean repair, self.cache (OrderedDict) is the source of truth.
if is_new: if is_new:
self.version += 1 self.version += 1
# print(f"Cache store! {len(self.cache)} items. New version:", self.version, flush=True)
async def delete(self, args): async def delete(self, key):
# Must be protected by a lock for thread safety """Removes an item from the cache if it exists."""
if not self.enabled:
return
async with self._lock: async with self._lock:
if not self.enabled: if key in self.cache:
return await self.update_stat(key, "delete")
# Deleting from OrderedDict is an O(1) operation on average.
if args in self.cache: del self.cache[key]
await self.update_stat(args, "delete")
del self.cache[args]
# NOTE: No list manipulation needed due to OrderedDict
# --- Utility Methods (Interface Retained) ---
async def get_stats(self): async def get_stats(self):
# Must be protected by a lock for thread safety """Returns statistics for all items currently in the cache."""
async with self._lock: async with self._lock:
all_ = [] stats_list = []
# Iterate through self.cache (OrderedDict) to get the MRU-to-LRU order # Items are iterated from oldest to newest. We reverse to show
# The public interface uses self.lru, so we must generate it here # most recently used items first.
# from the source of truth (self.cache keys) in MRU order. for key in reversed(self.cache):
stat_data = self.stats.get(key, {"set": 0, "get": 0, "delete": 0})
# Generate the keys in MRU order (reverse of iteration) value = self.cache[key]
lru_keys = list(self.cache.keys()) value_record = value.record if hasattr(value, 'record') else value
# For the original self.lru list, front was MRU, back was LRU
lru_keys.reverse()
self.lru = lru_keys # Update the redundant public attribute self.lru
for key in self.lru:
if key not in self.stats:
self.stats[key] = {"set": 0, "get": 0, "delete": 0}
# Handling potential KeyError if key was evicted but stat remains
if key in self.cache:
value_record = self.cache[key].record if hasattr(self.cache.get(key), 'record') else self.cache[key]
all_.append(
{
"key": key,
"set": self.stats[key]["set"],
"get": self.stats[key]["get"],
"delete": self.stats[key]["delete"],
"value": str(self.serialize(value_record)),
}
)
return all_
# Made synchronous as it's a CPU-bound operation stats_list.append({
def serialize(self, obj): "key": key,
cpy = obj.copy() "set": stat_data.get("set", 0),
cpy.pop("created_at", None) "get": stat_data.get("get", 0),
cpy.pop("deleted_at", None) "delete": stat_data.get("delete", 0),
cpy.pop("email", None) "value": str(self.serialize(value_record)),
cpy.pop("password", None) })
return cpy return stats_list
# Made synchronous as it's a CPU-bound operation
async def update_stat(self, key, action): async def update_stat(self, key, action):
# Although called within locked methods, we lock it here to make it safe """Updates hit/miss/set counts for a given cache key."""
# if called directly, as the original signature is async. # This method is already called within a locked context,
# but the lock makes it safe if ever called directly.
async with self._lock: async with self._lock:
if key not in self.stats: if key not in self.stats:
self.stats[key] = {"set": 0, "get": 0, "delete": 0} self.stats[key] = {"set": 0, "get": 0, "delete": 0}
self.stats[key][action] = self.stats[key][action] + 1 self.stats[key][action] += 1
# Made synchronous as it's a CPU-bound operation def serialize(self, obj):
"""A synchronous helper to create a serializable representation of an object."""
if not isinstance(obj, dict):
return obj
cpy = obj.copy()
for key_to_remove in ["created_at", "deleted_at", "email", "password"]:
cpy.pop(key_to_remove, None)
return cpy
def json_default(self, value): def json_default(self, value):
"""JSON serializer fallback for objects that are not directly serializable."""
try: try:
return json.dumps(value.__dict__, default=str) return json.dumps(value.__dict__, default=str)
except: except:
return str(value) return str(value)
# Retained async due to the call to await security.hash()
async def create_cache_key(self, args, kwargs): async def create_cache_key(self, args, kwargs):
# CPU-bound operations don't need a lock, but retain async for security.hash """Creates a consistent, hashable cache key from function arguments."""
# security.hash is async, so this method remains async.
return await security.hash( return await security.hash(
json.dumps( json.dumps(
{"args": args, "kwargs": kwargs}, {"args": args, "kwargs": kwargs},
@ -171,7 +141,7 @@ class Cache:
) )
def async_cache(self, func): def async_cache(self, func):
# No change to the decorator structure """Decorator to cache the results of an async function."""
@functools.wraps(func) @functools.wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
cache_key = await self.create_cache_key(args, kwargs) cache_key = await self.create_cache_key(args, kwargs)
@ -184,27 +154,11 @@ class Cache:
return wrapper return wrapper
def async_delete_cache(self, func): def async_delete_cache(self, func):
# The internal logic is now clean O(1) using self.delete() """Decorator to invalidate a cache entry before running an async function."""
@functools.wraps(func) @functools.wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args, **kwargs):
cache_key = await self.create_cache_key(args, kwargs) cache_key = await self.create_cache_key(args, kwargs)
# Use the fixed self.delete method await self.delete(cache_key)
await self.delete(cache_key)
return await func(*args, **kwargs) return await func(*args, **kwargs)
return wrapper return wrapper
# --- Standalone async_cache (No Change) ---
# NOTE: This implementation is separate from the Cache class and is not LRU.
def async_cache(func):
cache = {}
@functools.wraps(func)
async def wrapper(*args):
if args in cache:
return cache[args]
result = await func(*args)
cache[args] = result
return result
return wrapper

View File

@ -1,78 +0,0 @@
import asyncio
import logging
import os
import asyncssh
asyncssh.set_debug_level(2)
logging.basicConfig(level=logging.DEBUG)
# Configuration for SFTP server
SFTP_ROOT = "." # Directory to serve
USERNAME = "test"
PASSWORD = "woeii"
HOST = "localhost"
PORT = 2225
class MySFTPServer(asyncssh.SFTPServer):
def __init__(self, chan):
super().__init__(chan)
self.root = os.path.abspath(SFTP_ROOT)
async def stat(self, path):
"""Handles 'stat' command from SFTP client"""
full_path = os.path.join(self.root, path.lstrip("/"))
return await super().stat(full_path)
async def open(self, path, flags, attrs):
"""Handles file open requests"""
full_path = os.path.join(self.root, path.lstrip("/"))
return await super().open(full_path, flags, attrs)
async def listdir(self, path):
"""Handles directory listing"""
full_path = os.path.join(self.root, path.lstrip("/"))
return await super().listdir(full_path)
class MySSHServer(asyncssh.SSHServer):
"""Custom SSH server to handle authentication"""
def connection_made(self, conn):
print(f"New connection from {conn.get_extra_info('peername')}")
def connection_lost(self, exc):
print("Client disconnected")
def begin_auth(self, username):
return True # No additional authentication steps
def password_auth_supported(self):
return True # Support password authentication
def validate_password(self, username, password):
print(username, password)
return True
return username == USERNAME and password == PASSWORD
async def start_sftp_server():
os.makedirs(SFTP_ROOT, exist_ok=True) # Ensure the root directory exists
await asyncssh.create_server(
lambda: MySSHServer(),
host=HOST,
port=PORT,
server_host_keys=["ssh_host_key"],
process_factory=MySFTPServer,
)
print(f"SFTP server running on {HOST}:{PORT}")
await asyncio.Future() # Keep running forever
if __name__ == "__main__":
try:
asyncio.run(start_sftp_server())
except (OSError, asyncssh.Error) as e:
print(f"Error starting SFTP server: {e}")

View File

@ -1,77 +0,0 @@
import asyncio
import os
import asyncssh
# SSH Server Configuration
HOST = "0.0.0.0"
PORT = 2225
USERNAME = "user"
PASSWORD = "password"
SHELL = "/bin/sh" # Change to another shell if needed
class CustomSSHServer(asyncssh.SSHServer):
def connection_made(self, conn):
print(f"New connection from {conn.get_extra_info('peername')}")
def connection_lost(self, exc):
print("Client disconnected")
def password_auth_supported(self):
return True
def validate_password(self, username, password):
return username == USERNAME and password == PASSWORD
async def custom_bash_process(process):
"""Spawns a custom bash shell process"""
env = os.environ.copy()
env["TERM"] = "xterm-256color"
# Start the Bash shell
bash_proc = await asyncio.create_subprocess_exec(
SHELL,
"-i",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
async def read_output():
while True:
data = await bash_proc.stdout.read(1)
if not data:
break
process.stdout.write(data)
async def read_input():
while True:
data = await process.stdin.read(1)
if not data:
break
bash_proc.stdin.write(data)
await asyncio.gather(read_output(), read_input())
async def start_ssh_server():
"""Starts the AsyncSSH server with Bash"""
await asyncssh.create_server(
lambda: CustomSSHServer(),
host=HOST,
port=PORT,
server_host_keys=["ssh_host_key"],
process_factory=custom_bash_process,
)
print(f"SSH server running on {HOST}:{PORT}")
await asyncio.Future() # Keep running
if __name__ == "__main__":
try:
asyncio.run(start_ssh_server())
except (OSError, asyncssh.Error) as e:
print(f"Error starting SSH server: {e}")

View File

@ -1,74 +0,0 @@
#!/usr/bin/env python3.7
#
# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
# To run this program, the file ``ssh_host_key`` must exist with an SSH
# private key in it to use as a server host key. An SSH host certificate
# can optionally be provided in the file ``ssh_host_key-cert.pub``.
#
# The file ``ssh_user_ca`` must exist with a cert-authority entry of
# the certificate authority which can sign valid client certificates.
import asyncio
import sys
import asyncssh
async def handle_client(process: asyncssh.SSHServerProcess) -> None:
width, height, pixwidth, pixheight = process.term_size
process.stdout.write(
f"Terminal type: {process.term_type}, " f"size: {width}x{height}"
)
if pixwidth and pixheight:
process.stdout.write(f" ({pixwidth}x{pixheight} pixels)")
process.stdout.write("\nTry resizing your window!\n")
while not process.stdin.at_eof():
try:
await process.stdin.read()
except asyncssh.TerminalSizeChanged as exc:
process.stdout.write(f"New window size: {exc.width}x{exc.height}")
if exc.pixwidth and exc.pixheight:
process.stdout.write(f" ({exc.pixwidth}" f"x{exc.pixheight} pixels)")
process.stdout.write("\n")
async def start_server() -> None:
await asyncssh.listen(
"",
2230,
server_host_keys=["ssh_host_key"],
# authorized_client_keys='ssh_user_ca',
process_factory=handle_client,
)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit("Error starting server: " + str(exc))
loop.run_forever()

View File

@ -1,90 +0,0 @@
#!/usr/bin/env python3.7
#
# Copyright (c) 2013-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
# To run this program, the file ``ssh_host_key`` must exist with an SSH
# private key in it to use as a server host key. An SSH host certificate
# can optionally be provided in the file ``ssh_host_key-cert.pub``.
import asyncio
import sys
from typing import Optional
import asyncssh
import bcrypt
passwords = {
"guest": b"", # guest account with no password
"user": bcrypt.hashpw(b"user", bcrypt.gensalt()),
}
def handle_client(process: asyncssh.SSHServerProcess) -> None:
username = process.get_extra_info("username")
process.stdout.write(f"Welcome to my SSH server, {username}!\n")
# process.exit(0)
class MySSHServer(asyncssh.SSHServer):
def connection_made(self, conn: asyncssh.SSHServerConnection) -> None:
peername = conn.get_extra_info("peername")[0]
print(f"SSH connection received from {peername}.")
def connection_lost(self, exc: Optional[Exception]) -> None:
if exc:
print("SSH connection error: " + str(exc), file=sys.stderr)
else:
print("SSH connection closed.")
def begin_auth(self, username: str) -> bool:
# If the user's password is the empty string, no auth is required
return passwords.get(username) != b""
def password_auth_supported(self) -> bool:
return True
def validate_password(self, username: str, password: str) -> bool:
if username not in passwords:
return False
pw = passwords[username]
if not password and not pw:
return True
return bcrypt.checkpw(password.encode("utf-8"), pw)
async def start_server() -> None:
await asyncssh.create_server(
MySSHServer,
"",
2231,
server_host_keys=["ssh_host_key"],
process_factory=handle_client,
)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit("Error starting server: " + str(exc))
loop.run_forever()

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3.7
#
# Copyright (c) 2016-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
# To run this program, the file ``ssh_host_key`` must exist with an SSH
# private key in it to use as a server host key. An SSH host certificate
# can optionally be provided in the file ``ssh_host_key-cert.pub``.
#
# The file ``ssh_user_ca`` must exist with a cert-authority entry of
# the certificate authority which can sign valid client certificates.
import asyncio
import sys
from typing import List, cast
import asyncssh
class ChatClient:
_clients: List["ChatClient"] = []
def __init__(self, process: asyncssh.SSHServerProcess):
self._process = process
@classmethod
async def handle_client(cls, process: asyncssh.SSHServerProcess):
await cls(process).run()
async def readline(self) -> str:
return cast(str, self._process.stdin.readline())
def write(self, msg: str) -> None:
self._process.stdout.write(msg)
def broadcast(self, msg: str) -> None:
for client in self._clients:
if client != self:
client.write(msg)
def begin_auth(self, username: str) -> bool:
# If the user's password is the empty string, no auth is required
# return False
return True # passwords.get(username) != b''
def password_auth_supported(self) -> bool:
return True
def validate_password(self, username: str, password: str) -> bool:
# if username not in passwords:
# return False
# pw = passwords[username]
# if not password and not pw:
# return True
return True
# return bcrypt.checkpw(password.encode('utf-8'), pw)
async def run(self) -> None:
self.write("Welcome to chat!\n\n")
self.write("Enter your name: ")
name = (await self.readline()).rstrip("\n")
self.write(f"\n{len(self._clients)} other users are connected.\n\n")
self._clients.append(self)
self.broadcast(f"*** {name} has entered chat ***\n")
try:
async for line in self._process.stdin:
self.broadcast(f"{name}: {line}")
except asyncssh.BreakReceived:
pass
self.broadcast(f"*** {name} has left chat ***\n")
self._clients.remove(self)
async def start_server() -> None:
await asyncssh.listen(
"",
2235,
server_host_keys=["ssh_host_key"],
process_factory=ChatClient.handle_client,
)
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit("Error starting server: " + str(exc))
loop.run_forever()