from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import dataset import asyncio from datetime import datetime import chromadb from chromadb.config import Settings app = FastAPI(title="RSS Feed Manager") # Database and ChromaDB setup (accessible by background tasks) db = dataset.connect('sqlite:///feeds.db') chroma_client = chromadb.Client( Settings(is_persistent=True, persist_directory="chroma_db") ) chroma_collection = chroma_client.get_or_create_collection(name="articles") # Templates setup templates = Jinja2Templates(directory="templates") # Import routers from routers import router as manage_router, run_sync_task app.include_router(manage_router) async def hourly_sync_task(): """Periodically fetches new articles from RSS feeds.""" await asyncio.sleep(15) while True: print("Hourly Sync: Starting feed synchronization.") try: await run_sync_task() print("Hourly Sync: Feed synchronization finished.") except Exception as e: print(f"Error in hourly sync: {e}") await asyncio.sleep(3600) # Wait 1 hour async def chroma_sync_task(): """ A continuous background service that syncs the latest 100 articles, checking against ChromaDB to avoid duplicates. """ print("Chroma Sync Service: Task started.") articles_table = db['articles'] while True: try: print("Chroma Sync Service: Checking latest 100 articles from the database...") # 1. Fetch the 100 most recent articles from SQLite latest_articles = list(articles_table.find(order_by='-id', _limit=100)) if not latest_articles: print("Chroma Sync Service: No articles in the database yet. Waiting...") await asyncio.sleep(10) continue # 2. Get the IDs to check against ChromaDB guids_to_check = [article['guid'] for article in latest_articles] # 3. Check which articles already exist in ChromaDB existing_chroma_docs = chroma_collection.get(ids=guids_to_check) existing_guids = set(existing_chroma_docs['ids']) # 4. Filter out the articles that are already synced articles_to_index = [article for article in latest_articles if article['guid'] not in existing_guids] if articles_to_index: print(f"Chroma Sync Service: Found {len(articles_to_index)} new articles to index.") documents, metadatas, ids = [], [], [] for article in articles_to_index: doc_content = f"{article.get('title', '')}\n{article.get('description', '')}\n{article.get('content', '')}" metadata = {key: str(value) for key, value in article.items()} documents.append(doc_content) metadatas.append(metadata) ids.append(article['guid']) # 5. Index the new batch to ChromaDB chroma_collection.upsert(ids=ids, documents=documents, metadatas=metadatas) print(f"Chroma Sync Service: Successfully indexed {len(articles_to_index)} articles.") else: print("Chroma Sync Service: Latest 100 articles are already synced.") await asyncio.sleep(10) # Wait 10 seconds before the next check. except Exception as e: print(f"Error in Chroma sync service: {e}") await asyncio.sleep(30) # Wait longer after an error @app.on_event("startup") async def startup_event(): # Ensure tables exist db['feeds'] db['articles'] # Start background tasks print("Application startup: Initializing background tasks.") asyncio.create_task(hourly_sync_task()) asyncio.create_task(chroma_sync_task()) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8592)