126 lines
4.7 KiB
Python
126 lines
4.7 KiB
Python
|
|
from fastapi import FastAPI, HTTPException,Request
|
||
|
|
from pydantic import BaseModel
|
||
|
|
import urllib.request
|
||
|
|
import urllib.parse
|
||
|
|
from bs4 import BeautifulSoup
|
||
|
|
import time
|
||
|
|
import json
|
||
|
|
import uvicorn
|
||
|
|
import functools
|
||
|
|
app = FastAPI()
|
||
|
|
import aiohttp
|
||
|
|
import os
|
||
|
|
API_KEY = os.getenv("OPENROUTER_API_KEY")
|
||
|
|
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
|
||
|
|
MODEL_NAME = "google/gemma-3-12b-it"
|
||
|
|
|
||
|
|
async def prompt(q,p):
|
||
|
|
|
||
|
|
search_results = search_duckduckgo(q)
|
||
|
|
|
||
|
|
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
|
||
|
|
payload = {"model": MODEL_NAME, "messages": [{"role": "system", "content": f"You will do with the data what the prompt expects from you. You respond in valid JSON format like {{\"response\":,<response_str>}}.\n\n# DATA\n{search_results}"},{"role": "user", "content": p}]}
|
||
|
|
async with aiohttp.ClientSession() as session:
|
||
|
|
try:
|
||
|
|
async with session.post(OPENROUTER_URL, headers=headers, json=payload) as response:
|
||
|
|
response.raise_for_status()
|
||
|
|
result = await response.json()
|
||
|
|
raw_content = result["choices"][0]["message"]["content"]
|
||
|
|
json_start = raw_content.find('{')
|
||
|
|
json_end = raw_content.rfind('}')
|
||
|
|
json_content = raw_content[json_start:json_end+1]
|
||
|
|
return json.loads(json_content)
|
||
|
|
except Exception as e:
|
||
|
|
return {"error": str(e)}
|
||
|
|
|
||
|
|
|
||
|
|
CACHE = {}
|
||
|
|
|
||
|
|
def manual_cache(expire: int):
|
||
|
|
"""
|
||
|
|
A custom decorator to cache the result of a FastAPI route function
|
||
|
|
based on path and query parameters.
|
||
|
|
"""
|
||
|
|
def decorator(func):
|
||
|
|
@functools.wraps(func)
|
||
|
|
async def wrapper(request: Request, *args, **kwargs):
|
||
|
|
sorted_params = sorted(request.query_params.items())
|
||
|
|
key = f"{request.url.path}?{sorted_params}"
|
||
|
|
current_time = time.time()
|
||
|
|
if key in CACHE:
|
||
|
|
result, timestamp = CACHE[key]
|
||
|
|
if current_time - timestamp < expire:
|
||
|
|
print(f"✅ CACHE HIT for key: {key}")
|
||
|
|
return result
|
||
|
|
else:
|
||
|
|
print(f"⌛️ CACHE EXPIRED for key: {key}")
|
||
|
|
print(f"❌ CACHE MISS for key: {key}")
|
||
|
|
new_result = await func(request=request, *args, **kwargs)
|
||
|
|
CACHE[key] = (new_result, current_time)
|
||
|
|
return new_result
|
||
|
|
return wrapper
|
||
|
|
return decorator
|
||
|
|
|
||
|
|
#
|
||
|
|
|
||
|
|
|
||
|
|
class SearchQuery(BaseModel):
|
||
|
|
query: str
|
||
|
|
|
||
|
|
class FetchQuery(BaseModel):
|
||
|
|
url: str
|
||
|
|
|
||
|
|
def web_request(url: str) -> str:
|
||
|
|
"""Fetch and return content from a URL."""
|
||
|
|
try:
|
||
|
|
request = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (compatible; AI Assistant)'})
|
||
|
|
with urllib.request.urlopen(request) as response:
|
||
|
|
return response.read().decode('utf-8')
|
||
|
|
except Exception as error:
|
||
|
|
return f"Error: {str(error)}"
|
||
|
|
|
||
|
|
|
||
|
|
def search_duckduckgo(query: str) -> str:
|
||
|
|
"""Search DuckDuckGo and extract snippets."""
|
||
|
|
query_encoded = urllib.parse.quote(query)
|
||
|
|
url = f"https://duckduckgo.com/html/?q={query_encoded}"
|
||
|
|
content = web_request(url)
|
||
|
|
if "Error" in content:
|
||
|
|
return content
|
||
|
|
try:
|
||
|
|
soup = BeautifulSoup(content, 'html.parser')
|
||
|
|
results = []
|
||
|
|
for result in soup.find_all('div', class_='result')[:5]: # Top 5 results
|
||
|
|
title_tag = result.find('h2', class_='result__title')
|
||
|
|
title = title_tag.get_text() if title_tag else "No title"
|
||
|
|
snippet_tag = result.find('a', class_='result__snippet')
|
||
|
|
snippet = snippet_tag.get_text() if snippet_tag else "No snippet"
|
||
|
|
link_tag = result.find('a', class_='result__url')
|
||
|
|
link = link_tag.get_text() if link_tag else "No link"
|
||
|
|
results.append({"Title": title, "Snippet": snippet, "Link": link})
|
||
|
|
return json.dumps(results, indent=2, default=str)
|
||
|
|
except ImportError:
|
||
|
|
# If bs4 not available, fallback to grep-like extraction
|
||
|
|
lines = content.split('\n')
|
||
|
|
snippets = [line for line in lines if 'pony' in line.lower()][:10]
|
||
|
|
return "\n".join(snippets) or "No relevant snippets found."
|
||
|
|
|
||
|
|
@app.get("/search")
|
||
|
|
@manual_cache(expire=60*60)
|
||
|
|
async def search_endpoint(query: str,p:str=None,request:Request=None):
|
||
|
|
if not query:
|
||
|
|
raise HTTPException(status_code=400, detail="Query parameter is required")
|
||
|
|
if p:
|
||
|
|
return await prompt(query,p)
|
||
|
|
return search_duckduckgo(query)
|
||
|
|
|
||
|
|
@app.get("/fetch")
|
||
|
|
def fetch_endpoint(url: str):
|
||
|
|
if not url:
|
||
|
|
raise HTTPException(status_code=400, detail="URL parameter is required")
|
||
|
|
return web_request(url)
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
uvicorn.run(app, host="localhost", port=8777)
|
||
|
|
|