From 598cf3588684afd88d52c7594820dc8182de72a4 Mon Sep 17 00:00:00 2001 From: retoor Date: Sat, 26 Apr 2025 12:17:24 +0200 Subject: [PATCH] Update. --- client.py | 16 +++++----------- server.py | 37 ++++++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/client.py b/client.py index c91ebfe..908f33e 100644 --- a/client.py +++ b/client.py @@ -14,35 +14,29 @@ async def websocket_client(url: str, ollama_url: str) -> None: async with aiohttp.ClientSession() as session: try: async with session.ws_connect(f'{url}/publish') as ws: + logging.info("Fetching models.") async with session.get(f'{ollama_url}/api/tags') as response: if response.status != 200: logging.error(f"Failed to fetch models: {response.status}") return models = await response.json() await ws.send_json(models) - + logging.info("Published models to uberlama.") async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() - logging.info(f"Received data: {data}") + logging.info(f"Received data: {data}.") request_id = data['request_id'] api_url = urlunparse(urlparse(ollama_url)._replace(path=data['path'])) async with session.post(api_url, json=data['data']) as response: - print(response) if response.status != 200: - logging.error(f"Failed to post data: {response.status}") + logging.error(f"Failed to post data: {response.status}.") continue logging.info(f"Streaming response.") async for msg in response.content: - #first_index = msg.find(b"{") - #msg = msg[first_index:] - #last_index = msg.rfind(b"}") - #msg = msg[:last_index+1] - #if not msg: - # continue - #msg = json.loads(msg.decode('utf-8')) + print(msg.decode()) await ws.send_json(dict( request_id=request_id, data=msg.decode() diff --git a/server.py b/server.py index 0d912bd..9732f49 100644 --- a/server.py +++ b/server.py @@ -31,18 +31,37 @@ class OllamaServer: if chunk: yield chunk if not chunk: - yield '\n' + yield '' + #yield '\n' print("CHUNK:", chunk) + try: + obj = json.loads(chunk) + if obj.get('done'): + break + except: + pass + + try: + if '"finish_reason":"stop"' in chunk: + break + except: + pass + + try: + if 'data: [DONE]' in chunk: + break + except: + pass #try: #yield json.loads(chunk) #except: # yield chunk - if not 'done' in chunk: - break - if 'stop' in chunk: - break - if chunk['done']: - break + #if not 'done' in chunk: + # break + #if 'stop' in chunk: + # break + #if chunk.get('done'): + # break async def serve(self): @@ -145,10 +164,10 @@ async def http_handler(request): except ValueError: return web.Response(status=400) # application/x-ndjson text/event-stream - if data['stream']: + if data.get('stream'): resp = web.StreamResponse(headers={'Content-Type': 'text/event-stream', 'Transfer-Encoding': 'chunked'}) else: - resp = web.StreamResponse(headers={'Content-Type': 'application/json', 'Transfer-Encoding': 'chunked'}) + resp = web.StreamResponse(headers={'Content-Type': 'application/json', 'Transfer-Encoding':'chunked'}) await resp.prepare(request) import json try: