Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/module.py: 11%

245 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-16 22:16 +0000

1import copy 

2import inspect 

3import types 

4import typing as t 

5import logging 

6from viur.core import db, errors, current, utils 

7from viur.core.config import conf 

8 

9 

10class Method: 

11 """ 

12 Abstraction wrapper for any public available method. 

13 """ 

14 

15 @classmethod 

16 def ensure(cls, func: t.Callable | "Method") -> "Method": 

17 """ 

18 Ensures the provided `func` parameter is either a Method already, or turns it 

19 into a Method. This is done to avoid stacking Method objects, which may create 

20 unwanted results. 

21 """ 

22 if isinstance(func, Method): 

23 return func 

24 

25 return cls(func) 

26 

27 def __init__(self, func: t.Callable): 

28 # Content 

29 self._func = func 

30 self.__name__ = func.__name__ 

31 self._instance = None 

32 

33 # Attributes 

34 self.exposed = None # None = unexposed, True = exposed, False = internal exposed 

35 self.ssl = False 

36 self.methods = ("GET", "POST", "HEAD") 

37 self.seo_language_map = None 

38 

39 # Inspection 

40 self.signature = inspect.signature(self._func) 

41 

42 # Guards 

43 self.skey = None 

44 self.access = None 

45 

46 def __get__(self, obj, objtype=None): 

47 """ 

48 This binds the Method to an object. 

49 

50 To do it, the Method instance is copied and equipped with the individual _instance member. 

51 """ 

52 if obj: 

53 bound = copy.copy(self) 

54 bound._instance = obj 

55 return bound 

56 

57 return self 

58 

59 def __call__(self, *args, **kwargs): 

60 """ 

61 Calls the method with given args and kwargs. 

62 

63 Prepares and filters argument values from args and kwargs regarding self._func's signature and type annotations, 

64 if present. 

65 

66 Method objects normally wrap functions which are externally exposed. Therefore, any arguments passed from the 

67 client are str-values, and are automatically parsed when equipped with type-annotations. 

68 

69 This preparation of arguments therefore inspects the target function as follows 

70 - incoming values are parsed to their particular type, if type annotations are present 

71 - parameters in *args and **kwargs are being checked against their signature; only relevant values are being 

72 passed, anything else is thrown away. 

73 - execution of guard configurations from @skey and @access, if present 

74 """ 

75 

76 if trace := conf.debug.trace: 

77 logging.debug(f"calling {self._func=} with raw {args=}, {kwargs=}") 

78 

79 def parse_value_by_annotation(annotation: type, name: str, value: str | list | tuple) -> t.Any: 

80 """ 

81 Tries to parse a value according to a given type. 

82 May be called recursively to handle unions, lists and tuples as well. 

83 """ 

84 # simple types 

85 if annotation is str: 

86 return str(value) 

87 elif annotation is int: 

88 return int(value) 

89 elif annotation is float: 

90 return float(value) 

91 elif annotation is bool: 

92 return utils.parse.bool(value) 

93 elif annotation is types.NoneType: 

94 return None 

95 

96 # complex types 

97 origin_type = t.get_origin(annotation) 

98 

99 if origin_type is list and len(annotation.__args__) == 1: 

100 if not isinstance(value, list): 

101 value = [value] 

102 

103 return [parse_value_by_annotation(annotation.__args__[0], name, item) for item in value] 

104 

105 elif origin_type is tuple and len(annotation.__args__) == 1: 

106 if not isinstance(value, tuple): 

107 value = (value, ) 

108 

109 return tuple(parse_value_by_annotation(annotation.__args__[0], name, item) for item in value) 

110 

111 elif origin_type is t.Literal: 

112 if not any(value == str(literal) for literal in annotation.__args__): 

113 raise errors.NotAcceptable(f"Expecting any of {annotation.__args__} for {name}") 

114 

115 return value 

116 

117 elif origin_type is t.Union or isinstance(annotation, types.UnionType): 

118 for i, sub_annotation in enumerate(annotation.__args__): 

119 try: 

120 return parse_value_by_annotation(sub_annotation, name, value) 

121 except ValueError: 

122 if i == len(annotation.__args__) - 1: 

123 raise 

124 

125 elif annotation is db.Key: 

126 if isinstance(value, db.Key): 

127 return value 

128 

129 return parse_value_by_annotation(int | str, name, value) 

130 

131 raise errors.NotAcceptable(f"Unhandled type {annotation=} for {name}={value!r}") 

132 

133 # examine parameters 

134 args_iter = iter(args) 

135 

136 parsed_args = [] 

137 parsed_kwargs = {} 

138 varargs = [] 

139 varkwargs = False 

140 

141 for i, (param_name, param) in enumerate(self.signature.parameters.items()): 

142 if self._instance and i == 0 and param_name == "self": 

143 continue 

144 

145 param_type = param.annotation if param.annotation is not param.empty else None 

146 param_required = param.default is param.empty 

147 

148 # take positional parameters first 

149 if param.kind in ( 

150 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

151 inspect.Parameter.POSITIONAL_ONLY 

152 ): 

153 try: 

154 value = next(args_iter) 

155 

156 if param_type: 

157 value = parse_value_by_annotation(param_type, param_name, value) 

158 

159 parsed_args.append(value) 

160 continue 

161 except StopIteration: 

162 pass 

163 

164 # otherwise take kwargs or variadics 

165 if ( 

166 param.kind in ( 

167 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

168 inspect.Parameter.KEYWORD_ONLY 

169 ) 

170 and param_name in kwargs 

171 ): 

172 value = kwargs.pop(param_name) 

173 

174 if param_type: 

175 value = parse_value_by_annotation(param_type, param_name, value) 

176 

177 parsed_kwargs[param_name] = value 

178 

179 elif param.kind == inspect.Parameter.VAR_POSITIONAL: 

180 varargs = list(args_iter) 

181 elif param.kind == inspect.Parameter.VAR_KEYWORD: 

182 varkwargs = True 

183 elif param_required: 

184 if self.skey and param_name == self.skey["forward_payload"]: 

185 continue 

186 

187 raise errors.NotAcceptable(f"Missing required parameter {param_name!r}") 

188 

189 # Here's a short clarification on the variables used here: 

190 # 

191 # - parsed_args = tuple of (the type-parsed) arguments that have been assigned based on the signature 

192 # - parsed_kwargs = dict of (the type-parsed) keyword arguments that have been assigned based on the signature 

193 # - args = either parsed_args, or parsed_args + remaining args if the function accepts *args 

194 # - kwargs = either parsed_kwars, or parsed_kwargs | remaining kwargs if the function accepts **kwargs 

195 # - varargs = indicator that the args also contain variable args (*args) 

196 # - varkwards = indicator that variable kwargs (**kwargs) are also contained in the kwargs 

197 # 

198 

199 # Extend args to any varargs, and redefine args 

200 args = tuple(parsed_args + varargs) 

201 

202 # always take "skey"-parameter name, when configured, as parsed_kwargs 

203 if self.skey and self.skey["name"] in kwargs: 

204 parsed_kwargs[self.skey["name"]] = kwargs.pop(self.skey["name"]) 

205 

206 # When varkwargs are accepted, merge parsed_kwargs and kwargs, otherwise just use parsed_kwargs 

207 if varkwargs := varkwargs and bool(kwargs): 

208 kwargs = parsed_kwargs | kwargs 

209 else: 

210 kwargs = parsed_kwargs 

211 

212 # Trace message for final call configuration 

213 if trace := conf.debug.trace: 

214 logging.debug(f"calling {self._func=} with cleaned {args=}, {kwargs=}") 

215 

216 # evaluate skey guard setting? 

217 if self.skey and not current.request.get().skey_checked: # skey guardiance is only required once per request 

218 if trace: 

219 logging.debug(f"@skey {self.skey=}") 

220 

221 security_key = kwargs.pop(self.skey["name"], "") 

222 

223 # validation is necessary? 

224 if allow_empty := self.skey["allow_empty"]: 

225 # allow_empty can be callable, to detect programmatically 

226 if callable(allow_empty): 

227 required = not allow_empty(args, kwargs) 

228 # or allow_empty can be a sequence of allowed keys 

229 elif isinstance(allow_empty, (list, tuple)): 

230 required = any(k for k in kwargs.keys() if k not in allow_empty) 

231 # otherwise, varargs or varkwargs may not be empty. 

232 else: 

233 required = varargs or varkwargs or security_key 

234 if trace: 

235 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}") 

236 else: 

237 required = True 

238 

239 if required: 

240 if trace: 

241 logging.debug(f"@skey wanted, validating {security_key!r}") 

242 

243 from viur.core import securitykey 

244 payload = securitykey.validate(security_key, **self.skey["extra_kwargs"]) 

245 current.request.get().skey_checked = True 

246 

247 if not payload or (self.skey["validate"] and not self.skey["validate"](payload)): 

248 raise errors.PreconditionFailed( 

249 self.skey["message"] or f"Missing or invalid parameter {self.skey['name']!r}" 

250 ) 

251 

252 if self.skey["forward_payload"]: 

253 kwargs |= {self.skey["forward_payload"]: payload} 

254 

255 # evaluate access guard setting? 

256 if self.access: 

257 user = current.user.get() 

258 

259 if trace := conf.debug.trace: 

260 logging.debug(f"@access {user=} {self.access=}") 

261 

262 if not user: 

263 if offer_login := self.access["offer_login"]: 

264 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login") 

265 

266 raise errors.Unauthorized(self.access["message"]) if self.access["message"] else errors.Unauthorized() 

267 

268 ok = "root" in user["access"] 

269 

270 if not ok and self.access["access"]: 

271 for acc in self.access["access"]: 

272 if trace: 

273 logging.debug(f"@access checking {acc=}") 

274 

275 # Callable directly tests access 

276 if callable(acc): 

277 if acc(): 

278 ok = True 

279 break 

280 

281 continue 

282 

283 # Otherwise, check for access rights 

284 if isinstance(acc, str): 

285 acc = (acc, ) 

286 

287 assert isinstance(acc, (tuple, list, set)) 

288 

289 if all(a in user["access"] for a in acc): 

290 ok = True 

291 break 

292 

293 if trace: 

294 logging.debug(f"@access {ok=}") 

295 

296 if not ok: 

297 raise errors.Forbidden(self.access["message"]) if self.access["message"] else errors.Forbidden() 

298 

299 # call with instance when provided 

300 if self._instance: 

301 return self._func(self._instance, *args, **kwargs) 

302 

303 return self._func(*args, **kwargs) 

304 

305 def describe(self) -> dict: 

306 """ 

307 Describes the Method with a 

308 """ 

309 return_doc = t.get_type_hints(self._func).get("return") 

310 

311 ret = { 

312 "args": { 

313 param.name: { 

314 "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None, 

315 "default": str(param.default) if param.default is not inspect.Parameter.empty else None, 

316 } 

317 for param in self.signature.parameters.values() 

318 }, 

319 "returns": str(return_doc).strip() if return_doc else None, 

320 "accepts": self.methods, 

321 "docs": self._func.__doc__.strip() if self._func.__doc__ else None, 

322 "aliases": tuple(self.seo_language_map.keys()) if self.seo_language_map else None, 

323 } 

324 

325 if self.skey: 

326 ret["skey"] = self.skey["name"] 

327 

328 if self.access: 

329 ret["access"] = [str(access) for access in self.access["access"]] # must be a list to be JSON-serializable 

330 

331 return ret 

332 

333 def register(self, target: dict, name: str, language: str | None = None): 

334 """ 

335 Registers the Method under `name` and eventually some customized SEO-name for the provided language 

336 """ 

337 if self.exposed is None: 

338 return 

339 

340 target[name] = self 

341 

342 # reassign for SEO mapping as well 

343 if self.seo_language_map: 

344 for lang in tuple(self.seo_language_map.keys()) if not language else (language, ): 

345 if translated_name := self.seo_language_map.get(lang): 

346 target[translated_name] = self 

347 

348 

349class Module: 

350 """ 

351 This is the root module prototype that serves a minimal module in the ViUR system without any other bindings. 

352 """ 

353 

354 handler: str | t.Callable = None 

355 """ 

356 This is the module's handler, respectively its type. 

357 Use the @property-decorator in specific Modules to construct the handler's value dynamically. 

358 A module without a handler setting cannot be described, so cannot be handled by admin-tools. 

359 """ 

360 

361 accessRights: tuple[str] = None 

362 """ 

363 If set, a tuple of access rights (like add, edit, delete) that this module supports. 

364 

365 These will be prefixed on instance startup with the actual module name (becoming file-add, file-edit etc) 

366 and registered in ``conf.user.access_rights`` so these will be available on the access bone in user/add 

367 or user/edit. 

368 """ 

369 

370 roles: dict = {} 

371 r""" 

372 Allows to specify role settings for a module. 

373 

374 Defaults to no role definition, which ignores the module entirely in the role-system. 

375 In this case, access rights can still be set individually on the user's access bone. 

376 

377 A "*" wildcard can either be used as key or as value to allow for "all roles", or "all rights". 

378 

379 .. code-block:: python 

380 

381 # Example 

382 roles = { 

383 "*": "view", # Any role may only "view" 

384 "editor": ("add", "edit"), # Role "editor" may "add" or "edit", but not "delete" 

385 "admin": "*", # Role "admin" can do everything 

386 } 

387 

388 """ 

389 

390 seo_language_map: dict[str: str] = {} 

391 r""" 

392 The module name is the first part of a URL. 

393 SEO-identifiers have to be set as class-attribute ``seoLanguageMap`` of type ``dict[str, str]`` in the module. 

394 It maps a *language* to the according *identifier*. 

395 

396 .. code-block:: python 

397 :name: module seo-map 

398 :caption: modules/myorders.py 

399 :emphasize-lines: 4-7 

400 

401 from viur.core.prototypes import List 

402 

403 class MyOrders(List): 

404 seo_language_map = { 

405 "de": "bestellungen", 

406 "en": "orders", 

407 } 

408 

409 By default the module would be available under */myorders*, the lowercase module name. 

410 With the defined :attr:`seoLanguageMap`, it will become available as */de/bestellungen* and */en/orders*. 

411 

412 Great, this part is now user and robot friendly :) 

413 """ 

414 

415 adminInfo: dict[str, t.Any] | t.Callable = None 

416 """ 

417 This is a ``dict`` holding the information necessary for the Vi/Admin to handle this module. 

418 

419 name: ``str`` 

420 Human-readable module name that will be shown in the admin tool. 

421 

422 handler: ``str`` (``list``, ``tree`` or ``singleton``): 

423 Allows to override the handler provided by the module. Set this only when *really* necessary, 

424 otherwise it can be left out and is automatically injected by the Module's prototype. 

425 

426 icon: ``str`` 

427 (Optional) Either the Shoelace icon library name or a path relative to the project's deploy folder 

428 (e.g. /static/icons/viur.svg) for the icon used in the admin tool for this module. 

429 

430 columns: ``List[str]`` 

431 (Optional) List of columns (bone names) that are displayed by default. 

432 Used only by the List handler. 

433 

434 filter: ``Dict[str, str]`` 

435 (Optional) Dictionary of additional parameters that will be send along when 

436 fetching entities from the server. Can be used to filter the entities being displayed on the 

437 client-side. 

438 

439 display: ``str`` ("default", "hidden" or "group") 

440 (Optional) "hidden" will hide the module in the admin tool's main bar. 

441 (itwill not be accessible directly, however it's registered with the frontend so it can be used in a 

442 relational bone). "group" will show this module in the main bar, but it will not be clickable. 

443 Clicking it will just try to expand it (assuming there are additional views defined). 

444 

445 preview: ``Union[str, Dict[str, str]]`` 

446 (Optional) A url that will be opened in a new tab and is expected to display 

447 the entity selected in the table. Can be “/{{module}}/view/{{key}}", with {{module}} and {{key}} getting 

448 replaced as needed. If more than one preview-url is needed, supply a dictionary where the key is 

449 the URL and the value the description shown to the user. 

450 

451 views: ``List[Dict[str, t.Any]]`` 

452 (Optional) List of nested adminInfo like dictionaries. Used to define 

453 additional views on the module. Useful f.e. for an order module, where you want separate list of 

454 "payed orders", "unpayed orders", "orders waiting for shipment", etc. If such views are defined, 

455 the top-level entry in the menu bar will expand if clicked, revealing these additional filters. 

456 

457 actions: ``List[str]`` 

458 (Optional) List of actions supported by this modules. Actions can be defined by 

459 the frontend (like "add", "edit", "delete" or "preview"); it can be an action defined by a plugin 

460 loaded by the frontend; or it can be a so called "server side action" (see "customActions" below) 

461 

462 customActions: ``Dict[str, dict]`` 

463 (Optional) A mapping of names of server-defined actions that can be used 

464 in the ``actions`` list above to their definition dictionary. See .... for more details. 

465 

466 disabledActions: ``List[str, dict]`` 

467 (Optional) A list of disabled actions. The frontend will inject default actions like add or edit 

468 even if they're not listed in actions. Listing them here will prevent that. It's up to the frontend 

469 to decide if that action won't be visible at all or it's button just being disabled. 

470 

471 sortIndex: ``int`` 

472 (Optional) Defines the order in which the modules will appear in the main bar in 

473 ascrending order. 

474 

475 indexedBones: ``List[str]`` 

476 (Optional) List of bones, for which an (composite?) index exists in this 

477 view. This allows the fronted to signal the user that a given list can be sorted or filtered by this 

478 bone. If no additional filters are enforced by the 

479 :meth:`listFilter<viur.core.prototypes.list.listFilter>` and ``filter`` is not set, this should be 

480 all bones which are marked as indexed. 

481 

482 changeInvalidates: ``List[str]`` 

483 (Optional) A list of module-names which depend on the entities handled 

484 from this module. This allows the frontend to invalidate any caches in these depended modules if the 

485 data in this module changes. Example: This module may be a list-module handling the file_rootNode 

486 entities for the file module, so a edit/add/deletion action on this module should be reflected in the 

487 rootNode-selector in the file-module itself. In this case, this property should be set to ``["file"]``. 

488 

489 moduleGroup: ``str`` 

490 (Optional) If set, should be a key of a moduleGroup defined in .... . 

491 

492 editViews: ``Dict[str, t.Any]`` 

493 (Optional) If set, will embed another list-widget in the edit forms for 

494 a given entity. See .... for more details. 

495 

496 If this is a function, it must take no parameters and return the dictionary as shown above. This 

497 can be used to customize the appearance of the Vi/Admin to individual users. 

498 """ 

499 

500 def __init__(self, moduleName: str, modulePath: str, *args, **kwargs): 

501 self.render = None # will be set to the appropriate render instance at runtime 

502 self._cached_description = None # caching used by describe() 

503 self.moduleName = moduleName # Name of this module (usually it's class name, e.g. "file") 

504 self.modulePath = modulePath # Path to this module in URL-routing (e.g. "json/file") 

505 

506 if self.handler and self.accessRights: 

507 for right in self.accessRights: 

508 right = f"{self.moduleName}-{right}" 

509 

510 # fixme: Turn conf.user.access_rights into a set. 

511 if right not in conf.user.access_rights: 

512 conf.user.access_rights.append(right) 

513 

514 # Collect methods and (sub)modules 

515 self._methods = {} 

516 self._modules = {} 

517 self._update_methods() 

518 

519 def _update_methods(self): 

520 """ 

521 Internal function to update methods and submodules. 

522 This function should only be called when member attributes are dynamically modified by the module. 

523 """ 

524 self._methods.clear() 

525 self._modules.clear() 

526 

527 for key in dir(self): 

528 if key[0] == "_": 

529 continue 

530 if isinstance(getattr(self.__class__, key, None), property): 

531 continue 

532 

533 prop = getattr(self, key) 

534 

535 if isinstance(prop, Method): 

536 self._methods[key] = prop 

537 elif isinstance(prop, Module): 

538 self._modules[key] = prop 

539 

540 def describe(self) -> dict | None: 

541 """ 

542 Meta description of this module. 

543 """ 

544 # Use cached description? 

545 if isinstance(self._cached_description, dict): 

546 return self._cached_description 

547 

548 # Retrieve handler 

549 if not (handler := self.handler): 

550 return None 

551 

552 # Default description 

553 ret = { 

554 "name": self.__class__.__name__, 

555 "handler": ".".join((handler, self.__class__.__name__.lower())), 

556 "methods": { 

557 name: method.describe() for name, method in self._methods.items() 

558 }, 

559 } 

560 

561 # Extend indexes, if available 

562 # todo: This must be handled by SkelModule 

563 if indexes := getattr(self, "indexes", None): 

564 ret["indexes"] = indexes 

565 

566 # Merge adminInfo if present 

567 if admin_info := self.adminInfo() if callable(self.adminInfo) else self.adminInfo: 

568 assert isinstance(admin_info, dict), \ 

569 f"adminInfo can either be a dict or a callable returning a dict, but got {type(admin_info)}" 

570 ret |= admin_info 

571 

572 # Cache description for later re-use. 

573 if self._cached_description is not False: 

574 self._cached_description = ret 

575 

576 return ret 

577 

578 def register(self, target: dict, render: object): 

579 """ 

580 Registers this module's public functions to a given resolver. 

581 This function is executed on start-up, and can be sub-classed. 

582 """ 

583 # connect instance to render 

584 self.render = render 

585 

586 # Map module under SEO-mapped name, if available. 

587 if self.seo_language_map: 

588 for lang in conf.i18n.available_languages or [conf.i18n.default_language]: 

589 # Map the module under each translation 

590 if translated_module_name := self.seo_language_map.get(lang): 

591 translated_module = target.setdefault(translated_module_name, {}) 

592 

593 # Map module methods to the previously determined target 

594 for name, method in self._methods.items(): 

595 method.register(translated_module, name, lang) 

596 

597 conf.i18n.language_module_map[self.moduleName] = self.seo_language_map 

598 

599 # Map the module also under it's original name 

600 if self.moduleName != "index": 

601 target = target.setdefault(self.moduleName, {}) 

602 

603 # Map module methods to the previously determined target 

604 for name, method in self._methods.items(): 

605 method.register(target, name) 

606 

607 # Register sub modules 

608 for name, module in self._modules.items(): 

609 module.register(target, self.render)