395 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			395 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|  | # SPDX-License-Identifier: AGPL-3.0-or-later | ||
|  | # pylint: disable=too-few-public-methods,missing-module-docstring | ||
|  | 
 | ||
|  | from __future__ import annotations | ||
|  | 
 | ||
|  | __all__ = ["PluginInfo", "Plugin", "PluginStorage"] | ||
|  | 
 | ||
|  | import abc | ||
|  | import importlib | ||
|  | import logging | ||
|  | import pathlib | ||
|  | import types | ||
|  | import typing | ||
|  | import warnings | ||
|  | 
 | ||
|  | from dataclasses import dataclass, field | ||
|  | 
 | ||
|  | import flask | ||
|  | 
 | ||
|  | import searx | ||
|  | from searx.utils import load_module | ||
|  | from searx.extended_types import SXNG_Request | ||
|  | from searx.result_types import Result | ||
|  | 
 | ||
|  | 
 | ||
|  | if typing.TYPE_CHECKING: | ||
|  |     from searx.search import SearchWithPlugins | ||
|  | 
 | ||
|  | 
 | ||
|  | _default = pathlib.Path(__file__).parent | ||
|  | log: logging.Logger = logging.getLogger("searx.plugins") | ||
|  | 
 | ||
|  | 
 | ||
|  | @dataclass | ||
|  | class PluginInfo: | ||
|  |     """Object that holds informations about a *plugin*, these infos are shown to
 | ||
|  |     the user in the Preferences menu. | ||
|  | 
 | ||
|  |     To be able to translate the information into other languages, the text must | ||
|  |     be written in English and translated with :py:obj:`flask_babel.gettext`. | ||
|  |     """
 | ||
|  | 
 | ||
|  |     id: str | ||
|  |     """The ID-selector in HTML/CSS `#<id>`.""" | ||
|  | 
 | ||
|  |     name: str | ||
|  |     """Name of the *plugin*.""" | ||
|  | 
 | ||
|  |     description: str | ||
|  |     """Short description of the *answerer*.""" | ||
|  | 
 | ||
|  |     preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general" | ||
|  |     """Section (tab/group) in the preferences where this plugin is shown to the
 | ||
|  |     user. | ||
|  | 
 | ||
|  |     The value ``query`` is reserved for plugins that are activated via a | ||
|  |     *keyword* as part of a search query, see: | ||
|  | 
 | ||
|  |     - :py:obj:`PluginInfo.examples` | ||
|  |     - :py:obj:`Plugin.keywords` | ||
|  | 
 | ||
|  |     Those plugins are shown in the preferences in tab *Special Queries*. | ||
|  |     """
 | ||
|  | 
 | ||
|  |     examples: list[str] = field(default_factory=list) | ||
|  |     """List of short examples of the usage / of query terms.""" | ||
|  | 
 | ||
|  |     keywords: list[str] = field(default_factory=list) | ||
|  |     """See :py:obj:`Plugin.keywords`""" | ||
|  | 
 | ||
|  | 
 | ||
|  | class Plugin(abc.ABC): | ||
|  |     """Abstract base class of all Plugins.""" | ||
|  | 
 | ||
|  |     id: typing.ClassVar[str] | ||
|  |     """The ID (suffix) in the HTML form.""" | ||
|  | 
 | ||
|  |     default_on: typing.ClassVar[bool] | ||
|  |     """Plugin is enabled/disabled by default.""" | ||
|  | 
 | ||
|  |     keywords: list[str] = [] | ||
|  |     """Keywords in the search query that activate the plugin.  The *keyword* is
 | ||
|  |     the first word in a search query.  If a plugin should be executed regardless | ||
|  |     of the search query, the list of keywords should be empty (which is also the | ||
|  |     default in the base class for Plugins)."""
 | ||
|  | 
 | ||
|  |     log: logging.Logger | ||
|  |     """A logger object, is automatically initialized when calling the
 | ||
|  |     constructor (if not already set in the subclass)."""
 | ||
|  | 
 | ||
|  |     info: PluginInfo | ||
|  |     """Informations about the *plugin*, see :py:obj:`PluginInfo`.""" | ||
|  | 
 | ||
|  |     def __init__(self) -> None: | ||
|  |         super().__init__() | ||
|  | 
 | ||
|  |         for attr in ["id", "default_on"]: | ||
|  |             if getattr(self, attr, None) is None: | ||
|  |                 raise NotImplementedError(f"plugin {self} is missing attribute {attr}") | ||
|  | 
 | ||
|  |         if not self.id: | ||
|  |             self.id = f"{self.__class__.__module__}.{self.__class__.__name__}" | ||
|  |         if not getattr(self, "log", None): | ||
|  |             self.log = log.getChild(self.id) | ||
|  | 
 | ||
|  |     def __hash__(self) -> int: | ||
|  |         """The hash value is used in :py:obj:`set`, for example, when an object
 | ||
|  |         is added to the set.  The hash value is also used in other contexts, | ||
|  |         e.g. when checking for equality to identify identical plugins from | ||
|  |         different sources (name collisions)."""
 | ||
|  | 
 | ||
|  |         return id(self) | ||
|  | 
 | ||
|  |     def __eq__(self, other): | ||
|  |         """py:obj:`Plugin` objects are equal if the hash values of the two
 | ||
|  |         objects are equal."""
 | ||
|  | 
 | ||
|  |         return hash(self) == hash(other) | ||
|  | 
 | ||
|  |     def init(self, app: flask.Flask) -> bool:  # pylint: disable=unused-argument | ||
|  |         """Initialization of the plugin, the return value decides whether this
 | ||
|  |         plugin is active or not.  Initialization only takes place once, at the | ||
|  |         time the WEB application is set up.  The base methode always returns | ||
|  |         ``True``, the methode can be overwritten in the inheritances, | ||
|  | 
 | ||
|  |         - ``True`` plugin is active | ||
|  |         - ``False`` plugin is inactive | ||
|  |         """
 | ||
|  |         return True | ||
|  | 
 | ||
|  |     # pylint: disable=unused-argument | ||
|  |     def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: | ||
|  |         """Runs BEFORE the search request and returns a boolean:
 | ||
|  | 
 | ||
|  |         - ``True`` to continue the search | ||
|  |         - ``False`` to stop the search | ||
|  |         """
 | ||
|  |         return True | ||
|  | 
 | ||
|  |     def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: | ||
|  |         """Runs for each result of each engine and returns a boolean:
 | ||
|  | 
 | ||
|  |         - ``True`` to keep the result | ||
|  |         - ``False`` to remove the result from the result list | ||
|  | 
 | ||
|  |         The ``result`` can be modified to the needs. | ||
|  | 
 | ||
|  |         .. hint:: | ||
|  | 
 | ||
|  |            If :py:obj:`Result.url` is modified, :py:obj:`Result.parsed_url` must | ||
|  |            be changed accordingly: | ||
|  | 
 | ||
|  |            .. code:: python | ||
|  | 
 | ||
|  |               result["parsed_url"] = urlparse(result["url"]) | ||
|  |         """
 | ||
|  |         return True | ||
|  | 
 | ||
|  |     def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | typing.Sequence[Result]: | ||
|  |         """Runs AFTER the search request.  Can return a list of :py:obj:`Result`
 | ||
|  |         objects to be added to the final result list."""
 | ||
|  |         return | ||
|  | 
 | ||
|  | 
 | ||
|  | class ModulePlugin(Plugin): | ||
|  |     """A wrapper class for legacy *plugins*.
 | ||
|  | 
 | ||
|  |     .. note:: | ||
|  | 
 | ||
|  |        For internal use only! | ||
|  | 
 | ||
|  |     In a module plugin, the follwing names are mapped: | ||
|  | 
 | ||
|  |     - `module.query_keywords` --> :py:obj:`Plugin.keywords` | ||
|  |     - `module.plugin_id` --> :py:obj:`Plugin.id` | ||
|  |     - `module.logger` --> :py:obj:`Plugin.log` | ||
|  |     """
 | ||
|  | 
 | ||
|  |     _required_attrs = (("name", str), ("description", str), ("default_on", bool)) | ||
|  | 
 | ||
|  |     def __init__(self, mod: types.ModuleType): | ||
|  |         """In case of missing attributes in the module or wrong types are given,
 | ||
|  |         a :py:obj:`TypeError` exception is raised."""
 | ||
|  | 
 | ||
|  |         self.module = mod | ||
|  |         self.id = getattr(self.module, "plugin_id", self.module.__name__) | ||
|  |         self.log = logging.getLogger(self.module.__name__) | ||
|  |         self.keywords = getattr(self.module, "query_keywords", []) | ||
|  | 
 | ||
|  |         for attr, attr_type in self._required_attrs: | ||
|  |             if not hasattr(self.module, attr): | ||
|  |                 msg = f"missing attribute {attr}, cannot load plugin" | ||
|  |                 self.log.critical(msg) | ||
|  |                 raise TypeError(msg) | ||
|  |             if not isinstance(getattr(self.module, attr), attr_type): | ||
|  |                 msg = f"attribute {attr} is not of type {attr_type}" | ||
|  |                 self.log.critical(msg) | ||
|  |                 raise TypeError(msg) | ||
|  | 
 | ||
|  |         self.default_on = mod.default_on | ||
|  |         self.info = PluginInfo( | ||
|  |             id=self.id, | ||
|  |             name=self.module.name, | ||
|  |             description=self.module.description, | ||
|  |             preference_section=getattr(self.module, "preference_section", None), | ||
|  |             examples=getattr(self.module, "query_examples", []), | ||
|  |             keywords=self.keywords, | ||
|  |         ) | ||
|  | 
 | ||
|  |         # monkeypatch module | ||
|  |         self.module.logger = self.log  # type: ignore | ||
|  | 
 | ||
|  |         super().__init__() | ||
|  | 
 | ||
|  |     def init(self, app: flask.Flask) -> bool: | ||
|  |         if not hasattr(self.module, "init"): | ||
|  |             return True | ||
|  |         return self.module.init(app) | ||
|  | 
 | ||
|  |     def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: | ||
|  |         if not hasattr(self.module, "pre_search"): | ||
|  |             return True | ||
|  |         return self.module.pre_search(request, search) | ||
|  | 
 | ||
|  |     def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: | ||
|  |         if not hasattr(self.module, "on_result"): | ||
|  |             return True | ||
|  |         return self.module.on_result(request, search, result) | ||
|  | 
 | ||
|  |     def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | list[Result]: | ||
|  |         if not hasattr(self.module, "post_search"): | ||
|  |             return None | ||
|  |         return self.module.post_search(request, search) | ||
|  | 
 | ||
|  | 
 | ||
|  | class PluginStorage: | ||
|  |     """A storage for managing the *plugins* of SearXNG.""" | ||
|  | 
 | ||
|  |     plugin_list: set[Plugin] | ||
|  |     """The list of :py:obj:`Plugins` in this storage.""" | ||
|  | 
 | ||
|  |     legacy_plugins = [ | ||
|  |         "ahmia_filter", | ||
|  |         "calculator", | ||
|  |         "hostnames", | ||
|  |         "oa_doi_rewrite", | ||
|  |         "tor_check", | ||
|  |         "tracker_url_remover", | ||
|  |         "unit_converter", | ||
|  |     ] | ||
|  |     """Internal plugins implemented in the legacy style (as module / deprecated!).""" | ||
|  | 
 | ||
|  |     def __init__(self): | ||
|  |         self.plugin_list = set() | ||
|  | 
 | ||
|  |     def __iter__(self): | ||
|  | 
 | ||
|  |         yield from self.plugin_list | ||
|  | 
 | ||
|  |     def __len__(self): | ||
|  |         return len(self.plugin_list) | ||
|  | 
 | ||
|  |     @property | ||
|  |     def info(self) -> list[PluginInfo]: | ||
|  |         return [p.info for p in self.plugin_list] | ||
|  | 
 | ||
|  |     def load_builtins(self): | ||
|  |         """Load plugin modules from:
 | ||
|  | 
 | ||
|  |         - the python packages in :origin:`searx/plugins` and | ||
|  |         - the external plugins from :ref:`settings plugins`. | ||
|  |         """
 | ||
|  | 
 | ||
|  |         for f in _default.iterdir(): | ||
|  | 
 | ||
|  |             if f.name.startswith("_"): | ||
|  |                 continue | ||
|  | 
 | ||
|  |             if f.stem not in self.legacy_plugins: | ||
|  |                 self.register_by_fqn(f"searx.plugins.{f.stem}.SXNGPlugin") | ||
|  |                 continue | ||
|  | 
 | ||
|  |             # for backward compatibility | ||
|  |             mod = load_module(f.name, str(f.parent)) | ||
|  |             self.register(ModulePlugin(mod)) | ||
|  | 
 | ||
|  |         for fqn in searx.get_setting("plugins"):  # type: ignore | ||
|  |             self.register_by_fqn(fqn) | ||
|  | 
 | ||
|  |     def register(self, plugin: Plugin): | ||
|  |         """Register a :py:obj:`Plugin`.  In case of name collision (if two
 | ||
|  |         plugins have same ID) a :py:obj:`KeyError` exception is raised. | ||
|  |         """
 | ||
|  | 
 | ||
|  |         if plugin in self.plugin_list: | ||
|  |             msg = f"name collision '{plugin.id}'" | ||
|  |             plugin.log.critical(msg) | ||
|  |             raise KeyError(msg) | ||
|  | 
 | ||
|  |         self.plugin_list.add(plugin) | ||
|  |         plugin.log.debug("plugin has been loaded") | ||
|  | 
 | ||
|  |     def register_by_fqn(self, fqn: str): | ||
|  |         """Register a :py:obj:`Plugin` via its fully qualified class name (FQN).
 | ||
|  |         The FQNs of external plugins could be read from a configuration, for | ||
|  |         example, and registered using this method | ||
|  |         """
 | ||
|  | 
 | ||
|  |         mod_name, _, obj_name = fqn.rpartition('.') | ||
|  |         if not mod_name: | ||
|  |             # for backward compatibility | ||
|  |             code_obj = importlib.import_module(fqn) | ||
|  |         else: | ||
|  |             mod = importlib.import_module(mod_name) | ||
|  |             code_obj = getattr(mod, obj_name, None) | ||
|  | 
 | ||
|  |         if code_obj is None: | ||
|  |             msg = f"plugin {fqn} is not implemented" | ||
|  |             log.critical(msg) | ||
|  |             raise ValueError(msg) | ||
|  | 
 | ||
|  |         if isinstance(code_obj, types.ModuleType): | ||
|  |             # for backward compatibility | ||
|  |             warnings.warn( | ||
|  |                 f"plugin {fqn} is implemented in a legacy module / migrate to searx.plugins.Plugin", DeprecationWarning | ||
|  |             ) | ||
|  |             self.register(ModulePlugin(code_obj)) | ||
|  |             return | ||
|  | 
 | ||
|  |         self.register(code_obj()) | ||
|  | 
 | ||
|  |     def init(self, app: flask.Flask) -> None: | ||
|  |         """Calls the method :py:obj:`Plugin.init` of each plugin in this
 | ||
|  |         storage.  Depending on its return value, the plugin is removed from | ||
|  |         *this* storage or not."""
 | ||
|  | 
 | ||
|  |         for plg in self.plugin_list.copy(): | ||
|  |             if not plg.init(app): | ||
|  |                 self.plugin_list.remove(plg) | ||
|  | 
 | ||
|  |     def pre_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> bool: | ||
|  | 
 | ||
|  |         ret = True | ||
|  |         for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: | ||
|  |             try: | ||
|  |                 ret = bool(plugin.pre_search(request=request, search=search)) | ||
|  |             except Exception:  # pylint: disable=broad-except | ||
|  |                 plugin.log.exception("Exception while calling pre_search") | ||
|  |                 continue | ||
|  |             if not ret: | ||
|  |                 # skip this search on the first False from a plugin | ||
|  |                 break | ||
|  |         return ret | ||
|  | 
 | ||
|  |     def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool: | ||
|  | 
 | ||
|  |         ret = True | ||
|  |         for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: | ||
|  |             try: | ||
|  |                 ret = bool(plugin.on_result(request=request, search=search, result=result)) | ||
|  |             except Exception:  # pylint: disable=broad-except | ||
|  |                 plugin.log.exception("Exception while calling on_result") | ||
|  |                 continue | ||
|  |             if not ret: | ||
|  |                 # ignore this result item on the first False from a plugin | ||
|  |                 break | ||
|  | 
 | ||
|  |         return ret | ||
|  | 
 | ||
|  |     def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None: | ||
|  |         """Extend :py:obj:`search.result_container
 | ||
|  |         <searx.results.ResultContainer`> with result items from plugins listed | ||
|  |         in :py:obj:`search.user_plugins <SearchWithPlugins.user_plugins>`. | ||
|  |         """
 | ||
|  | 
 | ||
|  |         keyword = None | ||
|  |         for keyword in search.search_query.query.split(): | ||
|  |             if keyword: | ||
|  |                 break | ||
|  | 
 | ||
|  |         for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]: | ||
|  | 
 | ||
|  |             if plugin.keywords: | ||
|  |                 # plugin with keywords: skip plugin if no keyword match | ||
|  |                 if keyword and keyword not in plugin.keywords: | ||
|  |                     continue | ||
|  |             try: | ||
|  |                 results = plugin.post_search(request=request, search=search) or [] | ||
|  |             except Exception:  # pylint: disable=broad-except | ||
|  |                 plugin.log.exception("Exception while calling post_search") | ||
|  |                 continue | ||
|  | 
 | ||
|  |             # In case of *plugins* prefix ``plugin:`` is set, see searx.result_types.Result | ||
|  |             search.result_container.extend(f"plugin: {plugin.id}", results) |