from fastapi import APIRouter, Request, UploadFile, File, Form, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
import dataset
import json
import aiohttp
import feedparser
import asyncio
from datetime import datetime
import time
import trafilatura
router = APIRouter()
templates = Jinja2Templates(directory="templates")
db = dataset.connect('sqlite:///feeds.db')
# Browser-like headers
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
async def fetch_article_content(session, url):
try:
async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=10)) as response:
html = await response.text()
clean_text = trafilatura.extract(html)
return clean_text if clean_text else ""
except:
return ""
async def perform_sync():
feeds_table = db['feeds']
articles_table = db['articles']
feeds = list(feeds_table.all())
total_feeds = len(feeds)
completed = 0
total_articles_added = 0
total_articles_processed = 0
failed_feeds = 0
new_articles = []
start_time = time.time()
timeout = 300 # 5 minutes
# Create newspaper immediately at start
newspapers_table = db['newspapers']
sync_time = datetime.now().isoformat()
newspaper_id = newspapers_table.insert({
'created_at': sync_time,
'article_count': 0,
'articles_json': json.dumps([])
})
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for feed in feeds:
tasks.append(fetch_feed(session, feed['url'], feed['name']))
for coro in asyncio.as_completed(tasks):
elapsed = time.time() - start_time
if elapsed >= timeout:
break
feed_url, feed_name, content, error = await coro
completed += 1
if error:
failed_feeds += 1
continue
parsed = feedparser.parse(content)
for entry in parsed.entries:
total_articles_processed += 1
article_link = entry.get('link', '')
# Extract clean text content
clean_text = await fetch_article_content(session, article_link) if article_link else ""
article_data = {
'feed_name': feed_name,
'feed_url': feed_url,
'title': entry.get('title', ''),
'link': article_link,
'description': entry.get('description', '') or entry.get('summary', ''),
'content': clean_text,
'published': entry.get('published', '') or entry.get('updated', ''),
'author': entry.get('author', ''),
'guid': entry.get('id', '') or entry.get('link', ''),
'created_at': datetime.now().isoformat(),
'last_synchronized': datetime.now().isoformat()
}
existing = articles_table.find_one(guid=article_data['guid'])
if not existing:
new_articles.append(article_data)
total_articles_added += 1
articles_table.upsert(article_data, ['guid'])
feeds_table.update({
'id': [f for f in feeds if f['url'] == feed_url][0]['id'],
'last_synced': datetime.now().isoformat()
}, ['id'])
# Update newspaper immediately after each feed
newspapers_table.update({
'id': newspaper_id,
'article_count': len(new_articles),
'articles_json': json.dumps(new_articles)
}, ['id'])
elapsed_total = time.time() - start_time
avg_req_per_sec = completed / elapsed_total if elapsed_total > 0 else 0
# Record sync log with detailed statistics
sync_logs_table = db['sync_logs']
sync_logs_table.insert({
'sync_time': sync_time,
'total_feeds': total_feeds,
'completed_feeds': completed,
'failed_feeds': failed_feeds,
'total_articles_processed': total_articles_processed,
'new_articles': total_articles_added,
'elapsed_seconds': round(elapsed_total, 2),
'avg_req_per_sec': round(avg_req_per_sec, 2),
'timed_out': elapsed_total >= timeout
})
return {
'total_feeds': total_feeds,
'total_articles': total_articles_added,
'elapsed': elapsed_total,
'new_articles': new_articles
}
async def run_sync_task():
return await perform_sync()
@router.get("/config", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@router.get("/manage", response_class=HTMLResponse)
async def manage_list(request: Request):
feeds_table = db['feeds']
feeds = list(feeds_table.all())
return templates.TemplateResponse("manage_list.html", {
"request": request,
"feeds": feeds,
"total": len(feeds)
})
@router.get("/manage/create", response_class=HTMLResponse)
async def manage_create_page(request: Request):
return templates.TemplateResponse("manage_create.html", {"request": request})
@router.post("/manage/create")
async def manage_create(
name: str = Form(...),
url: str = Form(...),
type: str = Form(...),
category: str = Form(...)
):
feeds_table = db['feeds']
feeds_table.insert({
'name': name,
'url': url,
'type': type,
'category': category
})
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/upload", response_class=HTMLResponse)
async def manage_upload_page(request: Request):
return templates.TemplateResponse("manage_upload.html", {"request": request})
@router.post("/manage/upload")
async def manage_upload(file: UploadFile = File(...)):
content = await file.read()
feeds_data = json.loads(content)
feeds_table = db['feeds']
for feed in feeds_data:
feeds_table.upsert(feed, ['url'])
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/edit/{feed_id}", response_class=HTMLResponse)
async def manage_edit_page(request: Request, feed_id: int):
feeds_table = db['feeds']
feed = feeds_table.find_one(id=feed_id)
return templates.TemplateResponse("manage_edit.html", {
"request": request,
"feed": feed
})
@router.post("/manage/edit/{feed_id}")
async def manage_edit(
feed_id: int,
name: str = Form(...),
url: str = Form(...),
type: str = Form(...),
category: str = Form(...)
):
feeds_table = db['feeds']
feeds_table.update({
'id': feed_id,
'name': name,
'url': url,
'type': type,
'category': category
}, ['id'])
return RedirectResponse(url="/manage", status_code=303)
@router.post("/manage/delete/{feed_id}")
async def manage_delete(feed_id: int):
feeds_table = db['feeds']
feeds_table.delete(id=feed_id)
return RedirectResponse(url="/manage", status_code=303)
@router.get("/manage/sync", response_class=HTMLResponse)
async def manage_sync_page(request: Request):
feeds_table = db['feeds']
total_feeds = len(list(feeds_table.all()))
articles_table = db['articles']
total_articles = len(list(articles_table.all()))
return templates.TemplateResponse("manage_sync.html", {
"request": request,
"total_feeds": total_feeds,
"total_articles": total_articles
})
async def fetch_feed(session, feed_url, feed_name):
try:
async with session.get(feed_url, headers=HEADERS, timeout=aiohttp.ClientTimeout(total=30)) as response:
content = await response.text()
return feed_url, feed_name, content, None
except Exception as e:
return feed_url, feed_name, None, str(e)
@router.websocket("/ws/sync")
async def websocket_sync(websocket: WebSocket):
await websocket.accept()
try:
feeds_table = db['feeds']
articles_table = db['articles']
feeds = list(feeds_table.all())
total_feeds = len(feeds)
completed = 0
total_articles_added = 0
new_articles = []
start_time = time.time()
timeout = 300 # 5 minutes
await websocket.send_json({
"type": "start",
"total": total_feeds,
"message": f"Starting synchronization of {total_feeds} feeds (5 minute timeout)..."
})
# Create newspaper immediately at start
newspapers_table = db['newspapers']
sync_time = datetime.now().isoformat()
newspaper_id = newspapers_table.insert({
'created_at': sync_time,
'article_count': 0,
'articles_json': json.dumps([])
})
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = []
for feed in feeds:
tasks.append(fetch_feed(session, feed['url'], feed['name']))
for coro in asyncio.as_completed(tasks):
elapsed = time.time() - start_time
if elapsed >= timeout:
await websocket.send_json({
"type": "timeout",
"message": "5 minute timeout reached. Newspaper already contains all articles found so far...",
"completed": completed,
"total": total_feeds
})
break
feed_url, feed_name, content, error = await coro
completed += 1
req_per_sec = completed / elapsed if elapsed > 0 else 0
if error:
await websocket.send_json({
"type": "error",
"feed": feed_name,
"url": feed_url,
"error": error,
"completed": completed,
"total": total_feeds,
"req_per_sec": round(req_per_sec, 2)
})
continue
await websocket.send_json({
"type": "fetching",
"feed": feed_name,
"url": feed_url,
"completed": completed,
"total": total_feeds,
"req_per_sec": round(req_per_sec, 2)
})
parsed = feedparser.parse(content)
articles_count = 0
for entry in parsed.entries:
article_data = {
'feed_name': feed_name,
'feed_url': feed_url,
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'description': entry.get('description', '') or entry.get('summary', ''),
'published': entry.get('published', '') or entry.get('updated', ''),
'author': entry.get('author', ''),
'guid': entry.get('id', '') or entry.get('link', ''),
'created_at': datetime.now().isoformat(),
'last_synchronized': datetime.now().isoformat()
}
existing = articles_table.find_one(guid=article_data['guid'])
if not existing:
new_articles.append(article_data)
articles_count += 1
articles_table.upsert(article_data, ['guid'])
total_articles_added += articles_count
feeds_table.update({
'id': [f for f in feeds if f['url'] == feed_url][0]['id'],
'last_synced': datetime.now().isoformat()
}, ['id'])
# Update newspaper immediately after each feed
newspapers_table.update({
'id': newspaper_id,
'article_count': len(new_articles),
'articles_json': json.dumps(new_articles)
}, ['id'])
await websocket.send_json({
"type": "parsed",
"feed": feed_name,
"articles": articles_count,
"completed": completed,
"total": total_feeds,
"total_articles": total_articles_added,
"req_per_sec": round(req_per_sec, 2)
})
elapsed_total = time.time() - start_time
# Record sync log
sync_logs_table = db['sync_logs']
sync_logs_table.insert({
'sync_time': sync_time,
'total_feeds': total_feeds,
'new_articles': total_articles_added,
'elapsed_seconds': round(elapsed_total, 2)
})
await websocket.send_json({
"type": "complete",
"total_feeds": total_feeds,
"total_articles": total_articles_added,
"elapsed": round(elapsed_total, 2),
"avg_req_per_sec": round(total_feeds / elapsed_total if elapsed_total > 0 else 0, 2)
})
except WebSocketDisconnect:
pass
except Exception as e:
await websocket.send_json({
"type": "error",
"message": str(e)
})
@router.get("/newspapers", response_class=HTMLResponse)
async def newspapers_list(request: Request):
newspapers_table = db['newspapers']
newspapers = list(newspapers_table.all())
newspapers.reverse()
return templates.TemplateResponse("newspapers_list.html", {
"request": request,
"newspapers": newspapers
})
@router.get("/sync-logs", response_class=HTMLResponse)
async def sync_logs_list(request: Request):
sync_logs_table = db['sync_logs']
sync_logs = list(sync_logs_table.all())
sync_logs.reverse()
return templates.TemplateResponse("sync_logs_list.html", {
"request": request,
"sync_logs": sync_logs
})
@router.get("/newspaper/{newspaper_id}", response_class=HTMLResponse)
async def newspaper_view(request: Request, newspaper_id: int):
newspapers_table = db['newspapers']
newspaper = newspapers_table.find_one(id=newspaper_id)
if not newspaper:
return RedirectResponse(url="/newspapers")
articles = json.loads(newspaper['articles_json'])
return templates.TemplateResponse("newspaper_view.html", {
"request": request,
"newspaper": newspaper,
"articles": articles
})
@router.get("/", response_class=HTMLResponse)
async def newspaper_latest(request: Request):
newspapers_table = db['newspapers']
newspaper = None
try:
newspaper = list(db.query("select * from newspapers order by id desc limit 1"))[0]
except IndexError:
pass
if not newspaper:
return RedirectResponse(url="/newspapers")
articles = json.loads(newspaper['articles_json'])
return templates.TemplateResponse("newspaper_view.html", {
"request": request,
"newspaper": newspaper,
"articles": articles
})