diff --git a/.gitignore b/.gitignore index 1a9f6cf..04bc632 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,40 @@ -.venv -.history -__pycache__ -*.pyc +# retoor + +# Python +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +dist/ +build/ +.eggs/ + +# Virtual environments +.venv/ +venv/ + +# Environment .env + +# IDE +.history/ +.idea/ +.vscode/ +*.swp + +# C build artifacts +*.o +*.so +*.a +*.c +*.h + +# Databases *.db -examples/crawler/devrant.sqlite-shm -examples/crawler/devrant.sqlite-wal -examples/crawler/devrant.sqlite -examples/crawler/.venv -examples/crawler/__pycache__ +*.sqlite +*.sqlite-shm +*.sqlite-wal + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md index e2a856b..68d13f4 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,45 @@ # devRanta -devRanta is the best async devRant client written in Python. Authentication is only needed for half of the functionality; thus, the username and password are optional parameters when constructing the main class of this package (Api). You can find the latest packages in tar and wheel format [here](https://retoor.molodetz.nl/retoor/devranta/packages). + +Author: retoor + +An asynchronous Python client for the devRant API. Authentication is only required for write operations; read-only endpoints work without credentials. Packages available in tar and wheel format [here](https://retoor.molodetz.nl/retoor/devranta/packages). + ## Running ``` make run ``` ## Testing -Tests are only made for methods not requireing authentication. -I do not see value in mocking requests. + +Tests cover methods not requiring authentication. + ``` make test ``` -## How to use -Implementation: -``` + +## Usage + +```python from devranta.api import Api -api = Api(username="optional!", password="optional!") + +api = Api(username="optional", password="optional") + async def list_rants(): async for rant in api.get_rants(): print(rant["user_username"], ":", rant["text"]) ``` -See [tests](src/devranta/tests.py) for [examples](src/devranta/tests.py) on how to use. + +See [tests](src/devranta/tests.py) for additional examples. + +## Examples + +| Example | Description | +|---------|-------------| +| [crawler](examples/crawler/) | Asynchronous data collection with producer-consumer architecture | +| [princess](examples/princess/) | Automated response bot with LLM integration | + # devRant API Documentation -For people wanting to build their own client. -TODO: document responses. + +Reference for building custom clients. ## Base URL `https://devrant.com/api` ## Authentication diff --git a/examples/crawler/Makefile b/examples/crawler/Makefile index 2c74caf..0fe3775 100644 --- a/examples/crawler/Makefile +++ b/examples/crawler/Makefile @@ -1,10 +1,10 @@ +# retoor .PHONY: all env install run clean all: env install run env: python3 -m venv .venv - . install: . .venv/bin/activate && pip install -r requirements.txt diff --git a/examples/crawler/README.md b/examples/crawler/README.md index d6e85ac..f46729e 100644 --- a/examples/crawler/README.md +++ b/examples/crawler/README.md @@ -1,34 +1,122 @@ -# Example Crawler Project +# devRant Exhaustive Crawler -This is a simple example crawler project. Follow the instructions below to set up and run the crawler. +Author: retoor -## Setup +An asynchronous crawler for comprehensive data collection from the devRant platform. Implements a producer-consumer architecture with multiple discovery strategies to maximize content coverage. -1. Clone the repository or copy the project files to your local machine. -2. Make sure you have Python 3 installed. +## SSL Note + +The devRant API SSL certificate is expired. This crawler disables SSL verification to maintain connectivity. This is handled automatically by the API client. + +## Architecture + +The crawler employs four concurrent producers feeding into worker pools: + +| Producer | Strategy | Interval | +|----------|----------|----------| +| Recent | Paginate through recent rants | 2s | +| Top | Paginate through top-rated rants | 5s | +| Algo | Paginate through algorithm-sorted rants | 5s | +| Search | Cycle through 48 programming-related search terms | 30s | + +Worker pools process discovered content: +- 10 rant consumers fetch rant details and extract comments +- 5 user consumers fetch profiles and discover associated rants + +Discovery graph: rants reveal users, users reveal more rants (from their profile, upvoted, favorites). + +## Data Storage + +Uses SQLite via the dataset library with: +- Batched writes (100 items or 5s interval) +- Automatic upsert for deduplication +- Indexes on user_id, created_time, rant_id +- State persistence for resume capability + +### Schema + +**rants**: id, user_id, text, score, created_time, num_comments, attached_image_url, tags, link, vote_state, user_username, user_score + +**comments**: id, rant_id, user_id, body, score, created_time, vote_state, user_username, user_score + +**users**: id, username, score, about, location, created_time, skills, github, website + +**crawler_state**: Persists producer positions (skip values, search term index) ## Usage -1. Open a terminal in the project directory. -2. Run `make` to set up the environment, install dependencies, and start the crawler: +### Quick Start ```bash make ``` -This will create a virtual environment, install the package in editable mode from the parent directory, and run the main script. +This creates a virtual environment, installs dependencies, and starts the crawler. + +### Manual Setup + +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -e ../../. +pip install -r requirements.txt +python main.py +``` + +### Stopping + +Press `Ctrl+C` for graceful shutdown. The crawler will: +1. Save current state to database +2. Wait up to 30 seconds for queues to drain +3. Flush remaining batched writes + +### Resuming + +Simply run again. The crawler loads saved state and continues from where it stopped. + +## Configuration + +Edit `main.py` to adjust: + +```python +DB_FILE = "devrant.sqlite" +CONCURRENT_RANT_CONSUMERS = 10 +CONCURRENT_USER_CONSUMERS = 5 +BATCH_SIZE = 100 +FLUSH_INTERVAL = 5.0 +``` + +## Output + +The crawler logs statistics every 15 seconds: + +``` +[STATS] Rants Q'd/Proc: 1250/1200 | Users Q'd/Proc: 450/400 | Comments DB: 5600 | Queues (R/U): 50/50 | API Errors: 0 +``` ## Cleanup -To remove the virtual environment, run: - ```bash make clean ``` -## Notes +Removes the virtual environment. Database file (`devrant.sqlite`) is preserved. -- The project installs the package with `-e ../../.` to include the parent package `devranta` in editable mode. -- Ensure that the parent package is correctly set up in the directory structure. +## Requirements -Happy crawling! \ No newline at end of file +- Python 3.10+ +- dataset +- aiohttp (via parent devranta package) + +## File Structure + +``` +crawler/ +├── main.py # Entry point, configuration +├── crawler.py # Producer-consumer implementation +├── database.py # Dataset wrapper with batch queue +├── requirements.txt # Dependencies +├── Makefile # Build automation +├── .venv/ # Virtual environment (created on first run) +└── devrant.sqlite # SQLite database (created on first run) +``` diff --git a/examples/crawler/crawler.py b/examples/crawler/crawler.py index e6fc14f..025dd8d 100644 --- a/examples/crawler/crawler.py +++ b/examples/crawler/crawler.py @@ -1,12 +1,50 @@ +# retoor import asyncio +import json import logging +import time +from collections import OrderedDict from typing import Set from database import DatabaseManager - from devranta.api import Api, Rant +SEARCH_TERMS = [ + "python", "javascript", "java", "csharp", "golang", "rust", + "react", "angular", "vue", "node", "docker", "kubernetes", + "linux", "windows", "macos", "git", "github", "gitlab", + "sql", "mongodb", "redis", "api", "rest", "graphql", + "bug", "error", "crash", "debug", "fix", "issue", + "manager", "deadline", "meeting", "standup", "agile", "scrum", + "frontend", "backend", "fullstack", "devops", "cloud", "aws", + "typescript", "php", "ruby", "swift", "kotlin", "flutter", +] + + +class BoundedSeenSet: + def __init__(self, maxsize: int = 100000): + self._set = OrderedDict() + self._maxsize = maxsize + + def add(self, item: int): + if item in self._set: + self._set.move_to_end(item) + else: + self._set[item] = None + if len(self._set) > self._maxsize: + self._set.popitem(last=False) + + def __contains__(self, item: int) -> bool: + return item in self._set + + def __len__(self) -> int: + return len(self._set) + + def clear(self): + self._set.clear() + + class DevRantCrawler: def __init__( self, api: Api, db: DatabaseManager, rant_consumers: int, user_consumers: int @@ -20,8 +58,14 @@ class DevRantCrawler: self.num_rant_consumers = rant_consumers self.num_user_consumers = user_consumers - self.seen_rant_ids: Set[int] = set() - self.seen_user_ids: Set[int] = set() + self.seen_rant_ids = BoundedSeenSet(maxsize=100000) + self.seen_user_ids = BoundedSeenSet(maxsize=100000) + + self._recent_skip = 0 + self._top_skip = 0 + self._algo_skip = 0 + self._search_term_index = 0 + self.stats = { "rants_processed": 0, "rants_added_to_db": 0, @@ -74,29 +118,58 @@ class DevRantCrawler: f"Seeder finished: Queued {len(user_ids)} users to kickstart exploration." ) + async def _save_state(self): + state = { + "recent_skip": self._recent_skip, + "top_skip": self._top_skip, + "algo_skip": self._algo_skip, + "search_term_index": self._search_term_index, + "last_saved": int(time.time()), + } + await self.db.save_crawler_state("producer_state", json.dumps(state)) + logging.debug("Crawler state saved.") + + async def _load_state(self): + state_json = await self.db.load_crawler_state("producer_state") + if state_json: + try: + state = json.loads(state_json) + self._recent_skip = state.get("recent_skip", 0) + self._top_skip = state.get("top_skip", 0) + self._algo_skip = state.get("algo_skip", 0) + self._search_term_index = state.get("search_term_index", 0) + logging.info(f"Loaded crawler state: {state}") + except json.JSONDecodeError: + logging.warning("Failed to decode crawler state, starting fresh.") + + async def _state_saver(self): + logging.info("State saver started.") + while not self.shutdown_event.is_set(): + await asyncio.sleep(60) + await self._save_state() + async def _rant_producer(self): - logging.info("Rant producer started.") - skip = 0 + logging.info("Recent rant producer started.") consecutive_empty_responses = 0 while not self.shutdown_event.is_set(): try: - logging.info(f"Producer: Fetching rants with skip={skip}...") - rants = await self.api.get_rants(sort="recent", limit=50, skip=skip) + logging.debug(f"Recent producer: Fetching rants with skip={self._recent_skip}...") + rants = await self.api.get_rants(sort="recent", limit=50, skip=self._recent_skip) self.stats["producer_loops"] += 1 if not rants: consecutive_empty_responses += 1 - logging.info( - f"Producer: Feed returned empty. Consecutive empty hits: {consecutive_empty_responses}." + logging.debug( + f"Recent producer: Feed returned empty. Consecutive empty hits: {consecutive_empty_responses}." ) if consecutive_empty_responses >= 5: self.stats["end_of_feed_hits"] += 1 logging.info( - "Producer: End of feed likely reached. Pausing for 15 minutes before reset." + "Recent producer: End of feed likely reached. Pausing for 15 minutes before reset." ) await asyncio.sleep(900) - skip = 0 + self._recent_skip = 0 consecutive_empty_responses = 0 else: await asyncio.sleep(10) @@ -108,25 +181,98 @@ class DevRantCrawler: await self._queue_rant_if_new(rant) new_rants_found += 1 - logging.info( - f"Producer: Processed {new_rants_found} rants from feed. Total queued: {self.stats['rants_queued']}." + logging.debug( + f"Recent producer: Processed {new_rants_found} rants from feed." ) - skip += len(rants) + self._recent_skip += len(rants) await asyncio.sleep(2) except Exception as e: - logging.critical( - f"Producer: Unhandled exception: {e}. Retrying in 60s." + logging.error( + f"Recent producer: Unhandled exception: {e}. Retrying in 60s." ) self.stats["api_errors"] += 1 await asyncio.sleep(60) + async def _top_rant_producer(self): + logging.info("Top rant producer started.") + + while not self.shutdown_event.is_set(): + try: + logging.debug(f"Top producer: Fetching rants with skip={self._top_skip}...") + rants = await self.api.get_rants(sort="top", limit=50, skip=self._top_skip) + + if not rants: + logging.info("Top producer: End of feed reached. Resetting after 1 hour.") + self._top_skip = 0 + await asyncio.sleep(3600) + continue + + for rant in rants: + await self._queue_rant_if_new(rant) + + logging.debug(f"Top producer: Processed {len(rants)} rants.") + self._top_skip += len(rants) + await asyncio.sleep(5) + + except Exception as e: + logging.error(f"Top producer: Unhandled exception: {e}. Retrying in 60s.") + self.stats["api_errors"] += 1 + await asyncio.sleep(60) + + async def _algo_rant_producer(self): + logging.info("Algo rant producer started.") + + while not self.shutdown_event.is_set(): + try: + logging.debug(f"Algo producer: Fetching rants with skip={self._algo_skip}...") + rants = await self.api.get_rants(sort="algo", limit=50, skip=self._algo_skip) + + if not rants: + logging.info("Algo producer: End of feed reached. Resetting after 1 hour.") + self._algo_skip = 0 + await asyncio.sleep(3600) + continue + + for rant in rants: + await self._queue_rant_if_new(rant) + + logging.debug(f"Algo producer: Processed {len(rants)} rants.") + self._algo_skip += len(rants) + await asyncio.sleep(5) + + except Exception as e: + logging.error(f"Algo producer: Unhandled exception: {e}. Retrying in 60s.") + self.stats["api_errors"] += 1 + await asyncio.sleep(60) + + async def _search_producer(self): + logging.info("Search producer started.") + + while not self.shutdown_event.is_set(): + try: + term = SEARCH_TERMS[self._search_term_index % len(SEARCH_TERMS)] + logging.debug(f"Search producer: Searching for '{term}'...") + rants = await self.api.search(term) + + for rant in rants: + await self._queue_rant_if_new(rant) + + logging.debug(f"Search producer: Found {len(rants)} rants for '{term}'.") + self._search_term_index += 1 + await asyncio.sleep(30) + + except Exception as e: + logging.error(f"Search producer: Unhandled exception: {e}. Retrying in 60s.") + self.stats["api_errors"] += 1 + await asyncio.sleep(60) + async def _rant_consumer(self, worker_id: int): logging.info(f"Rant consumer #{worker_id} started.") while not self.shutdown_event.is_set(): try: - rant_id = await self.rant_queue.get() - logging.info( + rant_id = await asyncio.wait_for(self.rant_queue.get(), timeout=5.0) + logging.debug( f"Rant consumer #{worker_id}: Processing rant ID {rant_id}." ) @@ -146,22 +292,27 @@ class DevRantCrawler: self.stats["comments_added_to_db"] += 1 await self._queue_user_if_new(comment["user_id"]) - logging.info( + logging.debug( f"Rant consumer #{worker_id}: Finished processing rant {rant_id}, found {len(comments)} comments." ) self.stats["rants_processed"] += 1 self.rant_queue.task_done() + except asyncio.TimeoutError: + continue except Exception as e: logging.error(f"Rant consumer #{worker_id}: Unhandled exception: {e}") - self.rant_queue.task_done() + try: + self.rant_queue.task_done() + except ValueError: + pass async def _user_consumer(self, worker_id: int): logging.info(f"User consumer #{worker_id} started.") while not self.shutdown_event.is_set(): try: - user_id = await self.user_queue.get() - logging.info( + user_id = await asyncio.wait_for(self.user_queue.get(), timeout=5.0) + logging.debug( f"User consumer #{worker_id}: Processing user ID {user_id}." ) @@ -183,14 +334,20 @@ class DevRantCrawler: await self._queue_rant_if_new(rant_obj) rants_found_on_profile += 1 - logging.info( + logging.debug( f"User consumer #{worker_id}: Finished user {user_id}, found and queued {rants_found_on_profile} associated rants." ) self.stats["users_processed"] += 1 self.user_queue.task_done() + + except asyncio.TimeoutError: + continue except Exception as e: logging.error(f"User consumer #{worker_id}: Unhandled exception: {e}") - self.user_queue.task_done() + try: + self.user_queue.task_done() + except ValueError: + pass async def _stats_reporter(self): logging.info("Stats reporter started.") @@ -206,13 +363,18 @@ class DevRantCrawler: async def run(self): logging.info("Exhaustive crawler starting...") + await self._load_state() await self._initial_seed() logging.info("Starting main producer and consumer tasks...") tasks = [] try: tasks.append(asyncio.create_task(self._rant_producer())) + tasks.append(asyncio.create_task(self._top_rant_producer())) + tasks.append(asyncio.create_task(self._algo_rant_producer())) + tasks.append(asyncio.create_task(self._search_producer())) tasks.append(asyncio.create_task(self._stats_reporter())) + tasks.append(asyncio.create_task(self._state_saver())) for i in range(self.num_rant_consumers): tasks.append(asyncio.create_task(self._rant_consumer(i + 1))) @@ -232,6 +394,8 @@ class DevRantCrawler: logging.info("Shutting down... sending signal to all tasks.") self.shutdown_event.set() + await self._save_state() + logging.info("Waiting for queues to empty... Press Ctrl+C again to force exit.") try: await asyncio.wait_for(self.rant_queue.join(), timeout=30) diff --git a/examples/crawler/database.py b/examples/crawler/database.py index dbcafbf..e55f1ba 100644 --- a/examples/crawler/database.py +++ b/examples/crawler/database.py @@ -1,130 +1,226 @@ +# retoor +import asyncio +import json import logging -from typing import List +from typing import Any, Dict, List, Optional -import aiosqlite +import dataset from devranta.api import Comment, Rant, UserProfile class DatabaseManager: - def __init__(self, db_path: str): + def __init__(self, db_path: str, batch_size: int = 100, flush_interval: float = 5.0): self.db_path = db_path - self._conn: aiosqlite.Connection | None = None + self.batch_size = batch_size + self.flush_interval = flush_interval + self._db: Optional[dataset.Database] = None + self._rant_batch: List[Dict[str, Any]] = [] + self._comment_batch: List[Dict[str, Any]] = [] + self._user_batch: List[Dict[str, Any]] = [] + self._flush_task: Optional[asyncio.Task] = None + self._lock = asyncio.Lock() async def __aenter__(self): logging.info(f"Connecting to database at {self.db_path}...") - self._conn = await aiosqlite.connect(self.db_path) - await self._conn.execute("PRAGMA journal_mode=WAL;") - await self._conn.execute("PRAGMA foreign_keys=ON;") - await self.create_tables() + self._db = dataset.connect( + f"sqlite:///{self.db_path}?check_same_thread=False", + engine_kwargs={"connect_args": {"check_same_thread": False}} + ) + await self._create_indexes() + self._flush_task = asyncio.create_task(self._periodic_flush()) logging.info("Database connection successful.") return self async def __aexit__(self, exc_type, exc_val, exc_tb): - if self._conn: - await self._conn.close() - logging.info("Database connection closed.") + if self._flush_task: + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + await self.flush_all() + if self._db: + self._db.close() + logging.info("Database connection closed.") - async def create_tables(self): - logging.info("Ensuring database tables exist...") - await self._conn.executescript( - """ - CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY, - username TEXT NOT NULL UNIQUE, - score INTEGER, - about TEXT, - location TEXT, - created_time INTEGER, - skills TEXT, - github TEXT, - website TEXT - ); - CREATE TABLE IF NOT EXISTS rants ( - id INTEGER PRIMARY KEY, - user_id INTEGER, - text TEXT, - score INTEGER, - created_time INTEGER, - num_comments INTEGER - ); - CREATE TABLE IF NOT EXISTS comments ( - id INTEGER PRIMARY KEY, - rant_id INTEGER, - user_id INTEGER, - body TEXT, - score INTEGER, - created_time INTEGER - ); - """ - ) - await self._conn.commit() - logging.info("Table schema verified.") + async def _create_indexes(self): + def _sync_create(): + self._db.query("CREATE INDEX IF NOT EXISTS idx_rants_user_id ON rants(user_id)") + self._db.query("CREATE INDEX IF NOT EXISTS idx_rants_created_time ON rants(created_time)") + self._db.query("CREATE INDEX IF NOT EXISTS idx_comments_rant_id ON comments(rant_id)") + self._db.query("CREATE INDEX IF NOT EXISTS idx_comments_user_id ON comments(user_id)") + self._db.query("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)") + await asyncio.to_thread(_sync_create) + logging.info("Database indexes verified.") + + async def _periodic_flush(self): + while True: + await asyncio.sleep(self.flush_interval) + await self.flush_all() + + async def flush_all(self): + async with self._lock: + await self._flush_rants() + await self._flush_comments() + await self._flush_users() + + async def _flush_rants(self): + if not self._rant_batch: + return + batch = self._rant_batch.copy() + self._rant_batch.clear() + + def _sync_insert(): + table = self._db["rants"] + for rant in batch: + table.upsert(rant, ["id"]) + + await asyncio.to_thread(_sync_insert) + logging.debug(f"Flushed {len(batch)} rants to database") + + async def _flush_comments(self): + if not self._comment_batch: + return + batch = self._comment_batch.copy() + self._comment_batch.clear() + + def _sync_insert(): + table = self._db["comments"] + for comment in batch: + table.upsert(comment, ["id"]) + + await asyncio.to_thread(_sync_insert) + logging.debug(f"Flushed {len(batch)} comments to database") + + async def _flush_users(self): + if not self._user_batch: + return + batch = self._user_batch.copy() + self._user_batch.clear() + + def _sync_insert(): + table = self._db["users"] + for user in batch: + table.upsert(user, ["id"]) + + await asyncio.to_thread(_sync_insert) + logging.debug(f"Flushed {len(batch)} users to database") + + def _transform_rant(self, rant: Rant) -> Dict[str, Any]: + attached_image = rant.get("attached_image") + image_url = None + if isinstance(attached_image, dict): + image_url = attached_image.get("url") + elif isinstance(attached_image, str): + image_url = attached_image + + tags = rant.get("tags", []) + tags_str = json.dumps(tags) if tags else None + + return { + "id": rant["id"], + "user_id": rant["user_id"], + "text": rant["text"], + "score": rant["score"], + "created_time": rant["created_time"], + "num_comments": rant["num_comments"], + "attached_image_url": image_url, + "tags": tags_str, + "link": rant.get("link"), + "vote_state": rant.get("vote_state"), + "user_username": rant.get("user_username"), + "user_score": rant.get("user_score"), + } + + def _transform_comment(self, comment: Comment) -> Dict[str, Any]: + return { + "id": comment["id"], + "rant_id": comment["rant_id"], + "user_id": comment["user_id"], + "body": comment["body"], + "score": comment["score"], + "created_time": comment["created_time"], + "vote_state": comment.get("vote_state"), + "user_username": comment.get("user_username"), + "user_score": comment.get("user_score"), + } + + def _transform_user(self, user: UserProfile, user_id: int) -> Dict[str, Any]: + return { + "id": user_id, + "username": user["username"], + "score": user["score"], + "about": user.get("about"), + "location": user.get("location"), + "created_time": user.get("created_time"), + "skills": user.get("skills"), + "github": user.get("github"), + "website": user.get("website"), + } async def add_rant(self, rant: Rant): - await self._conn.execute( - "INSERT OR IGNORE INTO rants (id, user_id, text, score, created_time, num_comments) VALUES (?, ?, ?, ?, ?, ?)", - ( - rant["id"], - rant["user_id"], - rant["text"], - rant["score"], - rant["created_time"], - rant["num_comments"], - ), - ) - await self._conn.commit() + async with self._lock: + self._rant_batch.append(self._transform_rant(rant)) + if len(self._rant_batch) >= self.batch_size: + await self._flush_rants() async def add_comment(self, comment: Comment): - await self._conn.execute( - "INSERT OR IGNORE INTO comments (id, rant_id, user_id, body, score, created_time) VALUES (?, ?, ?, ?, ?, ?)", - ( - comment["id"], - comment["rant_id"], - comment["user_id"], - comment["body"], - comment["score"], - comment["created_time"], - ), - ) - await self._conn.commit() + async with self._lock: + self._comment_batch.append(self._transform_comment(comment)) + if len(self._comment_batch) >= self.batch_size: + await self._flush_comments() async def add_user(self, user: UserProfile, user_id: int): - await self._conn.execute( - "INSERT OR IGNORE INTO users (id, username, score, about, location, created_time, skills, github, website) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - ( - user_id, - user["username"], - user["score"], - user["about"], - user["location"], - user["created_time"], - user["skills"], - user["github"], - user["website"], - ), - ) - await self._conn.commit() + async with self._lock: + self._user_batch.append(self._transform_user(user, user_id)) + if len(self._user_batch) >= self.batch_size: + await self._flush_users() async def rant_exists(self, rant_id: int) -> bool: - async with self._conn.execute( - "SELECT 1 FROM rants WHERE id = ? LIMIT 1", (rant_id,) - ) as cursor: - return await cursor.fetchone() is not None + def _sync_check(): + table = self._db["rants"] + return table.find_one(id=rant_id) is not None + return await asyncio.to_thread(_sync_check) async def user_exists(self, user_id: int) -> bool: - async with self._conn.execute( - "SELECT 1 FROM users WHERE id = ? LIMIT 1", (user_id,) - ) as cursor: - return await cursor.fetchone() is not None + def _sync_check(): + table = self._db["users"] + return table.find_one(id=user_id) is not None + return await asyncio.to_thread(_sync_check) async def get_random_user_ids(self, limit: int) -> List[int]: - logging.info( - f"Fetching up to {limit} random user IDs from database for seeding..." - ) - query = "SELECT id FROM users ORDER BY RANDOM() LIMIT ?" - async with self._conn.execute(query, (limit,)) as cursor: - rows = await cursor.fetchall() - user_ids = [row[0] for row in rows] - logging.info(f"Found {len(user_ids)} user IDs to seed.") - return user_ids + logging.info(f"Fetching up to {limit} random user IDs from database for seeding...") + + def _sync_fetch(): + result = self._db.query(f"SELECT id FROM users ORDER BY RANDOM() LIMIT {limit}") + return [row["id"] for row in result] + + user_ids = await asyncio.to_thread(_sync_fetch) + logging.info(f"Found {len(user_ids)} user IDs to seed.") + return user_ids + + async def get_all_rant_ids(self) -> List[int]: + def _sync_fetch(): + result = self._db.query("SELECT id FROM rants") + return [row["id"] for row in result] + return await asyncio.to_thread(_sync_fetch) + + async def get_all_user_ids(self) -> List[int]: + def _sync_fetch(): + result = self._db.query("SELECT id FROM users") + return [row["id"] for row in result] + return await asyncio.to_thread(_sync_fetch) + + async def save_crawler_state(self, key: str, value: str): + def _sync_save(): + table = self._db["crawler_state"] + table.upsert({"key": key, "value": value}, ["key"]) + await asyncio.to_thread(_sync_save) + + async def load_crawler_state(self, key: str) -> Optional[str]: + def _sync_load(): + table = self._db["crawler_state"] + row = table.find_one(key=key) + return row["value"] if row else None + return await asyncio.to_thread(_sync_load) diff --git a/examples/crawler/main.py b/examples/crawler/main.py index 18c6c6f..1198175 100644 --- a/examples/crawler/main.py +++ b/examples/crawler/main.py @@ -1,45 +1,47 @@ -# main.py +# retoor import asyncio import logging import signal from crawler import DevRantCrawler from database import DatabaseManager - from devranta.api import Api -# --- Configuration --- + DB_FILE = "devrant.sqlite" -CONCURRENT_RANT_CONSUMERS = 10 # How many rants to process at once -CONCURRENT_USER_CONSUMERS = 5 # How many user profiles to fetch at once +CONCURRENT_RANT_CONSUMERS = 10 +CONCURRENT_USER_CONSUMERS = 5 +BATCH_SIZE = 100 +FLUSH_INTERVAL = 5.0 async def main(): - """Initializes and runs the crawler.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) - api = Api() - - async with DatabaseManager(DB_FILE) as db: - crawler = DevRantCrawler( - api=api, - db=db, - rant_consumers=CONCURRENT_RANT_CONSUMERS, - user_consumers=CONCURRENT_USER_CONSUMERS, - ) - - # Set up a signal handler for graceful shutdown on Ctrl+C - loop = asyncio.get_running_loop() - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler( - sig, lambda s=sig: asyncio.create_task(crawler.shutdown()) + async with Api() as api: + async with DatabaseManager( + DB_FILE, + batch_size=BATCH_SIZE, + flush_interval=FLUSH_INTERVAL, + ) as db: + crawler = DevRantCrawler( + api=api, + db=db, + rant_consumers=CONCURRENT_RANT_CONSUMERS, + user_consumers=CONCURRENT_USER_CONSUMERS, ) - await crawler.run() + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, lambda s=sig: asyncio.create_task(crawler.shutdown()) + ) + + await crawler.run() if __name__ == "__main__": diff --git a/examples/crawler/requirements.txt b/examples/crawler/requirements.txt index efa2b38..caba867 100644 --- a/examples/crawler/requirements.txt +++ b/examples/crawler/requirements.txt @@ -1 +1 @@ -aiosqlite +dataset diff --git a/examples/princess/README.md b/examples/princess/README.md index 739b579..963bfce 100644 --- a/examples/princess/README.md +++ b/examples/princess/README.md @@ -1,42 +1,55 @@ -# Princess Bot - Usage and Configuration Guide +# Princess Bot + +Author: retoor + +An automated social media interaction bot for the devRant platform. Monitors a target user's posts and generates LLM-powered responses. ## Overview -Princess.py is an automated social media interaction bot designed to monitor and respond to specific user-generated content (rants and comments) on a platform. It fetches new posts made by a target user, generates witty or devastating responses using a language model, and keeps track of responded messages to avoid duplicates. +Princess Bot monitors rants and comments from a specified user on devRant, generates contextual responses using the Grok language model, and posts replies automatically. The bot maintains state to prevent duplicate responses. -The bot operates continuously, periodically checking for new content and replying accordingly. +## Architecture ---- +The bot operates on a polling model with the following components: -## How It Works +| Component | Description | +|-----------|-------------| +| Api | devRant API client for authentication and content retrieval | +| GrokAPIClient | LLM integration for response generation | +| AsyncDataSet | Async SQLite wrapper for state persistence | -1. **Initialization**: The bot initializes with user credentials, target username, and API keys. -2. **Login**: It logs into the platform via the provided API. -3. **Content Monitoring**: It fetches recent rants and comments made by the target user. -4. **Response Generation**: For new content (not responded to before), it generates a response using a language model (GrokAPIClient). -5. **Response Posting**: It prints the content and the generated reply. -6. **Tracking**: It records responded messages in a local database to prevent duplicate responses. -7. **Loop**: It repeats this process every 60 seconds. +## Usage ---- +### Quick Start -## Configuration +```bash +make +``` -The script uses a `.env` file to manage sensitive credentials and configurable properties. Below are the supported environment variables: +This creates a virtual environment, installs dependencies, and starts the bot. -### Required Environment Variables +### Manual Setup -| Property | Description | Example | -|----------------------|----------------------------------------------------------|-------------------------------------------| -| `USERNAME` | Your platform username. | `my_username` | -| `PASSWORD` | Your platform password. | `my_password` | -| `TARGET` | The username of the user to monitor. | `target_user` | -| `LLM_KEY` | API key for the language model (Grok API). | `your-grok-api-key` | +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -e ../../. +pip install -r requirements.txt +python princess.py +``` -## Setup Instructions +### Configuration -1. **Create a `.env` file** in the same directory as `princess.py`. -2. **Add the required variables** with your credentials and target info: +Create a `.env` file with the following variables: + +| Variable | Description | +|----------|-------------| +| `USERNAME` | devRant account username | +| `PASSWORD` | devRant account password | +| `TARGET` | Username of the user to monitor | +| `LLM_KEY` | API key for Grok language model | + +Example: ```env USERNAME=your_username @@ -45,35 +58,42 @@ TARGET=target_username LLM_KEY=your_grok_api_key ``` -3. **Install dependencies** (if not already installed): +### Stopping + +Press `Ctrl+C` to terminate the bot. + +## Data Storage + +Uses SQLite via AsyncDataSet with: + +- Responded message tracking for deduplication +- Persistent state across restarts + +## Requirements + +- Python 3.10+ +- python-dotenv +- aiosqlite +- aiohttp (via parent devranta package) + +## Cleanup ```bash -pip install python-dotenv +make clean ``` -4. **Run the script**: +Removes the virtual environment. Database file (`princess.db`) is preserved. + +## File Structure -```bash -python princess.py ``` - ---- - -## Notes - -- The bot stores responded messages in a local SQLite database (`princess.db`) to avoid duplicate responses. -- It runs indefinitely, checking for new content every 60 seconds. -- Make sure your API keys and credentials are kept secure and not shared publicly. - ---- - -## Summary - -Princess.py is a social media response bot that: - -- Monitors a specific user's posts. -- Generates witty responses using a language model. -- Keeps track of responses to prevent duplicates. -- Runs continuously with minimal setup. - - +princess/ +├── princess.py # Main bot implementation +├── ads.py # AsyncDataSet database wrapper +├── grk.py # Grok API client +├── requirements.txt # Dependencies +├── Makefile # Build automation +├── .env # Configuration (create manually) +├── .venv/ # Virtual environment (created on first run) +└── princess.db # SQLite database (created on first run) +``` diff --git a/src/devranta/api.py b/src/devranta/api.py index 0c10ccf..6e25e25 100644 --- a/src/devranta/api.py +++ b/src/devranta/api.py @@ -1,5 +1,7 @@ +# retoor from __future__ import annotations +import ssl from enum import Enum from typing import Any, Dict, List, Literal, Optional, TypedDict, Union @@ -119,7 +121,40 @@ class Api: self.user_id: Optional[int] = None self.token_id: Optional[int] = None self.token_key: Optional[str] = None - self.session: Optional[aiohttp.ClientSession] = None + self._session: Optional[aiohttp.ClientSession] = None + self._owns_session: bool = False + + async def __aenter__(self): + """Async context manager entry - creates shared HTTP session.""" + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector = aiohttp.TCPConnector(ssl=ssl_context) + self._session = aiohttp.ClientSession(connector=connector) + self._owns_session = True + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit - closes shared HTTP session.""" + await self.close() + + async def _get_session(self) -> aiohttp.ClientSession: + """Returns or creates a shared HTTP session for connection reuse.""" + if self._session is None: + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + connector = aiohttp.TCPConnector(ssl=ssl_context) + self._session = aiohttp.ClientSession(connector=connector) + self._owns_session = True + return self._session + + async def close(self): + """Closes the HTTP session if owned by this instance.""" + if self._session and self._owns_session: + await self._session.close() + self._session = None + self._owns_session = False def patch_auth( self, request_dict: Optional[Dict[str, Any]] = None @@ -177,25 +212,25 @@ class Api: """ if not self.username or not self.password: raise Exception("No authentication details supplied.") - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url("users/auth-token"), - data={ - "username": self.username, - "password": self.password, - "app": self.app_id, - }, - ) - obj: LoginResponse = await response.json() - if not obj.get("success"): - return False - self.auth = obj.get("auth_token") - if not self.auth: - return False - self.user_id = self.auth.get("user_id") - self.token_id = self.auth.get("id") - self.token_key = self.auth.get("key") - return bool(self.auth) + session = await self._get_session() + response = await session.post( + url=self.patch_url("users/auth-token"), + data={ + "username": self.username, + "password": self.password, + "app": self.app_id, + }, + ) + obj: LoginResponse = await response.json() + if not obj.get("success"): + return False + self.auth = obj.get("auth_token") + if not self.auth: + return False + self.user_id = self.auth.get("user_id") + self.token_id = self.auth.get("id") + self.token_key = self.auth.get("key") + return bool(self.auth) async def ensure_login(self) -> bool: """Ensures the user is logged in before making a request.""" @@ -224,20 +259,20 @@ class Api: } ``` """ - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url(f"users"), - data=self.patch_auth( - { - "email": email, - "username": username, - "password": password, - "plat": 3, - } - ), - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.post( + url=self.patch_url("users"), + data=self.patch_auth( + { + "email": email, + "username": username, + "password": password, + "plat": 3, + } + ), + ) + obj = await response.json() + return obj.get("success", False) async def get_comments_from_user(self, username: str) -> List[Comment]: """ @@ -270,13 +305,13 @@ class Api: """ if not await self.ensure_login(): return False - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url(f"devrant/rants/{rant_id}/comments"), - data=self.patch_auth({"comment": comment, "plat": 2}), - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.post( + url=self.patch_url(f"devrant/rants/{rant_id}/comments"), + data=self.patch_auth({"comment": comment, "plat": 2}), + ) + obj = await response.json() + return obj.get("success", False) async def get_comment(self, id_: int) -> Optional[Comment]: """ @@ -288,12 +323,12 @@ class Api: Returns: Optional[Comment]: A dictionary representing the comment, or None if not found. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url(f"comments/{id_}"), params=self.patch_auth() - ) - obj = await response.json() - return obj.get("comment") if obj.get("success") else None + session = await self._get_session() + response = await session.get( + url=self.patch_url(f"comments/{id_}"), params=self.patch_auth() + ) + obj = await response.json() + return obj.get("comment") if obj.get("success") else None async def delete_comment(self, id_: int) -> bool: """ @@ -307,12 +342,12 @@ class Api: """ if not await self.ensure_login(): return False - async with aiohttp.ClientSession() as session: - response = await session.delete( - url=self.patch_url(f"comments/{id_}"), params=self.patch_auth() - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.delete( + url=self.patch_url(f"comments/{id_}"), params=self.patch_auth() + ) + obj = await response.json() + return obj.get("success", False) async def get_profile(self, id_: int) -> Optional[UserProfile]: """ @@ -324,12 +359,12 @@ class Api: Returns: Optional[UserProfile]: A dictionary with the user's profile data. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url(f"users/{id_}"), params=self.patch_auth() - ) - obj = await response.json() - return obj.get("profile") if obj.get("success") else None + session = await self._get_session() + response = await session.get( + url=self.patch_url(f"users/{id_}"), params=self.patch_auth() + ) + obj = await response.json() + return obj.get("profile") if obj.get("success") else None async def search(self, term: str) -> List[Rant]: """ @@ -341,13 +376,13 @@ class Api: Returns: List[Rant]: A list of rant objects from the search results. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url("devrant/search"), - params=self.patch_auth({"term": term}), - ) - obj = await response.json() - return obj.get("results", []) if obj.get("success") else [] + session = await self._get_session() + response = await session.get( + url=self.patch_url("devrant/search"), + params=self.patch_auth({"term": term}), + ) + obj = await response.json() + return obj.get("results", []) if obj.get("success") else [] async def get_rant(self, id: int) -> Dict[str, Any]: """ @@ -359,12 +394,12 @@ class Api: Returns: Dict[str, Any]: The full API response object. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - self.patch_url(f"devrant/rants/{id}"), - params=self.patch_auth(), - ) - return await response.json() + session = await self._get_session() + response = await session.get( + self.patch_url(f"devrant/rants/{id}"), + params=self.patch_auth(), + ) + return await response.json() async def get_rants( self, sort: str = "recent", limit: int = 20, skip: int = 0 @@ -380,13 +415,13 @@ class Api: Returns: List[Rant]: A list of rant objects. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url("devrant/rants"), - params=self.patch_auth({"sort": sort, "limit": limit, "skip": skip}), - ) - obj = await response.json() - return obj.get("rants", []) if obj.get("success") else [] + session = await self._get_session() + response = await session.get( + url=self.patch_url("devrant/rants"), + params=self.patch_auth({"sort": sort, "limit": limit, "skip": skip}), + ) + obj = await response.json() + return obj.get("rants", []) if obj.get("success") else [] async def get_user_id(self, username: str) -> Optional[int]: """ @@ -398,13 +433,13 @@ class Api: Returns: Optional[int]: The user's ID, or None if not found. """ - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url("get-user-id"), - params=self.patch_auth({"username": username}), - ) - obj = await response.json() - return obj.get("user_id") if obj.get("success") else None + session = await self._get_session() + response = await session.get( + url=self.patch_url("get-user-id"), + params=self.patch_auth({"username": username}), + ) + obj = await response.json() + return obj.get("user_id") if obj.get("success") else None async def mentions(self) -> List[Notification]: """ @@ -431,13 +466,13 @@ class Api: """ if not await self.ensure_login(): return False - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url(f"comments/{comment_id}"), - data=self.patch_auth({"comment": comment}), - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.post( + url=self.patch_url(f"comments/{comment_id}"), + data=self.patch_auth({"comment": comment}), + ) + obj = await response.json() + return obj.get("success", False) async def vote_rant( self, rant_id: int, vote: Literal[-1, 0, 1], reason: Optional[VoteReason] = None @@ -455,15 +490,15 @@ class Api: """ if not await self.ensure_login(): return False - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url(f"devrant/rants/{rant_id}/vote"), - data=self.patch_auth( - {"vote": vote, "reason": reason.value if reason else None} - ), - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.post( + url=self.patch_url(f"devrant/rants/{rant_id}/vote"), + data=self.patch_auth( + {"vote": vote, "reason": reason.value if reason else None} + ), + ) + obj = await response.json() + return obj.get("success", False) async def vote_comment( self, @@ -484,15 +519,15 @@ class Api: """ if not await self.ensure_login(): return False - async with aiohttp.ClientSession() as session: - response = await session.post( - url=self.patch_url(f"comments/{comment_id}/vote"), - data=self.patch_auth( - {"vote": vote, "reason": reason.value if reason else None} - ), - ) - obj = await response.json() - return obj.get("success", False) + session = await self._get_session() + response = await session.post( + url=self.patch_url(f"comments/{comment_id}/vote"), + data=self.patch_auth( + {"vote": vote, "reason": reason.value if reason else None} + ), + ) + obj = await response.json() + return obj.get("success", False) async def notifs(self) -> List[Notification]: """ @@ -503,9 +538,9 @@ class Api: """ if not await self.ensure_login(): return [] - async with aiohttp.ClientSession() as session: - response = await session.get( - url=self.patch_url("users/me/notif-feed"), params=self.patch_auth() - ) - obj = await response.json() - return obj.get("data", {}).get("items", []) + session = await self._get_session() + response = await session.get( + url=self.patch_url("users/me/notif-feed"), params=self.patch_auth() + ) + obj = await response.json() + return obj.get("data", {}).get("items", [])