Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/module.py: 11%
256 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
1import copy
2import enum
3import functools
4import inspect
5import types
6import typing as t
7import logging
8from viur.core import db, errors, current, utils
9from viur.core.config import conf
12class Method:
13 """
14 Abstraction wrapper for any public available method.
15 """
17 @classmethod
18 def ensure(cls, func: t.Callable | "Method") -> "Method":
19 """
20 Ensures the provided `func` parameter is either a Method already, or turns it
21 into a Method. This is done to avoid stacking Method objects, which may create
22 unwanted results.
23 """
24 if isinstance(func, Method):
25 return func
27 return cls(func)
29 def __init__(self, func: t.Callable):
30 # Content
31 self._func = func
32 self.__name__ = func.__name__
33 self._instance = None
35 # Attributes
36 self.exposed = None # None = unexposed, True = exposed, False = internal exposed
37 self.ssl = False
38 self.methods = ("GET", "POST", "HEAD", "OPTIONS")
39 self.seo_language_map = None
40 self.cors_allow_headers = None
42 # Inspection
43 self.signature = inspect.signature(self._func)
45 # Guards
46 self.skey = None
47 self.access = None
49 def __get__(self, obj, objtype=None):
50 """
51 This binds the Method to an object.
53 To do it, the Method instance is copied and equipped with the individual _instance member.
54 """
55 if obj:
56 bound = copy.copy(self)
57 bound._instance = obj
58 return bound
60 return self
62 def __call__(self, *args, **kwargs):
63 """
64 Calls the method with given args and kwargs.
66 Prepares and filters argument values from args and kwargs regarding self._func's signature and type annotations,
67 if present.
69 Method objects normally wrap functions which are externally exposed. Therefore, any arguments passed from the
70 client are str-values, and are automatically parsed when equipped with type-annotations.
72 This preparation of arguments therefore inspects the target function as follows
73 - incoming values are parsed to their particular type, if type annotations are present
74 - parameters in *args and **kwargs are being checked against their signature; only relevant values are being
75 passed, anything else is thrown away.
76 - execution of guard configurations from @skey and @access, if present
77 """
79 if trace := conf.debug.trace:
80 logging.debug(f"calling {self._func=} with raw {args=}, {kwargs=}")
82 def parse_value_by_annotation(annotation: type, name: str, value: str | list | tuple) -> t.Any:
83 """
84 Tries to parse a value according to a given type.
85 May be called recursively to handle unions, lists and tuples as well.
86 """
87 # logging.debug(f"{annotation=} | {name=} | {value=}")
89 # simple types
90 if annotation is str:
91 return str(value)
92 elif annotation is int:
93 return int(value)
94 elif annotation is float:
95 return float(value)
96 elif annotation is bool:
97 return utils.parse.bool(value)
98 elif annotation is types.NoneType:
99 return None
101 # complex types
102 origin_type = t.get_origin(annotation)
104 if origin_type is list and len(annotation.__args__) == 1:
105 if not isinstance(value, list):
106 value = [value]
108 return [parse_value_by_annotation(annotation.__args__[0], name, item) for item in value]
110 elif origin_type is tuple and len(annotation.__args__) == 1:
111 if not isinstance(value, tuple):
112 value = (value, )
114 return tuple(parse_value_by_annotation(annotation.__args__[0], name, item) for item in value)
116 elif origin_type is t.Literal:
117 if not any(value == str(literal) for literal in annotation.__args__):
118 raise errors.NotAcceptable(f"Expecting any of {annotation.__args__} for {name}")
120 return value
122 elif origin_type is t.Union or isinstance(annotation, types.UnionType):
123 for i, sub_annotation in enumerate(annotation.__args__):
124 try:
125 return parse_value_by_annotation(sub_annotation, name, value)
126 except ValueError:
127 if i == len(annotation.__args__) - 1:
128 raise
130 elif annotation is db.Key:
131 if isinstance(value, db.Key):
132 return value
134 return parse_value_by_annotation(int | str, name, value)
136 elif isinstance(annotation, enum.EnumMeta):
137 try:
138 return annotation(value)
139 except ValueError as exc:
140 for value_, member in annotation._value2member_map_.items():
141 if str(value) == str(value_): # Do a string comparison, it could be a IntEnum
142 return member
143 raise errors.NotAcceptable(f"{' '.join(exc.args)} for {name}") from exc
145 raise errors.NotAcceptable(f"Unhandled type {annotation=} for {name}={value!r}")
147 # examine parameters
148 args_iter = iter(args)
150 parsed_args = []
151 parsed_kwargs = {}
152 varargs = []
153 varkwargs = False
155 for i, (param_name, param) in enumerate(self.signature.parameters.items()):
156 if self._instance and i == 0 and param_name == "self":
157 continue
159 param_type = param.annotation if param.annotation is not param.empty else None
160 param_required = param.default is param.empty
162 # take positional parameters first
163 if param.kind in (
164 inspect.Parameter.POSITIONAL_OR_KEYWORD,
165 inspect.Parameter.POSITIONAL_ONLY
166 ):
167 try:
168 value = next(args_iter)
170 if param_type:
171 value = parse_value_by_annotation(param_type, param_name, value)
173 parsed_args.append(value)
174 continue
175 except StopIteration:
176 pass
178 # otherwise take kwargs or variadics
179 if (
180 param.kind in (
181 inspect.Parameter.POSITIONAL_OR_KEYWORD,
182 inspect.Parameter.KEYWORD_ONLY
183 )
184 and param_name in kwargs
185 ):
186 value = kwargs.pop(param_name)
188 if param_type:
189 value = parse_value_by_annotation(param_type, param_name, value)
191 parsed_kwargs[param_name] = value
193 elif param.kind == inspect.Parameter.VAR_POSITIONAL:
194 varargs = list(args_iter)
195 elif param.kind == inspect.Parameter.VAR_KEYWORD:
196 varkwargs = True
197 elif param_required:
198 if self.skey and param_name == self.skey["forward_payload"]:
199 continue
201 raise errors.NotAcceptable(f"Missing required parameter {param_name!r}")
203 # Here's a short clarification on the variables used here:
204 #
205 # - parsed_args = tuple of (the type-parsed) arguments that have been assigned based on the signature
206 # - parsed_kwargs = dict of (the type-parsed) keyword arguments that have been assigned based on the signature
207 # - args = either parsed_args, or parsed_args + remaining args if the function accepts *args
208 # - kwargs = either parsed_kwars, or parsed_kwargs | remaining kwargs if the function accepts **kwargs
209 # - varargs = indicator that the args also contain variable args (*args)
210 # - varkwards = indicator that variable kwargs (**kwargs) are also contained in the kwargs
211 #
213 # Extend args to any varargs, and redefine args
214 args = tuple(parsed_args + varargs)
216 # always take "skey"-parameter name, when configured, as parsed_kwargs
217 if self.skey and self.skey["name"] in kwargs:
218 parsed_kwargs[self.skey["name"]] = kwargs.pop(self.skey["name"])
220 # When varkwargs are accepted, merge parsed_kwargs and kwargs, otherwise just use parsed_kwargs
221 if varkwargs := varkwargs and bool(kwargs):
222 kwargs = parsed_kwargs | kwargs
223 else:
224 kwargs = parsed_kwargs
226 # Trace message for final call configuration
227 if trace := conf.debug.trace:
228 logging.debug(f"calling {self._func=} with cleaned {args=}, {kwargs=}")
230 # evaluate skey guard setting?
231 if self.skey and not current.request.get().skey_checked: # skey guardiance is only required once per request
232 if trace:
233 logging.debug(f"@skey {self.skey=}")
235 security_key = kwargs.pop(self.skey["name"], "")
237 # validation is necessary?
238 if allow_empty := self.skey["allow_empty"]:
239 # allow_empty can be callable, to detect programmatically
240 if callable(allow_empty):
241 required = not allow_empty(args, kwargs)
242 # or allow_empty can be a sequence of allowed keys
243 elif isinstance(allow_empty, (list, tuple)):
244 required = any(k for k in kwargs.keys() if k not in allow_empty)
245 # otherwise, varargs or varkwargs may not be empty.
246 else:
247 required = varargs or varkwargs or security_key
248 if trace:
249 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}")
250 else:
251 required = True
253 if required:
254 if trace:
255 logging.debug(f"@skey wanted, validating {security_key!r}")
257 from viur.core import securitykey
258 payload = securitykey.validate(security_key, **self.skey["extra_kwargs"])
259 current.request.get().skey_checked = True
261 if not payload or (self.skey["validate"] and not self.skey["validate"](payload)):
262 raise errors.PreconditionFailed(
263 self.skey["message"] or f"Missing or invalid parameter {self.skey['name']!r}"
264 )
266 if self.skey["forward_payload"]:
267 kwargs |= {self.skey["forward_payload"]: payload}
269 # evaluate access guard setting?
270 if self.access:
271 user = current.user.get()
273 if trace := conf.debug.trace:
274 logging.debug(f"@access {user=} {self.access=}")
276 if not user:
277 if offer_login := self.access["offer_login"]:
278 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login")
280 raise errors.Unauthorized(self.access["message"]) if self.access["message"] else errors.Unauthorized()
282 ok = "root" in user["access"]
284 if not ok and self.access["access"]:
285 for acc in self.access["access"]:
286 if trace:
287 logging.debug(f"@access checking {acc=}")
289 # Callable directly tests access
290 if callable(acc):
291 if acc():
292 ok = True
293 break
295 continue
297 # Otherwise, check for access rights
298 if isinstance(acc, str):
299 acc = (acc, )
301 assert isinstance(acc, (tuple, list, set))
303 if all(a in user["access"] for a in acc):
304 ok = True
305 break
307 if trace:
308 logging.debug(f"@access {ok=}")
310 if not ok:
311 raise errors.Forbidden(self.access["message"]) if self.access["message"] else errors.Forbidden()
313 # call with instance when provided
314 if self._instance:
315 return self._func(self._instance, *args, **kwargs)
317 return self._func(*args, **kwargs)
319 def describe(self) -> dict:
320 """
321 Describes the Method with a
322 """
323 return_doc = t.get_type_hints(self._func).get("return")
325 ret = {
326 "args": {
327 param.name: {
328 "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None,
329 "default": str(param.default) if param.default is not inspect.Parameter.empty else None,
330 }
331 for param in self.signature.parameters.values()
332 },
333 "returns": str(return_doc).strip() if return_doc else None,
334 "accepts": self.methods,
335 "docs": self._func.__doc__.strip() if self._func.__doc__ else None,
336 "aliases": tuple(self.seo_language_map.keys()) if self.seo_language_map else None,
337 }
339 if self.skey:
340 ret["skey"] = self.skey["name"]
342 if self.access:
343 ret["access"] = [str(access) for access in self.access["access"]] # must be a list to be JSON-serializable
345 return ret
347 def register(self, target: dict, name: str, language: str | None = None):
348 """
349 Registers the Method under `name` and eventually some customized SEO-name for the provided language
350 """
351 if self.exposed is None:
352 return
354 target[name] = self
356 # reassign for SEO mapping as well
357 if self.seo_language_map:
358 for lang in tuple(self.seo_language_map.keys()) if not language else (language, ):
359 if translated_name := self.seo_language_map.get(lang):
360 target[translated_name] = self
363class Module:
364 """
365 This is the root module prototype that serves a minimal module in the ViUR system without any other bindings.
366 """
368 handler: str | t.Callable = None
369 """
370 This is the module's handler, respectively its type.
371 Use the @property-decorator in specific Modules to construct the handler's value dynamically.
372 A module without a handler setting cannot be described, so cannot be handled by admin-tools.
373 """
375 accessRights: tuple[str] = None
376 """
377 If set, a tuple of access rights (like add, edit, delete) that this module supports.
379 These will be prefixed on instance startup with the actual module name (becoming file-add, file-edit etc)
380 and registered in ``conf.user.access_rights`` so these will be available on the access bone in user/add
381 or user/edit.
382 """
384 roles: dict = {}
385 r"""
386 Allows to specify role settings for a module.
388 Defaults to no role definition, which ignores the module entirely in the role-system.
389 In this case, access rights can still be set individually on the user's access bone.
391 A "*" wildcard can either be used as key or as value to allow for "all roles", or "all rights".
393 .. code-block:: python
395 # Example
396 roles = {
397 "*": "view", # Any role may only "view"
398 "editor": ("add", "edit"), # Role "editor" may "add" or "edit", but not "delete"
399 "admin": "*", # Role "admin" can do everything
400 }
402 """
404 seo_language_map: dict[str: str] = {}
405 r"""
406 The module name is the first part of a URL.
407 SEO-identifiers have to be set as class-attribute ``seoLanguageMap`` of type ``dict[str, str]`` in the module.
408 It maps a *language* to the according *identifier*.
410 .. code-block:: python
411 :name: module seo-map
412 :caption: modules/myorders.py
413 :emphasize-lines: 4-7
415 from viur.core.prototypes import List
417 class MyOrders(List):
418 seo_language_map = {
419 "de": "bestellungen",
420 "en": "orders",
421 }
423 By default the module would be available under */myorders*, the lowercase module name.
424 With the defined :attr:`seoLanguageMap`, it will become available as */de/bestellungen* and */en/orders*.
426 Great, this part is now user and robot friendly :)
427 """
429 adminInfo: dict[str, t.Any] | t.Callable = None
430 """
431 This is a ``dict`` holding the information necessary for the Vi/Admin to handle this module.
433 name: ``str``
434 Human-readable module name that will be shown in the admin tool.
436 handler: ``str`` (``list``, ``tree`` or ``singleton``):
437 Allows to override the handler provided by the module. Set this only when *really* necessary,
438 otherwise it can be left out and is automatically injected by the Module's prototype.
440 icon: ``str``
441 (Optional) Either the Shoelace icon library name or a path relative to the project's deploy folder
442 (e.g. /static/icons/viur.svg) for the icon used in the admin tool for this module.
444 columns: ``List[str]``
445 (Optional) List of columns (bone names) that are displayed by default.
446 Used only by the List handler.
448 filter: ``Dict[str, str]``
449 (Optional) Dictionary of additional parameters that will be send along when
450 fetching entities from the server. Can be used to filter the entities being displayed on the
451 client-side.
453 display: ``str`` ("default", "hidden" or "group")
454 (Optional) "hidden" will hide the module in the admin tool's main bar.
455 (itwill not be accessible directly, however it's registered with the frontend so it can be used in a
456 relational bone). "group" will show this module in the main bar, but it will not be clickable.
457 Clicking it will just try to expand it (assuming there are additional views defined).
459 preview: ``Union[str, Dict[str, str]]``
460 (Optional) A url that will be opened in a new tab and is expected to display
461 the entity selected in the table. Can be “/{{module}}/view/{{key}}", with {{module}} and {{key}} getting
462 replaced as needed. If more than one preview-url is needed, supply a dictionary where the key is
463 the URL and the value the description shown to the user.
465 views: ``List[Dict[str, t.Any]]``
466 (Optional) List of nested adminInfo like dictionaries. Used to define
467 additional views on the module. Useful f.e. for an order module, where you want separate list of
468 "payed orders", "unpayed orders", "orders waiting for shipment", etc. If such views are defined,
469 the top-level entry in the menu bar will expand if clicked, revealing these additional filters.
471 actions: ``List[str]``
472 (Optional) List of actions supported by this modules. Actions can be defined by
473 the frontend (like "add", "edit", "delete" or "preview"); it can be an action defined by a plugin
474 loaded by the frontend; or it can be a so called "server side action" (see "customActions" below)
476 customActions: ``Dict[str, dict]``
477 (Optional) A mapping of names of server-defined actions that can be used
478 in the ``actions`` list above to their definition dictionary. See .... for more details.
480 disabledActions: ``List[str, dict]``
481 (Optional) A list of disabled actions. The frontend will inject default actions like add or edit
482 even if they're not listed in actions. Listing them here will prevent that. It's up to the frontend
483 to decide if that action won't be visible at all or it's button just being disabled.
485 sortIndex: ``int``
486 (Optional) Defines the order in which the modules will appear in the main bar in
487 ascrending order.
489 indexedBones: ``List[str]``
490 (Optional) List of bones, for which an (composite?) index exists in this
491 view. This allows the fronted to signal the user that a given list can be sorted or filtered by this
492 bone. If no additional filters are enforced by the
493 :meth:`listFilter<viur.core.prototypes.list.listFilter>` and ``filter`` is not set, this should be
494 all bones which are marked as indexed.
496 changeInvalidates: ``List[str]``
497 (Optional) A list of module-names which depend on the entities handled
498 from this module. This allows the frontend to invalidate any caches in these depended modules if the
499 data in this module changes. Example: This module may be a list-module handling the file_rootNode
500 entities for the file module, so a edit/add/deletion action on this module should be reflected in the
501 rootNode-selector in the file-module itself. In this case, this property should be set to ``["file"]``.
503 moduleGroup: ``str``
504 (Optional) If set, should be a key of a moduleGroup defined in .... .
506 editViews: ``Dict[str, t.Any]``
507 (Optional) If set, will embed another list-widget in the edit forms for
508 a given entity. See .... for more details.
510 If this is a function, it must take no parameters and return the dictionary as shown above. This
511 can be used to customize the appearance of the Vi/Admin to individual users.
512 """
514 def __init__(self, moduleName: str, modulePath: str, *args, **kwargs):
515 self.render = None # will be set to the appropriate render instance at runtime
516 self._cached_description = None # caching used by describe()
517 self.moduleName = moduleName # Name of this module (usually it's class name, e.g. "file")
518 self.modulePath = modulePath # Path to this module in URL-routing (e.g. "json/file")
520 if self.handler and self.accessRights:
521 for right in self.accessRights:
522 right = f"{self.moduleName}-{right}"
524 # fixme: Turn conf.user.access_rights into a set.
525 if right not in conf.user.access_rights:
526 conf.user.access_rights.append(right)
528 # Collect methods and (sub)modules
529 self._methods = {}
530 self._modules = {}
531 self._update_methods()
533 def _update_methods(self):
534 """
535 Internal function to update methods and submodules.
536 This function should only be called when member attributes are dynamically modified by the module.
537 """
538 self._methods.clear()
539 self._modules.clear()
541 for key in dir(self):
542 if key[0] == "_":
543 continue
544 if isinstance(getattr(self.__class__, key, None), (property, functools.cached_property)):
545 continue
547 prop = getattr(self, key)
549 if isinstance(prop, Method):
550 self._methods[key] = prop
551 elif isinstance(prop, Module):
552 self._modules[key] = prop
554 def describe(self) -> dict | None:
555 """
556 Meta description of this module.
557 """
558 # Use cached description?
559 if isinstance(self._cached_description, dict):
560 return self._cached_description
562 # Retrieve handler
563 if not (handler := self.handler):
564 return None
566 # Default description
567 ret = {
568 "name": self.__class__.__name__,
569 "handler": ".".join((handler, self.__class__.__name__.lower())),
570 "methods": {
571 name: method.describe() for name, method in self._methods.items()
572 },
573 }
575 # Extend indexes, if available
576 # todo: This must be handled by SkelModule
577 if indexes := getattr(self, "indexes", None):
578 ret["indexes"] = indexes
580 # Merge adminInfo if present
581 if admin_info := self.adminInfo() if callable(self.adminInfo) else self.adminInfo:
582 assert isinstance(admin_info, dict), \
583 f"adminInfo can either be a dict or a callable returning a dict, but got {type(admin_info)}"
584 ret |= admin_info
586 # Cache description for later re-use.
587 if self._cached_description is not False:
588 self._cached_description = ret
590 return ret
592 def register(self, target: dict, render: object):
593 """
594 Registers this module's public functions to a given resolver.
595 This function is executed on start-up, and can be sub-classed.
596 """
597 # connect instance to render
598 self.render = render
600 # Map module under SEO-mapped name, if available.
601 if self.seo_language_map:
602 for lang in conf.i18n.available_languages or [conf.i18n.default_language]:
603 # Map the module under each translation
604 if translated_module_name := self.seo_language_map.get(lang):
605 translated_module = target.setdefault(translated_module_name, {})
607 # Map module methods to the previously determined target
608 for name, method in self._methods.items():
609 method.register(translated_module, name, lang)
611 conf.i18n.language_module_map[self.moduleName] = self.seo_language_map
613 # Map the module also under it's original name
614 if self.moduleName != "index":
615 target = target.setdefault(self.moduleName, {})
617 # Map module methods to the previously determined target
618 for name, method in self._methods.items():
619 method.register(target, name)
621 # Register sub modules
622 for name, module in self._modules.items():
623 module.register(target, self.render)