This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Written by retoor@molodetz.nl
# This FastAPI application provides a backend API for a note-taking service called Ada Notes. The application allows for creating, listing, and managing notes, as well as handling file uploads and serving a frontend application for the service.
# The external libraries used in this code include FastAPI for the web framework, Uvicorn as the ASGI server, Dataset for database operations, and additional packages for multipart file handling.
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import asyncio
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import dataset
from datetime import datetime
from uuid import uuid4
import os
from typing import List, Optional, Dict, Any
import pathlib
from pathlib import Path
from contextlib import asynccontextmanager
BASE_DIR = Path(__file__).parent
DB_URL = "sqlite:///" + str(BASE_DIR / "notes.db")
UPLOAD_DIR = pathlib.Path(".").joinpath("uploads")
FRONTEND_DIR = BASE_DIR / "frontend"
FRONTEND_INDEX = FRONTEND_DIR / "index.html"
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
_semaphore = asyncio.Semaphore(10)
@asynccontextmanager
async def db_session(transaction=False):
if transaction:
async with _semaphore:
db_ = dataset.connect(DB_URL)
db_.begin()
try:
yield db_
finally:
db_.commit()
db_.close()
else:
db_ = dataset.connect(DB_URL)
try:
yield db_
finally:
db_.close()
app = FastAPI(title="Ada Notes API", version="1.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory=UPLOAD_DIR), name="static")
if pathlib.Path(FRONTEND_DIR).exists():
app.mount("/assets", StaticFiles(directory=FRONTEND_DIR), name="assets")
@app.on_event("startup")
async def init_search_index():
db = dataset.connect(DB_URL)
# Create the FTS5 virtual table over notes.title and notes.body
db.query("""
CREATE VIRTUAL TABLE IF NOT EXISTS notes_fts
USING fts5(title, body, content='notes', content_rowid='id');
""")
# Triggers to keep it in sync
db.query("""
CREATE TRIGGER IF NOT EXISTS notes_ai AFTER INSERT ON notes
BEGIN
INSERT INTO notes_fts(rowid, title, body)
VALUES (new.id, new.title, new.body);
END;
""")
db.query("""
CREATE TRIGGER IF NOT EXISTS notes_ad AFTER DELETE ON notes
BEGIN
DELETE FROM notes_fts WHERE rowid = old.id;
END;
""")
db.query("""
CREATE TRIGGER IF NOT EXISTS notes_au AFTER UPDATE ON notes
BEGIN
UPDATE notes_fts SET title = new.title, body = new.body
WHERE rowid = old.id;
END;
""")
db.close()
@app.get("/api/search")
async def search_notes():
q = request.args.get("q", "")
tag = request.args.get("tag", "")
async with db_session() as db:
# Step 1: fulltext match → candidate note IDs
fts_rows = list(db.query("SELECT note_id FROM notes_fts WHERE notes_fts MATCH :query", query=q))
note_ids = {r["note_id"] for r in fts_rows}
if not note_ids:
return [] # early exit no matches
# Step 2: optional tag filtering (intersection)
if tag:
tag = [t.strip() for t in tag if t.strip()]
if not tag:
raise HTTPException(400, "Tag filter provided but empty after stripping")
tagged_ids = {
nt["note_id"]
for t in tag
for nt in db['note_tags'].find(tag=t)
}
note_ids &= tagged_ids
# Fetch & serialize
rows = [db['notes'].find_one(id=nid) for nid in note_ids]
rows.sort(key=lambda r: r["updated_at"], reverse=True)
return [await _serialize_note(r) for r in rows if r]
async def _serialize_note(row: Dict[str, Any]) -> Dict[str, Any]:
if not row:
return {}
note_id = row["id"]
async with db_session() as db:
atts = list(db['attachments'].find(note_id=note_id))
tags = [rt["tag"] for rt in db['note_tags'].find(note_id=note_id)]
return {
"id": note_id,
"title": row.get("title", ""),
"body": row.get("body", ""),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"attachments": atts,
"tags": tags,
}
@app.get("/api/notes")
async def list_notes(tag: Optional[str] = None, search: Optional[str] = None):
"""
List notes. Supports:
- ?tag=foo to filter by tag
- ?search=term to full-text-search title+body
"""
async with db_session() as db:
if search:
# FTS5 MATCH query
rows = list(db.query(
"SELECT notes.* FROM notes_fts "
"JOIN notes ON notes_fts.rowid = notes.id "
"WHERE notes_fts MATCH :q",
q=search
))
elif tag:
note_ids = [nt["note_id"] for nt in db['note_tags'].find(tag=tag)]
rows = [db['notes'].find_one(id=nid) for nid in note_ids]
else:
rows = list(db['notes'].all())
# sort & serialize
rows = [r for r in rows if r]
rows.sort(key=lambda r: r["created_at"], reverse=True)
return [await _serialize_note(r) for r in rows]
@app.post("/api/notes")
async def create_note(payload: Dict[str, Any]):
return await _upsert_note(None, payload)
@app.put("/api/notes/{note_id}")
async def update_note(note_id: int, payload: Dict[str, Any]):
async with db_session() as db:
if not db['notes'].find_one(id=note_id):
raise HTTPException(404, "Note not found")
return await _upsert_note(note_id, payload)
async def _upsert_note(note_id: Optional[int], payload: Dict[str, Any]):
async with db_session() as db:
title = payload.get("title", "").strip()
body = payload.get("body", "")
tags: List[str] = payload.get("tags", [])
atts: List[Dict[str, str]] = payload.get("attachments", [])
now = datetime.utcnow().isoformat()
if note_id is None:
note_id = db['notes'].insert({"title": title, "body": body, "created_at": now, "updated_at": now})
else:
db['notes'].update({"id": note_id, "title": title, "body": body, "updated_at": now}, ["id"])
db['attachments'].delete(note_id=note_id)
db['note_tags'].delete(note_id=note_id)
db.query("DELETE FROM notes_fts WHERE note_id = :nid", nid=note_id)
# (Re)insert into FTS table
db.query(
"INSERT INTO notes_fts (title, body, note_id) VALUES (:title, :body, :nid)",
title=title,
body=body,
nid=note_id,
)
for t in tags:
t = t.strip()
if not t:
continue
if not db['tags'].find_one(name=t):
db['tags'].insert({"name": t})
db['note_tags'].insert({"note_id": note_id, "tag": t})
for att in atts:
db['attachments'].insert({"note_id": note_id, "url": att.get("url"), "type": att.get("type", "file")})
return await _serialize_note(db['notes'].find_one(id=note_id))
@app.get("/api/tags")
async def list_tags():
async with db_session() as db:
return [{"name": row["name"]} for row in db['tags'].all()]
@app.post("/api/upload")
async def upload(file: UploadFile = File(...)):
filename = f"{uuid4().hex}_{file.filename}"
filepath = UPLOAD_DIR.joinpath(filename)
with filepath.open("wb") as buffer:
while chunk := await file.read(1024 * 1024):
buffer.write(chunk)
content_type = file.content_type or "application/octet-stream"
ftype = "image" if content_type.startswith("image/") else "file"
return {"url": f"/static/{filename}", "type": ftype}
@app.get("/", response_class=FileResponse, include_in_schema=False)
async def index():
if not pathlib.Path(FRONTEND_INDEX).exists():
raise HTTPException(status_code=404, detail="index.html not found. Place your frontend build in the 'frontend' directory.")
return FileResponse(FRONTEND_INDEX, media_type="text/html")
@app.get("/api/health")
async def health():
return {"status": "ok"}