import asyncio
import json
import pathlib
import time
import uuid
from aiohttp import web
from app.app import Application as BaseApplication
from rwebgui.component import Component
class EvalBox(Component):
async def on_change(self, value):
try:
if value and value.strip().endswith("="):
value = value.strip()[:-1]
try:
result = eval(value)
value = value + "= " + str(result)
await self.set_attr("value", value)
except:
pass
except AttributeError:
print(value)
return value
class Button(Component):
async def on_click(self, event):
component = self.app.search
await component.set_attr("value", "Woeiii")
class Button1(Component):
async def on_click(self, event):
field = self.app.search
await field.toggle()
value = await field.get_style("display", "block")
await self.set_attr("innerText", value)
class RandomString(Component):
async def task_random(self):
import random
rand_bytes = [random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(15)]
random_data = "".join(rand_bytes)
while True:
remember = random_data[0]
random_data = random_data[1:] + remember
await self.set_attr("innerHTML", random_data)
await asyncio.sleep(0.01)
class Counter(Component):
async def task_test(self):
while True:
await asyncio.sleep(10)
print("Slow task")
async def task_increment(self):
if not self.value:
self.value = 0
while True:
try:
self.value = int(self.value)
except:
self.value = 0
self.value += 1
await self.set_attr("value", self.value)
await asyncio.sleep(1)
class GPT(Component):
class Children:
prompt = Component
answer = Component
class submit(Component):
async def trigger(self, id_, event, data):
print("GOGOG", event, data)
return await super().trigger(id_, event, data)
async def on_click(self, data):
from xmlrpc.client import ServerProxy
client = ServerProxy("https://api.molodetz.nl/rpc")
prompt = await self.app.prompt.get_attr("value")
print(prompt)
exit(0)
await self.answer.set_attr("value", client.gpt4o(prompt))
return {"event_id": data["event_id"], "success": True}
class SpeedMeter(Component):
def __init__(self, app, id_, description=None, ws=None):
self.time_start = time.time()
self.bytes_received = 0
super().__init__(app, id_, description, ws)
async def task_update(self):
while True:
bytes_received = self.bytes_received
self.bytes_received = 0
await self.set_attr("value", f"{bytes_received / 1000} kb/s")
await asyncio.sleep(1)
async def trigger(self, id_, event, data):
super().trigger(id_, event, data)
print("JAA")
self.bytes_received += len(json.dumps(data))
class App(Component):
class Children:
eval_box = EvalBox
search = Component
teller1 = Counter
teller2 = Counter
teller3 = Counter
teller4 = Counter
teller5 = Counter
teller6 = Counter
teller7 = Counter
link1 = Button
random1 = RandomString
speed = SpeedMeter
toggle = Button1
gpt = GPT
_service = None
class Application(BaseApplication):
def __init__(self):
self.location = pathlib.Path(__file__).parent
self.location_static = self.location.joinpath("static")
self.template_path = self.location.joinpath("templates")
super().__init__(template_path=self.template_path)
self.router.add_static("/static", self.location_static)
self.router.add_get("/", self.index_handler)
self.router.add_get("/ws/{uuid}", self.websocket_handler)
async def websocket_handler(self, request):
uuid_value = request.match_info["uuid"]
try:
uuid_obj = uuid.UUID(uuid_value)
except ValueError:
return web.Response(text="Invalid UUID", status=400)
ws = web.WebSocketResponse()
await ws.prepare(request)
component = App(self, "app", ws=ws)
await component.service()
return ws
async def index_handler(self, request):
return await self.render_template("index.html", request, {})