commit 85f954249acfe5f836e6df0a08b8020e61a712cf Author: retoor <retoor@molodetz.nl> Date: Tue Apr 1 10:33:50 2025 +0200 Initial commit. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d17dae --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.venv diff --git a/client.py b/client.py new file mode 100644 index 0000000..a693e2a --- /dev/null +++ b/client.py @@ -0,0 +1,86 @@ +# This script requires aiohttp Python library to function. + +import asyncio +import aiohttp +import json +import logging +import argparse +from urllib.parse import urlparse, urlunparse + +# Default values +DEFAULT_CONCURRENCY = 4 +DEFAULT_OLLAMA_URL = 'https://localhost:11434' + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +async def websocket_client(url: str, ollama_url: str) -> None: + async with aiohttp.ClientSession() as session: + try: + async with session.ws_connect(f'{url}/publish') as ws: + async with session.get(f'{ollama_url}/api/tags') as response: + if response.status != 200: + logging.error(f"Failed to fetch models: {response.status}") + return + models = await response.json() + await ws.send_json(models) + + async for msg in ws: + if msg.type == aiohttp.WSMsgType.TEXT: + data = msg.json() + logging.info(f"Received data: {data}") + request_id = data['request_id'] + api_url = urlunparse(urlparse(ollama_url)._replace(path=data['path'])) + + async with session.post(api_url, json=data['data']) as response: + if response.status != 200: + logging.error(f"Failed to post data: {response.status}") + continue + logging.info(f"Streaming response.") + async for msg in response.content: + msg = json.loads(msg.decode('utf-8')) + await ws.send_json(dict( + request_id=request_id, + data=msg + )) + logging.info(f"Response complete.") + elif msg.type == aiohttp.WSMsgType.ERROR: + logging.error("WebSocket error occurred.") + break + except aiohttp.ClientError as e: + logging.error(f"Client error occurred: {e}") + except Exception as e: + logging.error(f"An unexpected error occurred: {e}") + +async def main(concurrency: int, ollama_url: str) -> None: + url = 'https://ollama.molodetz.nl' + + tasks = [] + for _ in range(concurrency): + tasks.append(websocket_client(url, ollama_url)) + + while True: + try: + await asyncio.gather(*tasks) + except Exception as e: + logging.error(f"Connection error: {e}") + await asyncio.sleep(1) # Wait before retrying + +def validate_url(url: str) -> bool: + parsed = urlparse(url) + return all([parsed.scheme, parsed.netloc]) + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='WebSocket Client for Ollama API') + parser.add_argument('--concurrency', type=int, default=DEFAULT_CONCURRENCY, + help='Number of concurrent WebSocket connections (default: 4)') + parser.add_argument('--ollama_url', type=str, default=DEFAULT_OLLAMA_URL, + help='Ollama API URL (default: http://localhost:11434)') + + args = parser.parse_args() + + if not validate_url(args.ollama_url): + logging.error(f"Invalid Ollama URL: {args.ollama_url}") + exit(1) + + asyncio.run(main(args.concurrency, args.ollama_url)) diff --git a/index.html b/index.html new file mode 100644 index 0000000..528a9aa --- /dev/null +++ b/index.html @@ -0,0 +1,57 @@ +<!doctype html> +<html lang="en"> + <head> + <meta charset="utf-8"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <meta name="color-scheme" content="light dark"> + <link + rel="stylesheet" + href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.yellow.min.css" + > + <title>Ollama Crowd-Funded Server</title> + </head> + <body> + <main class="container"> + <h1>Ollama Crowd-Funded Server</h1> + <p> + Welcome to the Ollama Crowd-Funded Server. You can use this URL with the official Ollama JavaScript or Python clients to communicate with an Ollama server. The Ollama servers are generously provided by individuals. + </p> + <h2>Using this Ollama Server</h2> + <p> + Simply use the original client! The only difference is the URL. + </p> + <code> + <pre> +from ollama import Client +client = Client( + host="https://ollama.molodetz.nl" +) + +messages = [] + +def chat(message): + if message: + messages.append({'role': 'user', 'content': message}) + content = '' + for response in client.chat(model='qwen2.5-coder:0.5b', messages=messages, stream=True): + content += response.message.content + print(response.message.content, end='', flush=True) + messages.append({'role': 'assistant', 'content': content}) + print("") + +while True: + message = input("You: ") + chat(message) + </pre> + </code> + <h2>Donate Your Resources</h2> + <p> + You can contribute your resources to the server by using the following script: + </p> + <code><pre> +#client.py + </pre> + </code> + </main> + </body> +</html> diff --git a/server.py b/server.py new file mode 100644 index 0000000..e9b754e --- /dev/null +++ b/server.py @@ -0,0 +1,142 @@ +# Written by retoor@molodetz.nl + +# This code creates a server using asyncio and aiohttp that manages websocket and HTTP connections to forward messages between them. + +# Used Imports: asyncio, aiohttp + +# The MIT License (MIT) +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import asyncio +import aiohttp +from aiohttp import web +import uuid +import pathlib + +class OllamaServer: + def __init__(self,ws,models): + self.ws = ws + self.queues = {} + self.models = models + print("New OllamaServer created") + print(self.model_names) + + @property + def model_names(self): + return [model['name'] for model in self.models] + + async def forward_to_http(self, request_id, message): + if not request_id in self.queues: + self.queues[request_id] = asyncio.Queue() + await self.queues[request_id].put(message) + + async def forward_to_websocket(self, request_id, message,path): + self.queues[request_id] = asyncio.Queue() + await self.ws.send_json(dict(request_id=request_id, data=message,path=path)) + + while True: + chunk = await self.queues[request_id].get() + yield chunk + if chunk['done']: + break + + + async def serve(self): + async for msg in self.ws: + if msg.type == web.WSMsgType.TEXT: + data = msg.json() + request_id = data['request_id'] + await self.forward_to_http(request_id, data['data']) + elif msg.type == web.WSMsgType.ERROR: + break + +class ServerManager: + def __init__(self): + self.servers = [] + + def add_server(self, server): + self.servers.append(server) + + def remove_server(self, server): + self.servers.remove(server) + + async def forward_to_websocket(self, request_id, message,path): + try: + server = self.servers.pop(0) + self.servers.append(server) + async for msg in server.forward_to_websocket(request_id, message,path): + yield msg + except: + raise + +server_manager = ServerManager() + + + +async def websocket_handler(request): + ws = web.WebSocketResponse() + await ws.prepare(request) + + models = await ws.receive_json() + + server = OllamaServer(ws,models['models']) + server_manager.add_server(server) + + async for msg in ws: + if msg.type == web.WSMsgType.TEXT: + data = msg.json() + await server.forward_to_http(data['request_id'], data['data']) + elif msg.type == web.WSMsgType.ERROR: + print(f'WebSocket connection closed with exception: {ws.exception()}') + + server_manager.remove_server(server) + return ws + + +async def http_handler(request): + request_id = str(uuid.uuid4()) + data = None + try: + data = await request.json() + except ValueError: + return web.Response(status=400) + + resp = web.StreamResponse(headers={'Content-Type': 'application/x-ndjson','Transfer-Encoding': 'chunked'}) + await resp.prepare(request) + import json + async for result in server_manager.forward_to_websocket(request_id, data,path=request.path): + await resp.write(json.dumps(result).encode() + b'\n') + await resp.write_eof() + return resp + +async def index_handler(request): + index_template = pathlib.Path("index.html").read_text() + client_py = pathlib.Path("client.py").read_text() + index_template = index_template.replace("#client.py", client_py) + return web.Response(text=index_template, content_type="text/html") + +app = web.Application() + +app.router.add_get("/", index_handler) +app.router.add_route('GET', '/publish', websocket_handler) +app.router.add_route('POST', '/api/chat', http_handler) + +if __name__ == '__main__': + web.run_app(app, port=8080) diff --git a/test.py b/test.py new file mode 100644 index 0000000..66e4293 --- /dev/null +++ b/test.py @@ -0,0 +1,59 @@ +from ollama import Client +client = Client( + #host="https://ollama.molodetz.nl", + host='http://localhost:8080', + headers={'x-some-header': 'some-value'} +) + +def times_two(nr_1: int) -> int: + return nr_1 * 2 + +available_functions = { + 'times_two': times_two +} + +messages = [] + + +def chat_stream(message): + if message: + messages.append({'role': 'user', 'content': message}) + content = '' + for response in client.chat(model='qwen2.5-coder:0.5b', messages=messages,stream=True): + content += response.message.content + print(response.message.content,end='',flush=True) + messages.append({'role': 'assistant', 'content': content}) + print("") + +def chat(message,stream=False): + if stream: + return chat_stream(message) + if message: + messages.append({'role': 'user', 'content': message}) + response = client.chat(model='qwen2.5:3b', messages=messages, + tools=[times_two]) + if response.message.tool_calls: + # There may be multiple tool calls in the response + for tool in response.message.tool_calls: + if function_to_call := available_functions.get(tool.function.name): + print('Calling function:', tool.function.name) + print('Arguments:', tool.function.arguments) + output = function_to_call(**tool.function.arguments) + print('Function output:', output) + else: + print('Function', tool.function.name, 'not found') + +# Only needed to chat with the model using the tool call results + if response.message.tool_calls: + # Add the function response to messages for the model to use + messages.append(response.message) + messages.append({'role': 'tool', 'content': str(output), 'name': tool.function.name}) + + # Get final response from model with function outputs + return chat(None) + return response.message.content + + +while True: + chat_stream("A farmer and a sheep are standing on one side of a river. There is a boat with enough room for one human and one animal. How can the farmer get across the river with the sheep in the fewest number of trips?") +