"""
Written in 2024 by retoor@molodetz.nl.
MIT license. Enjoy!
You'll need a paid OpenAI account, named a project in it, requested an api key and created an assistant.
URL's to all these pages are described in the class for convenience.
The API keys described in this document are fake but are in the correct format for educational purposes.
How to start:
- sudo apt install python3.12-venv python3-pip -y
- python3 -m venv .venv
- . .venv/bin/activate
- pip install openapi
This file is to be used as part of your project or a standalone after doing
some modifications at the end of the file.
"""
try:
import os
import sys
sys.path.append(os.getcwd())
import env
API_KEY = env.API_KEY
ASSISTANT_ID = env.ASSISTANT_ID
except:
pass
import asyncio
import functools
from collections.abc import Generator
from typing import Optional
from openai import OpenAI
class Agent:
"""
This class translates into an instance a single user session with its own memory.
The messages property of this class is a list containing the full chat history about
what the user said and what the assistant (agent) said. This can be used in future to continue
where you left off. Format is described in the docs of __init__ function below.
Introduction API usage for if you want to extend this class:
https://platform.openai.com/docs/api-reference/introduction
"""
def __init__(
self, api_key: str, assistant_id: int, messages: Optional[list] = None
):
"""
You can find and create API keys here:
https://platform.openai.com/api-keys
You can find assistant_id (agent_id) here. It is the id that starts with 'asst_', not your custom name:
https://platform.openai.com/assistants/
Messages are optional in this format, this is to keep a message history that you can later use again:
[
{"role": "user", "message": "What is choking the chicken?"},
{"role": "assistant", "message": "Lucky for the cock."}
]
"""
self.assistant_id = assistant_id
self.api_key = api_key
self.client = OpenAI(api_key=self.api_key)
self.messages = messages or []
self.thread = self.client.beta.threads.create(messages=self.messages)
async def dalle2(
self, prompt: str, width: Optional[int] = 512, height: Optional[int] = 512
) -> dict:
"""
In my opinion dall-e-2 produces unusual results.
Sizes: 256x256, 512x512 or 1024x1024.
"""
result = self.client.images.generate(
model="dall-e-2", prompt=prompt, n=1, size=f"{width}x{height}"
)
return result
@property
async def models(self):
"""
List models in dict format. That's more convenient than the original
list method because this can be directly converted to json to be used
in your front end or api. That's not the original result which is a
custom list with unserializable models.
"""
return [
{
"id": model.id,
"owned_by": model.owned_by,
"object": model.object,
"created": model.created,
}
for model in self.client.models.list()
]
async def dalle3(
self, prompt: str, height: Optional[int] = 1024, width: Optional[int] = 1024
) -> dict:
"""
Sadly only big sizes allowed. Is more pricy.
Sizes: 1024x1024, 1792x1024, or 1024x1792.
"""
result = self.client.images.generate(
model="dall-e-3", prompt=prompt, n=1, size=f"{width}x{height}"
)
print(result)
return result
def upload_file(file_name: str, purpose: str) -> str:
with open(file_name, "rb") as file_fd:
response = self.client.files.create(file=file_fd, purpose=purpose)
return response.id
async def chat(
self, message: str, interval: Optional[float] = 0.2
) -> Generator[None, None, str]:
"""
Chat with the agent. It yields on given interval to inform the caller it' still busy so you can
update the user with live status. It doesn't hang. You can use this fully async with other
instances of this class.
This function also updates the self.messages list with chat history for later use.
"""
message_object = {"role": "user", "content": message}
self.messages.append(message_object)
self.client.beta.threads.messages.create(
self.thread.id,
role=message_object["role"],
content=message_object["content"],
)
run = self.client.beta.threads.runs.create(
thread_id=self.thread.id, assistant_id=self.assistant_id
)
while run.status != "completed":
run = self.client.beta.threads.runs.retrieve(
thread_id=self.thread.id, run_id=run.id
)
yield None
await asyncio.sleep(interval)
response_messages = self.client.beta.threads.messages.list(
thread_id=self.thread.id
).data
last_message = response_messages[0].content[0].text.value
self.messages.append({"role": "assistant", "content": last_message})
print(last_message)
yield str(last_message)
async def chatp(self, message: str) -> str:
"""
Just like regular chat function but with progress indication and returns string directly.
This is handy for interactive usage or for a process log.
"""
asyncio.get_event_loop()
print("Processing", end="")
async for message in self.chat(message):
if not message:
print(".", end="", flush=True)
continue
print("")
break
return message
async def read_line(self, ps: Optional[str] = "> "):
"""
Non blocking read_line.
Blocking read line can break web socket connections.
That's why.
"""
loop = asyncio.get_event_loop()
patched_input = functools.partial(input, ps)
return await loop.run_in_executor(None, patched_input)
async def cli(self):
"""
Interactive client. Can be used on terminal by user or a different process.
The bottom new line is so that a process can check for \n\n to check if it's end response
and there's nothing left to wait for and thus can send next prompt if the '>' shows.
"""
while True:
try:
message = await self.read_line("> ")
if not message.strip():
continue
response = await self.chatp(message)
print(response.content[0].text.value)
print("")
except KeyboardInterrupt:
print("Exiting..")
break
async def main():
"""
Example main function. The keys here are not real but look exactly like
the real ones for example purposes and that you're sure your key is in the
right format.
"""
agent = Agent(api_key=API_KEY, assistant_id=ASSISTANT_ID)
# Run interactive chat
await agent.cli()
if __name__ == "__main__":
# Only gets executed by direct execution of script. Not when important.
asyncio.run(main())