|  | # SPDX-License-Identifier: AGPL-3.0-or-later
 | 
						
						
						
							|  | # pylint: disable=missing-module-docstring, missing-class-docstring
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | import sys
 | 
						
						
						
							|  | from hashlib import sha256
 | 
						
						
						
							|  | from importlib import import_module
 | 
						
						
						
							|  | from os import listdir, makedirs, remove, stat, utime
 | 
						
						
						
							|  | from os.path import abspath, basename, dirname, exists, join
 | 
						
						
						
							|  | from shutil import copyfile
 | 
						
						
						
							|  | from pkgutil import iter_modules
 | 
						
						
						
							|  | from logging import getLogger
 | 
						
						
						
							|  | from typing import List, Tuple
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | from searx import logger, settings
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | class Plugin:  # pylint: disable=too-few-public-methods
 | 
						
						
						
							|  |     """This class is currently never initialized and only used for type hinting."""
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     id: str
 | 
						
						
						
							|  |     name: str
 | 
						
						
						
							|  |     description: str
 | 
						
						
						
							|  |     default_on: bool
 | 
						
						
						
							|  |     js_dependencies: Tuple[str]
 | 
						
						
						
							|  |     css_dependencies: Tuple[str]
 | 
						
						
						
							|  |     preference_section: str
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | logger = logger.getChild("plugins")
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | required_attrs = (
 | 
						
						
						
							|  |     # fmt: off
 | 
						
						
						
							|  |     ("name", str),
 | 
						
						
						
							|  |     ("description", str),
 | 
						
						
						
							|  |     ("default_on", bool)
 | 
						
						
						
							|  |     # fmt: on
 | 
						
						
						
							|  | )
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | optional_attrs = (
 | 
						
						
						
							|  |     # fmt: off
 | 
						
						
						
							|  |     ("js_dependencies", tuple),
 | 
						
						
						
							|  |     ("css_dependencies", tuple),
 | 
						
						
						
							|  |     ("preference_section", str),
 | 
						
						
						
							|  |     # fmt: on
 | 
						
						
						
							|  | )
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def sha_sum(filename):
 | 
						
						
						
							|  |     with open(filename, "rb") as f:
 | 
						
						
						
							|  |         file_content_bytes = f.read()
 | 
						
						
						
							|  |         return sha256(file_content_bytes).hexdigest()
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def sync_resource(base_path, resource_path, name, target_dir, plugin_dir):
 | 
						
						
						
							|  |     dep_path = join(base_path, resource_path)
 | 
						
						
						
							|  |     file_name = basename(dep_path)
 | 
						
						
						
							|  |     resource_path = join(target_dir, file_name)
 | 
						
						
						
							|  |     if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path):
 | 
						
						
						
							|  |         try:
 | 
						
						
						
							|  |             copyfile(dep_path, resource_path)
 | 
						
						
						
							|  |             # copy atime_ns and mtime_ns, so the weak ETags (generated by
 | 
						
						
						
							|  |             # the HTTP server) do not change
 | 
						
						
						
							|  |             dep_stat = stat(dep_path)
 | 
						
						
						
							|  |             utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns))
 | 
						
						
						
							|  |         except IOError:
 | 
						
						
						
							|  |             logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name))
 | 
						
						
						
							|  |             sys.exit(3)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # returning with the web path of the resource
 | 
						
						
						
							|  |     return join("plugins/external_plugins", plugin_dir, file_name)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def prepare_package_resources(plugin, plugin_module_name):
 | 
						
						
						
							|  |     plugin_base_path = dirname(abspath(plugin.__file__))
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     plugin_dir = plugin_module_name
 | 
						
						
						
							|  |     target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir)
 | 
						
						
						
							|  |     try:
 | 
						
						
						
							|  |         makedirs(target_dir, exist_ok=True)
 | 
						
						
						
							|  |     except IOError:
 | 
						
						
						
							|  |         logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name))
 | 
						
						
						
							|  |         sys.exit(3)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     resources = []
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if hasattr(plugin, "js_dependencies"):
 | 
						
						
						
							|  |         resources.extend(map(basename, plugin.js_dependencies))
 | 
						
						
						
							|  |         plugin.js_dependencies = [
 | 
						
						
						
							|  |             sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
 | 
						
						
						
							|  |             for x in plugin.js_dependencies
 | 
						
						
						
							|  |         ]
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if hasattr(plugin, "css_dependencies"):
 | 
						
						
						
							|  |         resources.extend(map(basename, plugin.css_dependencies))
 | 
						
						
						
							|  |         plugin.css_dependencies = [
 | 
						
						
						
							|  |             sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir)
 | 
						
						
						
							|  |             for x in plugin.css_dependencies
 | 
						
						
						
							|  |         ]
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     for f in listdir(target_dir):
 | 
						
						
						
							|  |         if basename(f) not in resources:
 | 
						
						
						
							|  |             resource_path = join(target_dir, basename(f))
 | 
						
						
						
							|  |             try:
 | 
						
						
						
							|  |                 remove(resource_path)
 | 
						
						
						
							|  |             except IOError:
 | 
						
						
						
							|  |                 logger.critical(
 | 
						
						
						
							|  |                     "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name)
 | 
						
						
						
							|  |                 )
 | 
						
						
						
							|  |                 sys.exit(3)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def load_plugin(plugin_module_name, external):
 | 
						
						
						
							|  |     # pylint: disable=too-many-branches
 | 
						
						
						
							|  |     try:
 | 
						
						
						
							|  |         plugin = import_module(plugin_module_name)
 | 
						
						
						
							|  |     except (
 | 
						
						
						
							|  |         SyntaxError,
 | 
						
						
						
							|  |         KeyboardInterrupt,
 | 
						
						
						
							|  |         SystemExit,
 | 
						
						
						
							|  |         SystemError,
 | 
						
						
						
							|  |         ImportError,
 | 
						
						
						
							|  |         RuntimeError,
 | 
						
						
						
							|  |     ) as e:
 | 
						
						
						
							|  |         logger.critical("%s: fatal exception", plugin_module_name, exc_info=e)
 | 
						
						
						
							|  |         sys.exit(3)
 | 
						
						
						
							|  |     except BaseException:
 | 
						
						
						
							|  |         logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name)
 | 
						
						
						
							|  |         return None
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # difference with searx: use module name instead of the user name
 | 
						
						
						
							|  |     plugin.id = plugin_module_name
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     #
 | 
						
						
						
							|  |     plugin.logger = getLogger(plugin_module_name)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     for plugin_attr, plugin_attr_type in required_attrs:
 | 
						
						
						
							|  |         if not hasattr(plugin, plugin_attr):
 | 
						
						
						
							|  |             logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr)
 | 
						
						
						
							|  |             sys.exit(3)
 | 
						
						
						
							|  |         attr = getattr(plugin, plugin_attr)
 | 
						
						
						
							|  |         if not isinstance(attr, plugin_attr_type):
 | 
						
						
						
							|  |             type_attr = str(type(attr))
 | 
						
						
						
							|  |             logger.critical(
 | 
						
						
						
							|  |                 '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format(
 | 
						
						
						
							|  |                     plugin, plugin_attr, type_attr, plugin_attr_type
 | 
						
						
						
							|  |                 )
 | 
						
						
						
							|  |             )
 | 
						
						
						
							|  |             sys.exit(3)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     for plugin_attr, plugin_attr_type in optional_attrs:
 | 
						
						
						
							|  |         if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type):
 | 
						
						
						
							|  |             setattr(plugin, plugin_attr, plugin_attr_type())
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if not hasattr(plugin, "preference_section"):
 | 
						
						
						
							|  |         plugin.preference_section = "general"
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # query plugin
 | 
						
						
						
							|  |     if plugin.preference_section == "query":
 | 
						
						
						
							|  |         for plugin_attr in ("query_keywords", "query_examples"):
 | 
						
						
						
							|  |             if not hasattr(plugin, plugin_attr):
 | 
						
						
						
							|  |                 logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin))
 | 
						
						
						
							|  |                 sys.exit(3)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if settings.get("enabled_plugins"):
 | 
						
						
						
							|  |         # searx compatibility: plugin.name in settings['enabled_plugins']
 | 
						
						
						
							|  |         plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"]
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # copy resources if this is an external plugin
 | 
						
						
						
							|  |     if external:
 | 
						
						
						
							|  |         prepare_package_resources(plugin, plugin_module_name)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     logger.debug("%s: loaded", plugin_module_name)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     return plugin
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def load_and_initialize_plugin(plugin_module_name, external, init_args):
 | 
						
						
						
							|  |     plugin = load_plugin(plugin_module_name, external)
 | 
						
						
						
							|  |     if plugin and hasattr(plugin, 'init'):
 | 
						
						
						
							|  |         try:
 | 
						
						
						
							|  |             return plugin if plugin.init(*init_args) else None
 | 
						
						
						
							|  |         except Exception:  # pylint: disable=broad-except
 | 
						
						
						
							|  |             plugin.logger.exception("Exception while calling init, the plugin is disabled")
 | 
						
						
						
							|  |             return None
 | 
						
						
						
							|  |     return plugin
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | class PluginStore:
 | 
						
						
						
							|  |     def __init__(self):
 | 
						
						
						
							|  |         self.plugins: List[Plugin] = []
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     def __iter__(self):
 | 
						
						
						
							|  |         yield from self.plugins
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     def register(self, plugin):
 | 
						
						
						
							|  |         self.plugins.append(plugin)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     def call(self, ordered_plugin_list, plugin_type, *args, **kwargs):
 | 
						
						
						
							|  |         ret = True
 | 
						
						
						
							|  |         for plugin in ordered_plugin_list:
 | 
						
						
						
							|  |             if hasattr(plugin, plugin_type):
 | 
						
						
						
							|  |                 try:
 | 
						
						
						
							|  |                     ret = getattr(plugin, plugin_type)(*args, **kwargs)
 | 
						
						
						
							|  |                     if not ret:
 | 
						
						
						
							|  |                         break
 | 
						
						
						
							|  |                 except Exception:  # pylint: disable=broad-except
 | 
						
						
						
							|  |                     plugin.logger.exception("Exception while calling %s", plugin_type)
 | 
						
						
						
							|  |         return ret
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | plugins = PluginStore()
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def plugin_module_names():
 | 
						
						
						
							|  |     yield_plugins = set()
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # embedded plugins
 | 
						
						
						
							|  |     for module in iter_modules(path=[dirname(__file__)]):
 | 
						
						
						
							|  |         yield (__name__ + "." + module.name, False)
 | 
						
						
						
							|  |         yield_plugins.add(module.name)
 | 
						
						
						
							|  |     # external plugins
 | 
						
						
						
							|  |     for module_name in settings['plugins']:
 | 
						
						
						
							|  |         if module_name not in yield_plugins:
 | 
						
						
						
							|  |             yield (module_name, True)
 | 
						
						
						
							|  |             yield_plugins.add(module_name)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def initialize(app):
 | 
						
						
						
							|  |     for module_name, external in plugin_module_names():
 | 
						
						
						
							|  |         plugin = load_and_initialize_plugin(module_name, external, (app, settings))
 | 
						
						
						
							|  |         if plugin:
 | 
						
						
						
							|  |             plugins.register(plugin)
 |