Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/module.py: 11%
245 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1import copy
2import inspect
3import types
4import typing as t
5import logging
6from viur.core import db, errors, current, utils
7from viur.core.config import conf
10class Method:
11 """
12 Abstraction wrapper for any public available method.
13 """
15 @classmethod
16 def ensure(cls, func: t.Callable | "Method") -> "Method":
17 """
18 Ensures the provided `func` parameter is either a Method already, or turns it
19 into a Method. This is done to avoid stacking Method objects, which may create
20 unwanted results.
21 """
22 if isinstance(func, Method):
23 return func
25 return cls(func)
27 def __init__(self, func: t.Callable):
28 # Content
29 self._func = func
30 self.__name__ = func.__name__
31 self._instance = None
33 # Attributes
34 self.exposed = None # None = unexposed, True = exposed, False = internal exposed
35 self.ssl = False
36 self.methods = ("GET", "POST", "HEAD")
37 self.seo_language_map = None
39 # Inspection
40 self.signature = inspect.signature(self._func)
42 # Guards
43 self.skey = None
44 self.access = None
46 def __get__(self, obj, objtype=None):
47 """
48 This binds the Method to an object.
50 To do it, the Method instance is copied and equipped with the individual _instance member.
51 """
52 if obj:
53 bound = copy.copy(self)
54 bound._instance = obj
55 return bound
57 return self
59 def __call__(self, *args, **kwargs):
60 """
61 Calls the method with given args and kwargs.
63 Prepares and filters argument values from args and kwargs regarding self._func's signature and type annotations,
64 if present.
66 Method objects normally wrap functions which are externally exposed. Therefore, any arguments passed from the
67 client are str-values, and are automatically parsed when equipped with type-annotations.
69 This preparation of arguments therefore inspects the target function as follows
70 - incoming values are parsed to their particular type, if type annotations are present
71 - parameters in *args and **kwargs are being checked against their signature; only relevant values are being
72 passed, anything else is thrown away.
73 - execution of guard configurations from @skey and @access, if present
74 """
76 if trace := conf.debug.trace:
77 logging.debug(f"calling {self._func=} with raw {args=}, {kwargs=}")
79 def parse_value_by_annotation(annotation: type, name: str, value: str | list | tuple) -> t.Any:
80 """
81 Tries to parse a value according to a given type.
82 May be called recursively to handle unions, lists and tuples as well.
83 """
84 # simple types
85 if annotation is str:
86 return str(value)
87 elif annotation is int:
88 return int(value)
89 elif annotation is float:
90 return float(value)
91 elif annotation is bool:
92 return utils.parse.bool(value)
93 elif annotation is types.NoneType:
94 return None
96 # complex types
97 origin_type = t.get_origin(annotation)
99 if origin_type is list and len(annotation.__args__) == 1:
100 if not isinstance(value, list):
101 value = [value]
103 return [parse_value_by_annotation(annotation.__args__[0], name, item) for item in value]
105 elif origin_type is tuple and len(annotation.__args__) == 1:
106 if not isinstance(value, tuple):
107 value = (value, )
109 return tuple(parse_value_by_annotation(annotation.__args__[0], name, item) for item in value)
111 elif origin_type is t.Literal:
112 if not any(value == str(literal) for literal in annotation.__args__):
113 raise errors.NotAcceptable(f"Expecting any of {annotation.__args__} for {name}")
115 return value
117 elif origin_type is t.Union or isinstance(annotation, types.UnionType):
118 for i, sub_annotation in enumerate(annotation.__args__):
119 try:
120 return parse_value_by_annotation(sub_annotation, name, value)
121 except ValueError:
122 if i == len(annotation.__args__) - 1:
123 raise
125 elif annotation is db.Key:
126 if isinstance(value, db.Key):
127 return value
129 return parse_value_by_annotation(int | str, name, value)
131 raise errors.NotAcceptable(f"Unhandled type {annotation=} for {name}={value!r}")
133 # examine parameters
134 args_iter = iter(args)
136 parsed_args = []
137 parsed_kwargs = {}
138 varargs = []
139 varkwargs = False
141 for i, (param_name, param) in enumerate(self.signature.parameters.items()):
142 if self._instance and i == 0 and param_name == "self":
143 continue
145 param_type = param.annotation if param.annotation is not param.empty else None
146 param_required = param.default is param.empty
148 # take positional parameters first
149 if param.kind in (
150 inspect.Parameter.POSITIONAL_OR_KEYWORD,
151 inspect.Parameter.POSITIONAL_ONLY
152 ):
153 try:
154 value = next(args_iter)
156 if param_type:
157 value = parse_value_by_annotation(param_type, param_name, value)
159 parsed_args.append(value)
160 continue
161 except StopIteration:
162 pass
164 # otherwise take kwargs or variadics
165 if (
166 param.kind in (
167 inspect.Parameter.POSITIONAL_OR_KEYWORD,
168 inspect.Parameter.KEYWORD_ONLY
169 )
170 and param_name in kwargs
171 ):
172 value = kwargs.pop(param_name)
174 if param_type:
175 value = parse_value_by_annotation(param_type, param_name, value)
177 parsed_kwargs[param_name] = value
179 elif param.kind == inspect.Parameter.VAR_POSITIONAL:
180 varargs = list(args_iter)
181 elif param.kind == inspect.Parameter.VAR_KEYWORD:
182 varkwargs = True
183 elif param_required:
184 if self.skey and param_name == self.skey["forward_payload"]:
185 continue
187 raise errors.NotAcceptable(f"Missing required parameter {param_name!r}")
189 # Here's a short clarification on the variables used here:
190 #
191 # - parsed_args = tuple of (the type-parsed) arguments that have been assigned based on the signature
192 # - parsed_kwargs = dict of (the type-parsed) keyword arguments that have been assigned based on the signature
193 # - args = either parsed_args, or parsed_args + remaining args if the function accepts *args
194 # - kwargs = either parsed_kwars, or parsed_kwargs | remaining kwargs if the function accepts **kwargs
195 # - varargs = indicator that the args also contain variable args (*args)
196 # - varkwards = indicator that variable kwargs (**kwargs) are also contained in the kwargs
197 #
199 # Extend args to any varargs, and redefine args
200 args = tuple(parsed_args + varargs)
202 # always take "skey"-parameter name, when configured, as parsed_kwargs
203 if self.skey and self.skey["name"] in kwargs:
204 parsed_kwargs[self.skey["name"]] = kwargs.pop(self.skey["name"])
206 # When varkwargs are accepted, merge parsed_kwargs and kwargs, otherwise just use parsed_kwargs
207 if varkwargs := varkwargs and bool(kwargs):
208 kwargs = parsed_kwargs | kwargs
209 else:
210 kwargs = parsed_kwargs
212 # Trace message for final call configuration
213 if trace := conf.debug.trace:
214 logging.debug(f"calling {self._func=} with cleaned {args=}, {kwargs=}")
216 # evaluate skey guard setting?
217 if self.skey and not current.request.get().skey_checked: # skey guardiance is only required once per request
218 if trace:
219 logging.debug(f"@skey {self.skey=}")
221 security_key = kwargs.pop(self.skey["name"], "")
223 # validation is necessary?
224 if allow_empty := self.skey["allow_empty"]:
225 # allow_empty can be callable, to detect programmatically
226 if callable(allow_empty):
227 required = not allow_empty(args, kwargs)
228 # or allow_empty can be a sequence of allowed keys
229 elif isinstance(allow_empty, (list, tuple)):
230 required = any(k for k in kwargs.keys() if k not in allow_empty)
231 # otherwise, varargs or varkwargs may not be empty.
232 else:
233 required = varargs or varkwargs or security_key
234 if trace:
235 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}")
236 else:
237 required = True
239 if required:
240 if trace:
241 logging.debug(f"@skey wanted, validating {security_key!r}")
243 from viur.core import securitykey
244 payload = securitykey.validate(security_key, **self.skey["extra_kwargs"])
245 current.request.get().skey_checked = True
247 if not payload or (self.skey["validate"] and not self.skey["validate"](payload)):
248 raise errors.PreconditionFailed(
249 self.skey["message"] or f"Missing or invalid parameter {self.skey['name']!r}"
250 )
252 if self.skey["forward_payload"]:
253 kwargs |= {self.skey["forward_payload"]: payload}
255 # evaluate access guard setting?
256 if self.access:
257 user = current.user.get()
259 if trace := conf.debug.trace:
260 logging.debug(f"@access {user=} {self.access=}")
262 if not user:
263 if offer_login := self.access["offer_login"]:
264 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login")
266 raise errors.Unauthorized(self.access["message"]) if self.access["message"] else errors.Unauthorized()
268 ok = "root" in user["access"]
270 if not ok and self.access["access"]:
271 for acc in self.access["access"]:
272 if trace:
273 logging.debug(f"@access checking {acc=}")
275 # Callable directly tests access
276 if callable(acc):
277 if acc():
278 ok = True
279 break
281 continue
283 # Otherwise, check for access rights
284 if isinstance(acc, str):
285 acc = (acc, )
287 assert isinstance(acc, (tuple, list, set))
289 if all(a in user["access"] for a in acc):
290 ok = True
291 break
293 if trace:
294 logging.debug(f"@access {ok=}")
296 if not ok:
297 raise errors.Forbidden(self.access["message"]) if self.access["message"] else errors.Forbidden()
299 # call with instance when provided
300 if self._instance:
301 return self._func(self._instance, *args, **kwargs)
303 return self._func(*args, **kwargs)
305 def describe(self) -> dict:
306 """
307 Describes the Method with a
308 """
309 return_doc = t.get_type_hints(self._func).get("return")
311 ret = {
312 "args": {
313 param.name: {
314 "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None,
315 "default": str(param.default) if param.default is not inspect.Parameter.empty else None,
316 }
317 for param in self.signature.parameters.values()
318 },
319 "returns": str(return_doc).strip() if return_doc else None,
320 "accepts": self.methods,
321 "docs": self._func.__doc__.strip() if self._func.__doc__ else None,
322 "aliases": tuple(self.seo_language_map.keys()) if self.seo_language_map else None,
323 }
325 if self.skey:
326 ret["skey"] = self.skey["name"]
328 if self.access:
329 ret["access"] = [str(access) for access in self.access["access"]] # must be a list to be JSON-serializable
331 return ret
333 def register(self, target: dict, name: str, language: str | None = None):
334 """
335 Registers the Method under `name` and eventually some customized SEO-name for the provided language
336 """
337 if self.exposed is None:
338 return
340 target[name] = self
342 # reassign for SEO mapping as well
343 if self.seo_language_map:
344 for lang in tuple(self.seo_language_map.keys()) if not language else (language, ):
345 if translated_name := self.seo_language_map.get(lang):
346 target[translated_name] = self
349class Module:
350 """
351 This is the root module prototype that serves a minimal module in the ViUR system without any other bindings.
352 """
354 handler: str | t.Callable = None
355 """
356 This is the module's handler, respectively its type.
357 Use the @property-decorator in specific Modules to construct the handler's value dynamically.
358 A module without a handler setting cannot be described, so cannot be handled by admin-tools.
359 """
361 accessRights: tuple[str] = None
362 """
363 If set, a tuple of access rights (like add, edit, delete) that this module supports.
365 These will be prefixed on instance startup with the actual module name (becoming file-add, file-edit etc)
366 and registered in ``conf.user.access_rights`` so these will be available on the access bone in user/add
367 or user/edit.
368 """
370 roles: dict = {}
371 r"""
372 Allows to specify role settings for a module.
374 Defaults to no role definition, which ignores the module entirely in the role-system.
375 In this case, access rights can still be set individually on the user's access bone.
377 A "*" wildcard can either be used as key or as value to allow for "all roles", or "all rights".
379 .. code-block:: python
381 # Example
382 roles = {
383 "*": "view", # Any role may only "view"
384 "editor": ("add", "edit"), # Role "editor" may "add" or "edit", but not "delete"
385 "admin": "*", # Role "admin" can do everything
386 }
388 """
390 seo_language_map: dict[str: str] = {}
391 r"""
392 The module name is the first part of a URL.
393 SEO-identifiers have to be set as class-attribute ``seoLanguageMap`` of type ``dict[str, str]`` in the module.
394 It maps a *language* to the according *identifier*.
396 .. code-block:: python
397 :name: module seo-map
398 :caption: modules/myorders.py
399 :emphasize-lines: 4-7
401 from viur.core.prototypes import List
403 class MyOrders(List):
404 seo_language_map = {
405 "de": "bestellungen",
406 "en": "orders",
407 }
409 By default the module would be available under */myorders*, the lowercase module name.
410 With the defined :attr:`seoLanguageMap`, it will become available as */de/bestellungen* and */en/orders*.
412 Great, this part is now user and robot friendly :)
413 """
415 adminInfo: dict[str, t.Any] | t.Callable = None
416 """
417 This is a ``dict`` holding the information necessary for the Vi/Admin to handle this module.
419 name: ``str``
420 Human-readable module name that will be shown in the admin tool.
422 handler: ``str`` (``list``, ``tree`` or ``singleton``):
423 Allows to override the handler provided by the module. Set this only when *really* necessary,
424 otherwise it can be left out and is automatically injected by the Module's prototype.
426 icon: ``str``
427 (Optional) Either the Shoelace icon library name or a path relative to the project's deploy folder
428 (e.g. /static/icons/viur.svg) for the icon used in the admin tool for this module.
430 columns: ``List[str]``
431 (Optional) List of columns (bone names) that are displayed by default.
432 Used only by the List handler.
434 filter: ``Dict[str, str]``
435 (Optional) Dictionary of additional parameters that will be send along when
436 fetching entities from the server. Can be used to filter the entities being displayed on the
437 client-side.
439 display: ``str`` ("default", "hidden" or "group")
440 (Optional) "hidden" will hide the module in the admin tool's main bar.
441 (itwill not be accessible directly, however it's registered with the frontend so it can be used in a
442 relational bone). "group" will show this module in the main bar, but it will not be clickable.
443 Clicking it will just try to expand it (assuming there are additional views defined).
445 preview: ``Union[str, Dict[str, str]]``
446 (Optional) A url that will be opened in a new tab and is expected to display
447 the entity selected in the table. Can be “/{{module}}/view/{{key}}", with {{module}} and {{key}} getting
448 replaced as needed. If more than one preview-url is needed, supply a dictionary where the key is
449 the URL and the value the description shown to the user.
451 views: ``List[Dict[str, t.Any]]``
452 (Optional) List of nested adminInfo like dictionaries. Used to define
453 additional views on the module. Useful f.e. for an order module, where you want separate list of
454 "payed orders", "unpayed orders", "orders waiting for shipment", etc. If such views are defined,
455 the top-level entry in the menu bar will expand if clicked, revealing these additional filters.
457 actions: ``List[str]``
458 (Optional) List of actions supported by this modules. Actions can be defined by
459 the frontend (like "add", "edit", "delete" or "preview"); it can be an action defined by a plugin
460 loaded by the frontend; or it can be a so called "server side action" (see "customActions" below)
462 customActions: ``Dict[str, dict]``
463 (Optional) A mapping of names of server-defined actions that can be used
464 in the ``actions`` list above to their definition dictionary. See .... for more details.
466 disabledActions: ``List[str, dict]``
467 (Optional) A list of disabled actions. The frontend will inject default actions like add or edit
468 even if they're not listed in actions. Listing them here will prevent that. It's up to the frontend
469 to decide if that action won't be visible at all or it's button just being disabled.
471 sortIndex: ``int``
472 (Optional) Defines the order in which the modules will appear in the main bar in
473 ascrending order.
475 indexedBones: ``List[str]``
476 (Optional) List of bones, for which an (composite?) index exists in this
477 view. This allows the fronted to signal the user that a given list can be sorted or filtered by this
478 bone. If no additional filters are enforced by the
479 :meth:`listFilter<viur.core.prototypes.list.listFilter>` and ``filter`` is not set, this should be
480 all bones which are marked as indexed.
482 changeInvalidates: ``List[str]``
483 (Optional) A list of module-names which depend on the entities handled
484 from this module. This allows the frontend to invalidate any caches in these depended modules if the
485 data in this module changes. Example: This module may be a list-module handling the file_rootNode
486 entities for the file module, so a edit/add/deletion action on this module should be reflected in the
487 rootNode-selector in the file-module itself. In this case, this property should be set to ``["file"]``.
489 moduleGroup: ``str``
490 (Optional) If set, should be a key of a moduleGroup defined in .... .
492 editViews: ``Dict[str, t.Any]``
493 (Optional) If set, will embed another list-widget in the edit forms for
494 a given entity. See .... for more details.
496 If this is a function, it must take no parameters and return the dictionary as shown above. This
497 can be used to customize the appearance of the Vi/Admin to individual users.
498 """
500 def __init__(self, moduleName: str, modulePath: str, *args, **kwargs):
501 self.render = None # will be set to the appropriate render instance at runtime
502 self._cached_description = None # caching used by describe()
503 self.moduleName = moduleName # Name of this module (usually it's class name, e.g. "file")
504 self.modulePath = modulePath # Path to this module in URL-routing (e.g. "json/file")
506 if self.handler and self.accessRights:
507 for right in self.accessRights:
508 right = f"{self.moduleName}-{right}"
510 # fixme: Turn conf.user.access_rights into a set.
511 if right not in conf.user.access_rights:
512 conf.user.access_rights.append(right)
514 # Collect methods and (sub)modules
515 self._methods = {}
516 self._modules = {}
517 self._update_methods()
519 def _update_methods(self):
520 """
521 Internal function to update methods and submodules.
522 This function should only be called when member attributes are dynamically modified by the module.
523 """
524 self._methods.clear()
525 self._modules.clear()
527 for key in dir(self):
528 if key[0] == "_":
529 continue
530 if isinstance(getattr(self.__class__, key, None), property):
531 continue
533 prop = getattr(self, key)
535 if isinstance(prop, Method):
536 self._methods[key] = prop
537 elif isinstance(prop, Module):
538 self._modules[key] = prop
540 def describe(self) -> dict | None:
541 """
542 Meta description of this module.
543 """
544 # Use cached description?
545 if isinstance(self._cached_description, dict):
546 return self._cached_description
548 # Retrieve handler
549 if not (handler := self.handler):
550 return None
552 # Default description
553 ret = {
554 "name": self.__class__.__name__,
555 "handler": ".".join((handler, self.__class__.__name__.lower())),
556 "methods": {
557 name: method.describe() for name, method in self._methods.items()
558 },
559 }
561 # Extend indexes, if available
562 # todo: This must be handled by SkelModule
563 if indexes := getattr(self, "indexes", None):
564 ret["indexes"] = indexes
566 # Merge adminInfo if present
567 if admin_info := self.adminInfo() if callable(self.adminInfo) else self.adminInfo:
568 assert isinstance(admin_info, dict), \
569 f"adminInfo can either be a dict or a callable returning a dict, but got {type(admin_info)}"
570 ret |= admin_info
572 # Cache description for later re-use.
573 if self._cached_description is not False:
574 self._cached_description = ret
576 return ret
578 def register(self, target: dict, render: object):
579 """
580 Registers this module's public functions to a given resolver.
581 This function is executed on start-up, and can be sub-classed.
582 """
583 # connect instance to render
584 self.render = render
586 # Map module under SEO-mapped name, if available.
587 if self.seo_language_map:
588 for lang in conf.i18n.available_languages or [conf.i18n.default_language]:
589 # Map the module under each translation
590 if translated_module_name := self.seo_language_map.get(lang):
591 translated_module = target.setdefault(translated_module_name, {})
593 # Map module methods to the previously determined target
594 for name, method in self._methods.items():
595 method.register(translated_module, name, lang)
597 conf.i18n.language_module_map[self.moduleName] = self.seo_language_map
599 # Map the module also under it's original name
600 if self.moduleName != "index":
601 target = target.setdefault(self.moduleName, {})
603 # Map module methods to the previously determined target
604 for name, method in self._methods.items():
605 method.register(target, name)
607 # Register sub modules
608 for name, module in self._modules.items():
609 module.register(target, self.render)