|  | # SPDX-License-Identifier: AGPL-3.0-or-later
 | 
						
						
						
							|  | """Calculate mathematical expressions using :py:obj`ast.parse` (mode="eval").
 | 
						
						
						
							|  | """
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | from __future__ import annotations
 | 
						
						
						
							|  | from typing import Callable
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | import ast
 | 
						
						
						
							|  | import re
 | 
						
						
						
							|  | import operator
 | 
						
						
						
							|  | import multiprocessing
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | import babel
 | 
						
						
						
							|  | import babel.numbers
 | 
						
						
						
							|  | from flask_babel import gettext
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | from searx.result_types import EngineResults
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | name = "Basic Calculator"
 | 
						
						
						
							|  | description = gettext("Calculate mathematical expressions via the search bar")
 | 
						
						
						
							|  | default_on = True
 | 
						
						
						
							|  | preference_section = 'general'
 | 
						
						
						
							|  | plugin_id = 'calculator'
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | operators: dict[type, Callable] = {
 | 
						
						
						
							|  |     ast.Add: operator.add,
 | 
						
						
						
							|  |     ast.Sub: operator.sub,
 | 
						
						
						
							|  |     ast.Mult: operator.mul,
 | 
						
						
						
							|  |     ast.Div: operator.truediv,
 | 
						
						
						
							|  |     ast.Pow: operator.pow,
 | 
						
						
						
							|  |     ast.BitXor: operator.xor,
 | 
						
						
						
							|  |     ast.USub: operator.neg,
 | 
						
						
						
							|  | }
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | # with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
 | 
						
						
						
							|  | # the old behavior "fork") but it will not solve the core problem of fork, nor
 | 
						
						
						
							|  | # will it remove the deprecation warnings in py3.12 & py3.13.  Issue is
 | 
						
						
						
							|  | # ddiscussed here: https://github.com/searxng/searxng/issues/4159
 | 
						
						
						
							|  | mp_fork = multiprocessing.get_context("fork")
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def _eval_expr(expr):
 | 
						
						
						
							|  |     """
 | 
						
						
						
							|  |     >>> _eval_expr('2^6')
 | 
						
						
						
							|  |     64
 | 
						
						
						
							|  |     >>> _eval_expr('2**6')
 | 
						
						
						
							|  |     64
 | 
						
						
						
							|  |     >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
 | 
						
						
						
							|  |     -5.0
 | 
						
						
						
							|  |     """
 | 
						
						
						
							|  |     try:
 | 
						
						
						
							|  |         return _eval(ast.parse(expr, mode='eval').body)
 | 
						
						
						
							|  |     except ZeroDivisionError:
 | 
						
						
						
							|  |         # This is undefined
 | 
						
						
						
							|  |         return ""
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def _eval(node):
 | 
						
						
						
							|  |     if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
 | 
						
						
						
							|  |         return node.value
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if isinstance(node, ast.BinOp):
 | 
						
						
						
							|  |         return operators[type(node.op)](_eval(node.left), _eval(node.right))
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     if isinstance(node, ast.UnaryOp):
 | 
						
						
						
							|  |         return operators[type(node.op)](_eval(node.operand))
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     raise TypeError(node)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def handler(q: multiprocessing.Queue, func, args, **kwargs):  # pylint:disable=invalid-name
 | 
						
						
						
							|  |     try:
 | 
						
						
						
							|  |         q.put(func(*args, **kwargs))
 | 
						
						
						
							|  |     except:
 | 
						
						
						
							|  |         q.put(None)
 | 
						
						
						
							|  |         raise
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def timeout_func(timeout, func, *args, **kwargs):
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     que = mp_fork.Queue()
 | 
						
						
						
							|  |     p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
 | 
						
						
						
							|  |     p.start()
 | 
						
						
						
							|  |     p.join(timeout=timeout)
 | 
						
						
						
							|  |     ret_val = None
 | 
						
						
						
							|  |     # pylint: disable=used-before-assignment,undefined-variable
 | 
						
						
						
							|  |     if not p.is_alive():
 | 
						
						
						
							|  |         ret_val = que.get()
 | 
						
						
						
							|  |     else:
 | 
						
						
						
							|  |         logger.debug("terminate function after timeout is exceeded")  # type: ignore
 | 
						
						
						
							|  |         p.terminate()
 | 
						
						
						
							|  |     p.join()
 | 
						
						
						
							|  |     p.close()
 | 
						
						
						
							|  |     return ret_val
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  | def post_search(request, search) -> EngineResults:
 | 
						
						
						
							|  |     results = EngineResults()
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # only show the result of the expression on the first page
 | 
						
						
						
							|  |     if search.search_query.pageno > 1:
 | 
						
						
						
							|  |         return results
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     query = search.search_query.query
 | 
						
						
						
							|  |     # in order to avoid DoS attacks with long expressions, ignore long expressions
 | 
						
						
						
							|  |     if len(query) > 100:
 | 
						
						
						
							|  |         return results
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # replace commonly used math operators with their proper Python operator
 | 
						
						
						
							|  |     query = query.replace("x", "*").replace(":", "/")
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # use UI language
 | 
						
						
						
							|  |     ui_locale = babel.Locale.parse(request.preferences.get_value('locale'), sep='-')
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # parse the number system in a localized way
 | 
						
						
						
							|  |     def _decimal(match: re.Match) -> str:
 | 
						
						
						
							|  |         val = match.string[match.start() : match.end()]
 | 
						
						
						
							|  |         val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
 | 
						
						
						
							|  |         return str(val)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     decimal = ui_locale.number_symbols["latn"]["decimal"]
 | 
						
						
						
							|  |     group = ui_locale.number_symbols["latn"]["group"]
 | 
						
						
						
							|  |     query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # only numbers and math operators are accepted
 | 
						
						
						
							|  |     if any(str.isalpha(c) for c in query):
 | 
						
						
						
							|  |         return results
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # in python, powers are calculated via **
 | 
						
						
						
							|  |     query_py_formatted = query.replace("^", "**")
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     # Prevent the runtime from being longer than 50 ms
 | 
						
						
						
							|  |     res = timeout_func(0.05, _eval_expr, query_py_formatted)
 | 
						
						
						
							|  |     if res is None or res == "":
 | 
						
						
						
							|  |         return results
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     res = babel.numbers.format_decimal(res, locale=ui_locale)
 | 
						
						
						
							|  |     results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
 | 
						
						
						
							|  | 
 | 
						
						
						
							|  |     return results
 |