| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  | # SPDX-License-Identifier: AGPL-3.0-or-later | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | import typing | 
					
						
							|  |  |  | import types | 
					
						
							|  |  |  | import functools | 
					
						
							|  |  |  | import itertools | 
					
						
							| 
									
										
										
										
											2021-01-17 16:14:16 +01:00
										 |  |  | import threading | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | from time import time | 
					
						
							|  |  |  | from urllib.parse import urlparse | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import re | 
					
						
							| 
									
										
										
										
											2021-01-19 21:26:04 +01:00
										 |  |  | from langdetect import detect_langs | 
					
						
							|  |  |  | from langdetect.lang_detect_exception import LangDetectException | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | import requests.exceptions | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from searx import poolrequests, logger | 
					
						
							|  |  |  | from searx.results import ResultContainer | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  | from searx.search.models import SearchQuery, EngineRef | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | from searx.search.processors import EngineProcessor | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  | logger = logger.getChild('searx.search.checker') | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | HTML_TAGS = [ | 
					
						
							|  |  |  |     'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script', | 
					
						
							|  |  |  |     'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite', | 
					
						
							|  |  |  |     'code', 'data', 'dfn', 'em', 'i', 'kdb', 'mark', 'q', 'rb', 'rp', 'rt', 'rtc', 'ruby', 's', 'samp', 'small', | 
					
						
							|  |  |  |     'span', 'strong', 'sub', 'sup', 'time', 'u', 'var', 'wbr', 'style', 'blockquote', 'dd', 'div', 'dl', 'dt', | 
					
						
							|  |  |  |     'figcaption', 'figure', 'hr', 'li', 'ol', 'p', 'pre', 'ul', 'button', 'datalist', 'fieldset', 'form', 'input', | 
					
						
							|  |  |  |     'label', 'legend', 'meter', 'optgroup', 'option', 'output', 'progress', 'select', 'textarea', 'applet', | 
					
						
							|  |  |  |     'frame', 'frameset' | 
					
						
							|  |  |  | ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_check_no_html(): | 
					
						
							|  |  |  |     rep = ['<' + tag + '[^\>]*>' for tag in HTML_TAGS] | 
					
						
							|  |  |  |     rep += ['</' + tag + '>' for tag in HTML_TAGS] | 
					
						
							|  |  |  |     pattern = re.compile('|'.join(rep)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def f(text): | 
					
						
							|  |  |  |         return pattern.search(text.lower()) is None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return f | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | _check_no_html = get_check_no_html() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _is_url(url): | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         result = urlparse(url) | 
					
						
							|  |  |  |     except ValueError: | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  |     if result.scheme not in ('http', 'https'): | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  |     return True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @functools.lru_cache(maxsize=8192) | 
					
						
							|  |  |  | def _is_url_image(image_url): | 
					
						
							|  |  |  |     if not isinstance(image_url, str): | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if image_url.startswith('//'): | 
					
						
							|  |  |  |         image_url = 'https:' + image_url | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if image_url.startswith('data:'): | 
					
						
							|  |  |  |         return image_url.startswith('data:image/') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if not _is_url(image_url): | 
					
						
							|  |  |  |         return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     retry = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     while retry > 0: | 
					
						
							|  |  |  |         a = time() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             poolrequests.set_timeout_for_thread(10.0, time()) | 
					
						
							|  |  |  |             r = poolrequests.get(image_url, timeout=10.0, allow_redirects=True, headers={ | 
					
						
							|  |  |  |                 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0', | 
					
						
							|  |  |  |                 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | 
					
						
							|  |  |  |                 'Accept-Language': 'en-US;q=0.5,en;q=0.3', | 
					
						
							|  |  |  |                 'Accept-Encoding': 'gzip, deflate, br', | 
					
						
							|  |  |  |                 'DNT': '1', | 
					
						
							|  |  |  |                 'Connection': 'keep-alive', | 
					
						
							|  |  |  |                 'Upgrade-Insecure-Requests': '1', | 
					
						
							|  |  |  |                 'Sec-GPC': '1', | 
					
						
							|  |  |  |                 'Cache-Control': 'max-age=0' | 
					
						
							|  |  |  |             }) | 
					
						
							|  |  |  |             if r.headers["content-type"].startswith('image/'): | 
					
						
							|  |  |  |                 return True | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  |         except requests.exceptions.Timeout: | 
					
						
							|  |  |  |             logger.error('Timeout for %s: %i', image_url, int(time() - a)) | 
					
						
							|  |  |  |             retry -= 1 | 
					
						
							|  |  |  |         except requests.exceptions.RequestException: | 
					
						
							|  |  |  |             logger.exception('Exception for %s', image_url) | 
					
						
							|  |  |  |             return False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _search_query_to_dict(search_query: SearchQuery) -> typing.Dict[str, typing.Any]: | 
					
						
							|  |  |  |     return { | 
					
						
							|  |  |  |         'query': search_query.query, | 
					
						
							|  |  |  |         'lang': search_query.lang, | 
					
						
							|  |  |  |         'pageno': search_query.pageno, | 
					
						
							|  |  |  |         'safesearch': search_query.safesearch, | 
					
						
							|  |  |  |         'time_range': search_query.time_range, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _search_query_diff(sq1: SearchQuery, sq2: SearchQuery)\ | 
					
						
							|  |  |  |         -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]: | 
					
						
							|  |  |  |     param1 = _search_query_to_dict(sq1) | 
					
						
							|  |  |  |     param2 = _search_query_to_dict(sq2) | 
					
						
							|  |  |  |     common = {} | 
					
						
							|  |  |  |     diff = {} | 
					
						
							|  |  |  |     for k, value1 in param1.items(): | 
					
						
							|  |  |  |         value2 = param2[k] | 
					
						
							|  |  |  |         if value1 == value2: | 
					
						
							|  |  |  |             common[k] = value1 | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             diff[k] = (value1, value2) | 
					
						
							|  |  |  |     return (common, diff) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestResults: | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |     __slots__ = 'errors', 'logs', 'languages' | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self): | 
					
						
							|  |  |  |         self.errors: typing.Dict[str, typing.List[str]] = {} | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |         self.logs: typing.Dict[str, typing.List[typing.Any]] = {} | 
					
						
							|  |  |  |         self.languages: typing.Set[str] = set() | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |     def add_error(self, test, message, *args): | 
					
						
							|  |  |  |         # message to self.errors | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         errors_for_test = self.errors.setdefault(test, []) | 
					
						
							|  |  |  |         if message not in errors_for_test: | 
					
						
							|  |  |  |             errors_for_test.append(message) | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |         # (message, *args) to self.logs | 
					
						
							|  |  |  |         logs_for_test = self.logs.setdefault(test, []) | 
					
						
							|  |  |  |         if (message, *args) not in logs_for_test: | 
					
						
							|  |  |  |             logs_for_test.append((message, *args)) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |     def add_language(self, language): | 
					
						
							|  |  |  |         self.languages.add(language) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @property | 
					
						
							|  |  |  |     def succesfull(self): | 
					
						
							|  |  |  |         return len(self.errors) == 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __iter__(self): | 
					
						
							|  |  |  |         for test_name, errors in self.errors.items(): | 
					
						
							|  |  |  |             for error in sorted(errors): | 
					
						
							|  |  |  |                 yield (test_name, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class ResultContainerTests: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     __slots__ = 'test_name', 'search_query', 'result_container', 'languages', 'stop_test', 'test_results' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, | 
					
						
							|  |  |  |                  test_results: TestResults, | 
					
						
							|  |  |  |                  test_name: str, | 
					
						
							|  |  |  |                  search_query: SearchQuery, | 
					
						
							|  |  |  |                  result_container: ResultContainer): | 
					
						
							|  |  |  |         self.test_name = test_name | 
					
						
							|  |  |  |         self.search_query = search_query | 
					
						
							|  |  |  |         self.result_container = result_container | 
					
						
							|  |  |  |         self.languages: typing.Set[str] = set() | 
					
						
							|  |  |  |         self.test_results = test_results | 
					
						
							|  |  |  |         self.stop_test = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @property | 
					
						
							|  |  |  |     def result_urls(self): | 
					
						
							|  |  |  |         results = self.result_container.get_ordered_results() | 
					
						
							| 
									
										
										
										
											2021-03-25 09:37:37 +01:00
										 |  |  |         return [result['url'] for result in results if 'url' in result] | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |     def _record_error(self, message: str, *args) -> None: | 
					
						
							|  |  |  |         sq = _search_query_to_dict(self.search_query) | 
					
						
							|  |  |  |         sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()]) | 
					
						
							|  |  |  |         self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')') | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _add_language(self, text: str) -> typing.Optional[str]: | 
					
						
							| 
									
										
										
										
											2021-01-19 21:26:04 +01:00
										 |  |  |         try: | 
					
						
							|  |  |  |             r = detect_langs(str(text))  # pylint: disable=E1101 | 
					
						
							|  |  |  |         except LangDetectException: | 
					
						
							|  |  |  |             return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(r) > 0 and r[0].prob > 0.95: | 
					
						
							|  |  |  |             self.languages.add(r[0].lang) | 
					
						
							|  |  |  |             self.test_results.add_language(r[0].lang) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         return None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _check_result(self, result): | 
					
						
							|  |  |  |         if not _check_no_html(result.get('title', '')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |             self._record_error('HTML in title', repr(result.get('title', ''))) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         if not _check_no_html(result.get('content', '')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |             self._record_error('HTML in content', repr(result.get('content', ''))) | 
					
						
							| 
									
										
										
										
											2021-03-25 09:37:37 +01:00
										 |  |  |         if result.get('url') is None: | 
					
						
							|  |  |  |             self._record_error('url is None') | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |         self._add_language(result.get('title', '')) | 
					
						
							|  |  |  |         self._add_language(result.get('content', '')) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         template = result.get('template', 'default.html') | 
					
						
							|  |  |  |         if template == 'default.html': | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if template == 'code.html': | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if template == 'torrent.html': | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if template == 'map.html': | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         if template == 'images.html': | 
					
						
							|  |  |  |             thumbnail_src = result.get('thumbnail_src') | 
					
						
							|  |  |  |             if thumbnail_src is not None: | 
					
						
							|  |  |  |                 if not _is_url_image(thumbnail_src): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |                     self._record_error('thumbnail_src URL is invalid', thumbnail_src) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |             elif not _is_url_image(result.get('img_src')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |                 self._record_error('img_src URL is invalid', result.get('img_src')) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         if template == 'videos.html' and not _is_url_image(result.get('thumbnail')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |             self._record_error('thumbnail URL is invalid', result.get('img_src')) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _check_results(self, results: list): | 
					
						
							|  |  |  |         for result in results: | 
					
						
							|  |  |  |             self._check_result(result) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _check_answers(self, answers): | 
					
						
							|  |  |  |         for answer in answers: | 
					
						
							|  |  |  |             if not _check_no_html(answer): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |                 self._record_error('HTML in answer', answer) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _check_infoboxes(self, infoboxes): | 
					
						
							|  |  |  |         for infobox in infoboxes: | 
					
						
							|  |  |  |             if not _check_no_html(infobox.get('content', '')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |                 self._record_error('HTML in infobox content', infobox.get('content', '')) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |             self._add_language(infobox.get('content', '')) | 
					
						
							|  |  |  |             for attribute in infobox.get('attributes', {}): | 
					
						
							|  |  |  |                 if not _check_no_html(attribute.get('value', '')): | 
					
						
							| 
									
										
										
										
											2021-01-08 19:04:04 +01:00
										 |  |  |                     self._record_error('HTML in infobox attribute value', attribute.get('value', '')) | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def check_basic(self): | 
					
						
							|  |  |  |         if len(self.result_container.unresponsive_engines) > 0: | 
					
						
							|  |  |  |             for message in self.result_container.unresponsive_engines: | 
					
						
							|  |  |  |                 self._record_error(message[1] + ' ' + (message[2] or '')) | 
					
						
							|  |  |  |             self.stop_test = True | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         results = self.result_container.get_ordered_results() | 
					
						
							|  |  |  |         if len(results) > 0: | 
					
						
							|  |  |  |             self._check_results(results) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(self.result_container.answers) > 0: | 
					
						
							|  |  |  |             self._check_answers(self.result_container.answers) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(self.result_container.infoboxes) > 0: | 
					
						
							|  |  |  |             self._check_infoboxes(self.result_container.infoboxes) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def has_infobox(self): | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  |         """Check the ResultContainer has at least one infobox""" | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         if len(self.result_container.infoboxes) == 0: | 
					
						
							|  |  |  |             self._record_error('No infobox') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def has_answer(self): | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  |         """Check the ResultContainer has at least one answer""" | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         if len(self.result_container.answers) == 0: | 
					
						
							|  |  |  |             self._record_error('No answer') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def has_language(self, lang): | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  |         """Check at least one title or content of the results is written in the `lang`.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Detected using pycld3, may be not accurate"""
 | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         if lang not in self.languages: | 
					
						
							|  |  |  |             self._record_error(lang + ' not found') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def not_empty(self): | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  |         """Check the ResultContainer has at least one answer or infobox or result""" | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         result_types = set() | 
					
						
							|  |  |  |         results = self.result_container.get_ordered_results() | 
					
						
							|  |  |  |         if len(results) > 0: | 
					
						
							|  |  |  |             result_types.add('results') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(self.result_container.answers) > 0: | 
					
						
							|  |  |  |             result_types.add('answers') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(self.result_container.infoboxes) > 0: | 
					
						
							|  |  |  |             result_types.add('infoboxes') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if len(result_types) == 0: | 
					
						
							|  |  |  |             self._record_error('No result') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def one_title_contains(self, title: str): | 
					
						
							| 
									
										
										
										
											2021-01-05 11:24:39 +01:00
										 |  |  |         """Check one of the title contains `title` (case insensitive comparaison)""" | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         title = title.lower() | 
					
						
							|  |  |  |         for result in self.result_container.get_ordered_results(): | 
					
						
							|  |  |  |             if title in result['title'].lower(): | 
					
						
							|  |  |  |                 return | 
					
						
							|  |  |  |         self._record_error(('{!r} not found in the title'.format(title))) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class CheckerTests: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     __slots__ = 'test_results', 'test_name', 'result_container_tests_list' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, | 
					
						
							|  |  |  |                  test_results: TestResults, | 
					
						
							|  |  |  |                  test_name: str, | 
					
						
							|  |  |  |                  result_container_tests_list: typing.List[ResultContainerTests]): | 
					
						
							|  |  |  |         self.test_results = test_results | 
					
						
							|  |  |  |         self.test_name = test_name | 
					
						
							|  |  |  |         self.result_container_tests_list = result_container_tests_list | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def unique_results(self): | 
					
						
							| 
									
										
										
										
											2021-03-25 09:37:37 +01:00
										 |  |  |         """Check the results of each ResultContainer is unique""" | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |         urls_list = [rct.result_urls for rct in self.result_container_tests_list] | 
					
						
							|  |  |  |         if len(urls_list[0]) > 0: | 
					
						
							|  |  |  |             # results on the first page | 
					
						
							|  |  |  |             for i, urls_i in enumerate(urls_list): | 
					
						
							|  |  |  |                 for j, urls_j in enumerate(urls_list): | 
					
						
							|  |  |  |                     if i < j and urls_i == urls_j: | 
					
						
							|  |  |  |                         common, diff = _search_query_diff(self.result_container_tests_list[i].search_query, | 
					
						
							|  |  |  |                                                           self.result_container_tests_list[j].search_query) | 
					
						
							|  |  |  |                         common_str = ' '.join(['{}={!r}'.format(k, v) for k, v in common.items()]) | 
					
						
							|  |  |  |                         diff1_str = ', ' .join(['{}={!r}'.format(k, v1) for (k, (v1, v2)) in diff.items()]) | 
					
						
							|  |  |  |                         diff2_str = ', ' .join(['{}={!r}'.format(k, v2) for (k, (v1, v2)) in diff.items()]) | 
					
						
							|  |  |  |                         self.test_results.add_error(self.test_name, | 
					
						
							|  |  |  |                                                     'results are identitical for {} and {} ({})' | 
					
						
							|  |  |  |                                                     .format(diff1_str, diff2_str, common_str)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Checker: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     __slots__ = 'processor', 'tests', 'test_results' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, processor: EngineProcessor): | 
					
						
							|  |  |  |         self.processor = processor | 
					
						
							|  |  |  |         self.tests = self.processor.get_tests() | 
					
						
							|  |  |  |         self.test_results = TestResults() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @property | 
					
						
							|  |  |  |     def engineref_list(self): | 
					
						
							|  |  |  |         engine_name = self.processor.engine_name | 
					
						
							|  |  |  |         engine_category = self.processor.engine.categories[0] | 
					
						
							|  |  |  |         return [EngineRef(engine_name, engine_category)] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def search_query_matrix_iterator(engineref_list, matrix): | 
					
						
							|  |  |  |         p = [] | 
					
						
							|  |  |  |         for name, values in matrix.items(): | 
					
						
							|  |  |  |             if isinstance(values, (tuple, list)): | 
					
						
							|  |  |  |                 l = [(name, value) for value in values] | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 l = [(name, values)] | 
					
						
							|  |  |  |             p.append(l) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         for kwargs in itertools.product(*p): | 
					
						
							|  |  |  |             kwargs = {k: v for k, v in kwargs} | 
					
						
							|  |  |  |             query = kwargs['query'] | 
					
						
							|  |  |  |             params = dict(kwargs) | 
					
						
							|  |  |  |             del params['query'] | 
					
						
							|  |  |  |             yield SearchQuery(query, engineref_list, **params) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def call_test(self, obj, test_description): | 
					
						
							|  |  |  |         if isinstance(test_description, (tuple, list)): | 
					
						
							|  |  |  |             method, args = test_description[0], test_description[1:] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             method = test_description | 
					
						
							|  |  |  |             args = () | 
					
						
							|  |  |  |         if isinstance(method, str) and hasattr(obj, method): | 
					
						
							|  |  |  |             getattr(obj, method)(*args) | 
					
						
							|  |  |  |         elif isinstance(method, types.FunctionType): | 
					
						
							|  |  |  |             method(*args) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.test_results.add_error(obj.test_name, | 
					
						
							|  |  |  |                                         'method {!r} ({}) not found for {}' | 
					
						
							|  |  |  |                                         .format(method, method.__class__.__name__, obj.__class__.__name__)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def call_tests(self, obj, test_descriptions): | 
					
						
							|  |  |  |         for test_description in test_descriptions: | 
					
						
							|  |  |  |             self.call_test(obj, test_description) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def search(self, search_query: SearchQuery) -> ResultContainer: | 
					
						
							|  |  |  |         result_container = ResultContainer() | 
					
						
							|  |  |  |         engineref_category = search_query.engineref_list[0].category | 
					
						
							|  |  |  |         params = self.processor.get_params(search_query, engineref_category) | 
					
						
							|  |  |  |         if params is not None: | 
					
						
							| 
									
										
										
										
											2021-01-17 16:14:16 +01:00
										 |  |  |             with threading.RLock(): | 
					
						
							|  |  |  |                 self.processor.engine.stats['sent_search_count'] += 1 | 
					
						
							| 
									
										
										
										
											2020-12-24 09:28:16 +01:00
										 |  |  |             self.processor.search(search_query.query, params, result_container, time(), 5) | 
					
						
							|  |  |  |         return result_container | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_result_container_tests(self, test_name: str, search_query: SearchQuery) -> ResultContainerTests: | 
					
						
							|  |  |  |         result_container = self.search(search_query) | 
					
						
							|  |  |  |         result_container_check = ResultContainerTests(self.test_results, test_name, search_query, result_container) | 
					
						
							|  |  |  |         result_container_check.check_basic() | 
					
						
							|  |  |  |         return result_container_check | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_test(self, test_name): | 
					
						
							|  |  |  |         test_parameters = self.tests[test_name] | 
					
						
							|  |  |  |         search_query_list = list(Checker.search_query_matrix_iterator(self.engineref_list, test_parameters['matrix'])) | 
					
						
							|  |  |  |         rct_list = [self.get_result_container_tests(test_name, search_query) for search_query in search_query_list] | 
					
						
							|  |  |  |         stop_test = False | 
					
						
							|  |  |  |         if 'result_container' in test_parameters: | 
					
						
							|  |  |  |             for rct in rct_list: | 
					
						
							|  |  |  |                 stop_test = stop_test or rct.stop_test | 
					
						
							|  |  |  |                 if not rct.stop_test: | 
					
						
							|  |  |  |                     self.call_tests(rct, test_parameters['result_container']) | 
					
						
							|  |  |  |         if not stop_test: | 
					
						
							|  |  |  |             if 'test' in test_parameters: | 
					
						
							|  |  |  |                 checker_tests = CheckerTests(self.test_results, test_name, rct_list) | 
					
						
							|  |  |  |                 self.call_tests(checker_tests, test_parameters['test']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run(self): | 
					
						
							|  |  |  |         for test_name in self.tests: | 
					
						
							|  |  |  |             self.run_test(test_name) |