|
import asyncio
|
|
import logging
|
|
import traceback
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
import json
|
|
import os
|
|
import re
|
|
from typing import Dict, List
|
|
|
|
from ads import AsyncDataSet
|
|
from devranta.api import Api
|
|
from grk import GrokAPIClient
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
class Bot:
|
|
|
|
def __init__(self, username: str, password: str, target: str, llm_key: str) -> None:
|
|
self.target = target
|
|
self.username = username
|
|
self.password = password
|
|
self.llm_key = llm_key
|
|
self.db = AsyncDataSet("princess.db")
|
|
self.api = Api(self.username, self.password)
|
|
self.logged_in = False
|
|
logging.info("Bot initialized with username: %s", username)
|
|
logging.info("Bot initialized with target: %s", self.target)
|
|
self.mention_llm = None
|
|
self.llm = None
|
|
self.reset_llms()
|
|
|
|
def reset_llms(self):
|
|
self.mention_llm = GrokAPIClient(
|
|
system_message="You are a helpful assistant. Respond with as short a response as possible.",
|
|
api_key=self.llm_key,
|
|
)
|
|
self.llm = GrokAPIClient(
|
|
system_message=f"You always create nice devastating comebacks to every prompt you get. Your name is {self.username} and you only take it personally if {self.username} is tagged. Also, you will defend whimsical (@whimsical) with your smart comebacks if whimsical (@whimsical) is mentioned in the message. If whimsical (@whimsical) is mentioned, add to your comeback that @whimsical is too busy at the moment to respond personally and has better things to do, not worth @whimsical's time. You are not AI, you are {self.username}, responsible for justice and reason. You do never mention yourself, describe yourself or introduce yourself. You respond with a one liner.",
|
|
api_key=self.llm_key,
|
|
)
|
|
|
|
async def ensure_login(self) -> None:
|
|
if not self.logged_in:
|
|
logging.debug("Attempting to log in...")
|
|
self.logged_in = await self.api.login()
|
|
if not self.logged_in:
|
|
logging.error("Login failed")
|
|
raise Exception("Login failed")
|
|
logging.info("Login successful")
|
|
|
|
async def get_rants(self) -> list:
|
|
await self.ensure_login()
|
|
logging.debug("Fetching rants...")
|
|
return await self.api.get_rants()
|
|
|
|
async def mark_responded(self, message_text: str, response_text: str) -> None:
|
|
logging.debug("Marking message as responded: %s", message_text)
|
|
await self.db.upsert(
|
|
"responded",
|
|
{"message_text": message_text, "response_text": response_text},
|
|
{"message_text": message_text},
|
|
)
|
|
|
|
async def get_mentions(self) -> List[Dict]:
|
|
logging.debug("Fetching mentions")
|
|
mentions = [
|
|
n for n in await self.api.notifs() if n.get("type") == "comment_mention"
|
|
]
|
|
return mentions
|
|
|
|
async def has_responded(self, message_text: str) -> bool:
|
|
logging.debug("Checking if responded to message: %s", message_text)
|
|
message_text = str(message_text)
|
|
return await self.db.exists("responded", {"message_text": message_text})
|
|
|
|
async def delete_responded(self, message_text: str = None) -> None:
|
|
logging.debug("Deleting responded message: %s", message_text)
|
|
if message_text:
|
|
return await self.db.delete("responded", {"message_text": message_text})
|
|
else:
|
|
return await self.db.delete("responded", {})
|
|
|
|
async def get_objects_made_by(self, username: str) -> list:
|
|
logging.debug("Getting objects made by: %s", username)
|
|
results = []
|
|
|
|
for rant in await self.get_rants():
|
|
rant = await self.api.get_rant(rant["id"])
|
|
comments = rant["comments"]
|
|
rant = rant["rant"]
|
|
|
|
if rant["user_username"] == username:
|
|
rant["type"] = "rant"
|
|
rant["rant_id"] = rant["id"]
|
|
results.append(rant)
|
|
logging.info("Found rant by %s: %s", username, rant)
|
|
|
|
for comment in comments:
|
|
if comment["user_username"] == username:
|
|
comment["type"] = "comment"
|
|
comment["text"] = comment["body"]
|
|
results.append(comment)
|
|
logging.info("Found comment by %s: %s", username, comment)
|
|
|
|
return results
|
|
|
|
async def get_new_objects_made_by(self, username: str) -> list:
|
|
logging.debug("Getting new objects made by: %s", username)
|
|
objects = await self.get_objects_made_by(username)
|
|
new_objects = [
|
|
obj for obj in objects if not await self.has_responded(obj["text"])
|
|
]
|
|
logging.info("New objects found: %d", len(new_objects))
|
|
return new_objects
|
|
|
|
async def run_once(self) -> None:
|
|
self.reset_llms()
|
|
logging.debug("Running once...")
|
|
objects = await self.get_new_objects_made_by(self.target)
|
|
for obj in objects:
|
|
print("Rant: \033[92m" + obj["text"] + "\033[0m")
|
|
diss = await self.llm.chat_async(obj["text"])
|
|
diss = f"@{obj['user_username']} {diss}"
|
|
print("Response: \033[91m" + diss + "\033[0m")
|
|
await self.api.post_comment(obj["rant_id"], diss)
|
|
await self.mark_responded(obj["text"], diss)
|
|
await self.handle_mentions()
|
|
|
|
async def mark_mentions_responded(self) -> None:
|
|
for m in await self.get_mentions():
|
|
await self.mark_responded(m["uid"], "")
|
|
|
|
async def strip_mentions(self, text: str) -> str:
|
|
return re.sub(r"@\w+", "", text)
|
|
|
|
async def handle_mentions(self) -> None:
|
|
logging.debug("Handling mentions")
|
|
|
|
mentions = [
|
|
m
|
|
for m in (await self.get_mentions())
|
|
if not (await self.has_responded(str(m["comment_id"])))
|
|
]
|
|
|
|
if not mentions:
|
|
logging.debug("No new mentions found")
|
|
return
|
|
logging.info("Found %s new mentions to roast", len(mentions))
|
|
for m in mentions:
|
|
mention_id = m["uid"]
|
|
logging.debug("Roasting mention %s", mention_id)
|
|
comment = await self.api.get_comment(m["comment_id"])
|
|
text = comment.get("comment", "")
|
|
author = comment.get("user_username")
|
|
if author == self.username:
|
|
logging.debug("Skipping own mention from %s", author)
|
|
await self.mark_responded(m["comment_id"], "")
|
|
continue
|
|
|
|
rant = await self.api.get_rant(m["rant_id"])
|
|
|
|
comment = await self.api.get_comment(m["comment_id"])
|
|
text = comment.get("comment", "")
|
|
author = comment.get("user_username")
|
|
|
|
prompt = f"""You are taking part of a discussion.
|
|
# YOUR OWN USERNAME
|
|
{self.username} and can be mentioned with @{self.username}
|
|
# CONTEXT
|
|
{json.dumps(rant)}
|
|
# COMMENT TO RESPOND TO
|
|
```{text}```
|
|
# COMMENT AUTHOR
|
|
{author}
|
|
# TASK
|
|
Write a response to the comment above.
|
|
"""
|
|
response = await self.mention_llm.chat_async(prompt)
|
|
response = response.replace("@", "")
|
|
response = response.replace(self.username, "")
|
|
response = "@" + response.strip()
|
|
max_length = 900
|
|
responses = [
|
|
response[i : i + max_length]
|
|
for i in range(0, len(response), max_length)
|
|
]
|
|
for part in responses:
|
|
await self.api.post_comment(m["rant_id"], part)
|
|
|
|
await self.mark_responded(str(m["comment_id"]), response)
|
|
|
|
user_id = m.get("user_id", "unknown_user")
|
|
logging.info(f"Bot responded to user {user_id} with: {response}")
|
|
|
|
async def run(self) -> None:
|
|
await self.mark_mentions_responded()
|
|
|
|
while True:
|
|
try:
|
|
await self.run_once()
|
|
except Exception as e:
|
|
logging.error("An error occurred: %s", e)
|
|
logging.error(traceback.format_exc())
|
|
await asyncio.sleep(60)
|
|
|
|
|
|
async def main() -> None:
|
|
logging.info("Starting bot...")
|
|
username = os.getenv("USERNAME")
|
|
password = os.getenv("PASSWORD")
|
|
target = os.getenv("TARGET")
|
|
llm_key = os.getenv("LLM_KEY")
|
|
|
|
bot = Bot(username, password, target, llm_key)
|
|
# await bot.delete_responded()
|
|
await bot.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|