From 48b56921a4d2141be7f9ceef5a1afb72fce115c5 Mon Sep 17 00:00:00 2001 From: retoor Date: Fri, 10 Oct 2025 05:06:13 +0200 Subject: [PATCH] Ducksearch --- aiducksearch.py | 125 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 aiducksearch.py diff --git a/aiducksearch.py b/aiducksearch.py new file mode 100644 index 0000000..3826176 --- /dev/null +++ b/aiducksearch.py @@ -0,0 +1,125 @@ +from fastapi import FastAPI, HTTPException,Request +from pydantic import BaseModel +import urllib.request +import urllib.parse +from bs4 import BeautifulSoup +import time +import json +import uvicorn +import functools +app = FastAPI() +import aiohttp +import os +API_KEY = os.getenv("OPENROUTER_API_KEY") +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" +MODEL_NAME = "google/gemma-3-12b-it" + +async def prompt(q,p): + + search_results = search_duckduckgo(q) + + headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"} + payload = {"model": MODEL_NAME, "messages": [{"role": "system", "content": f"You will do with the data what the prompt expects from you. You respond in valid JSON format like {{\"response\":,}}.\n\n# DATA\n{search_results}"},{"role": "user", "content": p}]} + async with aiohttp.ClientSession() as session: + try: + async with session.post(OPENROUTER_URL, headers=headers, json=payload) as response: + response.raise_for_status() + result = await response.json() + raw_content = result["choices"][0]["message"]["content"] + json_start = raw_content.find('{') + json_end = raw_content.rfind('}') + json_content = raw_content[json_start:json_end+1] + return json.loads(json_content) + except Exception as e: + return {"error": str(e)} + + +CACHE = {} + +def manual_cache(expire: int): + """ + A custom decorator to cache the result of a FastAPI route function + based on path and query parameters. + """ + def decorator(func): + @functools.wraps(func) + async def wrapper(request: Request, *args, **kwargs): + sorted_params = sorted(request.query_params.items()) + key = f"{request.url.path}?{sorted_params}" + current_time = time.time() + if key in CACHE: + result, timestamp = CACHE[key] + if current_time - timestamp < expire: + print(f"✅ CACHE HIT for key: {key}") + return result + else: + print(f"⌛️ CACHE EXPIRED for key: {key}") + print(f"❌ CACHE MISS for key: {key}") + new_result = await func(request=request, *args, **kwargs) + CACHE[key] = (new_result, current_time) + return new_result + return wrapper + return decorator + +# + + +class SearchQuery(BaseModel): + query: str + +class FetchQuery(BaseModel): + url: str + +def web_request(url: str) -> str: + """Fetch and return content from a URL.""" + try: + request = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0 (compatible; AI Assistant)'}) + with urllib.request.urlopen(request) as response: + return response.read().decode('utf-8') + except Exception as error: + return f"Error: {str(error)}" + + +def search_duckduckgo(query: str) -> str: + """Search DuckDuckGo and extract snippets.""" + query_encoded = urllib.parse.quote(query) + url = f"https://duckduckgo.com/html/?q={query_encoded}" + content = web_request(url) + if "Error" in content: + return content + try: + soup = BeautifulSoup(content, 'html.parser') + results = [] + for result in soup.find_all('div', class_='result')[:5]: # Top 5 results + title_tag = result.find('h2', class_='result__title') + title = title_tag.get_text() if title_tag else "No title" + snippet_tag = result.find('a', class_='result__snippet') + snippet = snippet_tag.get_text() if snippet_tag else "No snippet" + link_tag = result.find('a', class_='result__url') + link = link_tag.get_text() if link_tag else "No link" + results.append({"Title": title, "Snippet": snippet, "Link": link}) + return json.dumps(results, indent=2, default=str) + except ImportError: + # If bs4 not available, fallback to grep-like extraction + lines = content.split('\n') + snippets = [line for line in lines if 'pony' in line.lower()][:10] + return "\n".join(snippets) or "No relevant snippets found." + +@app.get("/search") +@manual_cache(expire=60*60) +async def search_endpoint(query: str,p:str=None,request:Request=None): + if not query: + raise HTTPException(status_code=400, detail="Query parameter is required") + if p: + return await prompt(query,p) + return search_duckduckgo(query) + +@app.get("/fetch") +def fetch_endpoint(url: str): + if not url: + raise HTTPException(status_code=400, detail="URL parameter is required") + return web_request(url) + +if __name__ == "__main__": + uvicorn.run(app, host="localhost", port=8777) +