Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%

430 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-07 19:28 +0000

1import logging 

2import typing as t 

3from deprecated.sphinx import deprecated 

4from viur.core import utils, errors, db, current 

5from viur.core.decorators import * 

6from viur.core.bones import KeyBone, SortIndexBone 

7from viur.core.cache import flushCache 

8from viur.core.skeleton import Skeleton, SkeletonInstance 

9from viur.core.tasks import CallDeferred 

10from .skelmodule import SkelModule 

11 

12 

13SkelType = t.Literal["node", "leaf"] 

14 

15 

16class TreeSkel(Skeleton): 

17 parententry = KeyBone( 

18 descr="Parent", 

19 visible=False, 

20 readOnly=True, 

21 ) 

22 parentrepo = KeyBone( 

23 descr="BaseRepo", 

24 visible=False, 

25 readOnly=True, 

26 ) 

27 sortindex = SortIndexBone( 

28 visible=False, 

29 readOnly=True, 

30 ) 

31 

32 @classmethod 

33 def refresh(cls, skelValues): # ViUR2 Compatibility 

34 super().refresh(skelValues) 

35 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

36 skelValues["parententry"] = utils.normalizeKey( 

37 db.Key.from_legacy_urlsafe(skelValues.dbEntity["parentdir"])) 

38 

39 

40class Tree(SkelModule): 

41 """ 

42 Tree module prototype. 

43 

44 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

45 """ 

46 accessRights = ("add", "edit", "view", "delete", "manage") 

47 

48 nodeSkelCls = None 

49 leafSkelCls = None 

50 

51 default_order = "sortindex" 

52 

53 def __init__(self, moduleName, modulePath, *args, **kwargs): 

54 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

55 super().__init__(moduleName, modulePath, *args, **kwargs) 

56 

57 @property 

58 def handler(self): 

59 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

60 

61 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

62 """ 

63 Checks for correct skelType. 

64 

65 Either returns the type provided, or None in case it is invalid. 

66 """ 

67 skelType = skelType.lower() 

68 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

69 return skelType 

70 

71 return None 

72 

73 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

74 if not (skelType := self._checkSkelType(skelType)): 

75 raise ValueError("Unsupported skelType") 

76 

77 if skelType == "leaf": 

78 return self.leafSkelCls 

79 

80 return self.nodeSkelCls 

81 

82 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

83 """ 

84 Return unmodified base skeleton for the given skelType. 

85 

86 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

87 """ 

88 return self._resolveSkelCls(skelType, *args, **kwargs)() 

89 

90 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

91 """ 

92 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

93 for viewing an existing entry from the tree. 

94 

95 The default is a Skeleton instance returned by :func:`~baseSkel`. 

96 

97 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

98 

99 :return: Returns a Skeleton instance for viewing an entry. 

100 """ 

101 return self.baseSkel(skelType, *args, **kwargs) 

102 

103 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

104 """ 

105 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

106 for adding an entry to the tree. 

107 

108 The default is a Skeleton instance returned by :func:`~baseSkel`. 

109 

110 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

111 

112 :return: Returns a Skeleton instance for adding an entry. 

113 """ 

114 return self.baseSkel(skelType, *args, **kwargs) 

115 

116 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

117 """ 

118 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

119 for editing an existing entry from the tree. 

120 

121 The default is a Skeleton instance returned by :func:`~baseSkel`. 

122 

123 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

124 

125 :return: Returns a Skeleton instance for editing an entry. 

126 """ 

127 return self.baseSkel(skelType, *args, **kwargs) 

128 

129 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

130 """ 

131 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

132 for cloning an existing entry of the tree. 

133 

134 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

135 

136 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

137 

138 :return: Returns a SkeletonInstance for cloning an entry. 

139 """ 

140 return self.baseSkel(skelType, *args, **kwargs) 

141 

142 def rootnodeSkel( 

143 self, 

144 *, 

145 identifier: str = "rep_module_repo", 

146 ensure: bool | dict | t.Callable[[SkeletonInstance], None] = False, 

147 ) -> SkeletonInstance: 

148 """ 

149 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

150 for rootnode entries. 

151 

152 The default is a SkeletonInstance returned by :func:`~baseSkel`, with a preset key created from identifier. 

153 

154 :param identifier: Unique identifier (name) for this rootnode. 

155 :param ensure: If provided, ensures that the skeleton is available, and created with optionally provided values. 

156 

157 :return: Returns a SkeletonInstance for handling root nodes. 

158 """ 

159 skel = self.baseSkel("node") 

160 

161 skel["key"] = db.Key(skel.kindName, identifier) 

162 skel["rootNode"] = True 

163 

164 if ensure not in (False, None): 

165 return skel.read(create=ensure) 

166 

167 return skel 

168 

169 @deprecated( 

170 version="3.7.0", 

171 reason="Use rootnodeSkel(ensure=True) instead.", 

172 action="always" 

173 ) 

174 def ensureOwnModuleRootNode(self) -> db.Entity: 

175 """ 

176 Ensures, that general root-node for the current module exists. 

177 If no root-node exists yet, it will be created. 

178 

179 :returns: The entity of the root-node. 

180 """ 

181 return self.rootnodeSkel(ensure=True).dbEntity 

182 

183 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

184 """ 

185 Default function for providing a list of root node items. 

186 This list is requested by several module-internal functions and *must* be 

187 overridden by a custom functionality. The default stub for this function 

188 returns an empty list. 

189 An example implementation could be the following: 

190 

191 .. code-block:: python 

192 

193 # Example 

194 def getAvailableRootNodes(self, *args, **kwargs): 

195 q = db.Query(self.rootKindName) 

196 ret = [{"key": str(e.key()), 

197 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

198 for e in q.run(limit=25)] 

199 return ret 

200 

201 :param args: Can be used in custom implementations. 

202 :param kwargs: Can be used in custom implementations. 

203 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

204 respective information. 

205 """ 

206 return [] 

207 

208 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

209 """ 

210 Returns the root-node for a given child. 

211 

212 :param key: Key of the child node entry. 

213 

214 :returns: The skeleton of the root-node. 

215 """ 

216 skel = self.nodeSkelCls() 

217 

218 while key: 

219 if not skel.read(key): 

220 return None 

221 

222 key = skel["parententry"] 

223 

224 return skel 

225 

226 @CallDeferred 

227 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

228 """ 

229 Recursively fixes the parentrepo key after a move operation. 

230 

231 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

232 

233 :param parentNode: URL-safe key of the node which children should be fixed. 

234 :param newRepoKey: URL-safe key of the new repository. 

235 :param depth: Safety level depth preventing infinitive loops. 

236 """ 

237 if depth > 99: 

238 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

239 logging.critical("Your data is corrupt!") 

240 logging.debug(f"{parentNode=}, {newRepoKey=}") 

241 return 

242 

243 def fixTxn(nodeKey, newRepoKey): 

244 node = db.Get(nodeKey) 

245 node["parentrepo"] = newRepoKey 

246 db.Put(node) 

247 

248 # Fix all nodes 

249 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

250 for repo in q.iter(): 

251 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

252 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

253 

254 # Fix the leafs on this level 

255 if self.leafSkelCls: 

256 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

257 for repo in q.iter(): 

258 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

259 

260 ## Internal exposed functions 

261 

262 @internal_exposed 

263 def pathToKey(self, key: db.Key): 

264 """ 

265 Returns the recursively expanded path through the Tree from the root-node to a 

266 requested node. 

267 :param key: Key of the destination *node*. 

268 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

269 """ 

270 lastLevel = [] 

271 for x in range(0, 99): 

272 currentNodeSkel = self.viewSkel("node") 

273 if not currentNodeSkel.read(key): 

274 return [] # Either invalid key or listFilter prevented us from fetching anything 

275 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

276 break 

277 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

278 currentLevel = [{"skel": x, 

279 "active": x["key"] == currentNodeSkel["key"], 

280 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

281 for x in self.listFilter(levelQry).fetch(99)] 

282 assert currentLevel, "Got emtpy parent list?" 

283 lastLevel = currentLevel 

284 key = currentNodeSkel["parententry"] 

285 return lastLevel 

286 

287 ## External exposed functions 

288 

289 @exposed 

290 def index(self, skelType: SkelType = "node", parententry: t.Optional[db.Key | int | str] = None, **kwargs): 

291 if not parententry: 

292 repos = self.getAvailableRootNodes(**kwargs) 

293 match len(repos): 

294 case 0: 

295 raise errors.Unauthorized() 

296 case 1: 

297 parententry = repos[0]["key"] 

298 case _: 

299 raise errors.NotAcceptable(f"Missing required parameter {'parententry'!r}") 

300 

301 return self.list(skelType=skelType, parententry=parententry, **kwargs) 

302 

303 @exposed 

304 def listRootNodes(self, *args, **kwargs) -> t.Any: 

305 """ 

306 Renders a list of all available repositories for the current user using the 

307 modules default renderer. 

308 

309 :returns: The rendered representation of the available root-nodes. 

310 """ 

311 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

312 

313 @exposed 

314 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

315 """ 

316 Prepares and renders a list of entries. 

317 

318 All supplied parameters are interpreted as filters for the elements displayed. 

319 

320 Unlike other module prototypes in ViUR, the access control in this function is performed 

321 by calling the function :func:`listFilter`, which updates the query-filter to match only 

322 elements which the user is allowed to see. 

323 

324 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

325 

326 :returns: The rendered list objects for the matching entries. 

327 

328 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

329 """ 

330 if not (skelType := self._checkSkelType(skelType)): 

331 raise errors.NotAcceptable("Invalid skelType provided.") 

332 

333 # The general access control is made via self.listFilter() 

334 if not (query := self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))): 

335 raise errors.Unauthorized() 

336 

337 self._apply_default_order(query) 

338 return self.render.list(query.fetch()) 

339 

340 @exposed 

341 def structure(self, skelType: SkelType, action: t.Optional[str] = "view") -> t.Any: 

342 """ 

343 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

344 in each bone. 

345 

346 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

347 """ 

348 # FIXME: In ViUR > 3.7 this could also become dynamic (ActionSkel paradigm). 

349 match action: 

350 case "view": 

351 skel = self.viewSkel(skelType) 

352 if not self.canView(skelType, skel): 

353 raise errors.Unauthorized() 

354 

355 case "edit": 

356 skel = self.editSkel(skelType) 

357 if not self.canEdit(skelType, skel): 

358 raise errors.Unauthorized() 

359 

360 case "add": 

361 if not self.canAdd(skelType): 

362 raise errors.Unauthorized() 

363 

364 skel = self.addSkel(skelType) 

365 

366 case "clone": 

367 skel = self.cloneSkel(skelType) 

368 if not (self.canAdd(skelType) and self.canEdit(skelType, skel)): 

369 raise errors.Unauthorized() 

370 

371 case _: 

372 raise errors.NotImplemented(f"The action {action!r} is not implemented.") 

373 

374 return self.render.render(f"structure.{skelType}.{action}", skel) 

375 

376 @exposed 

377 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

378 """ 

379 Prepares and renders a single entry for viewing. 

380 

381 The entry is fetched by its *key* and its *skelType*. 

382 The function performs several access control checks on the requested entity before it is rendered. 

383 

384 .. seealso:: :func:`canView`, :func:`onView` 

385 

386 :returns: The rendered representation of the requested entity. 

387 

388 :param skelType: May either be "node" or "leaf". 

389 :param key: URL-safe key of the parent. 

390 

391 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

392 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

393 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

394 """ 

395 if not (skelType := self._checkSkelType(skelType)): 

396 raise errors.NotAcceptable(f"Invalid skelType provided.") 

397 

398 skel = self.viewSkel(skelType) 

399 if not skel.read(key): 

400 raise errors.NotFound() 

401 

402 if not self.canView(skelType, skel): 

403 raise errors.Unauthorized() 

404 

405 self.onView(skelType, skel) 

406 return self.render.view(skel) 

407 

408 @exposed 

409 @force_ssl 

410 @skey(allow_empty=True) 

411 def add(self, skelType: SkelType, node: db.Key | int | str, *args, **kwargs) -> t.Any: 

412 """ 

413 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

414 on incorrect data. Data is taken by any other arguments in *kwargs*. 

415 

416 The function performs several access control checks on the requested entity before it is added. 

417 

418 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

419 

420 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

421 :param node: URL-safe key of the parent. 

422 

423 :returns: The rendered, added object of the entry, eventually with error hints. 

424 

425 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

426 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

427 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

428 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

429 """ 

430 if not (skelType := self._checkSkelType(skelType)): 

431 raise errors.NotAcceptable(f"Invalid skelType provided.") 

432 

433 skel = self.addSkel(skelType) 

434 parentNodeSkel = self.editSkel("node") 

435 if not parentNodeSkel.read(node): 

436 raise errors.NotFound("The provided parent node could not be found.") 

437 if not self.canAdd(skelType, parentNodeSkel): 

438 raise errors.Unauthorized() 

439 

440 skel["parententry"] = parentNodeSkel["key"] 

441 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

442 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

443 

444 if ( 

445 not kwargs # no data supplied 

446 or not current.request.get().isPostRequest # failure if not using POST-method 

447 or not skel.fromClient(kwargs) # failure on reading into the bones 

448 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

449 ): 

450 return self.render.add(skel) 

451 

452 self.onAdd(skelType, skel) 

453 skel.write() 

454 self.onAdded(skelType, skel) 

455 

456 return self.render.addSuccess(skel) 

457 

458 @force_ssl 

459 @force_post 

460 @exposed 

461 @skey 

462 @access("root") 

463 def add_or_edit(self, skelType: SkelType, key: db.Key | int | str, **kwargs) -> t.Any: 

464 """ 

465 This function is intended to be used by importers. 

466 Only "root"-users are allowed to use it. 

467 """ 

468 if not (skelType := self._checkSkelType(skelType)): 

469 raise errors.NotAcceptable("Invalid skelType provided.") 

470 

471 kind_name = self.nodeSkelCls.kindName if skelType == "node" else self.leafSkelCls.kindName 

472 

473 db_key = db.keyHelper(key, targetKind=kind_name, adjust_kind=kind_name) 

474 is_add = not bool(db.Get(db_key)) 

475 

476 if is_add: 

477 skel = self.addSkel(skelType) 

478 else: 

479 skel = self.editSkel(skelType) 

480 

481 skel["key"] = db_key 

482 

483 if ( 

484 not kwargs # no data supplied 

485 or not skel.fromClient(kwargs) # failure on reading into the bones 

486 ): 

487 # render the skeleton in the version it could as far as it could be read. 

488 return self.render.render("add_or_edit", skel) 

489 

490 if is_add: 

491 self.onAdd(skelType, skel) 

492 else: 

493 self.onEdit(skelType, skel) 

494 

495 skel.write() 

496 

497 if is_add: 

498 self.onAdded(skelType, skel) 

499 return self.render.addSuccess(skel) 

500 

501 self.onEdited(skelType, skel) 

502 return self.render.editSuccess(skel) 

503 

504 @exposed 

505 @force_ssl 

506 @skey(allow_empty=True) 

507 def edit(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

508 """ 

509 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

510 Data is taken by any other arguments in *kwargs*. 

511 

512 The function performs several access control checks on the requested entity before it is added. 

513 

514 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

515 

516 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

517 :param key: URL-safe key of the item to be edited. 

518 

519 :returns: The rendered, modified object of the entry, eventually with error hints. 

520 

521 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

522 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

523 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

524 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

525 """ 

526 if not (skelType := self._checkSkelType(skelType)): 

527 raise errors.NotAcceptable(f"Invalid skelType provided.") 

528 

529 skel = self.editSkel(skelType) 

530 if not skel.read(key): 

531 raise errors.NotFound() 

532 

533 if not self.canEdit(skelType, skel): 

534 raise errors.Unauthorized() 

535 

536 if ( 

537 not kwargs # no data supplied 

538 or not current.request.get().isPostRequest # failure if not using POST-method 

539 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

540 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

541 ): 

542 return self.render.edit(skel) 

543 

544 self.onEdit(skelType, skel) 

545 skel.write() 

546 self.onEdited(skelType, skel) 

547 

548 return self.render.editSuccess(skel) 

549 

550 @exposed 

551 @force_ssl 

552 @force_post 

553 @skey 

554 def delete(self, skelType: SkelType, key: str, *args, **kwargs) -> t.Any: 

555 """ 

556 Deletes an entry or an directory (including its contents). 

557 

558 The function runs several access control checks on the data before it is deleted. 

559 

560 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

561 

562 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

563 :param key: URL-safe key of the item to be deleted. 

564 

565 :returns: The rendered, deleted object of the entry. 

566 

567 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

568 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

569 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

570 """ 

571 if not (skelType := self._checkSkelType(skelType)): 

572 raise errors.NotAcceptable(f"Invalid skelType provided.") 

573 

574 skel = self.editSkel(skelType) 

575 if not skel.read(key): 

576 raise errors.NotFound() 

577 

578 if not self.canDelete(skelType, skel): 

579 raise errors.Unauthorized() 

580 

581 if skelType == "node": 

582 self.deleteRecursive(skel["key"]) 

583 

584 self.onDelete(skelType, skel) 

585 skel.delete() 

586 self.onDeleted(skelType, skel) 

587 

588 return self.render.deleteSuccess(skel, skelType=skelType) 

589 

590 @CallDeferred 

591 def deleteRecursive(self, parentKey: str): 

592 """ 

593 Recursively processes a delete request. 

594 

595 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

596 

597 :param parentKey: URL-safe key of the node which children should be deleted. 

598 """ 

599 nodeKey = db.keyHelper(parentKey, self.viewSkel("node").kindName) 

600 if self.leafSkelCls: 

601 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

602 leafSkel = self.viewSkel("leaf") 

603 if not leafSkel.read(leaf.key): 

604 continue 

605 leafSkel.delete() 

606 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

607 self.deleteRecursive(node.key) 

608 nodeSkel = self.viewSkel("node") 

609 if not nodeSkel.read(node.key): 

610 continue 

611 nodeSkel.delete() 

612 

613 @exposed 

614 @force_ssl 

615 @force_post 

616 @skey 

617 def move(self, skelType: SkelType, key: db.Key | int | str, parentNode: str, *args, **kwargs) -> str: 

618 """ 

619 Move a node (including its contents) or a leaf to another node. 

620 

621 .. seealso:: :func:`canMove` 

622 

623 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

624 :param key: URL-safe key of the item to be moved. 

625 :param parentNode: URL-safe key of the destination node, which must be a node. 

626 :param skey: The CSRF security key. 

627 

628 :returns: The rendered, edited object of the entry. 

629 

630 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

631 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

632 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

633 """ 

634 if not (skelType := self._checkSkelType(skelType)): 

635 raise errors.NotAcceptable(f"Invalid skelType provided.") 

636 

637 skel = self.editSkel(skelType) # srcSkel - the skeleton to be moved 

638 parentNodeSkel = self.baseSkel("node") # destSkel - the node it should be moved into 

639 

640 if not skel.read(key): 

641 raise errors.NotFound("Cannot find entity to move") 

642 

643 if not parentNodeSkel.read(parentNode): 

644 parentNode = utils.normalizeKey(db.Key.from_legacy_urlsafe(parentNode)) 

645 

646 if parentNode.kind != parentNodeSkel.kindName: 

647 raise errors.NotFound( 

648 f"You provided a key of kind {parentNode.kind}, but require a {parentNodeSkel.kindName}." 

649 ) 

650 

651 raise errors.NotFound("Cannot find parentNode entity") 

652 

653 if not self.canMove(skelType, skel, parentNodeSkel): 

654 raise errors.Unauthorized() 

655 

656 if skel["key"] == parentNodeSkel["key"]: 

657 raise errors.NotAcceptable("Cannot move a node into itself") 

658 

659 ## Test for recursion 

660 currLevel = db.Get(parentNodeSkel["key"]) 

661 for _ in range(0, 99): 

662 if currLevel.key == skel["key"]: 

663 break 

664 if currLevel.get("rootNode") or currLevel.get("is_root_node"): 

665 # We reached a rootNode, so this is okay 

666 break 

667 currLevel = db.Get(currLevel["parententry"]) 

668 else: # We did not "break" - recursion-level exceeded or loop detected 

669 raise errors.NotAcceptable("Unable to find a root node in recursion?") 

670 

671 # Test if we try to move a rootNode 

672 # TODO: Remove "rootNode"-fallback with VIUR4 

673 if skel.dbEntity.get("is_root_node") or skel.dbEntity.get("rootNode"): 

674 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

675 

676 currentParentRepo = skel["parentrepo"] 

677 skel["parententry"] = parentNodeSkel["key"] 

678 skel["parentrepo"] = parentNodeSkel["parentrepo"] # Fixme: Need to recursive fixing to parentrepo? 

679 if "sortindex" in kwargs: 

680 try: 

681 skel["sortindex"] = float(kwargs["sortindex"]) 

682 except: 

683 raise errors.PreconditionFailed() 

684 

685 self.onEdit(skelType, skel) 

686 skel.write() 

687 self.onEdited(skelType, skel) 

688 

689 # Ensure a changed parentRepo get's proagated 

690 if currentParentRepo != parentNodeSkel["parentrepo"]: 

691 self.updateParentRepo(key, parentNodeSkel["parentrepo"]) 

692 

693 return self.render.editSuccess(skel) 

694 

695 @exposed 

696 @force_ssl 

697 @skey(allow_empty=True) 

698 def clone(self, skelType: SkelType, key: db.Key | str | int, **kwargs): 

699 """ 

700 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

701 Data is taken by any other arguments in *kwargs*. 

702 

703 The function performs several access control checks on the requested entity before it is added. 

704 

705 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

706 

707 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

708 :param key: URL-safe key of the item to be edited. 

709 

710 :returns: The cloned object of the entry, eventually with error hints. 

711 

712 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

713 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

714 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

715 """ 

716 

717 if not (skelType := self._checkSkelType(skelType)): 

718 raise errors.NotAcceptable(f"Invalid skelType provided.") 

719 

720 skel = self.cloneSkel(skelType) 

721 if not skel.read(key): 

722 raise errors.NotFound() 

723 

724 # a clone-operation is some kind of edit and add... 

725 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))): 

726 raise errors.Unauthorized() 

727 

728 # Remember source skel and unset the key for clone operation! 

729 src_skel = skel 

730 skel = skel.clone() 

731 skel["key"] = None 

732 

733 # make parententry required and writeable when provided 

734 if "parententry" in kwargs: 

735 skel.parententry.readOnly = False 

736 skel.parententry.required = True 

737 else: 

738 _ = skel["parententry"] # TODO: because of accessedValues... 

739 

740 # make parentrepo required and writeable when provided 

741 if "parentrepo" in kwargs: 

742 skel.parentrepo.readOnly = False 

743 skel.parentrepo.required = True 

744 else: 

745 _ = skel["parentrepo"] # TODO: because of accessedValues... 

746 

747 # Check all required preconditions for clone 

748 if ( 

749 not kwargs # no data supplied 

750 or not current.request.get().isPostRequest # failure if not using POST-method 

751 or not skel.fromClient(kwargs) # failure on reading into the bones 

752 or utils.parse.bool(kwargs.get("bounce")) # review before changing 

753 ): 

754 return self.render.edit(skel, action="clone") 

755 

756 self.onClone(skelType, skel, src_skel=src_skel) 

757 assert skel.write() 

758 self.onCloned(skelType, skel, src_skel=src_skel) 

759 

760 return self.render.editSuccess(skel, action="cloneSuccess") 

761 

762 ## Default access control functions 

763 

764 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

765 """ 

766 Access control function on item listing. 

767 

768 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

769 and is used to modify the provided filter parameter to match only items that the current user 

770 is allowed to see. 

771 

772 :param query: Query which should be altered. 

773 

774 :returns: The altered filter, or None if access is not granted. 

775 """ 

776 

777 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

778 return query 

779 

780 return None 

781 

782 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

783 """ 

784 Checks if the current user can view the given entry. 

785 Should be identical to what's allowed by listFilter. 

786 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

787 method can be overridden for performance improvements (to eliminate that additional database access). 

788 :param skel: The entry we check for 

789 :return: True if the current session is authorized to view that entry, False otherwise 

790 """ 

791 query = self.viewSkel(skelType).all() 

792 

793 if key := skel["key"]: 

794 query.mergeExternalFilter({"key": key}) 

795 

796 query = self.listFilter(query) # Access control 

797 

798 if query is None or (key and not query.getEntry()): 

799 return False 

800 

801 return True 

802 

803 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance] = None) -> bool: 

804 """ 

805 Access control function for adding permission. 

806 

807 Checks if the current user has the permission to add a new entry. 

808 

809 The default behavior is: 

810 - If no user is logged in, adding is generally refused. 

811 - If the user has "root" access, adding is generally allowed. 

812 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

813 

814 It should be overridden for a module-specific behavior. 

815 

816 .. seealso:: :func:`add` 

817 

818 :param skelType: Defines the type of the node that should be added. 

819 :param parentNodeSkel: The parent node where a new entry should be added. 

820 

821 :returns: True, if adding entries is allowed, False otherwise. 

822 """ 

823 

824 if not (user := current.user.get()): 

825 return False 

826 # root user is always allowed. 

827 if user["access"] and "root" in user["access"]: 

828 return True 

829 # user with add-permission is allowed. 

830 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

831 return True 

832 return False 

833 

834 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

835 """ 

836 Access control function for modification permission. 

837 

838 Checks if the current user has the permission to edit an entry. 

839 

840 The default behavior is: 

841 - If no user is logged in, editing is generally refused. 

842 - If the user has "root" access, editing is generally allowed. 

843 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

844 

845 It should be overridden for a module-specific behavior. 

846 

847 .. seealso:: :func:`edit` 

848 

849 :param skelType: Defines the type of the node that should be edited. 

850 :param skel: The Skeleton that should be edited. 

851 

852 :returns: True, if editing entries is allowed, False otherwise. 

853 """ 

854 if not (user := current.user.get()): 

855 return False 

856 if user["access"] and "root" in user["access"]: 

857 return True 

858 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

859 return True 

860 return False 

861 

862 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

863 """ 

864 Access control function for delete permission. 

865 

866 Checks if the current user has the permission to delete an entry. 

867 

868 The default behavior is: 

869 - If no user is logged in, deleting is generally refused. 

870 - If the user has "root" access, deleting is generally allowed. 

871 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

872 deleting is allowed. 

873 

874 It should be overridden for a module-specific behavior. 

875 

876 :param skelType: Defines the type of the node that should be deleted. 

877 :param skel: The Skeleton that should be deleted. 

878 

879 .. seealso:: :func:`delete` 

880 

881 :returns: True, if deleting entries is allowed, False otherwise. 

882 """ 

883 if not (user := current.user.get()): 

884 return False 

885 if user["access"] and "root" in user["access"]: 

886 return True 

887 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

888 return True 

889 return False 

890 

891 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

892 """ 

893 Access control function for moving permission. 

894 

895 Checks if the current user has the permission to move an entry. 

896 

897 The default behavior is: 

898 - If no user is logged in, deleting is generally refused. 

899 - If the user has "root" access, deleting is generally allowed. 

900 - If the user has the modules "edit" permission (module-edit) enabled, \ 

901 moving is allowed. 

902 

903 It should be overridden for a module-specific behavior. 

904 

905 :param skelType: Defines the type of the node that shall be deleted. 

906 :param node: URL-safe key of the node to be moved. 

907 :param destNode: URL-safe key of the node where *node* should be moved to. 

908 

909 .. seealso:: :func:`move` 

910 

911 :returns: True, if deleting entries is allowed, False otherwise. 

912 """ 

913 if not (user := current.user.get()): 

914 return False 

915 if user["access"] and "root" in user["access"]: 

916 return True 

917 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

918 return True 

919 return False 

920 

921 ## Overridable eventhooks 

922 

923 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

924 """ 

925 Hook function that is called before adding an entry. 

926 

927 It can be overridden for a module-specific behavior. 

928 

929 :param skelType: Defines the type of the node that shall be added. 

930 :param skel: The Skeleton that is going to be added. 

931 

932 .. seealso:: :func:`add`, :func:`onAdded` 

933 """ 

934 pass 

935 

936 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

937 """ 

938 Hook function that is called after adding an entry. 

939 

940 It should be overridden for a module-specific behavior. 

941 The default is writing a log entry. 

942 

943 :param skelType: Defines the type of the node that has been added. 

944 :param skel: The Skeleton that has been added. 

945 

946 .. seealso:: :func:`add`, :func:`onAdd` 

947 """ 

948 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

949 flushCache(kind=skel.kindName) 

950 if user := current.user.get(): 

951 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

952 

953 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

954 """ 

955 Hook function that is called before editing an entry. 

956 

957 It can be overridden for a module-specific behavior. 

958 

959 :param skelType: Defines the type of the node that shall be edited. 

960 :param skel: The Skeleton that is going to be edited. 

961 

962 .. seealso:: :func:`edit`, :func:`onEdited` 

963 """ 

964 pass 

965 

966 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

967 """ 

968 Hook function that is called after modifying an entry. 

969 

970 It should be overridden for a module-specific behavior. 

971 The default is writing a log entry. 

972 

973 :param skelType: Defines the type of the node that has been edited. 

974 :param skel: The Skeleton that has been modified. 

975 

976 .. seealso:: :func:`edit`, :func:`onEdit` 

977 """ 

978 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

979 flushCache(key=skel["key"]) 

980 if user := current.user.get(): 

981 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

982 

983 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

984 """ 

985 Hook function that is called when viewing an entry. 

986 

987 It should be overridden for a module-specific behavior. 

988 The default is doing nothing. 

989 

990 :param skelType: Defines the type of the node that is viewed. 

991 :param skel: The Skeleton that is viewed. 

992 

993 .. seealso:: :func:`view` 

994 """ 

995 pass 

996 

997 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

998 """ 

999 Hook function that is called before deleting an entry. 

1000 

1001 It can be overridden for a module-specific behavior. 

1002 

1003 :param skelType: Defines the type of the node that shall be deleted. 

1004 :param skel: The Skeleton that is going to be deleted. 

1005 

1006 .. seealso:: :func:`delete`, :func:`onDeleted` 

1007 """ 

1008 pass 

1009 

1010 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

1011 """ 

1012 Hook function that is called after deleting an entry. 

1013 

1014 It should be overridden for a module-specific behavior. 

1015 The default is writing a log entry. 

1016 

1017 ..warning: Saving the skeleton again will undo the deletion 

1018 (if the skeleton was a leaf or a node with no children). 

1019 

1020 :param skelType: Defines the type of the node that is deleted. 

1021 :param skel: The Skeleton that has been deleted. 

1022 

1023 .. seealso:: :func:`delete`, :func:`onDelete` 

1024 """ 

1025 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

1026 flushCache(key=skel["key"]) 

1027 if user := current.user.get(): 

1028 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1029 

1030 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1031 """ 

1032 Hook function that is called before cloning an entry. 

1033 

1034 It can be overwritten to a module-specific behavior. 

1035 

1036 :param skelType: Defines the type of the node that is cloned. 

1037 :param skel: The new SkeletonInstance that is being created. 

1038 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

1039 

1040 .. seealso:: :func:`clone`, :func:`onCloned` 

1041 """ 

1042 pass 

1043 

1044 @CallDeferred 

1045 def _clone_recursive( 

1046 self, 

1047 skel_type: SkelType, 

1048 src_key: db.Key, 

1049 target_key: db.Key, 

1050 target_repo: db.Key, 

1051 cursor=None 

1052 ): 

1053 """ 

1054 Helper function which is used by default onCloned() to clone a recursive structure. 

1055 """ 

1056 assert (skel_type := self._checkSkelType(skel_type)) 

1057 

1058 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

1059 

1060 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

1061 q.setCursor(cursor) 

1062 

1063 count = 0 

1064 for skel in q.fetch(): 

1065 src_skel = skel 

1066 

1067 skel = skel.clone() 

1068 skel["key"] = None 

1069 skel["parententry"] = target_key 

1070 skel["parentrepo"] = target_repo 

1071 

1072 self.onClone(skel_type, skel, src_skel=src_skel) 

1073 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

1074 assert skel.write() 

1075 self.onCloned(skel_type, skel, src_skel=src_skel) 

1076 count += 1 

1077 

1078 logging.debug(f"_clone_recursive {count=}") 

1079 

1080 if cursor := q.getCursor(): 

1081 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor) 

1082 

1083 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1084 """ 

1085 Hook function that is called after cloning an entry. 

1086 

1087 It can be overwritten to a module-specific behavior. 

1088 

1089 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1090 which recursively clones the entire structure below this node in the background. 

1091 If this is not wanted, or wanted by a specific setting, overwrite this function 

1092 without a super-call. 

1093 

1094 :param skelType: Defines the type of the node that is cloned. 

1095 :param skel: The new SkeletonInstance that was created. 

1096 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1097 

1098 .. seealso:: :func:`clone`, :func:`onClone` 

1099 """ 

1100 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1101 flushCache(kind=skel.kindName) 

1102 

1103 if user := current.user.get(): 

1104 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1105 

1106 # Clone entire structure below, in case this is a node. 

1107 if skelType == "node": 

1108 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1109 

1110 if self.leafSkelCls: 

1111 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1112 

1113 

1114Tree.vi = True 

1115Tree.admin = True