from fastapi import APIRouter, Request, UploadFile, File, Form, WebSocket, WebSocketDisconnect, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
import dataset
import json
import aiohttp
import feedparser
import asyncio
from datetime import datetime
import time
import trafilatura
import chromadb
from chromadb.config import Settings # Import Settings
router = APIRouter()
templates = Jinja2Templates(directory="templates")
db = dataset.connect('sqlite:///feeds.db')
# ChromaDB setup
# This creates a persistent database in the 'chroma_db' directory
chroma_client = chroma_client = chromadb.PersistentClient(path="chroma_db")
chroma_collection = chroma_client.get_or_create_collection(name="articles")
# Browser-like headers
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
async def fetch_article_content(session, url):
try:
async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
html = await response.text()
clean_text = trafilatura.extract(html)
return clean_text if clean_text else ""
except:
return ""
async def perform_sync():
feeds_table = db['feeds']
articles_table = db['articles']
feeds = list(feeds_table.all())
total_feeds = len(feeds)
completed = 0
total_articles_added = 0
total_articles_processed = 0
failed_feeds = 0
new_articles = []
start_time = time.time()
timeout = 300 # 5 minutes
# Create newspaper immediately at start
newspapers_table = db['newspapers']
sync_time = datetime.now().isoformat()
newspaper_id = newspapers_table.insert({
'created_at': sync_time,
'article_count': 0,
'articles_json': json.dumps([])
})
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for feed in feeds:
tasks.append(fetch_feed(session, feed['url'], feed['name']))
for coro in asyncio.as_completed(tasks):
elapsed = time.time() - start_time
if elapsed >= timeout:
break
feed_url, feed_name, content, error = await coro
completed += 1
if error:
failed_feeds += 1
continue
parsed = feedparser.parse(content)
for entry in parsed.entries:
total_articles_processed += 1
article_link = entry.get('link', '')
# Extract clean text content
clean_text = await fetch_article_content(session, article_link) if article_link else ""
article_data = {
'feed_name': feed_name,
'feed_url': feed_url,
'title': entry.get('title', ''),
'link': article_link,
'description': entry.get('description', '') or entry.get('summary', ''),
'content': clean_text,
'published': entry.get('published', '') or entry.get('updated', ''),
'author': entry.get('author', ''),
'guid': entry.get('id', '') or entry.get('link', ''),
'created_at': datetime.now().isoformat(),
'last_synchronized': datetime.now().isoformat()
}
existing = articles_table.find_one(guid=article_data['guid'])
if not existing:
new_articles.append(article_data)
total_articles_added += 1
articles_table.upsert(article_data, ['guid'])
# Index the article to ChromaDB
doc_content = f"{article_data.get('title', '')}\n{article_data.get('description', '')}\n{article_data.get('content', '')}"
metadata = {key: str(value) for key, value in article_data.items()}
chroma_collection.upsert(
documents=[doc_content],
metadatas=[metadata],
ids=[article_data['guid']]
)
feeds_table.update({
'id': [f for f in feeds if f['url'] == feed_url][0]['id'],
'last_synced': datetime.now().isoformat()
}, ['id'])
# Update newspaper immediately after each feed
newspapers_table.update({
'id': newspaper_id,
'article_count': len(new_articles),
'articles_json': json.dumps(new_articles)
}, ['id'])
elapsed_total = time.time() - start_time
avg_req_per_sec = completed / elapsed_total if elapsed_total > 0 else 0
# Record sync log with detailed statistics
sync_logs_table = db['sync_logs']
sync_logs_table.insert({
'sync_time': sync_time,
'total_feeds': total_feeds,
'completed_feeds': completed,
'failed_feeds': failed_feeds,
'total_articles_processed': total_articles_processed,
'new_articles': total_articles_added,
'elapsed_seconds': round(elapsed_total, 2),
'avg_req_per_sec': round(avg_req_per_sec, 2),
'timed_out': elapsed_total >= timeout
})
return {
'total_feeds': total_feeds,
'total_articles': total_articles_added,
'elapsed': elapsed_total,
'new_articles': new_articles
}
async def run_sync_task():
return await perform_sync()
@router.get("/config", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@router.get("/manage", response_class=HTMLResponse)
async def manage_list(request: Request):
feeds_table = db['feeds']
feeds = list(feeds_table.all())
return templates.TemplateResponse("manage_list.html", {
"request": request,
"feeds": feeds,
"total": len(feeds)
})
@router.get("/manage/create", response_class=HTMLResponse)
async def manage_create_page(request: Request):
return templates.TemplateResponse("manage_create.html", {"request": request})
@router.post("/manage/create")
async def manage_create(
name: str = Form(...),
url: str = Form(...),
type: str = Form(...),
category: str = Form(...)
):
feeds_table = db['feeds']
feeds_table.insert({
'name': name,
'url': url,
'type': type,
'category': category
})
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/upload", response_class=HTMLResponse)
async def manage_upload_page(request: Request):
return templates.TemplateResponse("manage_upload.html", {"request": request})
@router.post("/manage/upload")
async def manage_upload(file: UploadFile = File(...)):
content = await file.read()
feeds_data = json.loads(content)
feeds_table = db['feeds']
for feed in feeds_data:
feeds_table.upsert(feed, ['url'])
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/edit/{feed_id}", response_class=HTMLResponse)
async def manage_edit_page(request: Request, feed_id: int):
feeds_table = db['feeds']
feed = feeds_table.find_one(id=feed_id)
return templates.TemplateResponse("manage_edit.html", {
"request": request,
"feed": feed
})
@router.post("/manage/edit/{feed_id}")
async def manage_edit(
feed_id: int,
name: str = Form(...),
url: str = Form(...),
type: str = Form(...),
category: str = Form(...)
):
feeds_table = db['feeds']
feeds_table.update({
'id': feed_id,
'name': name,
'url': url,
'type': type,
'category': category
}, ['id'])
return RedirectResponse(url="/manage", status_code=303)
@router.post("/manage/delete/{feed_id}")
async def manage_delete(feed_id: int):
feeds_table = db['feeds']
feeds_table.delete(id=feed_id)
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/sync", response_class=HTMLResponse)
async def manage_sync_page(request: Request):
feeds_table = db['feeds']
total_feeds = len(list(feeds_table.all()))
articles_table = db['articles']
total_articles = len(list(articles_table.all()))
return templates.TemplateResponse("manage_sync.html", {
"request": request,
"total_feeds": total_feeds,
"total_articles": total_articles
})
async def fetch_feed(session, feed_url, feed_name):
try:
async with session.get(feed_url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=30)) as response:
content = await response.text()
return feed_url, feed_name, content, None
except Exception as e:
return feed_url, feed_name, None, str(e)
@router.websocket("/ws/sync")
async def websocket_sync(websocket: WebSocket):
await websocket.accept()
try:
feeds_table = db['feeds']
articles_table = db['articles']
feeds = list(feeds_table.all())
total_feeds = len(feeds)
completed = 0
total_articles_added = 0
new_articles = []
start_time = time.time()
timeout = 300 # 5 minutes
await websocket.send_json({
"type": "start",
"total": total_feeds,
"message": f"Starting synchronization of {total_feeds} feeds (5 minute timeout)..."
})
# Create newspaper immediately at start
newspapers_table = db['newspapers']
sync_time = datetime.now().isoformat()
newspaper_id = newspapers_table.insert({
'created_at': sync_time,
'article_count': 0,
'articles_json': json.dumps([])
})
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for feed in feeds:
tasks.append(fetch_feed(session, feed['url'], feed['name']))
for coro in asyncio.as_completed(tasks):
elapsed = time.time() - start_time
if elapsed >= timeout:
await websocket.send_json({
"type": "timeout",
"message": "5 minute timeout reached. Newspaper already contains all articles found so far...",
"completed": completed,
"total": total_feeds
})
break
feed_url, feed_name, content, error = await coro
completed += 1
req_per_sec = completed / elapsed if elapsed > 0 else 0
if error:
await websocket.send_json({
"type": "error",
"feed": feed_name,
"url": feed_url,
"error": error,
"completed": completed,
"total": total_feeds,
"req_per_sec": round(req_per_sec, 2)
})
continue
await websocket.send_json({
"type": "fetching",
"feed": feed_name,
"url": feed_url,
"completed": completed,
"total": total_feeds,
"req_per_sec": round(req_per_sec, 2)
})
parsed = feedparser.parse(content)
articles_count = 0
for entry in parsed.entries:
article_data = {
'feed_name': feed_name,
'feed_url': feed_url,
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'description': entry.get('description', '') or entry.get('summary', ''),
'published': entry.get('published', '') or entry.get('updated', ''),
'author': entry.get('author', ''),
'guid': entry.get('id', '') or entry.get('link', ''),
'created_at': datetime.now().isoformat(),
'last_synchronized': datetime.now().isoformat()
}
existing = articles_table.find_one(guid=article_data['guid'])
if not existing:
new_articles.append(article_data)
articles_count += 1
articles_table.upsert(article_data, ['guid'])
# Index the article to ChromaDB
doc_content = f"{article_data.get('title', '')}\n{article_data.get('description', '')}"
metadata = {key: str(value) for key, value in article_data.items() if key != 'content'} # Exclude large content from metadata
chroma_collection.upsert(
documents=[doc_content],
metadatas=[metadata],
ids=[article_data['guid']]
)
total_articles_added += articles_count
feeds_table.update({
'id': [f for f in feeds if f['url'] == feed_url][0]['id'],
'last_synced': datetime.now().isoformat()
}, ['id'])
# Update newspaper immediately after each feed
newspapers_table.update({
'id': newspaper_id,
'article_count': len(new_articles),
'articles_json': json.dumps(new_articles)
}, ['id'])
await websocket.send_json({
"type": "parsed",
"feed": feed_name,
"articles": articles_count,
"completed": completed,
"total": total_feeds,
"total_articles": total_articles_added,
"req_per_sec": round(req_per_sec, 2)
})
elapsed_total = time.time() - start_time
# Record sync log
sync_logs_table = db['sync_logs']
sync_logs_table.insert({
'sync_time': sync_time,
'total_feeds': total_feeds,
'new_articles': total_articles_added,
'elapsed_seconds': round(elapsed_total, 2)
})
await websocket.send_json({
"type": "complete",
"total_feeds": total_feeds,
"total_articles": total_articles_added,
"elapsed": round(elapsed_total, 2),
"avg_req_per_sec": round(total_feeds / elapsed_total if elapsed_total > 0 else 0, 2)
})
except WebSocketDisconnect:
pass
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
## --- API Endpoints ---
@router.post("/api/sync-to-chroma", tags=["API"], status_code=200)
async def sync_all_articles_to_chroma():
"""
Manually synchronizes all articles from the SQLite database to the ChromaDB vector store.
This is useful for initializing the search index with existing data.
"""
articles_table = db['articles']
all_articles = list(articles_table.all())
if not all_articles:
return JSONResponse(content={"status": "noop", "message": "No articles in the database to sync."})
documents, metadatas, ids = [], [], []
for article in all_articles:
# The document is what ChromaDB will search against. A combo of title and content is good.
doc_content = f"{article.get('title', '')}\n{article.get('description', '')}\n{article.get('content', '')}"
# Metadata must have values of type str, int, float, or bool.
metadata = {key: str(value) for key, value in article.items()}
documents.append(doc_content)
metadatas.append(metadata)
ids.append(article['guid'])
# Upsert in batches to be memory-efficient
batch_size = 100
for i in range(0, len(ids), batch_size):
chroma_collection.upsert(
ids=ids[i:i+batch_size],
documents=documents[i:i+batch_size],
metadatas=metadatas[i:i+batch_size]
)
return JSONResponse(content={
"status": "success",
"message": f"Successfully indexed {len(all_articles)} articles to ChromaDB."
})
@router.get("/api/search", tags=["API"])
async def search_articles(
q: str = Query(None, description="The search term to query for."),
limit: int = Query(20, ge=1, le=100, description="The maximum number of results to return."),
page: int = Query(1, ge=1, description="The page number for paginated results (used when 'q' is not provided).")
):
"""
Searches for articles within the ChromaDB vector store.
- If **q** is provided, performs a similarity search based on the query text.
- If **q** is not provided, returns a paginated list of all articles, sorted by insertion order.
"""
if q:
# Perform a similarity search
results = chroma_collection.query(
query_texts=[q],
n_results=limit,
include=['metadatas', 'distances']
)
# Format results into a cleaner list of objects
formatted_results = []
if results and results.get('ids', [[]])[0]:
for i, doc_id in enumerate(results['ids'][0]):
res = results['metadatas'][0][i]
res['distance'] = results['distances'][0][i]
formatted_results.append(res)
return JSONResponse(content={"results": formatted_results})
else:
# Return a paginated list of articles
page_limit = 20
offset = (page - 1) * page_limit
results = chroma_collection.get(
limit=page_limit,
offset=offset,
include=['metadatas']
)
return JSONResponse(content={"results": results['metadatas']})
## --- HTML Page Routes ---
@router.get("/newspapers", response_class=HTMLResponse)
async def newspapers_list(request: Request):
newspapers_table = db['newspapers']
newspapers = list(newspapers_table.all())
newspapers.reverse()
return templates.TemplateResponse("newspapers_list.html", {
"request": request,
"newspapers": newspapers
})
@router.get("/sync-logs", response_class=HTMLResponse)
async def sync_logs_list(request: Request):
sync_logs_table = db['sync_logs']
sync_logs = list(sync_logs_table.all())
sync_logs.reverse()
return templates.TemplateResponse("sync_logs_list.html", {
"request": request,
"sync_logs": sync_logs
})
@router.get("/newspaper/{newspaper_id}", response_class=HTMLResponse)
async def newspaper_view(request: Request, newspaper_id: int):
newspapers_table = db['newspapers']
newspaper = newspapers_table.find_one(id=newspaper_id)
if not newspaper:
return RedirectResponse(url="/newspapers")
articles = json.loads(newspaper['articles_json'])
return templates.TemplateResponse("newspaper_view.html", {
"request": request,
"newspaper": newspaper,
"articles": articles
})
@router.get("/", response_class=HTMLResponse)
async def newspaper_latest(request: Request):
newspapers_table = db['newspapers']
newspaper = None
try:
newspapers= list(db.query("select * from newspapers order by id desc limit 10"))
except IndexError:
pass
articles = []
first_newspaper = newspapers[0]
for newspaper in newspapers:
articles += json.loads(newspaper['articles_json'])
if len(articles) > 30:
for article in articles:
for key, value in article.items():
article[key] = str(value).strip().replace(' ', '')
return templates.TemplateResponse("newspaper_view.html", {
"request": request,
"newspaper": first_newspaper,
"articles": articles
})
return RedirectResponse(url="/newspapers")