# Written by retoor@molodetz.nl # This FastAPI application provides a backend API for a note-taking service called Ada Notes. The application allows for creating, listing, and managing notes, as well as handling file uploads and serving a frontend application for the service. # The external libraries used in this code include FastAPI for the web framework, Uvicorn as the ASGI server, Dataset for database operations, and additional packages for multipart file handling. # MIT License # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import asyncio from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse import dataset from datetime import datetime from uuid import uuid4 import os from typing import List, Optional, Dict, Any import pathlib from pathlib import Path from contextlib import asynccontextmanager BASE_DIR = Path(__file__).parent DB_URL = "sqlite:///" + str(BASE_DIR / "notes.db") UPLOAD_DIR = pathlib.Path(".").joinpath("uploads") FRONTEND_DIR = BASE_DIR / "frontend" FRONTEND_INDEX = FRONTEND_DIR / "index.html" UPLOAD_DIR.mkdir(parents=True, exist_ok=True) _semaphore = asyncio.Semaphore(10) @asynccontextmanager async def db_session(transaction=False): if transaction: async with _semaphore: db_ = dataset.connect(DB_URL) db_.begin() try: yield db_ finally: db_.commit() db_.close() else: db_ = dataset.connect(DB_URL) try: yield db_ finally: db_.close() app = FastAPI(title="Ada Notes API", version="1.1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory=UPLOAD_DIR), name="static") if pathlib.Path(FRONTEND_DIR).exists(): app.mount("/assets", StaticFiles(directory=FRONTEND_DIR), name="assets") async def ensure_tables_exist(): async with db_session() as db: # Check if 'notes' table exists if 'notes' not in db.tables: db.query(""" CREATE TABLE notes ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, body TEXT, created_at TEXT, updated_at TEXT ) """) # Check if 'tags' table exists if 'tags' not in db.tables: db.query(""" CREATE TABLE tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE ) """) # Check if 'note_tags' table exists if 'note_tags' not in db.tables: db.query(""" CREATE TABLE note_tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, note_id INTEGER, tag TEXT, FOREIGN KEY(note_id) REFERENCES notes(id) ) """) @app.on_event("startup") async def init_search_index(): await ensure_tables_exist() db = dataset.connect(DB_URL) # Create the FTS5 virtual table over notes.title and notes.body db.query(""" CREATE VIRTUAL TABLE IF NOT EXISTS notes_fts USING fts5(title, body, content='notes', content_rowid='id'); """) # Triggers to keep it in sync db.query(""" CREATE TRIGGER IF NOT EXISTS notes_ai AFTER INSERT ON notes BEGIN INSERT INTO notes_fts(rowid, title, body) VALUES (new.id, new.title, new.body); END; """) db.query(""" CREATE TRIGGER IF NOT EXISTS notes_ad AFTER DELETE ON notes BEGIN DELETE FROM notes_fts WHERE rowid = old.id; END; """) db.query(""" CREATE TRIGGER IF NOT EXISTS notes_au AFTER UPDATE ON notes BEGIN UPDATE notes_fts SET title = new.title, body = new.body WHERE rowid = old.id; END; """) db.close() async def _serialize_note(row: Dict[str, Any]) -> Dict[str, Any]: if not row: return {} note_id = row["id"] score = row.get("score") async with db_session() as db: atts = list(db['attachments'].find(note_id=note_id)) tags = [rt["tag"] for rt in db['note_tags'].find(note_id=note_id)] result = { "id": note_id, "title": row.get("title", ""), "body": row.get("body", ""), "created_at": row.get("created_at"), "updated_at": row.get("updated_at"), "attachments": atts, "tags": tags, } if score is not None: result["score"] = score return result @app.get("/api/search") async def search_notes(q: str = "", tag: Optional[str] = None): """ Full-text search with prefix matching and BM25 scoring. Optional tag filter. """ q = q.strip() if not q: return [] # build an FTS5 prefix query: each term appended with '*' terms = [t for t in q.split() if t] fts_query = " ".join(f"{t}*" for t in terms) async with db_session() as db: if tag: rows = list(db.query(""" SELECT notes.*, bm25(notes_fts) AS score FROM notes_fts JOIN notes ON notes_fts.rowid = notes.id JOIN note_tags ON notes.id = note_tags.note_id WHERE notes_fts MATCH :q AND note_tags.tag = :tag GROUP BY notes.id ORDER BY score """, q=fts_query, tag=tag)) else: rows = list(db.query(""" SELECT notes.*, bm25(notes_fts) AS score FROM notes_fts JOIN notes ON notes_fts.rowid = notes.id WHERE notes_fts MATCH :q ORDER BY score """, q=fts_query)) return [await _serialize_note(r) for r in rows] @app.get("/api/notes") async def list_notes(tag: Optional[str] = None, search: Optional[str] = None): """ List notes. Supports: - ?tag=foo to filter by tag - ?search=term to full-text-search title+body with prefix & scoring """ async with db_session() as db: if search: terms = [t for t in search.split() if t] fts_query = " ".join(f"{t}*" for t in terms) if tag: rows = list(db.query(""" SELECT notes.*, bm25(notes_fts) AS score FROM notes_fts JOIN notes ON notes_fts.rowid = notes.id JOIN note_tags ON notes.id = note_tags.note_id WHERE notes_fts MATCH :q AND note_tags.tag = :tag GROUP BY notes.id ORDER BY score """, q=fts_query, tag=tag)) else: rows = list(db.query(""" SELECT notes.*, bm25(notes_fts) AS score FROM notes_fts JOIN notes ON notes_fts.rowid = notes.id WHERE notes_fts MATCH :q ORDER BY score """, q=fts_query)) elif tag: note_ids = [nt["note_id"] for nt in db['note_tags'].find(tag=tag)] rows = [db['notes'].find_one(id=nid) for nid in note_ids] else: rows = list(db['notes'].all()) # If no FTS scoring, sort by creation date if rows and "score" not in rows[0]: rows.sort(key=lambda r: r["created_at"], reverse=True) return [await _serialize_note(r) for r in rows] @app.post("/api/notes") async def create_note(payload: Dict[str, Any]): return await _upsert_note(None, payload) @app.put("/api/notes/{note_id}") async def update_note(note_id: int, payload: Dict[str, Any]): async with db_session() as db: if not db['notes'].find_one(id=note_id): raise HTTPException(404, "Note not found") return await _upsert_note(note_id, payload) async def _upsert_note(note_id: Optional[int], payload: Dict[str, Any]): async with db_session() as db: title = payload.get("title", "").strip() body = payload.get("body", "") tags: List[str] = payload.get("tags", []) atts: List[Dict[str, str]] = payload.get("attachments", []) now = datetime.utcnow().isoformat() if note_id is None: note_id = db['notes'].insert({"title": title, "body": body, "created_at": now, "updated_at": now}) else: db['notes'].update({"id": note_id, "title": title, "body": body, "updated_at": now}, ["id"]) db['attachments'].delete(note_id=note_id) db['note_tags'].delete(note_id=note_id) # (Re‑)insert into FTS table for t in tags: t = t.strip() if not t: continue if not db['tags'].find_one(name=t): db['tags'].insert({"name": t}) db['note_tags'].insert({"note_id": note_id, "tag": t}) for att in atts: db['attachments'].insert({"note_id": note_id, "url": att.get("url"), "type": att.get("type", "file")}) return await _serialize_note(db['notes'].find_one(id=note_id)) @app.get("/api/tags") async def list_tags(): async with db_session() as db: return [{"name": row["name"]} for row in db['tags'].all()] @app.post("/api/upload") async def upload(file: UploadFile = File(...)): filename = f"{uuid4().hex}_{file.filename}" filepath = UPLOAD_DIR.joinpath(filename) with filepath.open("wb") as buffer: while chunk := await file.read(1024 * 1024): buffer.write(chunk) content_type = file.content_type or "application/octet-stream" ftype = "image" if content_type.startswith("image/") else "file" return {"url": f"/static/{filename}", "type": ftype} @app.get("/", response_class=FileResponse, include_in_schema=False) async def index(): if not pathlib.Path(FRONTEND_INDEX).exists(): raise HTTPException(status_code=404, detail="index.html not found. Place your frontend build in the 'frontend' directory.") return FileResponse(FRONTEND_INDEX, media_type="text/html") @app.get("/api/health") async def health(): return {"status": "ok"}