from fastapi import APIRouter, Request, UploadFile, File, Form, WebSocket, WebSocketDisconnect, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates import dataset import json import aiohttp import feedparser import asyncio from datetime import datetime import time import trafilatura import chromadb from chromadb.config import Settings # Import Settings router = APIRouter() templates = Jinja2Templates(directory="templates") db = dataset.connect('sqlite:///feeds.db') # ChromaDB setup # This creates a persistent database in the 'chroma_db' directory chroma_client = chroma_client = chromadb.PersistentClient(path="chroma_db") chroma_collection = chroma_client.get_or_create_collection(name="articles") # Browser-like headers HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } async def fetch_article_content(session, url): try: async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response: html = await response.text() clean_text = trafilatura.extract(html) return clean_text if clean_text else "" except: return "" async def perform_sync(): feeds_table = db['feeds'] articles_table = db['articles'] feeds = list(feeds_table.all()) total_feeds = len(feeds) completed = 0 total_articles_added = 0 total_articles_processed = 0 failed_feeds = 0 new_articles = [] start_time = time.time() timeout = 300 # 5 minutes # Create newspaper immediately at start newspapers_table = db['newspapers'] sync_time = datetime.now().isoformat() newspaper_id = newspapers_table.insert({ 'created_at': sync_time, 'article_count': 0, 'articles_json': json.dumps([]) }) connector = aiohttp.TCPConnector(limit=10) async with aiohttp.ClientSession(connector=connector) as session: tasks = [] for feed in feeds: tasks.append(fetch_feed(session, feed['url'], feed['name'])) for coro in asyncio.as_completed(tasks): elapsed = time.time() - start_time if elapsed >= timeout: break feed_url, feed_name, content, error = await coro completed += 1 if error: failed_feeds += 1 continue parsed = feedparser.parse(content) for entry in parsed.entries: total_articles_processed += 1 article_link = entry.get('link', '') # Extract clean text content clean_text = await fetch_article_content(session, article_link) if article_link else "" article_data = { 'feed_name': feed_name, 'feed_url': feed_url, 'title': entry.get('title', ''), 'link': article_link, 'description': entry.get('description', '') or entry.get('summary', ''), 'content': clean_text, 'published': entry.get('published', '') or entry.get('updated', ''), 'author': entry.get('author', ''), 'guid': entry.get('id', '') or entry.get('link', ''), 'created_at': datetime.now().isoformat(), 'last_synchronized': datetime.now().isoformat() } existing = articles_table.find_one(guid=article_data['guid']) if not existing: new_articles.append(article_data) total_articles_added += 1 articles_table.upsert(article_data, ['guid']) # Index the article to ChromaDB doc_content = f"{article_data.get('title', '')}\n{article_data.get('description', '')}\n{article_data.get('content', '')}" metadata = {key: str(value) for key, value in article_data.items()} chroma_collection.upsert( documents=[doc_content], metadatas=[metadata], ids=[article_data['guid']] ) feeds_table.update({ 'id': [f for f in feeds if f['url'] == feed_url][0]['id'], 'last_synced': datetime.now().isoformat() }, ['id']) # Update newspaper immediately after each feed newspapers_table.update({ 'id': newspaper_id, 'article_count': len(new_articles), 'articles_json': json.dumps(new_articles) }, ['id']) elapsed_total = time.time() - start_time avg_req_per_sec = completed / elapsed_total if elapsed_total > 0 else 0 # Record sync log with detailed statistics sync_logs_table = db['sync_logs'] sync_logs_table.insert({ 'sync_time': sync_time, 'total_feeds': total_feeds, 'completed_feeds': completed, 'failed_feeds': failed_feeds, 'total_articles_processed': total_articles_processed, 'new_articles': total_articles_added, 'elapsed_seconds': round(elapsed_total, 2), 'avg_req_per_sec': round(avg_req_per_sec, 2), 'timed_out': elapsed_total >= timeout }) return { 'total_feeds': total_feeds, 'total_articles': total_articles_added, 'elapsed': elapsed_total, 'new_articles': new_articles } async def run_sync_task(): return await perform_sync() @router.get("/config", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @router.get("/manage", response_class=HTMLResponse) async def manage_list(request: Request): feeds_table = db['feeds'] feeds = list(feeds_table.all()) return templates.TemplateResponse("manage_list.html", { "request": request, "feeds": feeds, "total": len(feeds) }) @router.get("/manage/create", response_class=HTMLResponse) async def manage_create_page(request: Request): return templates.TemplateResponse("manage_create.html", {"request": request}) @router.post("/manage/create") async def manage_create( name: str = Form(...), url: str = Form(...), type: str = Form(...), category: str = Form(...) ): feeds_table = db['feeds'] feeds_table.insert({ 'name': name, 'url': url, 'type': type, 'category': category }) return RedirectResponse(url="/manage", status_code=303) @router.get("/manage/upload", response_class=HTMLResponse) async def manage_upload_page(request: Request): return templates.TemplateResponse("manage_upload.html", {"request": request}) @router.post("/manage/upload") async def manage_upload(file: UploadFile = File(...)): content = await file.read() feeds_data = json.loads(content) feeds_table = db['feeds'] for feed in feeds_data: feeds_table.upsert(feed, ['url']) return RedirectResponse(url="/manage", status_code=303) @router.get("/manage/edit/{feed_id}", response_class=HTMLResponse) async def manage_edit_page(request: Request, feed_id: int): feeds_table = db['feeds'] feed = feeds_table.find_one(id=feed_id) return templates.TemplateResponse("manage_edit.html", { "request": request, "feed": feed }) @router.post("/manage/edit/{feed_id}") async def manage_edit( feed_id: int, name: str = Form(...), url: str = Form(...), type: str = Form(...), category: str = Form(...) ): feeds_table = db['feeds'] feeds_table.update({ 'id': feed_id, 'name': name, 'url': url, 'type': type, 'category': category }, ['id']) return RedirectResponse(url="/manage", status_code=303) @router.post("/manage/delete/{feed_id}") async def manage_delete(feed_id: int): feeds_table = db['feeds'] feeds_table.delete(id=feed_id) return RedirectResponse(url="/manage", status_code=303) @router.get("/manage/sync", response_class=HTMLResponse) async def manage_sync_page(request: Request): feeds_table = db['feeds'] total_feeds = len(list(feeds_table.all())) articles_table = db['articles'] total_articles = len(list(articles_table.all())) return templates.TemplateResponse("manage_sync.html", { "request": request, "total_feeds": total_feeds, "total_articles": total_articles }) async def fetch_feed(session, feed_url, feed_name): try: async with session.get(feed_url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=30)) as response: content = await response.text() return feed_url, feed_name, content, None except Exception as e: return feed_url, feed_name, None, str(e) @router.websocket("/ws/sync") async def websocket_sync(websocket: WebSocket): await websocket.accept() try: feeds_table = db['feeds'] articles_table = db['articles'] feeds = list(feeds_table.all()) total_feeds = len(feeds) completed = 0 total_articles_added = 0 new_articles = [] start_time = time.time() timeout = 300 # 5 minutes await websocket.send_json({ "type": "start", "total": total_feeds, "message": f"Starting synchronization of {total_feeds} feeds (5 minute timeout)..." }) # Create newspaper immediately at start newspapers_table = db['newspapers'] sync_time = datetime.now().isoformat() newspaper_id = newspapers_table.insert({ 'created_at': sync_time, 'article_count': 0, 'articles_json': json.dumps([]) }) connector = aiohttp.TCPConnector(limit=10) async with aiohttp.ClientSession(connector=connector) as session: tasks = [] for feed in feeds: tasks.append(fetch_feed(session, feed['url'], feed['name'])) for coro in asyncio.as_completed(tasks): elapsed = time.time() - start_time if elapsed >= timeout: await websocket.send_json({ "type": "timeout", "message": "5 minute timeout reached. Newspaper already contains all articles found so far...", "completed": completed, "total": total_feeds }) break feed_url, feed_name, content, error = await coro completed += 1 req_per_sec = completed / elapsed if elapsed > 0 else 0 if error: await websocket.send_json({ "type": "error", "feed": feed_name, "url": feed_url, "error": error, "completed": completed, "total": total_feeds, "req_per_sec": round(req_per_sec, 2) }) continue await websocket.send_json({ "type": "fetching", "feed": feed_name, "url": feed_url, "completed": completed, "total": total_feeds, "req_per_sec": round(req_per_sec, 2) }) parsed = feedparser.parse(content) articles_count = 0 for entry in parsed.entries: article_data = { 'feed_name': feed_name, 'feed_url': feed_url, 'title': entry.get('title', ''), 'link': entry.get('link', ''), 'description': entry.get('description', '') or entry.get('summary', ''), 'published': entry.get('published', '') or entry.get('updated', ''), 'author': entry.get('author', ''), 'guid': entry.get('id', '') or entry.get('link', ''), 'created_at': datetime.now().isoformat(), 'last_synchronized': datetime.now().isoformat() } existing = articles_table.find_one(guid=article_data['guid']) if not existing: new_articles.append(article_data) articles_count += 1 articles_table.upsert(article_data, ['guid']) # Index the article to ChromaDB doc_content = f"{article_data.get('title', '')}\n{article_data.get('description', '')}" metadata = {key: str(value) for key, value in article_data.items() if key != 'content'} # Exclude large content from metadata chroma_collection.upsert( documents=[doc_content], metadatas=[metadata], ids=[article_data['guid']] ) total_articles_added += articles_count feeds_table.update({ 'id': [f for f in feeds if f['url'] == feed_url][0]['id'], 'last_synced': datetime.now().isoformat() }, ['id']) # Update newspaper immediately after each feed newspapers_table.update({ 'id': newspaper_id, 'article_count': len(new_articles), 'articles_json': json.dumps(new_articles) }, ['id']) await websocket.send_json({ "type": "parsed", "feed": feed_name, "articles": articles_count, "completed": completed, "total": total_feeds, "total_articles": total_articles_added, "req_per_sec": round(req_per_sec, 2) }) elapsed_total = time.time() - start_time # Record sync log sync_logs_table = db['sync_logs'] sync_logs_table.insert({ 'sync_time': sync_time, 'total_feeds': total_feeds, 'new_articles': total_articles_added, 'elapsed_seconds': round(elapsed_total, 2) }) await websocket.send_json({ "type": "complete", "total_feeds": total_feeds, "total_articles": total_articles_added, "elapsed": round(elapsed_total, 2), "avg_req_per_sec": round(total_feeds / elapsed_total if elapsed_total > 0 else 0, 2) }) except WebSocketDisconnect: pass except Exception as e: await websocket.send_json({ "type": "error", "message": str(e) }) ## --- API Endpoints --- @router.post("/api/sync-to-chroma", tags=["API"], status_code=200) async def sync_all_articles_to_chroma(): """ Manually synchronizes all articles from the SQLite database to the ChromaDB vector store. This is useful for initializing the search index with existing data. """ articles_table = db['articles'] all_articles = list(articles_table.all()) if not all_articles: return JSONResponse(content={"status": "noop", "message": "No articles in the database to sync."}) documents, metadatas, ids = [], [], [] for article in all_articles: # The document is what ChromaDB will search against. A combo of title and content is good. doc_content = f"{article.get('title', '')}\n{article.get('description', '')}\n{article.get('content', '')}" # Metadata must have values of type str, int, float, or bool. metadata = {key: str(value) for key, value in article.items()} documents.append(doc_content) metadatas.append(metadata) ids.append(article['guid']) # Upsert in batches to be memory-efficient batch_size = 100 for i in range(0, len(ids), batch_size): chroma_collection.upsert( ids=ids[i:i+batch_size], documents=documents[i:i+batch_size], metadatas=metadatas[i:i+batch_size] ) return JSONResponse(content={ "status": "success", "message": f"Successfully indexed {len(all_articles)} articles to ChromaDB." }) @router.get("/api/search", tags=["API"]) async def search_articles( q: str = Query(None, description="The search term to query for."), limit: int = Query(20, ge=1, le=100, description="The maximum number of results to return."), page: int = Query(1, ge=1, description="The page number for paginated results (used when 'q' is not provided).") ): """ Searches for articles within the ChromaDB vector store. - If **q** is provided, performs a similarity search based on the query text. - If **q** is not provided, returns a paginated list of all articles, sorted by insertion order. """ if q: # Perform a similarity search results = chroma_collection.query( query_texts=[q], n_results=limit, include=['metadatas', 'distances'] ) # Format results into a cleaner list of objects formatted_results = [] if results and results.get('ids', [[]])[0]: for i, doc_id in enumerate(results['ids'][0]): res = results['metadatas'][0][i] res['distance'] = results['distances'][0][i] formatted_results.append(res) return JSONResponse(content={"results": formatted_results}) else: # Return a paginated list of articles page_limit = 20 offset = (page - 1) * page_limit results = chroma_collection.get( limit=page_limit, offset=offset, include=['metadatas'] ) return JSONResponse(content={"results": results['metadatas']}) ## --- HTML Page Routes --- @router.get("/newspapers", response_class=HTMLResponse) async def newspapers_list(request: Request): newspapers_table = db['newspapers'] newspapers = list(newspapers_table.all()) newspapers.reverse() return templates.TemplateResponse("newspapers_list.html", { "request": request, "newspapers": newspapers }) @router.get("/sync-logs", response_class=HTMLResponse) async def sync_logs_list(request: Request): sync_logs_table = db['sync_logs'] sync_logs = list(sync_logs_table.all()) sync_logs.reverse() return templates.TemplateResponse("sync_logs_list.html", { "request": request, "sync_logs": sync_logs }) @router.get("/newspaper/{newspaper_id}", response_class=HTMLResponse) async def newspaper_view(request: Request, newspaper_id: int): newspapers_table = db['newspapers'] newspaper = newspapers_table.find_one(id=newspaper_id) if not newspaper: return RedirectResponse(url="/newspapers") articles = json.loads(newspaper['articles_json']) return templates.TemplateResponse("newspaper_view.html", { "request": request, "newspaper": newspaper, "articles": articles }) @router.get("/", response_class=HTMLResponse) async def newspaper_latest(request: Request): newspapers_table = db['newspapers'] newspaper = None try: newspapers= list(db.query("select * from newspapers order by id desc limit 10")) except IndexError: pass for newspaper in newspapers: articles = json.loads(newspaper['articles_json']) if articles: for article in articles: for key, value in article.items(): article[key] = str(value).strip() return templates.TemplateResponse("newspaper_view.html", { "request": request, "newspaper": newspaper, "articles": articles }) return RedirectResponse(url="/newspapers")