# retoor import asyncio import json import logging import time from collections import OrderedDict from typing import Set from database import DatabaseManager from devranta.api import Api, Rant SEARCH_TERMS = [ "python", "javascript", "java", "csharp", "golang", "rust", "react", "angular", "vue", "node", "docker", "kubernetes", "linux", "windows", "macos", "git", "github", "gitlab", "sql", "mongodb", "redis", "api", "rest", "graphql", "bug", "error", "crash", "debug", "fix", "issue", "manager", "deadline", "meeting", "standup", "agile", "scrum", "frontend", "backend", "fullstack", "devops", "cloud", "aws", "typescript", "php", "ruby", "swift", "kotlin", "flutter", ] class BoundedSeenSet: def __init__(self, maxsize: int = 100000): self._set = OrderedDict() self._maxsize = maxsize def add(self, item: int): if item in self._set: self._set.move_to_end(item) else: self._set[item] = None if len(self._set) > self._maxsize: self._set.popitem(last=False) def __contains__(self, item: int) -> bool: return item in self._set def __len__(self) -> int: return len(self._set) def clear(self): self._set.clear() class DevRantCrawler: def __init__( self, api: Api, db: DatabaseManager, rant_consumers: int, user_consumers: int ): self.api = api self.db = db self.rant_queue = asyncio.Queue(maxsize=1000000) self.user_queue = asyncio.Queue(maxsize=1000000) self.shutdown_event = asyncio.Event() self.num_rant_consumers = rant_consumers self.num_user_consumers = user_consumers self.seen_rant_ids = BoundedSeenSet(maxsize=100000) self.seen_user_ids = BoundedSeenSet(maxsize=100000) self._recent_skip = 0 self._top_skip = 0 self._algo_skip = 0 self._search_term_index = 0 self.stats = { "rants_processed": 0, "rants_added_to_db": 0, "comments_added_to_db": 0, "users_processed": 0, "users_added_to_db": 0, "api_errors": 0, "producer_loops": 0, "end_of_feed_hits": 0, "rants_queued": 0, "users_queued": 0, } async def _queue_user_if_new(self, user_id: int): if user_id in self.seen_user_ids: return self.seen_user_ids.add(user_id) if not await self.db.user_exists(user_id): await self.user_queue.put(user_id) self.stats["users_queued"] += 1 async def _queue_rant_if_new(self, rant_obj: Rant): rant_id = rant_obj["id"] if rant_id in self.seen_rant_ids: return self.seen_rant_ids.add(rant_id) if not await self.db.rant_exists(rant_id): await self.db.add_rant(rant_obj) self.stats["rants_added_to_db"] += 1 await self.rant_queue.put(rant_id) self.stats["rants_queued"] += 1 async def _initial_seed(self): logging.info("Starting initial seeder to re-ignite crawling process...") user_ids = await self.db.get_random_user_ids(limit=2000) if not user_ids: logging.info( "Seeder found no existing users. Crawler will start from scratch." ) return for user_id in user_ids: if user_id not in self.seen_user_ids: self.seen_user_ids.add(user_id) await self.user_queue.put(user_id) self.stats["users_queued"] += 1 logging.info( f"Seeder finished: Queued {len(user_ids)} users to kickstart exploration." ) async def _save_state(self): state = { "recent_skip": self._recent_skip, "top_skip": self._top_skip, "algo_skip": self._algo_skip, "search_term_index": self._search_term_index, "last_saved": int(time.time()), } await self.db.save_crawler_state("producer_state", json.dumps(state)) logging.debug("Crawler state saved.") async def _load_state(self): state_json = await self.db.load_crawler_state("producer_state") if state_json: try: state = json.loads(state_json) self._recent_skip = state.get("recent_skip", 0) self._top_skip = state.get("top_skip", 0) self._algo_skip = state.get("algo_skip", 0) self._search_term_index = state.get("search_term_index", 0) logging.info(f"Loaded crawler state: {state}") except json.JSONDecodeError: logging.warning("Failed to decode crawler state, starting fresh.") async def _state_saver(self): logging.info("State saver started.") while not self.shutdown_event.is_set(): await asyncio.sleep(60) await self._save_state() async def _rant_producer(self): logging.info("Recent rant producer started.") consecutive_empty_responses = 0 while not self.shutdown_event.is_set(): try: logging.debug(f"Recent producer: Fetching rants with skip={self._recent_skip}...") rants = await self.api.get_rants(sort="recent", limit=50, skip=self._recent_skip) self.stats["producer_loops"] += 1 if not rants: consecutive_empty_responses += 1 logging.debug( f"Recent producer: Feed returned empty. Consecutive empty hits: {consecutive_empty_responses}." ) if consecutive_empty_responses >= 5: self.stats["end_of_feed_hits"] += 1 logging.info( "Recent producer: End of feed likely reached. Pausing for 15 minutes before reset." ) await asyncio.sleep(900) self._recent_skip = 0 consecutive_empty_responses = 0 else: await asyncio.sleep(10) continue consecutive_empty_responses = 0 new_rants_found = 0 for rant in rants: await self._queue_rant_if_new(rant) new_rants_found += 1 logging.debug( f"Recent producer: Processed {new_rants_found} rants from feed." ) self._recent_skip += len(rants) await asyncio.sleep(2) except Exception as e: logging.error( f"Recent producer: Unhandled exception: {e}. Retrying in 60s." ) self.stats["api_errors"] += 1 await asyncio.sleep(60) async def _top_rant_producer(self): logging.info("Top rant producer started.") while not self.shutdown_event.is_set(): try: logging.debug(f"Top producer: Fetching rants with skip={self._top_skip}...") rants = await self.api.get_rants(sort="top", limit=50, skip=self._top_skip) if not rants: logging.info("Top producer: End of feed reached. Resetting after 1 hour.") self._top_skip = 0 await asyncio.sleep(3600) continue for rant in rants: await self._queue_rant_if_new(rant) logging.debug(f"Top producer: Processed {len(rants)} rants.") self._top_skip += len(rants) await asyncio.sleep(5) except Exception as e: logging.error(f"Top producer: Unhandled exception: {e}. Retrying in 60s.") self.stats["api_errors"] += 1 await asyncio.sleep(60) async def _algo_rant_producer(self): logging.info("Algo rant producer started.") while not self.shutdown_event.is_set(): try: logging.debug(f"Algo producer: Fetching rants with skip={self._algo_skip}...") rants = await self.api.get_rants(sort="algo", limit=50, skip=self._algo_skip) if not rants: logging.info("Algo producer: End of feed reached. Resetting after 1 hour.") self._algo_skip = 0 await asyncio.sleep(3600) continue for rant in rants: await self._queue_rant_if_new(rant) logging.debug(f"Algo producer: Processed {len(rants)} rants.") self._algo_skip += len(rants) await asyncio.sleep(5) except Exception as e: logging.error(f"Algo producer: Unhandled exception: {e}. Retrying in 60s.") self.stats["api_errors"] += 1 await asyncio.sleep(60) async def _search_producer(self): logging.info("Search producer started.") while not self.shutdown_event.is_set(): try: term = SEARCH_TERMS[self._search_term_index % len(SEARCH_TERMS)] logging.debug(f"Search producer: Searching for '{term}'...") rants = await self.api.search(term) for rant in rants: await self._queue_rant_if_new(rant) logging.debug(f"Search producer: Found {len(rants)} rants for '{term}'.") self._search_term_index += 1 await asyncio.sleep(30) except Exception as e: logging.error(f"Search producer: Unhandled exception: {e}. Retrying in 60s.") self.stats["api_errors"] += 1 await asyncio.sleep(60) async def _rant_consumer(self, worker_id: int): logging.info(f"Rant consumer #{worker_id} started.") while not self.shutdown_event.is_set(): try: rant_id = await asyncio.wait_for(self.rant_queue.get(), timeout=5.0) logging.debug( f"Rant consumer #{worker_id}: Processing rant ID {rant_id}." ) rant_details = await self.api.get_rant(rant_id) if not rant_details or not rant_details.get("success"): logging.warning( f"Rant consumer #{worker_id}: Failed to fetch details for rant {rant_id}." ) self.rant_queue.task_done() continue await self._queue_user_if_new(rant_details["rant"]["user_id"]) comments = rant_details.get("comments", []) for comment in comments: await self.db.add_comment(comment) self.stats["comments_added_to_db"] += 1 await self._queue_user_if_new(comment["user_id"]) logging.debug( f"Rant consumer #{worker_id}: Finished processing rant {rant_id}, found {len(comments)} comments." ) self.stats["rants_processed"] += 1 self.rant_queue.task_done() except asyncio.TimeoutError: continue except Exception as e: logging.error(f"Rant consumer #{worker_id}: Unhandled exception: {e}") try: self.rant_queue.task_done() except ValueError: pass async def _user_consumer(self, worker_id: int): logging.info(f"User consumer #{worker_id} started.") while not self.shutdown_event.is_set(): try: user_id = await asyncio.wait_for(self.user_queue.get(), timeout=5.0) logging.debug( f"User consumer #{worker_id}: Processing user ID {user_id}." ) profile = await self.api.get_profile(user_id) if not profile: logging.warning( f"User consumer #{worker_id}: Could not fetch profile for user {user_id}." ) self.user_queue.task_done() continue await self.db.add_user(profile, user_id) self.stats["users_added_to_db"] += 1 rants_found_on_profile = 0 content_sections = profile.get("content", {}).get("content", {}) for section_name in ["rants", "upvoted", "favorites"]: for rant_obj in content_sections.get(section_name, []): await self._queue_rant_if_new(rant_obj) rants_found_on_profile += 1 logging.debug( f"User consumer #{worker_id}: Finished user {user_id}, found and queued {rants_found_on_profile} associated rants." ) self.stats["users_processed"] += 1 self.user_queue.task_done() except asyncio.TimeoutError: continue except Exception as e: logging.error(f"User consumer #{worker_id}: Unhandled exception: {e}") try: self.user_queue.task_done() except ValueError: pass async def _stats_reporter(self): logging.info("Stats reporter started.") while not self.shutdown_event.is_set(): await asyncio.sleep(15) logging.info( f"[STATS] Rants Q'd/Proc: {self.stats['rants_queued']}/{self.stats['rants_processed']} | " f"Users Q'd/Proc: {self.stats['users_queued']}/{self.stats['users_processed']} | " f"Comments DB: {self.stats['comments_added_to_db']} | " f"Queues (R/U): {self.rant_queue.qsize()}/{self.user_queue.qsize()} | " f"API Errors: {self.stats['api_errors']}" ) async def run(self): logging.info("Exhaustive crawler starting...") await self._load_state() await self._initial_seed() logging.info("Starting main producer and consumer tasks...") tasks = [] try: tasks.append(asyncio.create_task(self._rant_producer())) tasks.append(asyncio.create_task(self._top_rant_producer())) tasks.append(asyncio.create_task(self._algo_rant_producer())) tasks.append(asyncio.create_task(self._search_producer())) tasks.append(asyncio.create_task(self._stats_reporter())) tasks.append(asyncio.create_task(self._state_saver())) for i in range(self.num_rant_consumers): tasks.append(asyncio.create_task(self._rant_consumer(i + 1))) for i in range(self.num_user_consumers): tasks.append(asyncio.create_task(self._user_consumer(i + 1))) await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: logging.info("Crawler run cancelled.") finally: await self.shutdown() async def shutdown(self): if self.shutdown_event.is_set(): return logging.info("Shutting down... sending signal to all tasks.") self.shutdown_event.set() await self._save_state() logging.info("Waiting for queues to empty... Press Ctrl+C again to force exit.") try: await asyncio.wait_for(self.rant_queue.join(), timeout=30) await asyncio.wait_for(self.user_queue.join(), timeout=30) except (asyncio.TimeoutError, asyncio.CancelledError): logging.warning("Could not empty queues in time, proceeding with shutdown.") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) logging.info("All tasks cancelled.") logging.info(f"--- FINAL STATS ---\n{self.stats}")