|
import imghdr
|
|
import json
|
|
import random
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
|
|
import json
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
# Realistic User-Agents
|
|
USER_AGENTS = [
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
|
|
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59",
|
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
|
|
"Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1",
|
|
"Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/88.0",
|
|
]
|
|
|
|
def get_default_headers():
|
|
"""Get default realistic headers with variations."""
|
|
accept_languages = [
|
|
"en-US,en;q=0.5",
|
|
"en-US,en;q=0.9",
|
|
"en-GB,en;q=0.5",
|
|
"en-US,en;q=0.5;fr;q=0.3",
|
|
]
|
|
headers = {
|
|
"User-Agent": random.choice(USER_AGENTS),
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
|
"Accept-Language": random.choice(accept_languages),
|
|
"Accept-Encoding": "gzip, deflate, br",
|
|
"DNT": "1",
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
}
|
|
# Sometimes add Cache-Control
|
|
if random.random() < 0.3:
|
|
headers["Cache-Control"] = "no-cache"
|
|
# Sometimes add Referer
|
|
if random.random() < 0.2:
|
|
headers["Referer"] = "https://www.google.com/"
|
|
return headers
|
|
|
|
|
|
def http_fetch(url, headers=None):
|
|
"""Fetch content from an HTTP URL.
|
|
|
|
Args:
|
|
url: The URL to fetch.
|
|
headers: Optional HTTP headers.
|
|
|
|
Returns:
|
|
Dict with status and content.
|
|
"""
|
|
try:
|
|
request = urllib.request.Request(url)
|
|
default_headers = get_default_headers()
|
|
if headers:
|
|
default_headers.update(headers)
|
|
for header_key, header_value in default_headers.items():
|
|
request.add_header(header_key, header_value)
|
|
with urllib.request.urlopen(request) as response:
|
|
content = response.read().decode("utf-8")
|
|
return {"status": "success", "content": content[:10000]}
|
|
except Exception as exception:
|
|
return {"status": "error", "error": str(exception)}
|
|
|
|
def download_to_file(source_url, destination_path, headers=None):
|
|
"""Download content from an HTTP URL to a file.
|
|
|
|
Args:
|
|
source_url: The URL to download from.
|
|
destination_path: The path to save the downloaded content.
|
|
headers: Optional HTTP headers.
|
|
|
|
Returns:
|
|
Dict with status, downloaded_from, and downloaded_to on success, or status and error on failure.
|
|
|
|
This function can be used for binary files like images as well.
|
|
"""
|
|
try:
|
|
request = urllib.request.Request(source_url)
|
|
default_headers = get_default_headers()
|
|
if headers:
|
|
default_headers.update(headers)
|
|
for header_key, header_value in default_headers.items():
|
|
request.add_header(header_key, header_value)
|
|
with urllib.request.urlopen(request) as response:
|
|
content = response.read()
|
|
with open(destination_path, 'wb') as file:
|
|
file.write(content)
|
|
content_type = response.headers.get('Content-Type', '').lower()
|
|
if content_type.startswith('image/'):
|
|
img_type = imghdr.what(destination_path)
|
|
if img_type is None:
|
|
return {"status": "success", "downloaded_from": source_url, "downloaded_to": destination_path, "is_valid_image": False, "warning": "Downloaded content is not a valid image, consider finding a different source."}
|
|
else:
|
|
return {"status": "success", "downloaded_from": source_url, "downloaded_to": destination_path, "is_valid_image": True}
|
|
else:
|
|
return {"status": "success", "downloaded_from": source_url, "downloaded_to": destination_path}
|
|
except Exception as exception:
|
|
return {"status": "error", "error": str(exception)}
|
|
|
|
|
|
def _perform_search(base_url, query, params=None):
|
|
try:
|
|
encoded_query = urllib.parse.quote(query)
|
|
full_url = f"{base_url}?query={encoded_query}"
|
|
request = urllib.request.Request(full_url)
|
|
default_headers = get_default_headers()
|
|
for header_key, header_value in default_headers.items():
|
|
request.add_header(header_key, header_value)
|
|
with urllib.request.urlopen(request) as response:
|
|
content = response.read().decode("utf-8")
|
|
return {"status": "success", "content": json.loads(content)}
|
|
except Exception as exception:
|
|
return {"status": "error", "error": str(exception)}
|
|
|
|
|
|
def web_search(query):
|
|
"""Perform a web search.
|
|
|
|
Args:
|
|
query: Search query.
|
|
|
|
Returns:
|
|
Dict with status and search results.
|
|
"""
|
|
base_url = "https://search.molodetz.nl/search"
|
|
return _perform_search(base_url, query)
|
|
|
|
|
|
def web_search_news(query):
|
|
"""Perform a web search for news.
|
|
|
|
Args:
|
|
query: Search query for news.
|
|
|
|
Returns:
|
|
Dict with status and news search results.
|
|
"""
|
|
base_url = "https://search.molodetz.nl/search"
|
|
return _perform_search(base_url, query)
|