import imghdr
import random
import requests
from typing import Optional, Dict, Any
# Realistic User-Agents
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0",
]
def get_default_headers():
"""Get default realistic headers with variations."""
accept_languages = [
"en-US,en;q=0.5",
"en-US,en;q=0.9",
"en-GB,en;q=0.5",
"en-US,en;q=0.5;fr;q=0.3",
]
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": random.choice(accept_languages),
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
# Sometimes add Cache-Control
if random.random() < 0.3:
headers["Cache-Control"] = "no-cache"
# Sometimes add Referer
if random.random() < 0.2:
headers["Referer"] = "https://www.google.com/"
return headers
def http_fetch(url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
"""Fetch content from an HTTP URL.
Args:
url: The URL to fetch.
headers: Optional HTTP headers.
Returns:
Dict with status and content.
"""
try:
default_headers = get_default_headers()
if headers:
default_headers.update(headers)
response = requests.get(url, headers=default_headers, timeout=30)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
content_type = response.headers.get("Content-Type", "").lower()
if "text" in content_type or "json" in content_type or "xml" in content_type:
content = response.text
return {"status": "success", "content": content[:10000]}
else:
content = response.content
return {"status": "success", "content": content}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": str(e)}
def download_to_file(
source_url: str, destination_path: str, headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Download content from an HTTP URL to a file.
Args:
source_url: The URL to download from.
destination_path: The path to save the downloaded content.
headers: Optional HTTP headers.
Returns:
Dict with status, downloaded_from, and downloaded_to on success, or status and error on failure.
This function can be used for binary files like images as well.
"""
try:
default_headers = get_default_headers()
if headers:
default_headers.update(headers)
response = requests.get(source_url, headers=default_headers, stream=True, timeout=60)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
with open(destination_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
content_type = response.headers.get("Content-Type", "").lower()
if content_type.startswith("image/"):
img_type = imghdr.what(destination_path)
if img_type is None:
return {
"status": "success",
"downloaded_from": source_url,
"downloaded_to": destination_path,
"is_valid_image": False,
"warning": "Downloaded content is not a valid image, consider finding a different source.",
}
else:
return {
"status": "success",
"downloaded_from": source_url,
"downloaded_to": destination_path,
"is_valid_image": True,
}
else:
return {
"status": "success",
"downloaded_from": source_url,
"downloaded_to": destination_path,
}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": str(e)}
def _perform_search(
base_url: str, query: str, params: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
try:
default_headers = get_default_headers()
search_params = {"query": query}
if params:
search_params.update(params)
response = requests.get(base_url, headers=default_headers, params=search_params, timeout=30)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
return {"status": "success", "content": response.json()}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": str(e)}
def web_search(query: str) -> Dict[str, Any]:
"""Perform a web search.
Args:
query: Search query.
Returns:
Dict with status and search results.
"""
base_url = "https://search.molodetz.nl/search"
return _perform_search(base_url, query)
def web_search_news(query: str) -> Dict[str, Any]:
"""Perform a web search for news.
Args:
query: Search query for news.
Returns:
Dict with status and news search results.
"""
base_url = "https://search.molodetz.nl/search"
return _perform_search(base_url, query)