215 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			215 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								# SPDX-License-Identifier: AGPL-3.0-or-later
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import asyncio
							 | 
						||
| 
								 | 
							
								import logging
							 | 
						||
| 
								 | 
							
								import threading
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import httpcore
							 | 
						||
| 
								 | 
							
								import httpx
							 | 
						||
| 
								 | 
							
								from httpx_socks import AsyncProxyTransport
							 | 
						||
| 
								 | 
							
								from python_socks import parse_proxy_url
							 | 
						||
| 
								 | 
							
								import python_socks._errors
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from searx import logger
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Optional uvloop (support Python 3.6)
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    import uvloop
							 | 
						||
| 
								 | 
							
								except ImportError:
							 | 
						||
| 
								 | 
							
								    pass
							 | 
						||
| 
								 | 
							
								else:
							 | 
						||
| 
								 | 
							
								    uvloop.install()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								logger = logger.getChild('searx.http.client')
							 | 
						||
| 
								 | 
							
								LOOP = None
							 | 
						||
| 
								 | 
							
								TRANSPORT_KWARGS = {
							 | 
						||
| 
								 | 
							
								    'backend': 'asyncio',
							 | 
						||
| 
								 | 
							
								    'trust_env': False,
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
							 | 
						||
| 
								 | 
							
								    origin = httpcore._utils.url_to_origin(url)
							 | 
						||
| 
								 | 
							
								    logger.debug('Drop connections for %r', origin)
							 | 
						||
| 
								 | 
							
								    connections_to_close = connection_pool._connections_for_origin(origin)
							 | 
						||
| 
								 | 
							
								    for connection in connections_to_close:
							 | 
						||
| 
								 | 
							
								        await connection_pool._remove_from_pool(connection)
							 | 
						||
| 
								 | 
							
								        try:
							 | 
						||
| 
								 | 
							
								            await connection.aclose()
							 | 
						||
| 
								 | 
							
								        except httpcore.NetworkError as e:
							 | 
						||
| 
								 | 
							
								            logger.warning('Error closing an existing connection', exc_info=e)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
							 | 
						||
| 
								 | 
							
								    """Block HTTP request"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    async def arequest(self, method, url, headers=None, stream=None, ext=None):
							 | 
						||
| 
								 | 
							
								        raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class AsyncProxyTransportFixed(AsyncProxyTransport):
							 | 
						||
| 
								 | 
							
								    """Fix httpx_socks.AsyncProxyTransport
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    Map python_socks exceptions to httpcore.ProxyError
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    Map socket.gaierror to httpcore.ConnectError
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
							 | 
						||
| 
								 | 
							
								    * self._keepalive_sweep()
							 | 
						||
| 
								 | 
							
								    * self._response_closed(self, connection)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    Note: AsyncProxyTransport inherit from AsyncConnectionPool
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    Note: the API is going to change on httpx 0.18.0
							 | 
						||
| 
								 | 
							
								    see https://github.com/encode/httpx/pull/1522
							 | 
						||
| 
								 | 
							
								    """
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    async def arequest(self, method, url, headers=None, stream=None, ext=None):
							 | 
						||
| 
								 | 
							
								        retry = 2
							 | 
						||
| 
								 | 
							
								        while retry > 0:
							 | 
						||
| 
								 | 
							
								            retry -= 1
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                return await super().arequest(method, url, headers, stream, ext)
							 | 
						||
| 
								 | 
							
								            except (python_socks._errors.ProxyConnectionError,
							 | 
						||
| 
								 | 
							
								                    python_socks._errors.ProxyTimeoutError,
							 | 
						||
| 
								 | 
							
								                    python_socks._errors.ProxyError) as e:
							 | 
						||
| 
								 | 
							
								                raise httpcore.ProxyError(e)
							 | 
						||
| 
								 | 
							
								            except OSError as e:
							 | 
						||
| 
								 | 
							
								                # socket.gaierror when DNS resolution fails
							 | 
						||
| 
								 | 
							
								                raise httpcore.NetworkError(e)
							 | 
						||
| 
								 | 
							
								            except httpcore.RemoteProtocolError as e:
							 | 
						||
| 
								 | 
							
								                # in case of httpcore.RemoteProtocolError: Server disconnected
							 | 
						||
| 
								 | 
							
								                await close_connections_for_url(self, url)
							 | 
						||
| 
								 | 
							
								                logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
							 | 
						||
| 
								 | 
							
								                # retry
							 | 
						||
| 
								 | 
							
								            except (httpcore.NetworkError, httpcore.ProtocolError) as e:
							 | 
						||
| 
								 | 
							
								                # httpcore.WriteError on HTTP/2 connection leaves a new opened stream
							 | 
						||
| 
								 | 
							
								                # then each new request creates a new stream and raise the same WriteError
							 | 
						||
| 
								 | 
							
								                await close_connections_for_url(self, url)
							 | 
						||
| 
								 | 
							
								                raise e
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
							 | 
						||
| 
								 | 
							
								    """Fix httpx.AsyncHTTPTransport"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    async def arequest(self, method, url, headers=None, stream=None, ext=None):
							 | 
						||
| 
								 | 
							
								        retry = 2
							 | 
						||
| 
								 | 
							
								        while retry > 0:
							 | 
						||
| 
								 | 
							
								            retry -= 1
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                return await super().arequest(method, url, headers, stream, ext)
							 | 
						||
| 
								 | 
							
								            except OSError as e:
							 | 
						||
| 
								 | 
							
								                # socket.gaierror when DNS resolution fails
							 | 
						||
| 
								 | 
							
								                raise httpcore.ConnectError(e)
							 | 
						||
| 
								 | 
							
								            except httpcore.CloseError as e:
							 | 
						||
| 
								 | 
							
								                # httpcore.CloseError: [Errno 104] Connection reset by peer
							 | 
						||
| 
								 | 
							
								                # raised by _keepalive_sweep()
							 | 
						||
| 
								 | 
							
								                #   from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198  # noqa
							 | 
						||
| 
								 | 
							
								                await close_connections_for_url(self._pool, url)
							 | 
						||
| 
								 | 
							
								                logger.warning('httpcore.CloseError: retry', exc_info=e)
							 | 
						||
| 
								 | 
							
								                # retry
							 | 
						||
| 
								 | 
							
								            except httpcore.RemoteProtocolError as e:
							 | 
						||
| 
								 | 
							
								                # in case of httpcore.RemoteProtocolError: Server disconnected
							 | 
						||
| 
								 | 
							
								                await close_connections_for_url(self._pool, url)
							 | 
						||
| 
								 | 
							
								                logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
							 | 
						||
| 
								 | 
							
								                # retry
							 | 
						||
| 
								 | 
							
								            except (httpcore.ProtocolError, httpcore.NetworkError) as e:
							 | 
						||
| 
								 | 
							
								                await close_connections_for_url(self._pool, url)
							 | 
						||
| 
								 | 
							
								                raise e
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
							 | 
						||
| 
								 | 
							
								    global LOOP, TRANSPORT_KWARGS
							 | 
						||
| 
								 | 
							
								    # support socks5h (requests compatibility):
							 | 
						||
| 
								 | 
							
								    # https://requests.readthedocs.io/en/master/user/advanced/#socks
							 | 
						||
| 
								 | 
							
								    # socks5://   hostname is resolved on client side
							 | 
						||
| 
								 | 
							
								    # socks5h://  hostname is resolved on proxy side
							 | 
						||
| 
								 | 
							
								    rdns = False
							 | 
						||
| 
								 | 
							
								    socks5h = 'socks5h://'
							 | 
						||
| 
								 | 
							
								    if proxy_url.startswith(socks5h):
							 | 
						||
| 
								 | 
							
								        proxy_url = 'socks5://' + proxy_url[len(socks5h):]
							 | 
						||
| 
								 | 
							
								        rdns = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
							 | 
						||
| 
								 | 
							
								                                    username=proxy_username, password=proxy_password,
							 | 
						||
| 
								 | 
							
								                                    rdns=rdns,
							 | 
						||
| 
								 | 
							
								                                    loop=LOOP,
							 | 
						||
| 
								 | 
							
								                                    verify=verify,
							 | 
						||
| 
								 | 
							
								                                    http2=http2,
							 | 
						||
| 
								 | 
							
								                                    local_address=local_address,
							 | 
						||
| 
								 | 
							
								                                    max_connections=limit.max_connections,
							 | 
						||
| 
								 | 
							
								                                    max_keepalive_connections=limit.max_keepalive_connections,
							 | 
						||
| 
								 | 
							
								                                    keepalive_expiry=limit.keepalive_expiry,
							 | 
						||
| 
								 | 
							
								                                    retries=retries,
							 | 
						||
| 
								 | 
							
								                                    **TRANSPORT_KWARGS)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def get_transport(verify, http2, local_address, proxy_url, limit, retries):
							 | 
						||
| 
								 | 
							
								    return AsyncHTTPTransportFixed(verify=verify,
							 | 
						||
| 
								 | 
							
								                                   http2=http2,
							 | 
						||
| 
								 | 
							
								                                   local_address=local_address,
							 | 
						||
| 
								 | 
							
								                                   proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
							 | 
						||
| 
								 | 
							
								                                   limits=limit,
							 | 
						||
| 
								 | 
							
								                                   retries=retries,
							 | 
						||
| 
								 | 
							
								                                   **TRANSPORT_KWARGS)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def iter_proxies(proxies):
							 | 
						||
| 
								 | 
							
								    # https://www.python-httpx.org/compatibility/#proxy-keys
							 | 
						||
| 
								 | 
							
								    if isinstance(proxies, str):
							 | 
						||
| 
								 | 
							
								        yield 'all://', proxies
							 | 
						||
| 
								 | 
							
								    elif isinstance(proxies, dict):
							 | 
						||
| 
								 | 
							
								        for pattern, proxy_url in proxies.items():
							 | 
						||
| 
								 | 
							
								            yield pattern, proxy_url
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def new_client(enable_http, verify, enable_http2,
							 | 
						||
| 
								 | 
							
								               max_connections, max_keepalive_connections, keepalive_expiry,
							 | 
						||
| 
								 | 
							
								               proxies, local_address, retries, max_redirects):
							 | 
						||
| 
								 | 
							
								    limit = httpx.Limits(max_connections=max_connections,
							 | 
						||
| 
								 | 
							
								                         max_keepalive_connections=max_keepalive_connections,
							 | 
						||
| 
								 | 
							
								                         keepalive_expiry=keepalive_expiry)
							 | 
						||
| 
								 | 
							
								    # See https://www.python-httpx.org/advanced/#routing
							 | 
						||
| 
								 | 
							
								    mounts = {}
							 | 
						||
| 
								 | 
							
								    for pattern, proxy_url in iter_proxies(proxies):
							 | 
						||
| 
								 | 
							
								        if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
							 | 
						||
| 
								 | 
							
								            continue
							 | 
						||
| 
								 | 
							
								        if proxy_url.startswith('socks4://') \
							 | 
						||
| 
								 | 
							
								           or proxy_url.startswith('socks5://') \
							 | 
						||
| 
								 | 
							
								           or proxy_url.startswith('socks5h://'):
							 | 
						||
| 
								 | 
							
								            mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
							 | 
						||
| 
								 | 
							
								                                                            retries)
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								            mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if not enable_http:
							 | 
						||
| 
								 | 
							
								        mounts['http://'] = AsyncHTTPTransportNoHttp()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
							 | 
						||
| 
								 | 
							
								    return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def init():
							 | 
						||
| 
								 | 
							
								    # log
							 | 
						||
| 
								 | 
							
								    for logger_name in ('hpack.hpack', 'hpack.table'):
							 | 
						||
| 
								 | 
							
								        logging.getLogger(logger_name).setLevel(logging.WARNING)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # loop
							 | 
						||
| 
								 | 
							
								    def loop_thread():
							 | 
						||
| 
								 | 
							
								        global LOOP
							 | 
						||
| 
								 | 
							
								        LOOP = asyncio.new_event_loop()
							 | 
						||
| 
								 | 
							
								        LOOP.run_forever()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    th = threading.Thread(
							 | 
						||
| 
								 | 
							
								        target=loop_thread,
							 | 
						||
| 
								 | 
							
								        name='asyncio_loop',
							 | 
						||
| 
								 | 
							
								        daemon=True,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    th.start()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								init()
							 |