Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/module.py: 11%

256 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-07 19:28 +0000

1import copy 

2import enum 

3import functools 

4import inspect 

5import types 

6import typing as t 

7import logging 

8from viur.core import db, errors, current, utils 

9from viur.core.config import conf 

10 

11 

12class Method: 

13 """ 

14 Abstraction wrapper for any public available method. 

15 """ 

16 

17 @classmethod 

18 def ensure(cls, func: t.Callable | "Method") -> "Method": 

19 """ 

20 Ensures the provided `func` parameter is either a Method already, or turns it 

21 into a Method. This is done to avoid stacking Method objects, which may create 

22 unwanted results. 

23 """ 

24 if isinstance(func, Method): 

25 return func 

26 

27 return cls(func) 

28 

29 def __init__(self, func: t.Callable): 

30 # Content 

31 self._func = func 

32 self.__name__ = func.__name__ 

33 self._instance = None 

34 

35 # Attributes 

36 self.exposed = None # None = unexposed, True = exposed, False = internal exposed 

37 self.ssl = False 

38 self.methods = ("GET", "POST", "HEAD", "OPTIONS") 

39 self.seo_language_map = None 

40 self.cors_allow_headers = None 

41 

42 # Inspection 

43 self.signature = inspect.signature(self._func) 

44 

45 # Guards 

46 self.skey = None 

47 self.access = None 

48 

49 def __get__(self, obj, objtype=None): 

50 """ 

51 This binds the Method to an object. 

52 

53 To do it, the Method instance is copied and equipped with the individual _instance member. 

54 """ 

55 if obj: 

56 bound = copy.copy(self) 

57 bound._instance = obj 

58 return bound 

59 

60 return self 

61 

62 def __call__(self, *args, **kwargs): 

63 """ 

64 Calls the method with given args and kwargs. 

65 

66 Prepares and filters argument values from args and kwargs regarding self._func's signature and type annotations, 

67 if present. 

68 

69 Method objects normally wrap functions which are externally exposed. Therefore, any arguments passed from the 

70 client are str-values, and are automatically parsed when equipped with type-annotations. 

71 

72 This preparation of arguments therefore inspects the target function as follows 

73 - incoming values are parsed to their particular type, if type annotations are present 

74 - parameters in *args and **kwargs are being checked against their signature; only relevant values are being 

75 passed, anything else is thrown away. 

76 - execution of guard configurations from @skey and @access, if present 

77 """ 

78 

79 if trace := conf.debug.trace: 

80 logging.debug(f"calling {self._func=} with raw {args=}, {kwargs=}") 

81 

82 def parse_value_by_annotation(annotation: type, name: str, value: str | list | tuple) -> t.Any: 

83 """ 

84 Tries to parse a value according to a given type. 

85 May be called recursively to handle unions, lists and tuples as well. 

86 """ 

87 # logging.debug(f"{annotation=} | {name=} | {value=}") 

88 

89 # simple types 

90 if annotation is str: 

91 return str(value) 

92 elif annotation is int: 

93 return int(value) 

94 elif annotation is float: 

95 return float(value) 

96 elif annotation is bool: 

97 return utils.parse.bool(value) 

98 elif annotation is types.NoneType: 

99 return None 

100 

101 # complex types 

102 origin_type = t.get_origin(annotation) 

103 

104 if origin_type is list and len(annotation.__args__) == 1: 

105 if not isinstance(value, list): 

106 value = [value] 

107 

108 return [parse_value_by_annotation(annotation.__args__[0], name, item) for item in value] 

109 

110 elif origin_type is tuple and len(annotation.__args__) == 1: 

111 if not isinstance(value, tuple): 

112 value = (value, ) 

113 

114 return tuple(parse_value_by_annotation(annotation.__args__[0], name, item) for item in value) 

115 

116 elif origin_type is t.Literal: 

117 if not any(value == str(literal) for literal in annotation.__args__): 

118 raise errors.NotAcceptable(f"Expecting any of {annotation.__args__} for {name}") 

119 

120 return value 

121 

122 elif origin_type is t.Union or isinstance(annotation, types.UnionType): 

123 for i, sub_annotation in enumerate(annotation.__args__): 

124 try: 

125 return parse_value_by_annotation(sub_annotation, name, value) 

126 except ValueError: 

127 if i == len(annotation.__args__) - 1: 

128 raise 

129 

130 elif annotation is db.Key: 

131 if isinstance(value, db.Key): 

132 return value 

133 

134 return parse_value_by_annotation(int | str, name, value) 

135 

136 elif isinstance(annotation, enum.EnumMeta): 

137 try: 

138 return annotation(value) 

139 except ValueError as exc: 

140 for value_, member in annotation._value2member_map_.items(): 

141 if str(value) == str(value_): # Do a string comparison, it could be a IntEnum 

142 return member 

143 raise errors.NotAcceptable(f"{' '.join(exc.args)} for {name}") from exc 

144 

145 raise errors.NotAcceptable(f"Unhandled type {annotation=} for {name}={value!r}") 

146 

147 # examine parameters 

148 args_iter = iter(args) 

149 

150 parsed_args = [] 

151 parsed_kwargs = {} 

152 varargs = [] 

153 varkwargs = False 

154 

155 for i, (param_name, param) in enumerate(self.signature.parameters.items()): 

156 if self._instance and i == 0 and param_name == "self": 

157 continue 

158 

159 param_type = param.annotation if param.annotation is not param.empty else None 

160 param_required = param.default is param.empty 

161 

162 # take positional parameters first 

163 if param.kind in ( 

164 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

165 inspect.Parameter.POSITIONAL_ONLY 

166 ): 

167 try: 

168 value = next(args_iter) 

169 

170 if param_type: 

171 value = parse_value_by_annotation(param_type, param_name, value) 

172 

173 parsed_args.append(value) 

174 continue 

175 except StopIteration: 

176 pass 

177 

178 # otherwise take kwargs or variadics 

179 if ( 

180 param.kind in ( 

181 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

182 inspect.Parameter.KEYWORD_ONLY 

183 ) 

184 and param_name in kwargs 

185 ): 

186 value = kwargs.pop(param_name) 

187 

188 if param_type: 

189 value = parse_value_by_annotation(param_type, param_name, value) 

190 

191 parsed_kwargs[param_name] = value 

192 

193 elif param.kind == inspect.Parameter.VAR_POSITIONAL: 

194 varargs = list(args_iter) 

195 elif param.kind == inspect.Parameter.VAR_KEYWORD: 

196 varkwargs = True 

197 elif param_required: 

198 if self.skey and param_name == self.skey["forward_payload"]: 

199 continue 

200 

201 raise errors.NotAcceptable(f"Missing required parameter {param_name!r}") 

202 

203 # Here's a short clarification on the variables used here: 

204 # 

205 # - parsed_args = tuple of (the type-parsed) arguments that have been assigned based on the signature 

206 # - parsed_kwargs = dict of (the type-parsed) keyword arguments that have been assigned based on the signature 

207 # - args = either parsed_args, or parsed_args + remaining args if the function accepts *args 

208 # - kwargs = either parsed_kwars, or parsed_kwargs | remaining kwargs if the function accepts **kwargs 

209 # - varargs = indicator that the args also contain variable args (*args) 

210 # - varkwards = indicator that variable kwargs (**kwargs) are also contained in the kwargs 

211 # 

212 

213 # Extend args to any varargs, and redefine args 

214 args = tuple(parsed_args + varargs) 

215 

216 # always take "skey"-parameter name, when configured, as parsed_kwargs 

217 if self.skey and self.skey["name"] in kwargs: 

218 parsed_kwargs[self.skey["name"]] = kwargs.pop(self.skey["name"]) 

219 

220 # When varkwargs are accepted, merge parsed_kwargs and kwargs, otherwise just use parsed_kwargs 

221 if varkwargs := varkwargs and bool(kwargs): 

222 kwargs = parsed_kwargs | kwargs 

223 else: 

224 kwargs = parsed_kwargs 

225 

226 # Trace message for final call configuration 

227 if trace := conf.debug.trace: 

228 logging.debug(f"calling {self._func=} with cleaned {args=}, {kwargs=}") 

229 

230 # evaluate skey guard setting? 

231 if self.skey and not current.request.get().skey_checked: # skey guardiance is only required once per request 

232 if trace: 

233 logging.debug(f"@skey {self.skey=}") 

234 

235 security_key = kwargs.pop(self.skey["name"], "") 

236 

237 # validation is necessary? 

238 if allow_empty := self.skey["allow_empty"]: 

239 # allow_empty can be callable, to detect programmatically 

240 if callable(allow_empty): 

241 required = not allow_empty(args, kwargs) 

242 # or allow_empty can be a sequence of allowed keys 

243 elif isinstance(allow_empty, (list, tuple)): 

244 required = any(k for k in kwargs.keys() if k not in allow_empty) 

245 # otherwise, varargs or varkwargs may not be empty. 

246 else: 

247 required = varargs or varkwargs or security_key 

248 if trace: 

249 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}") 

250 else: 

251 required = True 

252 

253 if required: 

254 if trace: 

255 logging.debug(f"@skey wanted, validating {security_key!r}") 

256 

257 from viur.core import securitykey 

258 payload = securitykey.validate(security_key, **self.skey["extra_kwargs"]) 

259 current.request.get().skey_checked = True 

260 

261 if not payload or (self.skey["validate"] and not self.skey["validate"](payload)): 

262 raise errors.PreconditionFailed( 

263 self.skey["message"] or f"Missing or invalid parameter {self.skey['name']!r}" 

264 ) 

265 

266 if self.skey["forward_payload"]: 

267 kwargs |= {self.skey["forward_payload"]: payload} 

268 

269 # evaluate access guard setting? 

270 if self.access: 

271 user = current.user.get() 

272 

273 if trace := conf.debug.trace: 

274 logging.debug(f"@access {user=} {self.access=}") 

275 

276 if not user: 

277 if offer_login := self.access["offer_login"]: 

278 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login") 

279 

280 raise errors.Unauthorized(self.access["message"]) if self.access["message"] else errors.Unauthorized() 

281 

282 ok = "root" in user["access"] 

283 

284 if not ok and self.access["access"]: 

285 for acc in self.access["access"]: 

286 if trace: 

287 logging.debug(f"@access checking {acc=}") 

288 

289 # Callable directly tests access 

290 if callable(acc): 

291 if acc(): 

292 ok = True 

293 break 

294 

295 continue 

296 

297 # Otherwise, check for access rights 

298 if isinstance(acc, str): 

299 acc = (acc, ) 

300 

301 assert isinstance(acc, (tuple, list, set)) 

302 

303 if all(a in user["access"] for a in acc): 

304 ok = True 

305 break 

306 

307 if trace: 

308 logging.debug(f"@access {ok=}") 

309 

310 if not ok: 

311 raise errors.Forbidden(self.access["message"]) if self.access["message"] else errors.Forbidden() 

312 

313 # call with instance when provided 

314 if self._instance: 

315 return self._func(self._instance, *args, **kwargs) 

316 

317 return self._func(*args, **kwargs) 

318 

319 def describe(self) -> dict: 

320 """ 

321 Describes the Method with a 

322 """ 

323 return_doc = t.get_type_hints(self._func).get("return") 

324 

325 ret = { 

326 "args": { 

327 param.name: { 

328 "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None, 

329 "default": str(param.default) if param.default is not inspect.Parameter.empty else None, 

330 } 

331 for param in self.signature.parameters.values() 

332 }, 

333 "returns": str(return_doc).strip() if return_doc else None, 

334 "accepts": self.methods, 

335 "docs": self._func.__doc__.strip() if self._func.__doc__ else None, 

336 "aliases": tuple(self.seo_language_map.keys()) if self.seo_language_map else None, 

337 } 

338 

339 if self.skey: 

340 ret["skey"] = self.skey["name"] 

341 

342 if self.access: 

343 ret["access"] = [str(access) for access in self.access["access"]] # must be a list to be JSON-serializable 

344 

345 return ret 

346 

347 def register(self, target: dict, name: str, language: str | None = None): 

348 """ 

349 Registers the Method under `name` and eventually some customized SEO-name for the provided language 

350 """ 

351 if self.exposed is None: 

352 return 

353 

354 target[name] = self 

355 

356 # reassign for SEO mapping as well 

357 if self.seo_language_map: 

358 for lang in tuple(self.seo_language_map.keys()) if not language else (language, ): 

359 if translated_name := self.seo_language_map.get(lang): 

360 target[translated_name] = self 

361 

362 

363class Module: 

364 """ 

365 This is the root module prototype that serves a minimal module in the ViUR system without any other bindings. 

366 """ 

367 

368 handler: str | t.Callable = None 

369 """ 

370 This is the module's handler, respectively its type. 

371 Use the @property-decorator in specific Modules to construct the handler's value dynamically. 

372 A module without a handler setting cannot be described, so cannot be handled by admin-tools. 

373 """ 

374 

375 accessRights: tuple[str] = None 

376 """ 

377 If set, a tuple of access rights (like add, edit, delete) that this module supports. 

378 

379 These will be prefixed on instance startup with the actual module name (becoming file-add, file-edit etc) 

380 and registered in ``conf.user.access_rights`` so these will be available on the access bone in user/add 

381 or user/edit. 

382 """ 

383 

384 roles: dict = {} 

385 r""" 

386 Allows to specify role settings for a module. 

387 

388 Defaults to no role definition, which ignores the module entirely in the role-system. 

389 In this case, access rights can still be set individually on the user's access bone. 

390 

391 A "*" wildcard can either be used as key or as value to allow for "all roles", or "all rights". 

392 

393 .. code-block:: python 

394 

395 # Example 

396 roles = { 

397 "*": "view", # Any role may only "view" 

398 "editor": ("add", "edit"), # Role "editor" may "add" or "edit", but not "delete" 

399 "admin": "*", # Role "admin" can do everything 

400 } 

401 

402 """ 

403 

404 seo_language_map: dict[str: str] = {} 

405 r""" 

406 The module name is the first part of a URL. 

407 SEO-identifiers have to be set as class-attribute ``seoLanguageMap`` of type ``dict[str, str]`` in the module. 

408 It maps a *language* to the according *identifier*. 

409 

410 .. code-block:: python 

411 :name: module seo-map 

412 :caption: modules/myorders.py 

413 :emphasize-lines: 4-7 

414 

415 from viur.core.prototypes import List 

416 

417 class MyOrders(List): 

418 seo_language_map = { 

419 "de": "bestellungen", 

420 "en": "orders", 

421 } 

422 

423 By default the module would be available under */myorders*, the lowercase module name. 

424 With the defined :attr:`seoLanguageMap`, it will become available as */de/bestellungen* and */en/orders*. 

425 

426 Great, this part is now user and robot friendly :) 

427 """ 

428 

429 adminInfo: dict[str, t.Any] | t.Callable = None 

430 """ 

431 This is a ``dict`` holding the information necessary for the Vi/Admin to handle this module. 

432 

433 name: ``str`` 

434 Human-readable module name that will be shown in the admin tool. 

435 

436 handler: ``str`` (``list``, ``tree`` or ``singleton``): 

437 Allows to override the handler provided by the module. Set this only when *really* necessary, 

438 otherwise it can be left out and is automatically injected by the Module's prototype. 

439 

440 icon: ``str`` 

441 (Optional) Either the Shoelace icon library name or a path relative to the project's deploy folder 

442 (e.g. /static/icons/viur.svg) for the icon used in the admin tool for this module. 

443 

444 columns: ``List[str]`` 

445 (Optional) List of columns (bone names) that are displayed by default. 

446 Used only by the List handler. 

447 

448 filter: ``Dict[str, str]`` 

449 (Optional) Dictionary of additional parameters that will be send along when 

450 fetching entities from the server. Can be used to filter the entities being displayed on the 

451 client-side. 

452 

453 display: ``str`` ("default", "hidden" or "group") 

454 (Optional) "hidden" will hide the module in the admin tool's main bar. 

455 (itwill not be accessible directly, however it's registered with the frontend so it can be used in a 

456 relational bone). "group" will show this module in the main bar, but it will not be clickable. 

457 Clicking it will just try to expand it (assuming there are additional views defined). 

458 

459 preview: ``Union[str, Dict[str, str]]`` 

460 (Optional) A url that will be opened in a new tab and is expected to display 

461 the entity selected in the table. Can be “/{{module}}/view/{{key}}", with {{module}} and {{key}} getting 

462 replaced as needed. If more than one preview-url is needed, supply a dictionary where the key is 

463 the URL and the value the description shown to the user. 

464 

465 views: ``List[Dict[str, t.Any]]`` 

466 (Optional) List of nested adminInfo like dictionaries. Used to define 

467 additional views on the module. Useful f.e. for an order module, where you want separate list of 

468 "payed orders", "unpayed orders", "orders waiting for shipment", etc. If such views are defined, 

469 the top-level entry in the menu bar will expand if clicked, revealing these additional filters. 

470 

471 actions: ``List[str]`` 

472 (Optional) List of actions supported by this modules. Actions can be defined by 

473 the frontend (like "add", "edit", "delete" or "preview"); it can be an action defined by a plugin 

474 loaded by the frontend; or it can be a so called "server side action" (see "customActions" below) 

475 

476 customActions: ``Dict[str, dict]`` 

477 (Optional) A mapping of names of server-defined actions that can be used 

478 in the ``actions`` list above to their definition dictionary. See .... for more details. 

479 

480 disabledActions: ``List[str, dict]`` 

481 (Optional) A list of disabled actions. The frontend will inject default actions like add or edit 

482 even if they're not listed in actions. Listing them here will prevent that. It's up to the frontend 

483 to decide if that action won't be visible at all or it's button just being disabled. 

484 

485 sortIndex: ``int`` 

486 (Optional) Defines the order in which the modules will appear in the main bar in 

487 ascrending order. 

488 

489 indexedBones: ``List[str]`` 

490 (Optional) List of bones, for which an (composite?) index exists in this 

491 view. This allows the fronted to signal the user that a given list can be sorted or filtered by this 

492 bone. If no additional filters are enforced by the 

493 :meth:`listFilter<viur.core.prototypes.list.listFilter>` and ``filter`` is not set, this should be 

494 all bones which are marked as indexed. 

495 

496 changeInvalidates: ``List[str]`` 

497 (Optional) A list of module-names which depend on the entities handled 

498 from this module. This allows the frontend to invalidate any caches in these depended modules if the 

499 data in this module changes. Example: This module may be a list-module handling the file_rootNode 

500 entities for the file module, so a edit/add/deletion action on this module should be reflected in the 

501 rootNode-selector in the file-module itself. In this case, this property should be set to ``["file"]``. 

502 

503 moduleGroup: ``str`` 

504 (Optional) If set, should be a key of a moduleGroup defined in .... . 

505 

506 editViews: ``Dict[str, t.Any]`` 

507 (Optional) If set, will embed another list-widget in the edit forms for 

508 a given entity. See .... for more details. 

509 

510 If this is a function, it must take no parameters and return the dictionary as shown above. This 

511 can be used to customize the appearance of the Vi/Admin to individual users. 

512 """ 

513 

514 def __init__(self, moduleName: str, modulePath: str, *args, **kwargs): 

515 self.render = None # will be set to the appropriate render instance at runtime 

516 self._cached_description = None # caching used by describe() 

517 self.moduleName = moduleName # Name of this module (usually it's class name, e.g. "file") 

518 self.modulePath = modulePath # Path to this module in URL-routing (e.g. "json/file") 

519 

520 if self.handler and self.accessRights: 

521 for right in self.accessRights: 

522 right = f"{self.moduleName}-{right}" 

523 

524 # fixme: Turn conf.user.access_rights into a set. 

525 if right not in conf.user.access_rights: 

526 conf.user.access_rights.append(right) 

527 

528 # Collect methods and (sub)modules 

529 self._methods = {} 

530 self._modules = {} 

531 self._update_methods() 

532 

533 def _update_methods(self): 

534 """ 

535 Internal function to update methods and submodules. 

536 This function should only be called when member attributes are dynamically modified by the module. 

537 """ 

538 self._methods.clear() 

539 self._modules.clear() 

540 

541 for key in dir(self): 

542 if key[0] == "_": 

543 continue 

544 if isinstance(getattr(self.__class__, key, None), (property, functools.cached_property)): 

545 continue 

546 

547 prop = getattr(self, key) 

548 

549 if isinstance(prop, Method): 

550 self._methods[key] = prop 

551 elif isinstance(prop, Module): 

552 self._modules[key] = prop 

553 

554 def describe(self) -> dict | None: 

555 """ 

556 Meta description of this module. 

557 """ 

558 # Use cached description? 

559 if isinstance(self._cached_description, dict): 

560 return self._cached_description 

561 

562 # Retrieve handler 

563 if not (handler := self.handler): 

564 return None 

565 

566 # Default description 

567 ret = { 

568 "name": self.__class__.__name__, 

569 "handler": ".".join((handler, self.__class__.__name__.lower())), 

570 "methods": { 

571 name: method.describe() for name, method in self._methods.items() 

572 }, 

573 } 

574 

575 # Extend indexes, if available 

576 # todo: This must be handled by SkelModule 

577 if indexes := getattr(self, "indexes", None): 

578 ret["indexes"] = indexes 

579 

580 # Merge adminInfo if present 

581 if admin_info := self.adminInfo() if callable(self.adminInfo) else self.adminInfo: 

582 assert isinstance(admin_info, dict), \ 

583 f"adminInfo can either be a dict or a callable returning a dict, but got {type(admin_info)}" 

584 ret |= admin_info 

585 

586 # Cache description for later re-use. 

587 if self._cached_description is not False: 

588 self._cached_description = ret 

589 

590 return ret 

591 

592 def register(self, target: dict, render: object): 

593 """ 

594 Registers this module's public functions to a given resolver. 

595 This function is executed on start-up, and can be sub-classed. 

596 """ 

597 # connect instance to render 

598 self.render = render 

599 

600 # Map module under SEO-mapped name, if available. 

601 if self.seo_language_map: 

602 for lang in conf.i18n.available_languages or [conf.i18n.default_language]: 

603 # Map the module under each translation 

604 if translated_module_name := self.seo_language_map.get(lang): 

605 translated_module = target.setdefault(translated_module_name, {}) 

606 

607 # Map module methods to the previously determined target 

608 for name, method in self._methods.items(): 

609 method.register(translated_module, name, lang) 

610 

611 conf.i18n.language_module_map[self.moduleName] = self.seo_language_map 

612 

613 # Map the module also under it's original name 

614 if self.moduleName != "index": 

615 target = target.setdefault(self.moduleName, {}) 

616 

617 # Map module methods to the previously determined target 

618 for name, method in self._methods.items(): 

619 method.register(target, name) 

620 

621 # Register sub modules 

622 for name, module in self._modules.items(): 

623 module.register(target, self.render)