413 lines
16 KiB
Python
Raw Normal View History

2025-12-28 06:03:12 +01:00
# retoor <retoor@molodetz.nl>
2025-08-03 00:40:34 +02:00
import asyncio
2025-12-28 06:03:12 +01:00
import json
2025-08-03 00:40:34 +02:00
import logging
2025-12-28 06:03:12 +01:00
import time
from collections import OrderedDict
2025-08-03 00:40:34 +02:00
from typing import Set
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
from database import DatabaseManager
2025-08-13 00:06:44 +02:00
from devranta.api import Api, Rant
2025-12-28 06:03:12 +01:00
SEARCH_TERMS = [
"python", "javascript", "java", "csharp", "golang", "rust",
"react", "angular", "vue", "node", "docker", "kubernetes",
"linux", "windows", "macos", "git", "github", "gitlab",
"sql", "mongodb", "redis", "api", "rest", "graphql",
"bug", "error", "crash", "debug", "fix", "issue",
"manager", "deadline", "meeting", "standup", "agile", "scrum",
"frontend", "backend", "fullstack", "devops", "cloud", "aws",
"typescript", "php", "ruby", "swift", "kotlin", "flutter",
]
class BoundedSeenSet:
def __init__(self, maxsize: int = 100000):
self._set = OrderedDict()
self._maxsize = maxsize
def add(self, item: int):
if item in self._set:
self._set.move_to_end(item)
else:
self._set[item] = None
if len(self._set) > self._maxsize:
self._set.popitem(last=False)
def __contains__(self, item: int) -> bool:
return item in self._set
def __len__(self) -> int:
return len(self._set)
def clear(self):
self._set.clear()
2025-08-03 00:40:34 +02:00
class DevRantCrawler:
2025-08-13 00:06:44 +02:00
def __init__(
self, api: Api, db: DatabaseManager, rant_consumers: int, user_consumers: int
):
2025-08-03 00:40:34 +02:00
self.api = api
self.db = db
self.rant_queue = asyncio.Queue(maxsize=1000000)
self.user_queue = asyncio.Queue(maxsize=1000000)
self.shutdown_event = asyncio.Event()
self.num_rant_consumers = rant_consumers
self.num_user_consumers = user_consumers
2025-12-28 06:03:12 +01:00
self.seen_rant_ids = BoundedSeenSet(maxsize=100000)
self.seen_user_ids = BoundedSeenSet(maxsize=100000)
self._recent_skip = 0
self._top_skip = 0
self._algo_skip = 0
self._search_term_index = 0
2025-08-03 00:40:34 +02:00
self.stats = {
2025-08-13 00:06:44 +02:00
"rants_processed": 0,
"rants_added_to_db": 0,
"comments_added_to_db": 0,
"users_processed": 0,
"users_added_to_db": 0,
"api_errors": 0,
"producer_loops": 0,
"end_of_feed_hits": 0,
"rants_queued": 0,
"users_queued": 0,
2025-08-03 00:40:34 +02:00
}
async def _queue_user_if_new(self, user_id: int):
if user_id in self.seen_user_ids:
return
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
self.seen_user_ids.add(user_id)
if not await self.db.user_exists(user_id):
await self.user_queue.put(user_id)
self.stats["users_queued"] += 1
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
async def _queue_rant_if_new(self, rant_obj: Rant):
2025-08-13 00:06:44 +02:00
rant_id = rant_obj["id"]
2025-08-03 00:40:34 +02:00
if rant_id in self.seen_rant_ids:
return
self.seen_rant_ids.add(rant_id)
if not await self.db.rant_exists(rant_id):
await self.db.add_rant(rant_obj)
self.stats["rants_added_to_db"] += 1
await self.rant_queue.put(rant_id)
self.stats["rants_queued"] += 1
async def _initial_seed(self):
logging.info("Starting initial seeder to re-ignite crawling process...")
user_ids = await self.db.get_random_user_ids(limit=2000)
if not user_ids:
2025-08-13 00:06:44 +02:00
logging.info(
"Seeder found no existing users. Crawler will start from scratch."
)
2025-08-03 00:40:34 +02:00
return
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
for user_id in user_ids:
if user_id not in self.seen_user_ids:
self.seen_user_ids.add(user_id)
await self.user_queue.put(user_id)
self.stats["users_queued"] += 1
2025-08-13 00:06:44 +02:00
logging.info(
f"Seeder finished: Queued {len(user_ids)} users to kickstart exploration."
)
2025-08-03 00:40:34 +02:00
2025-12-28 06:03:12 +01:00
async def _save_state(self):
state = {
"recent_skip": self._recent_skip,
"top_skip": self._top_skip,
"algo_skip": self._algo_skip,
"search_term_index": self._search_term_index,
"last_saved": int(time.time()),
}
await self.db.save_crawler_state("producer_state", json.dumps(state))
logging.debug("Crawler state saved.")
async def _load_state(self):
state_json = await self.db.load_crawler_state("producer_state")
if state_json:
try:
state = json.loads(state_json)
self._recent_skip = state.get("recent_skip", 0)
self._top_skip = state.get("top_skip", 0)
self._algo_skip = state.get("algo_skip", 0)
self._search_term_index = state.get("search_term_index", 0)
logging.info(f"Loaded crawler state: {state}")
except json.JSONDecodeError:
logging.warning("Failed to decode crawler state, starting fresh.")
async def _state_saver(self):
logging.info("State saver started.")
while not self.shutdown_event.is_set():
await asyncio.sleep(60)
await self._save_state()
2025-08-03 00:40:34 +02:00
async def _rant_producer(self):
2025-12-28 06:03:12 +01:00
logging.info("Recent rant producer started.")
2025-08-03 00:40:34 +02:00
consecutive_empty_responses = 0
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
while not self.shutdown_event.is_set():
try:
2025-12-28 06:03:12 +01:00
logging.debug(f"Recent producer: Fetching rants with skip={self._recent_skip}...")
rants = await self.api.get_rants(sort="recent", limit=50, skip=self._recent_skip)
2025-08-03 00:40:34 +02:00
self.stats["producer_loops"] += 1
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
if not rants:
consecutive_empty_responses += 1
2025-12-28 06:03:12 +01:00
logging.debug(
f"Recent producer: Feed returned empty. Consecutive empty hits: {consecutive_empty_responses}."
2025-08-13 00:06:44 +02:00
)
2025-08-03 00:40:34 +02:00
if consecutive_empty_responses >= 5:
self.stats["end_of_feed_hits"] += 1
2025-08-13 00:06:44 +02:00
logging.info(
2025-12-28 06:03:12 +01:00
"Recent producer: End of feed likely reached. Pausing for 15 minutes before reset."
2025-08-13 00:06:44 +02:00
)
2025-08-03 00:40:34 +02:00
await asyncio.sleep(900)
2025-12-28 06:03:12 +01:00
self._recent_skip = 0
2025-08-03 00:40:34 +02:00
consecutive_empty_responses = 0
else:
await asyncio.sleep(10)
continue
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
consecutive_empty_responses = 0
new_rants_found = 0
for rant in rants:
await self._queue_rant_if_new(rant)
new_rants_found += 1
2025-08-13 00:06:44 +02:00
2025-12-28 06:03:12 +01:00
logging.debug(
f"Recent producer: Processed {new_rants_found} rants from feed."
2025-08-13 00:06:44 +02:00
)
2025-12-28 06:03:12 +01:00
self._recent_skip += len(rants)
2025-08-03 00:40:34 +02:00
await asyncio.sleep(2)
except Exception as e:
2025-12-28 06:03:12 +01:00
logging.error(
f"Recent producer: Unhandled exception: {e}. Retrying in 60s."
2025-08-13 00:06:44 +02:00
)
2025-08-03 00:40:34 +02:00
self.stats["api_errors"] += 1
await asyncio.sleep(60)
2025-12-28 06:03:12 +01:00
async def _top_rant_producer(self):
logging.info("Top rant producer started.")
while not self.shutdown_event.is_set():
try:
logging.debug(f"Top producer: Fetching rants with skip={self._top_skip}...")
rants = await self.api.get_rants(sort="top", limit=50, skip=self._top_skip)
if not rants:
logging.info("Top producer: End of feed reached. Resetting after 1 hour.")
self._top_skip = 0
await asyncio.sleep(3600)
continue
for rant in rants:
await self._queue_rant_if_new(rant)
logging.debug(f"Top producer: Processed {len(rants)} rants.")
self._top_skip += len(rants)
await asyncio.sleep(5)
except Exception as e:
logging.error(f"Top producer: Unhandled exception: {e}. Retrying in 60s.")
self.stats["api_errors"] += 1
await asyncio.sleep(60)
async def _algo_rant_producer(self):
logging.info("Algo rant producer started.")
while not self.shutdown_event.is_set():
try:
logging.debug(f"Algo producer: Fetching rants with skip={self._algo_skip}...")
rants = await self.api.get_rants(sort="algo", limit=50, skip=self._algo_skip)
if not rants:
logging.info("Algo producer: End of feed reached. Resetting after 1 hour.")
self._algo_skip = 0
await asyncio.sleep(3600)
continue
for rant in rants:
await self._queue_rant_if_new(rant)
logging.debug(f"Algo producer: Processed {len(rants)} rants.")
self._algo_skip += len(rants)
await asyncio.sleep(5)
except Exception as e:
logging.error(f"Algo producer: Unhandled exception: {e}. Retrying in 60s.")
self.stats["api_errors"] += 1
await asyncio.sleep(60)
async def _search_producer(self):
logging.info("Search producer started.")
while not self.shutdown_event.is_set():
try:
term = SEARCH_TERMS[self._search_term_index % len(SEARCH_TERMS)]
logging.debug(f"Search producer: Searching for '{term}'...")
rants = await self.api.search(term)
for rant in rants:
await self._queue_rant_if_new(rant)
logging.debug(f"Search producer: Found {len(rants)} rants for '{term}'.")
self._search_term_index += 1
await asyncio.sleep(30)
except Exception as e:
logging.error(f"Search producer: Unhandled exception: {e}. Retrying in 60s.")
self.stats["api_errors"] += 1
await asyncio.sleep(60)
2025-08-03 00:40:34 +02:00
async def _rant_consumer(self, worker_id: int):
logging.info(f"Rant consumer #{worker_id} started.")
while not self.shutdown_event.is_set():
try:
2025-12-28 06:03:12 +01:00
rant_id = await asyncio.wait_for(self.rant_queue.get(), timeout=5.0)
logging.debug(
2025-08-13 00:06:44 +02:00
f"Rant consumer #{worker_id}: Processing rant ID {rant_id}."
)
2025-08-03 00:40:34 +02:00
rant_details = await self.api.get_rant(rant_id)
if not rant_details or not rant_details.get("success"):
2025-08-13 00:06:44 +02:00
logging.warning(
f"Rant consumer #{worker_id}: Failed to fetch details for rant {rant_id}."
)
2025-08-03 00:40:34 +02:00
self.rant_queue.task_done()
continue
2025-08-13 00:06:44 +02:00
await self._queue_user_if_new(rant_details["rant"]["user_id"])
2025-08-03 00:40:34 +02:00
comments = rant_details.get("comments", [])
for comment in comments:
await self.db.add_comment(comment)
self.stats["comments_added_to_db"] += 1
2025-08-13 00:06:44 +02:00
await self._queue_user_if_new(comment["user_id"])
2025-12-28 06:03:12 +01:00
logging.debug(
2025-08-13 00:06:44 +02:00
f"Rant consumer #{worker_id}: Finished processing rant {rant_id}, found {len(comments)} comments."
)
2025-08-03 00:40:34 +02:00
self.stats["rants_processed"] += 1
self.rant_queue.task_done()
2025-12-28 06:03:12 +01:00
except asyncio.TimeoutError:
continue
2025-08-03 00:40:34 +02:00
except Exception as e:
logging.error(f"Rant consumer #{worker_id}: Unhandled exception: {e}")
2025-12-28 06:03:12 +01:00
try:
self.rant_queue.task_done()
except ValueError:
pass
2025-08-03 00:40:34 +02:00
async def _user_consumer(self, worker_id: int):
logging.info(f"User consumer #{worker_id} started.")
while not self.shutdown_event.is_set():
try:
2025-12-28 06:03:12 +01:00
user_id = await asyncio.wait_for(self.user_queue.get(), timeout=5.0)
logging.debug(
2025-08-13 00:06:44 +02:00
f"User consumer #{worker_id}: Processing user ID {user_id}."
)
2025-08-03 00:40:34 +02:00
profile = await self.api.get_profile(user_id)
if not profile:
2025-08-13 00:06:44 +02:00
logging.warning(
f"User consumer #{worker_id}: Could not fetch profile for user {user_id}."
)
2025-08-03 00:40:34 +02:00
self.user_queue.task_done()
continue
await self.db.add_user(profile, user_id)
self.stats["users_added_to_db"] += 1
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
rants_found_on_profile = 0
content_sections = profile.get("content", {}).get("content", {})
for section_name in ["rants", "upvoted", "favorites"]:
for rant_obj in content_sections.get(section_name, []):
await self._queue_rant_if_new(rant_obj)
rants_found_on_profile += 1
2025-12-28 06:03:12 +01:00
logging.debug(
2025-08-13 00:06:44 +02:00
f"User consumer #{worker_id}: Finished user {user_id}, found and queued {rants_found_on_profile} associated rants."
)
2025-08-03 00:40:34 +02:00
self.stats["users_processed"] += 1
self.user_queue.task_done()
2025-12-28 06:03:12 +01:00
except asyncio.TimeoutError:
continue
2025-08-03 00:40:34 +02:00
except Exception as e:
logging.error(f"User consumer #{worker_id}: Unhandled exception: {e}")
2025-12-28 06:03:12 +01:00
try:
self.user_queue.task_done()
except ValueError:
pass
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
async def _stats_reporter(self):
logging.info("Stats reporter started.")
while not self.shutdown_event.is_set():
await asyncio.sleep(15)
logging.info(
f"[STATS] Rants Q'd/Proc: {self.stats['rants_queued']}/{self.stats['rants_processed']} | "
f"Users Q'd/Proc: {self.stats['users_queued']}/{self.stats['users_processed']} | "
f"Comments DB: {self.stats['comments_added_to_db']} | "
f"Queues (R/U): {self.rant_queue.qsize()}/{self.user_queue.qsize()} | "
f"API Errors: {self.stats['api_errors']}"
)
async def run(self):
logging.info("Exhaustive crawler starting...")
2025-12-28 06:03:12 +01:00
await self._load_state()
2025-08-03 00:40:34 +02:00
await self._initial_seed()
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
logging.info("Starting main producer and consumer tasks...")
tasks = []
try:
tasks.append(asyncio.create_task(self._rant_producer()))
2025-12-28 06:03:12 +01:00
tasks.append(asyncio.create_task(self._top_rant_producer()))
tasks.append(asyncio.create_task(self._algo_rant_producer()))
tasks.append(asyncio.create_task(self._search_producer()))
2025-08-03 00:40:34 +02:00
tasks.append(asyncio.create_task(self._stats_reporter()))
2025-12-28 06:03:12 +01:00
tasks.append(asyncio.create_task(self._state_saver()))
2025-08-03 00:40:34 +02:00
for i in range(self.num_rant_consumers):
tasks.append(asyncio.create_task(self._rant_consumer(i + 1)))
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
for i in range(self.num_user_consumers):
tasks.append(asyncio.create_task(self._user_consumer(i + 1)))
await asyncio.gather(*tasks, return_exceptions=True)
except asyncio.CancelledError:
logging.info("Crawler run cancelled.")
finally:
await self.shutdown()
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
async def shutdown(self):
if self.shutdown_event.is_set():
return
logging.info("Shutting down... sending signal to all tasks.")
self.shutdown_event.set()
2025-12-28 06:03:12 +01:00
await self._save_state()
2025-08-03 00:40:34 +02:00
logging.info("Waiting for queues to empty... Press Ctrl+C again to force exit.")
try:
await asyncio.wait_for(self.rant_queue.join(), timeout=30)
await asyncio.wait_for(self.user_queue.join(), timeout=30)
except (asyncio.TimeoutError, asyncio.CancelledError):
logging.warning("Could not empty queues in time, proceeding with shutdown.")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
2025-08-13 00:06:44 +02:00
2025-08-03 00:40:34 +02:00
await asyncio.gather(*tasks, return_exceptions=True)
logging.info("All tasks cancelled.")
logging.info(f"--- FINAL STATS ---\n{self.stats}")