Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/module.py: 11%

244 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-03 13:41 +0000

1import copy 

2import inspect 

3import types 

4import typing as t 

5import logging 

6from viur.core import db, errors, current, utils 

7from viur.core.config import conf 

8 

9 

10class Method: 

11 """ 

12 Abstraction wrapper for any public available method. 

13 """ 

14 

15 @classmethod 

16 def ensure(cls, func: t.Callable | "Method") -> "Method": 

17 """ 

18 Ensures the provided `func` parameter is either a Method already, or turns it 

19 into a Method. This is done to avoid stacking Method objects, which may create 

20 unwanted results. 

21 """ 

22 if isinstance(func, Method): 

23 return func 

24 

25 return cls(func) 

26 

27 def __init__(self, func: t.Callable): 

28 # Content 

29 self._func = func 

30 self.__name__ = func.__name__ 

31 self._instance = None 

32 

33 # Attributes 

34 self.exposed = None # None = unexposed, True = exposed, False = internal exposed 

35 self.ssl = False 

36 self.methods = ("GET", "POST", "HEAD") 

37 self.seo_language_map = None 

38 

39 # Inspection 

40 self.signature = inspect.signature(self._func) 

41 

42 # Guards 

43 self.skey = None 

44 self.access = None 

45 

46 def __get__(self, obj, objtype=None): 

47 """ 

48 This binds the Method to an object. 

49 

50 To do it, the Method instance is copied and equipped with the individual _instance member. 

51 """ 

52 if obj: 

53 bound = copy.copy(self) 

54 bound._instance = obj 

55 return bound 

56 

57 return self 

58 

59 def __call__(self, *args, **kwargs): 

60 """ 

61 Calls the method with given args and kwargs. 

62 

63 Prepares and filters argument values from args and kwargs regarding self._func's signature and type annotations, 

64 if present. 

65 

66 Method objects normally wrap functions which are externally exposed. Therefore, any arguments passed from the 

67 client are str-values, and are automatically parsed when equipped with type-annotations. 

68 

69 This preparation of arguments therefore inspects the target function as follows 

70 - incoming values are parsed to their particular type, if type annotations are present 

71 - parameters in *args and **kwargs are being checked against their signature; only relevant values are being 

72 passed, anything else is thrown away. 

73 - execution of guard configurations from @skey and @access, if present 

74 """ 

75 

76 if trace := conf.debug.trace: 

77 logging.debug(f"calling {self._func=} with raw {args=}, {kwargs=}") 

78 

79 def parse_value_by_annotation(annotation: type, name: str, value: str | list | tuple) -> t.Any: 

80 """ 

81 Tries to parse a value according to a given type. 

82 May be called recursively to handle unions, lists and tuples as well. 

83 """ 

84 # simple types 

85 if annotation is str: 

86 return str(value) 

87 elif annotation is int: 

88 return int(value) 

89 elif annotation is float: 

90 return float(value) 

91 elif annotation is bool: 

92 return utils.parse.bool(value) 

93 elif annotation is types.NoneType: 

94 return None 

95 

96 # complex types 

97 origin_type = t.get_origin(annotation) 

98 

99 if origin_type is list and len(annotation.__args__) == 1: 

100 if not isinstance(value, list): 

101 value = [value] 

102 

103 return [parse_value_by_annotation(annotation.__args__[0], name, item) for item in value] 

104 

105 elif origin_type is tuple and len(annotation.__args__) == 1: 

106 if not isinstance(value, tuple): 

107 value = (value, ) 

108 

109 return tuple(parse_value_by_annotation(annotation.__args__[0], name, item) for item in value) 

110 

111 elif origin_type is t.Literal: 

112 if not any(value == str(literal) for literal in annotation.__args__): 

113 raise errors.NotAcceptable(f"Expecting any of {annotation.__args__} for {name}") 

114 

115 return value 

116 

117 elif origin_type is t.Union or isinstance(annotation, types.UnionType): 

118 for i, sub_annotation in enumerate(annotation.__args__): 

119 try: 

120 return parse_value_by_annotation(sub_annotation, name, value) 

121 except ValueError: 

122 if i == len(annotation.__args__) - 1: 

123 raise 

124 

125 elif annotation is db.Key: 

126 if isinstance(value, db.Key): 

127 return value 

128 

129 return parse_value_by_annotation(int | str, name, value) 

130 

131 raise errors.NotAcceptable(f"Unhandled type {annotation=} for {name}={value!r}") 

132 

133 # examine parameters 

134 args_iter = iter(args) 

135 

136 parsed_args = [] 

137 parsed_kwargs = {} 

138 varargs = [] 

139 varkwargs = False 

140 

141 for i, (param_name, param) in enumerate(self.signature.parameters.items()): 

142 if self._instance and i == 0 and param_name == "self": 

143 continue 

144 

145 param_type = param.annotation if param.annotation is not param.empty else None 

146 param_required = param.default is param.empty 

147 

148 # take positional parameters first 

149 if param.kind in ( 

150 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

151 inspect.Parameter.POSITIONAL_ONLY 

152 ): 

153 try: 

154 value = next(args_iter) 

155 

156 if param_type: 

157 value = parse_value_by_annotation(param_type, param_name, value) 

158 

159 parsed_args.append(value) 

160 continue 

161 except StopIteration: 

162 pass 

163 

164 # otherwise take kwargs or variadics 

165 if ( 

166 param.kind in ( 

167 inspect.Parameter.POSITIONAL_OR_KEYWORD, 

168 inspect.Parameter.KEYWORD_ONLY 

169 ) 

170 and param_name in kwargs 

171 ): 

172 value = kwargs.pop(param_name) 

173 

174 if param_type: 

175 value = parse_value_by_annotation(param_type, param_name, value) 

176 

177 parsed_kwargs[param_name] = value 

178 

179 elif param.kind == inspect.Parameter.VAR_POSITIONAL: 

180 varargs = list(args_iter) 

181 elif param.kind == inspect.Parameter.VAR_KEYWORD: 

182 varkwargs = True 

183 elif param_required: 

184 if self.skey and param_name == self.skey["forward_payload"]: 

185 continue 

186 

187 raise errors.NotAcceptable(f"Missing required parameter {param_name!r}") 

188 

189 # Here's a short clarification on the variables used here: 

190 # 

191 # - parsed_args = tuple of (the type-parsed) arguments that have been assigned based on the signature 

192 # - parsed_kwargs = dict of (the type-parsed) keyword arguments that have been assigned based on the signature 

193 # - args = either parsed_args, or parsed_args + remaining args if the function accepts *args 

194 # - kwargs = either parsed_kwars, or parsed_kwargs | remaining kwargs if the function accepts **kwargs 

195 # - varargs = indicator that the args also contain variable args (*args) 

196 # - varkwards = indicator that variable kwargs (**kwargs) are also contained in the kwargs 

197 # 

198 

199 # Extend args to any varargs, and redefine args 

200 args = tuple(parsed_args + varargs) 

201 

202 # always take "skey"-parameter name, when configured, as parsed_kwargs 

203 if self.skey and self.skey["name"] in kwargs: 

204 parsed_kwargs[self.skey["name"]] = kwargs.pop(self.skey["name"]) 

205 

206 # When varkwargs are accepted, merge parsed_kwargs and kwargs, otherwise just use parsed_kwargs 

207 if varkwargs := varkwargs and bool(kwargs): 

208 kwargs = parsed_kwargs | kwargs 

209 else: 

210 kwargs = parsed_kwargs 

211 

212 # Trace message for final call configuration 

213 if trace := conf.debug.trace: 

214 logging.debug(f"calling {self._func=} with cleaned {args=}, {kwargs=}") 

215 

216 # evaluate skey guard setting? 

217 if self.skey and not current.request.get().skey_checked: # skey guardiance is only required once per request 

218 if trace: 

219 logging.debug(f"@skey {self.skey=}") 

220 

221 security_key = kwargs.pop(self.skey["name"], "") 

222 

223 # validation is necessary? 

224 if allow_empty := self.skey["allow_empty"]: 

225 # allow_empty can be callable, to detect programmatically 

226 if callable(allow_empty): 

227 required = not allow_empty(args, kwargs) 

228 # or allow_empty can be a sequence of allowed keys 

229 elif isinstance(allow_empty, (list, tuple)): 

230 required = any(k for k in kwargs.keys() if k not in allow_empty) 

231 # otherwise, varargs or varkwargs may not be empty. 

232 else: 

233 required = varargs or varkwargs or security_key 

234 if trace: 

235 logging.debug(f"@skey {required=} because either {varargs=} or {varkwargs=} or {security_key=}") 

236 else: 

237 required = True 

238 

239 if required: 

240 if trace: 

241 logging.debug(f"@skey wanted, validating {security_key!r}") 

242 

243 from viur.core import securitykey 

244 payload = securitykey.validate(security_key, **self.skey["extra_kwargs"]) 

245 current.request.get().skey_checked = True 

246 

247 if not payload or (self.skey["validate"] and not self.skey["validate"](payload)): 

248 raise errors.PreconditionFailed( 

249 self.skey["message"] or f"Missing or invalid parameter {self.skey['name']!r}" 

250 ) 

251 

252 if self.skey["forward_payload"]: 

253 kwargs |= {self.skey["forward_payload"]: payload} 

254 

255 # evaluate access guard setting? 

256 if self.access: 

257 user = current.user.get() 

258 

259 if trace := conf.debug.trace: 

260 logging.debug(f"@access {user=} {self.access=}") 

261 

262 if not user: 

263 if offer_login := self.access["offer_login"]: 

264 raise errors.Redirect(offer_login if isinstance(offer_login, str) else "/user/login") 

265 

266 raise errors.Unauthorized(self.access["message"]) if self.access["message"] else errors.Unauthorized() 

267 

268 ok = False 

269 for acc in self.access["access"]: 

270 if trace: 

271 logging.debug(f"@access checking {acc=}") 

272 

273 # Callable directly tests access 

274 if callable(acc): 

275 if acc(): 

276 ok = True 

277 break 

278 

279 continue 

280 

281 # Otherwise, check for access rights 

282 if isinstance(acc, str): 

283 acc = (acc, ) 

284 

285 assert isinstance(acc, (tuple, list, set)) 

286 

287 if not set(acc).difference(user["access"]): 

288 ok = True 

289 break 

290 

291 if trace: 

292 logging.debug(f"@access {ok=}") 

293 

294 if not ok: 

295 raise errors.Forbidden(self.access["message"]) if self.access["message"] else errors.Forbidden() 

296 

297 # call with instance when provided 

298 if self._instance: 

299 return self._func(self._instance, *args, **kwargs) 

300 

301 return self._func(*args, **kwargs) 

302 

303 def describe(self) -> dict: 

304 """ 

305 Describes the Method with a 

306 """ 

307 return_doc = t.get_type_hints(self._func).get("return") 

308 

309 ret = { 

310 "args": { 

311 param.name: { 

312 "type": str(param.annotation) if param.annotation is not inspect.Parameter.empty else None, 

313 "default": str(param.default) if param.default is not inspect.Parameter.empty else None, 

314 } 

315 for param in self.signature.parameters.values() 

316 }, 

317 "returns": str(return_doc).strip() if return_doc else None, 

318 "accepts": self.methods, 

319 "docs": self._func.__doc__.strip() if self._func.__doc__ else None, 

320 "aliases": tuple(self.seo_language_map.keys()) if self.seo_language_map else None, 

321 } 

322 

323 if self.skey: 

324 ret["skey"] = self.skey["name"] 

325 

326 if self.access: 

327 ret["access"] = [str(access) for access in self.access["access"]] # must be a list to be JSON-serializable 

328 

329 return ret 

330 

331 def register(self, target: dict, name: str, language: str | None = None): 

332 """ 

333 Registers the Method under `name` and eventually some customized SEO-name for the provided language 

334 """ 

335 if self.exposed is None: 

336 return 

337 

338 target[name] = self 

339 

340 # reassign for SEO mapping as well 

341 if self.seo_language_map: 

342 for lang in tuple(self.seo_language_map.keys()) if not language else (language, ): 

343 if translated_name := self.seo_language_map.get(lang): 

344 target[translated_name] = self 

345 

346 

347class Module: 

348 """ 

349 This is the root module prototype that serves a minimal module in the ViUR system without any other bindings. 

350 """ 

351 

352 handler: str | t.Callable = None 

353 """ 

354 This is the module's handler, respectively its type. 

355 Use the @property-decorator in specific Modules to construct the handler's value dynamically. 

356 A module without a handler setting cannot be described, so cannot be handled by admin-tools. 

357 """ 

358 

359 accessRights: tuple[str] = None 

360 """ 

361 If set, a tuple of access rights (like add, edit, delete) that this module supports. 

362 

363 These will be prefixed on instance startup with the actual module name (becoming file-add, file-edit etc) 

364 and registered in ``conf.user.access_rights`` so these will be available on the access bone in user/add 

365 or user/edit. 

366 """ 

367 

368 roles: dict = {} 

369 r""" 

370 Allows to specify role settings for a module. 

371 

372 Defaults to no role definition, which ignores the module entirely in the role-system. 

373 In this case, access rights can still be set individually on the user's access bone. 

374 

375 A "*" wildcard can either be used as key or as value to allow for "all roles", or "all rights". 

376 

377 .. code-block:: python 

378 

379 # Example 

380 roles = { 

381 "*": "view", # Any role may only "view" 

382 "editor": ("add", "edit"), # Role "editor" may "add" or "edit", but not "delete" 

383 "admin": "*", # Role "admin" can do everything 

384 } 

385 

386 """ 

387 

388 seo_language_map: dict[str: str] = {} 

389 r""" 

390 The module name is the first part of a URL. 

391 SEO-identifiers have to be set as class-attribute ``seoLanguageMap`` of type ``dict[str, str]`` in the module. 

392 It maps a *language* to the according *identifier*. 

393 

394 .. code-block:: python 

395 :name: module seo-map 

396 :caption: modules/myorders.py 

397 :emphasize-lines: 4-7 

398 

399 from viur.core.prototypes import List 

400 

401 class MyOrders(List): 

402 seo_language_map = { 

403 "de": "bestellungen", 

404 "en": "orders", 

405 } 

406 

407 By default the module would be available under */myorders*, the lowercase module name. 

408 With the defined :attr:`seoLanguageMap`, it will become available as */de/bestellungen* and */en/orders*. 

409 

410 Great, this part is now user and robot friendly :) 

411 """ 

412 

413 adminInfo: dict[str, t.Any] | t.Callable = None 

414 """ 

415 This is a ``dict`` holding the information necessary for the Vi/Admin to handle this module. 

416 

417 name: ``str`` 

418 Human-readable module name that will be shown in the admin tool. 

419 

420 handler: ``str`` (``list``, ``tree`` or ``singleton``): 

421 Allows to override the handler provided by the module. Set this only when *really* necessary, 

422 otherwise it can be left out and is automatically injected by the Module's prototype. 

423 

424 icon: ``str`` 

425 (Optional) Either the Shoelace icon library name or a path relative to the project's deploy folder 

426 (e.g. /static/icons/viur.svg) for the icon used in the admin tool for this module. 

427 

428 columns: ``List[str]`` 

429 (Optional) List of columns (bone names) that are displayed by default. 

430 Used only by the List handler. 

431 

432 filter: ``Dict[str, str]`` 

433 (Optional) Dictionary of additional parameters that will be send along when 

434 fetching entities from the server. Can be used to filter the entities being displayed on the 

435 client-side. 

436 

437 display: ``str`` ("default", "hidden" or "group") 

438 (Optional) "hidden" will hide the module in the admin tool's main bar. 

439 (itwill not be accessible directly, however it's registered with the frontend so it can be used in a 

440 relational bone). "group" will show this module in the main bar, but it will not be clickable. 

441 Clicking it will just try to expand it (assuming there are additional views defined). 

442 

443 preview: ``Union[str, Dict[str, str]]`` 

444 (Optional) A url that will be opened in a new tab and is expected to display 

445 the entity selected in the table. Can be “/{{module}}/view/{{key}}", with {{module}} and {{key}} getting 

446 replaced as needed. If more than one preview-url is needed, supply a dictionary where the key is 

447 the URL and the value the description shown to the user. 

448 

449 views: ``List[Dict[str, t.Any]]`` 

450 (Optional) List of nested adminInfo like dictionaries. Used to define 

451 additional views on the module. Useful f.e. for an order module, where you want separate list of 

452 "payed orders", "unpayed orders", "orders waiting for shipment", etc. If such views are defined, 

453 the top-level entry in the menu bar will expand if clicked, revealing these additional filters. 

454 

455 actions: ``List[str]`` 

456 (Optional) List of actions supported by this modules. Actions can be defined by 

457 the frontend (like "add", "edit", "delete" or "preview"); it can be an action defined by a plugin 

458 loaded by the frontend; or it can be a so called "server side action" (see "customActions" below) 

459 

460 customActions: ``Dict[str, dict]`` 

461 (Optional) A mapping of names of server-defined actions that can be used 

462 in the ``actions`` list above to their definition dictionary. See .... for more details. 

463 

464 disabledActions: ``List[str, dict]`` 

465 (Optional) A list of disabled actions. The frontend will inject default actions like add or edit 

466 even if they're not listed in actions. Listing them here will prevent that. It's up to the frontend 

467 to decide if that action won't be visible at all or it's button just being disabled. 

468 

469 sortIndex: ``int`` 

470 (Optional) Defines the order in which the modules will appear in the main bar in 

471 ascrending order. 

472 

473 indexedBones: ``List[str]`` 

474 (Optional) List of bones, for which an (composite?) index exists in this 

475 view. This allows the fronted to signal the user that a given list can be sorted or filtered by this 

476 bone. If no additional filters are enforced by the 

477 :meth:`listFilter<viur.core.prototypes.list.listFilter>` and ``filter`` is not set, this should be 

478 all bones which are marked as indexed. 

479 

480 changeInvalidates: ``List[str]`` 

481 (Optional) A list of module-names which depend on the entities handled 

482 from this module. This allows the frontend to invalidate any caches in these depended modules if the 

483 data in this module changes. Example: This module may be a list-module handling the file_rootNode 

484 entities for the file module, so a edit/add/deletion action on this module should be reflected in the 

485 rootNode-selector in the file-module itself. In this case, this property should be set to ``["file"]``. 

486 

487 moduleGroup: ``str`` 

488 (Optional) If set, should be a key of a moduleGroup defined in .... . 

489 

490 editViews: ``Dict[str, t.Any]`` 

491 (Optional) If set, will embed another list-widget in the edit forms for 

492 a given entity. See .... for more details. 

493 

494 If this is a function, it must take no parameters and return the dictionary as shown above. This 

495 can be used to customize the appearance of the Vi/Admin to individual users. 

496 """ 

497 

498 def __init__(self, moduleName: str, modulePath: str, *args, **kwargs): 

499 self.render = None # will be set to the appropriate render instance at runtime 

500 self._cached_description = None # caching used by describe() 

501 self.moduleName = moduleName # Name of this module (usually it's class name, e.g. "file") 

502 self.modulePath = modulePath # Path to this module in URL-routing (e.g. "json/file") 

503 

504 if self.handler and self.accessRights: 

505 for right in self.accessRights: 

506 right = f"{self.moduleName}-{right}" 

507 

508 # fixme: Turn conf.user.access_rights into a set. 

509 if right not in conf.user.access_rights: 

510 conf.user.access_rights.append(right) 

511 

512 # Collect methods and (sub)modules 

513 self._methods = {} 

514 self._modules = {} 

515 self._update_methods() 

516 

517 def _update_methods(self): 

518 """ 

519 Internal function to update methods and submodules. 

520 This function should only be called when member attributes are dynamically modified by the module. 

521 """ 

522 self._methods.clear() 

523 self._modules.clear() 

524 

525 for key in dir(self): 

526 if key[0] == "_": 

527 continue 

528 if isinstance(getattr(self.__class__, key, None), property): 

529 continue 

530 

531 prop = getattr(self, key) 

532 

533 if isinstance(prop, Method): 

534 self._methods[key] = prop 

535 elif isinstance(prop, Module): 

536 self._modules[key] = prop 

537 

538 def describe(self) -> dict | None: 

539 """ 

540 Meta description of this module. 

541 """ 

542 # Use cached description? 

543 if isinstance(self._cached_description, dict): 

544 return self._cached_description 

545 

546 # Retrieve handler 

547 if not (handler := self.handler): 

548 return None 

549 

550 # Default description 

551 ret = { 

552 "name": self.__class__.__name__, 

553 "handler": ".".join((handler, self.__class__.__name__.lower())), 

554 "methods": { 

555 name: method.describe() for name, method in self._methods.items() 

556 }, 

557 } 

558 

559 # Extend indexes, if available 

560 # todo: This must be handled by SkelModule 

561 if indexes := getattr(self, "indexes", None): 

562 ret["indexes"] = indexes 

563 

564 # Merge adminInfo if present 

565 if admin_info := self.adminInfo() if callable(self.adminInfo) else self.adminInfo: 

566 assert isinstance(admin_info, dict), \ 

567 f"adminInfo can either be a dict or a callable returning a dict, but got {type(admin_info)}" 

568 ret |= admin_info 

569 

570 # Cache description for later re-use. 

571 if self._cached_description is not False: 

572 self._cached_description = ret 

573 

574 return ret 

575 

576 def register(self, target: dict, render: object): 

577 """ 

578 Registers this module's public functions to a given resolver. 

579 This function is executed on start-up, and can be sub-classed. 

580 """ 

581 # connect instance to render 

582 self.render = render 

583 

584 # Map module under SEO-mapped name, if available. 

585 if self.seo_language_map: 

586 for lang in conf.i18n.available_languages or [conf.i18n.default_language]: 

587 # Map the module under each translation 

588 if translated_module_name := self.seo_language_map.get(lang): 

589 translated_module = target.setdefault(translated_module_name, {}) 

590 

591 # Map module methods to the previously determined target 

592 for name, method in self._methods.items(): 

593 method.register(translated_module, name, lang) 

594 

595 conf.i18n.language_module_map[self.moduleName] = self.seo_language_map 

596 

597 # Map the module also under it's original name 

598 if self.moduleName != "index": 

599 target = target.setdefault(self.moduleName, {}) 

600 

601 # Map module methods to the previously determined target 

602 for name, method in self._methods.items(): 

603 method.register(target, name) 

604 

605 # Register sub modules 

606 for name, module in self._modules.items(): 

607 module.register(target, self.render)