Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%

385 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-16 22:16 +0000

1import logging 

2import typing as t 

3from viur.core import utils, errors, db, current 

4from viur.core.decorators import * 

5from viur.core.bones import KeyBone, SortIndexBone 

6from viur.core.cache import flushCache 

7from viur.core.skeleton import Skeleton, SkeletonInstance 

8from viur.core.tasks import CallDeferred 

9from .skelmodule import SkelModule, DEFAULT_ORDER_TYPE 

10 

11 

12SkelType = t.Literal["node", "leaf"] 

13 

14 

15class TreeSkel(Skeleton): 

16 parententry = KeyBone( 

17 descr="Parent", 

18 visible=False, 

19 readOnly=True, 

20 ) 

21 parentrepo = KeyBone( 

22 descr="BaseRepo", 

23 visible=False, 

24 readOnly=True, 

25 ) 

26 sortindex = SortIndexBone( 

27 visible=False, 

28 readOnly=True, 

29 ) 

30 

31 @classmethod 

32 def refresh(cls, skelValues): # ViUR2 Compatibility 

33 super().refresh(skelValues) 

34 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility 

35 skelValues["parententry"] = utils.normalizeKey( 

36 db.Key.from_legacy_urlsafe(skelValues.dbEntity["parentdir"])) 

37 

38 

39class Tree(SkelModule): 

40 """ 

41 Tree module prototype. 

42 

43 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only. 

44 """ 

45 accessRights = ("add", "edit", "view", "delete", "manage") 

46 

47 nodeSkelCls = None 

48 leafSkelCls = None 

49 

50 default_order: DEFAULT_ORDER_TYPE = "sortindex" 

51 """ 

52 Allows to specify a default order for this module, which is applied when no other order is specified. 

53 

54 Setting a default_order might result in the requirement of additional indexes, which are being raised 

55 and must be specified. 

56 """ 

57 

58 def __init__(self, moduleName, modulePath, *args, **kwargs): 

59 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}" 

60 super().__init__(moduleName, modulePath, *args, **kwargs) 

61 

62 @property 

63 def handler(self): 

64 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy) 

65 

66 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]: 

67 """ 

68 Checks for correct skelType. 

69 

70 Either returns the type provided, or None in case it is invalid. 

71 """ 

72 skelType = skelType.lower() 

73 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls): 

74 return skelType 

75 

76 return None 

77 

78 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]: 

79 if not (skelType := self._checkSkelType(skelType)): 

80 raise ValueError("Unsupported skelType") 

81 

82 if skelType == "leaf": 

83 return self.leafSkelCls 

84 

85 return self.nodeSkelCls 

86 

87 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

88 """ 

89 Return unmodified base skeleton for the given skelType. 

90 

91 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel` 

92 """ 

93 return self._resolveSkelCls(skelType, *args, **kwargs)() 

94 

95 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

96 """ 

97 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

98 for viewing an existing entry from the tree. 

99 

100 The default is a Skeleton instance returned by :func:`~baseSkel`. 

101 

102 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel` 

103 

104 :return: Returns a Skeleton instance for viewing an entry. 

105 """ 

106 return self.baseSkel(skelType, *args, **kwargs) 

107 

108 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

109 """ 

110 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

111 for adding an entry to the tree. 

112 

113 The default is a Skeleton instance returned by :func:`~baseSkel`. 

114 

115 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

116 

117 :return: Returns a Skeleton instance for adding an entry. 

118 """ 

119 return self.baseSkel(skelType, *args, **kwargs) 

120 

121 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

122 """ 

123 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application 

124 for editing an existing entry from the tree. 

125 

126 The default is a Skeleton instance returned by :func:`~baseSkel`. 

127 

128 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

129 

130 :return: Returns a Skeleton instance for editing an entry. 

131 """ 

132 return self.baseSkel(skelType, *args, **kwargs) 

133 

134 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance: 

135 """ 

136 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application 

137 for cloning an existing entry of the tree. 

138 

139 The default is a SkeletonInstance returned by :func:`~baseSkel`. 

140 

141 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel` 

142 

143 :return: Returns a SkeletonInstance for cloning an entry. 

144 """ 

145 return self.baseSkel(skelType, *args, **kwargs) 

146 

147 def ensureOwnModuleRootNode(self) -> db.Entity: 

148 """ 

149 Ensures, that general root-node for the current module exists. 

150 If no root-node exists yet, it will be created. 

151 

152 :returns: The entity of the root-node. 

153 """ 

154 key = "rep_module_repo" 

155 kindName = self.viewSkel("node").kindName 

156 return db.GetOrInsert(db.Key(kindName, key), creationdate=utils.utcNow(), rootNode=1) 

157 

158 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]: 

159 """ 

160 Default function for providing a list of root node items. 

161 This list is requested by several module-internal functions and *must* be 

162 overridden by a custom functionality. The default stub for this function 

163 returns an empty list. 

164 An example implementation could be the following: 

165 

166 .. code-block:: python 

167 

168 # Example 

169 def getAvailableRootNodes(self, *args, **kwargs): 

170 q = db.Query(self.rootKindName) 

171 ret = [{"key": str(e.key()), 

172 "name": e.get("name", str(e.key().id_or_name()))} #FIXME 

173 for e in q.run(limit=25)] 

174 return ret 

175 

176 :param args: Can be used in custom implementations. 

177 :param kwargs: Can be used in custom implementations. 

178 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \ 

179 respective information. 

180 """ 

181 return [] 

182 

183 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None: 

184 """ 

185 Returns the root-node for a given child. 

186 

187 :param key: Key of the child node entry. 

188 

189 :returns: The skeleton of the root-node. 

190 """ 

191 skel = self.nodeSkelCls() 

192 

193 while key: 

194 if not skel.fromDB(key): 

195 return None 

196 

197 key = skel["parententry"] 

198 

199 return skel 

200 

201 @CallDeferred 

202 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0): 

203 """ 

204 Recursively fixes the parentrepo key after a move operation. 

205 

206 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

207 

208 :param parentNode: URL-safe key of the node which children should be fixed. 

209 :param newRepoKey: URL-safe key of the new repository. 

210 :param depth: Safety level depth preventing infinitive loops. 

211 """ 

212 if depth > 99: 

213 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo") 

214 logging.critical("Your data is corrupt!") 

215 logging.debug(f"{parentNode=}, {newRepoKey=}") 

216 return 

217 

218 def fixTxn(nodeKey, newRepoKey): 

219 node = db.Get(nodeKey) 

220 node["parentrepo"] = newRepoKey 

221 db.Put(node) 

222 

223 # Fix all nodes 

224 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode) 

225 for repo in q.iter(): 

226 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1) 

227 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

228 

229 # Fix the leafs on this level 

230 if self.leafSkelCls: 

231 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode) 

232 for repo in q.iter(): 

233 db.RunInTransaction(fixTxn, repo.key, newRepoKey) 

234 

235 ## Internal exposed functions 

236 

237 @internal_exposed 

238 def pathToKey(self, key: db.Key): 

239 """ 

240 Returns the recursively expanded path through the Tree from the root-node to a 

241 requested node. 

242 :param key: Key of the destination *node*. 

243 :returns: An nested dictionary with information about all nodes in the path from root to the requested node. 

244 """ 

245 lastLevel = [] 

246 for x in range(0, 99): 

247 currentNodeSkel = self.viewSkel("node") 

248 if not currentNodeSkel.fromDB(key): 

249 return [] # Either invalid key or listFilter prevented us from fetching anything 

250 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level 

251 break 

252 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"]) 

253 currentLevel = [{"skel": x, 

254 "active": x["key"] == currentNodeSkel["key"], 

255 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []} 

256 for x in self.listFilter(levelQry).fetch(99)] 

257 assert currentLevel, "Got emtpy parent list?" 

258 lastLevel = currentLevel 

259 key = currentNodeSkel["parententry"] 

260 return lastLevel 

261 

262 ## External exposed functions 

263 

264 @exposed 

265 def listRootNodes(self, *args, **kwargs) -> t.Any: 

266 """ 

267 Renders a list of all available repositories for the current user using the 

268 modules default renderer. 

269 

270 :returns: The rendered representation of the available root-nodes. 

271 """ 

272 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs)) 

273 

274 @exposed 

275 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

276 """ 

277 Prepares and renders a list of entries. 

278 

279 All supplied parameters are interpreted as filters for the elements displayed. 

280 

281 Unlike other module prototypes in ViUR, the access control in this function is performed 

282 by calling the function :func:`listFilter`, which updates the query-filter to match only 

283 elements which the user is allowed to see. 

284 

285 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter` 

286 

287 :returns: The rendered list objects for the matching entries. 

288 

289 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

290 """ 

291 if not (skelType := self._checkSkelType(skelType)): 

292 raise errors.NotAcceptable("Invalid skelType provided.") 

293 

294 # The general access control is made via self.listFilter() 

295 query = self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs)) 

296 if query and query.queries and not isinstance(query.queries, list): 

297 # Apply default order when specified 

298 if self.default_order and not query.queries.orders and not current.request.get().kwargs.get("search"): 

299 # TODO: refactor: Duplicate code in prototypes.List 

300 if callable(default_order := self.default_order): 

301 default_order = default_order(query) 

302 

303 if isinstance(default_order, dict): 

304 logging.debug(f"Applying filter {default_order=}") 

305 query.mergeExternalFilter(default_order) 

306 

307 elif default_order: 

308 logging.debug(f"Applying {default_order=}") 

309 

310 # FIXME: This ugly test can be removed when there is type that abstracts SortOrders 

311 if ( 

312 isinstance(default_order, str) 

313 or ( 

314 isinstance(default_order, tuple) 

315 and len(default_order) == 2 

316 and isinstance(default_order[0], str) 

317 and isinstance(default_order[1], db.SortOrder) 

318 ) 

319 ): 

320 query.order(default_order) 

321 else: 

322 query.order(*default_order) 

323 

324 return self.render.list(query.fetch()) 

325 

326 raise errors.Unauthorized() 

327 

328 @exposed 

329 def structure(self, skelType: SkelType, *args, **kwargs) -> t.Any: 

330 """ 

331 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set 

332 in each bone. 

333 

334 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

335 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

336 """ 

337 if not (skelType := self._checkSkelType(skelType)): 

338 raise errors.NotAcceptable(f"Invalid skelType provided.") 

339 skel = self.viewSkel(skelType) 

340 if not self.canAdd(skelType, None): # We can't use canView here as it would require passing a skeletonInstance. 

341 # As a fallback, we'll check if the user has the permissions to view at least one entry 

342 qry = self.listFilter(skel.all()) 

343 if not qry or not qry.getEntry(): 

344 raise errors.Unauthorized() 

345 return self.render.view(skel) 

346 

347 @exposed 

348 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

349 """ 

350 Prepares and renders a single entry for viewing. 

351 

352 The entry is fetched by its *key* and its *skelType*. 

353 The function performs several access control checks on the requested entity before it is rendered. 

354 

355 .. seealso:: :func:`canView`, :func:`onView` 

356 

357 :returns: The rendered representation of the requested entity. 

358 

359 :param skelType: May either be "node" or "leaf". 

360 :param key: URL-safe key of the parent. 

361 

362 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided. 

363 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

364 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

365 """ 

366 if not (skelType := self._checkSkelType(skelType)): 

367 raise errors.NotAcceptable(f"Invalid skelType provided.") 

368 

369 skel = self.viewSkel(skelType) 

370 if not skel.fromDB(key): 

371 raise errors.NotFound() 

372 

373 if not self.canView(skelType, skel): 

374 raise errors.Unauthorized() 

375 

376 self.onView(skelType, skel) 

377 return self.render.view(skel) 

378 

379 @exposed 

380 @force_ssl 

381 @skey(allow_empty=True) 

382 def add(self, skelType: SkelType, node: db.Key | int | str, *args, **kwargs) -> t.Any: 

383 """ 

384 Add a new entry with the given parent *node*, and render the entry, eventually with error notes 

385 on incorrect data. Data is taken by any other arguments in *kwargs*. 

386 

387 The function performs several access control checks on the requested entity before it is added. 

388 

389 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded` 

390 

391 :param skelType: Defines the type of the new entry and may either be "node" or "leaf". 

392 :param node: URL-safe key of the parent. 

393 

394 :returns: The rendered, added object of the entry, eventually with error hints. 

395 

396 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

397 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

398 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

399 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

400 """ 

401 if not (skelType := self._checkSkelType(skelType)): 

402 raise errors.NotAcceptable(f"Invalid skelType provided.") 

403 

404 skel = self.addSkel(skelType) 

405 parentNodeSkel = self.editSkel("node") 

406 if not parentNodeSkel.fromDB(node): 

407 raise errors.NotFound("The provided parent node could not be found.") 

408 if not self.canAdd(skelType, parentNodeSkel): 

409 raise errors.Unauthorized() 

410 

411 skel["parententry"] = parentNodeSkel["key"] 

412 # parentrepo may not exist in parentNodeSkel as it may be an rootNode 

413 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"] 

414 

415 if ( 

416 not kwargs # no data supplied 

417 or not current.request.get().isPostRequest # failure if not using POST-method 

418 or not skel.fromClient(kwargs) # failure on reading into the bones 

419 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

420 ): 

421 return self.render.add(skel) 

422 

423 self.onAdd(skelType, skel) 

424 skel.toDB() 

425 self.onAdded(skelType, skel) 

426 

427 return self.render.addSuccess(skel) 

428 

429 @exposed 

430 @force_ssl 

431 @skey(allow_empty=True) 

432 def edit(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any: 

433 """ 

434 Modify an existing entry, and render the entry, eventually with error notes on incorrect data. 

435 Data is taken by any other arguments in *kwargs*. 

436 

437 The function performs several access control checks on the requested entity before it is added. 

438 

439 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited` 

440 

441 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf". 

442 :param key: URL-safe key of the item to be edited. 

443 

444 :returns: The rendered, modified object of the entry, eventually with error hints. 

445 

446 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

447 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found. 

448 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

449 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

450 """ 

451 if not (skelType := self._checkSkelType(skelType)): 

452 raise errors.NotAcceptable(f"Invalid skelType provided.") 

453 

454 skel = self.editSkel(skelType) 

455 if not skel.fromDB(key): 

456 raise errors.NotFound() 

457 

458 if not self.canEdit(skelType, skel): 

459 raise errors.Unauthorized() 

460 

461 if ( 

462 not kwargs # no data supplied 

463 or not current.request.get().isPostRequest # failure if not using POST-method 

464 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones 

465 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

466 ): 

467 return self.render.edit(skel) 

468 

469 self.onEdit(skelType, skel) 

470 skel.toDB() 

471 self.onEdited(skelType, skel) 

472 

473 return self.render.editSuccess(skel) 

474 

475 @exposed 

476 @force_ssl 

477 @force_post 

478 @skey 

479 def delete(self, skelType: SkelType, key: str, *args, **kwargs) -> t.Any: 

480 """ 

481 Deletes an entry or an directory (including its contents). 

482 

483 The function runs several access control checks on the data before it is deleted. 

484 

485 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted` 

486 

487 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf". 

488 :param key: URL-safe key of the item to be deleted. 

489 

490 :returns: The rendered, deleted object of the entry. 

491 

492 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

493 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

494 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

495 """ 

496 if not (skelType := self._checkSkelType(skelType)): 

497 raise errors.NotAcceptable(f"Invalid skelType provided.") 

498 

499 skel = self.editSkel(skelType) 

500 if not skel.fromDB(key): 

501 raise errors.NotFound() 

502 

503 if not self.canDelete(skelType, skel): 

504 raise errors.Unauthorized() 

505 

506 if skelType == "node": 

507 self.deleteRecursive(skel["key"]) 

508 

509 self.onDelete(skelType, skel) 

510 skel.delete() 

511 self.onDeleted(skelType, skel) 

512 

513 return self.render.deleteSuccess(skel, skelType=skelType) 

514 

515 @CallDeferred 

516 def deleteRecursive(self, parentKey: str): 

517 """ 

518 Recursively processes a delete request. 

519 

520 This will delete all entries which are children of *nodeKey*, except *key* nodeKey. 

521 

522 :param parentKey: URL-safe key of the node which children should be deleted. 

523 """ 

524 nodeKey = db.keyHelper(parentKey, self.viewSkel("node").kindName) 

525 if self.leafSkelCls: 

526 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter(): 

527 leafSkel = self.viewSkel("leaf") 

528 if not leafSkel.fromDB(leaf.key): 

529 continue 

530 leafSkel.delete() 

531 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter(): 

532 self.deleteRecursive(node.key) 

533 nodeSkel = self.viewSkel("node") 

534 if not nodeSkel.fromDB(node.key): 

535 continue 

536 nodeSkel.delete() 

537 

538 @exposed 

539 @force_ssl 

540 @force_post 

541 @skey 

542 def move(self, skelType: SkelType, key: db.Key | int | str, parentNode: str, *args, **kwargs) -> str: 

543 """ 

544 Move a node (including its contents) or a leaf to another node. 

545 

546 .. seealso:: :func:`canMove` 

547 

548 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf". 

549 :param key: URL-safe key of the item to be moved. 

550 :param parentNode: URL-safe key of the destination node, which must be a node. 

551 :param skey: The CSRF security key. 

552 

553 :returns: The rendered, edited object of the entry. 

554 

555 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found. 

556 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

557 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

558 """ 

559 if not (skelType := self._checkSkelType(skelType)): 

560 raise errors.NotAcceptable(f"Invalid skelType provided.") 

561 

562 skel = self.editSkel(skelType) # srcSkel - the skeleton to be moved 

563 parentNodeSkel = self.baseSkel("node") # destSkel - the node it should be moved into 

564 

565 if not skel.fromDB(key): 

566 raise errors.NotFound("Cannot find entity to move") 

567 

568 if not parentNodeSkel.fromDB(parentNode): 

569 parentNode = utils.normalizeKey(db.Key.from_legacy_urlsafe(parentNode)) 

570 

571 if parentNode.kind != parentNodeSkel.kindName: 

572 raise errors.NotFound( 

573 f"You provided a key of kind {parentNode.kind}, but require a {parentNodeSkel.kindName}." 

574 ) 

575 

576 raise errors.NotFound("Cannot find parentNode entity") 

577 

578 if not self.canMove(skelType, skel, parentNodeSkel): 

579 raise errors.Unauthorized() 

580 

581 if skel["key"] == parentNodeSkel["key"]: 

582 raise errors.NotAcceptable("Cannot move a node into itself") 

583 

584 ## Test for recursion 

585 currLevel = db.Get(parentNodeSkel["key"]) 

586 for _ in range(0, 99): 

587 if currLevel.key == skel["key"]: 

588 break 

589 if currLevel.get("rootNode") or currLevel.get("is_root_node"): 

590 # We reached a rootNode, so this is okay 

591 break 

592 currLevel = db.Get(currLevel["parententry"]) 

593 else: # We did not "break" - recursion-level exceeded or loop detected 

594 raise errors.NotAcceptable("Unable to find a root node in recursion?") 

595 

596 # Test if we try to move a rootNode 

597 # TODO: Remove "rootNode"-fallback with VIUR4 

598 if skel.dbEntity.get("is_root_node") or skel.dbEntity.get("rootNode"): 

599 raise errors.NotAcceptable("Can't move a rootNode to somewhere else") 

600 

601 currentParentRepo = skel["parentrepo"] 

602 skel["parententry"] = parentNodeSkel["key"] 

603 skel["parentrepo"] = parentNodeSkel["parentrepo"] # Fixme: Need to recursive fixing to parentrepo? 

604 if "sortindex" in kwargs: 

605 try: 

606 skel["sortindex"] = float(kwargs["sortindex"]) 

607 except: 

608 raise errors.PreconditionFailed() 

609 

610 self.onEdit(skelType, skel) 

611 skel.toDB() 

612 self.onEdited(skelType, skel) 

613 

614 # Ensure a changed parentRepo get's proagated 

615 if currentParentRepo != parentNodeSkel["parentrepo"]: 

616 self.updateParentRepo(key, parentNodeSkel["parentrepo"]) 

617 

618 return self.render.editSuccess(skel) 

619 

620 @exposed 

621 @force_ssl 

622 @skey(allow_empty=True) 

623 def clone(self, skelType: SkelType, key: db.Key | str | int, **kwargs): 

624 """ 

625 Clone an existing entry, and render the entry, eventually with error notes on incorrect data. 

626 Data is taken by any other arguments in *kwargs*. 

627 

628 The function performs several access control checks on the requested entity before it is added. 

629 

630 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned` 

631 

632 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf". 

633 :param key: URL-safe key of the item to be edited. 

634 

635 :returns: The cloned object of the entry, eventually with error hints. 

636 

637 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided. 

638 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found. 

639 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

640 """ 

641 

642 if not (skelType := self._checkSkelType(skelType)): 

643 raise errors.NotAcceptable(f"Invalid skelType provided.") 

644 

645 skel = self.cloneSkel(skelType) 

646 if not skel.fromDB(key): 

647 raise errors.NotFound() 

648 

649 # a clone-operation is some kind of edit and add... 

650 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))): 

651 raise errors.Unauthorized() 

652 

653 # Remember source skel and unset the key for clone operation! 

654 src_skel = skel 

655 skel = skel.clone() 

656 skel["key"] = None 

657 

658 # make parententry required and writeable when provided 

659 if "parententry" in kwargs: 

660 skel.parententry.readOnly = False 

661 skel.parententry.required = True 

662 else: 

663 _ = skel["parententry"] # TODO: because of accessedValues... 

664 

665 # make parentrepo required and writeable when provided 

666 if "parentrepo" in kwargs: 

667 skel.parentrepo.readOnly = False 

668 skel.parentrepo.required = True 

669 else: 

670 _ = skel["parentrepo"] # TODO: because of accessedValues... 

671 

672 # Check all required preconditions for clone 

673 if ( 

674 not kwargs # no data supplied 

675 or not current.request.get().isPostRequest # failure if not using POST-method 

676 or not skel.fromClient(kwargs) # failure on reading into the bones 

677 or utils.parse.bool(kwargs.get("bounce")) # review before changing 

678 ): 

679 return self.render.edit(skel, action="clone") 

680 

681 self.onClone(skelType, skel, src_skel=src_skel) 

682 assert skel.toDB() 

683 self.onCloned(skelType, skel, src_skel=src_skel) 

684 

685 return self.render.editSuccess(skel, action="cloneSuccess") 

686 

687 ## Default access control functions 

688 

689 def listFilter(self, query: db.Query) -> t.Optional[db.Query]: 

690 """ 

691 Access control function on item listing. 

692 

693 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function, 

694 and is used to modify the provided filter parameter to match only items that the current user 

695 is allowed to see. 

696 

697 :param query: Query which should be altered. 

698 

699 :returns: The altered filter, or None if access is not granted. 

700 """ 

701 

702 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]): 

703 return query 

704 

705 return None 

706 

707 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

708 """ 

709 Checks if the current user can view the given entry. 

710 Should be identical to what's allowed by listFilter. 

711 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this 

712 method can be overridden for performance improvements (to eliminate that additional database access). 

713 :param skel: The entry we check for 

714 :return: True if the current session is authorized to view that entry, False otherwise 

715 """ 

716 queryObj = self.viewSkel(skelType).all().mergeExternalFilter({"key": skel["key"]}) 

717 queryObj = self.listFilter(queryObj) # Access control 

718 if queryObj is None: 

719 return False 

720 if not queryObj.getEntry(): 

721 return False 

722 return True 

723 

724 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance]) -> bool: 

725 """ 

726 Access control function for adding permission. 

727 

728 Checks if the current user has the permission to add a new entry. 

729 

730 The default behavior is: 

731 - If no user is logged in, adding is generally refused. 

732 - If the user has "root" access, adding is generally allowed. 

733 - If the user has the modules "add" permission (module-add) enabled, adding is allowed. 

734 

735 It should be overridden for a module-specific behavior. 

736 

737 .. seealso:: :func:`add` 

738 

739 :param skelType: Defines the type of the node that should be added. 

740 :param parentNodeSkel: The parent node where a new entry should be added. 

741 

742 :returns: True, if adding entries is allowed, False otherwise. 

743 """ 

744 

745 if not (user := current.user.get()): 

746 return False 

747 # root user is always allowed. 

748 if user["access"] and "root" in user["access"]: 

749 return True 

750 # user with add-permission is allowed. 

751 if user and user["access"] and f"{self.moduleName}-add" in user["access"]: 

752 return True 

753 return False 

754 

755 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

756 """ 

757 Access control function for modification permission. 

758 

759 Checks if the current user has the permission to edit an entry. 

760 

761 The default behavior is: 

762 - If no user is logged in, editing is generally refused. 

763 - If the user has "root" access, editing is generally allowed. 

764 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed. 

765 

766 It should be overridden for a module-specific behavior. 

767 

768 .. seealso:: :func:`edit` 

769 

770 :param skelType: Defines the type of the node that should be edited. 

771 :param skel: The Skeleton that should be edited. 

772 

773 :returns: True, if editing entries is allowed, False otherwise. 

774 """ 

775 if not (user := current.user.get()): 

776 return False 

777 if user["access"] and "root" in user["access"]: 

778 return True 

779 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

780 return True 

781 return False 

782 

783 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool: 

784 """ 

785 Access control function for delete permission. 

786 

787 Checks if the current user has the permission to delete an entry. 

788 

789 The default behavior is: 

790 - If no user is logged in, deleting is generally refused. 

791 - If the user has "root" access, deleting is generally allowed. 

792 - If the user has the modules "deleting" permission (module-delete) enabled, \ 

793 deleting is allowed. 

794 

795 It should be overridden for a module-specific behavior. 

796 

797 :param skelType: Defines the type of the node that should be deleted. 

798 :param skel: The Skeleton that should be deleted. 

799 

800 .. seealso:: :func:`delete` 

801 

802 :returns: True, if deleting entries is allowed, False otherwise. 

803 """ 

804 if not (user := current.user.get()): 

805 return False 

806 if user["access"] and "root" in user["access"]: 

807 return True 

808 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]: 

809 return True 

810 return False 

811 

812 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool: 

813 """ 

814 Access control function for moving permission. 

815 

816 Checks if the current user has the permission to move an entry. 

817 

818 The default behavior is: 

819 - If no user is logged in, deleting is generally refused. 

820 - If the user has "root" access, deleting is generally allowed. 

821 - If the user has the modules "edit" permission (module-edit) enabled, \ 

822 moving is allowed. 

823 

824 It should be overridden for a module-specific behavior. 

825 

826 :param skelType: Defines the type of the node that shall be deleted. 

827 :param node: URL-safe key of the node to be moved. 

828 :param destNode: URL-safe key of the node where *node* should be moved to. 

829 

830 .. seealso:: :func:`move` 

831 

832 :returns: True, if deleting entries is allowed, False otherwise. 

833 """ 

834 if not (user := current.user.get()): 

835 return False 

836 if user["access"] and "root" in user["access"]: 

837 return True 

838 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]: 

839 return True 

840 return False 

841 

842 ## Overridable eventhooks 

843 

844 def onAdd(self, skelType: SkelType, skel: SkeletonInstance): 

845 """ 

846 Hook function that is called before adding an entry. 

847 

848 It can be overridden for a module-specific behavior. 

849 

850 :param skelType: Defines the type of the node that shall be added. 

851 :param skel: The Skeleton that is going to be added. 

852 

853 .. seealso:: :func:`add`, :func:`onAdded` 

854 """ 

855 pass 

856 

857 def onAdded(self, skelType: SkelType, skel: SkeletonInstance): 

858 """ 

859 Hook function that is called after adding an entry. 

860 

861 It should be overridden for a module-specific behavior. 

862 The default is writing a log entry. 

863 

864 :param skelType: Defines the type of the node that has been added. 

865 :param skel: The Skeleton that has been added. 

866 

867 .. seealso:: :func:`add`, :func:`onAdd` 

868 """ 

869 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""") 

870 flushCache(kind=skel.kindName) 

871 if user := current.user.get(): 

872 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

873 

874 def onEdit(self, skelType: SkelType, skel: SkeletonInstance): 

875 """ 

876 Hook function that is called before editing an entry. 

877 

878 It can be overridden for a module-specific behavior. 

879 

880 :param skelType: Defines the type of the node that shall be edited. 

881 :param skel: The Skeleton that is going to be edited. 

882 

883 .. seealso:: :func:`edit`, :func:`onEdited` 

884 """ 

885 pass 

886 

887 def onEdited(self, skelType: SkelType, skel: SkeletonInstance): 

888 """ 

889 Hook function that is called after modifying an entry. 

890 

891 It should be overridden for a module-specific behavior. 

892 The default is writing a log entry. 

893 

894 :param skelType: Defines the type of the node that has been edited. 

895 :param skel: The Skeleton that has been modified. 

896 

897 .. seealso:: :func:`edit`, :func:`onEdit` 

898 """ 

899 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""") 

900 flushCache(key=skel["key"]) 

901 if user := current.user.get(): 

902 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

903 

904 def onView(self, skelType: SkelType, skel: SkeletonInstance): 

905 """ 

906 Hook function that is called when viewing an entry. 

907 

908 It should be overridden for a module-specific behavior. 

909 The default is doing nothing. 

910 

911 :param skelType: Defines the type of the node that is viewed. 

912 :param skel: The Skeleton that is viewed. 

913 

914 .. seealso:: :func:`view` 

915 """ 

916 pass 

917 

918 def onDelete(self, skelType: SkelType, skel: SkeletonInstance): 

919 """ 

920 Hook function that is called before deleting an entry. 

921 

922 It can be overridden for a module-specific behavior. 

923 

924 :param skelType: Defines the type of the node that shall be deleted. 

925 :param skel: The Skeleton that is going to be deleted. 

926 

927 .. seealso:: :func:`delete`, :func:`onDeleted` 

928 """ 

929 pass 

930 

931 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance): 

932 """ 

933 Hook function that is called after deleting an entry. 

934 

935 It should be overridden for a module-specific behavior. 

936 The default is writing a log entry. 

937 

938 ..warning: Saving the skeleton again will undo the deletion 

939 (if the skeleton was a leaf or a node with no children). 

940 

941 :param skelType: Defines the type of the node that is deleted. 

942 :param skel: The Skeleton that has been deleted. 

943 

944 .. seealso:: :func:`delete`, :func:`onDelete` 

945 """ 

946 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""") 

947 flushCache(key=skel["key"]) 

948 if user := current.user.get(): 

949 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

950 

951 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

952 """ 

953 Hook function that is called before cloning an entry. 

954 

955 It can be overwritten to a module-specific behavior. 

956 

957 :param skelType: Defines the type of the node that is cloned. 

958 :param skel: The new SkeletonInstance that is being created. 

959 :param src_skel: The source SkeletonInstance `skel` is cloned from. 

960 

961 .. seealso:: :func:`clone`, :func:`onCloned` 

962 """ 

963 pass 

964 

965 @CallDeferred 

966 def _clone_recursive( 

967 self, 

968 skel_type: SkelType, 

969 src_key: db.Key, 

970 target_key: db.Key, 

971 target_repo: db.Key, 

972 cursor=None 

973 ): 

974 """ 

975 Helper function which is used by default onCloned() to clone a recursive structure. 

976 """ 

977 assert (skel_type := self._checkSkelType(skel_type)) 

978 

979 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}") 

980 

981 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex") 

982 q.setCursor(cursor) 

983 

984 count = 0 

985 for skel in q.fetch(): 

986 src_skel = skel 

987 

988 skel = skel.clone() 

989 skel["key"] = None 

990 skel["parententry"] = target_key 

991 skel["parentrepo"] = target_repo 

992 

993 self.onClone(skel_type, skel, src_skel=src_skel) 

994 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written.. 

995 assert skel.toDB() 

996 self.onCloned(skel_type, skel, src_skel=src_skel) 

997 count += 1 

998 

999 logging.debug(f"_clone_recursive {count=}") 

1000 

1001 if cursor := q.getCursor(): 

1002 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor) 

1003 

1004 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance): 

1005 """ 

1006 Hook function that is called after cloning an entry. 

1007 

1008 It can be overwritten to a module-specific behavior. 

1009 

1010 By default, when cloning a "node", this function calls :func:`_clone_recursive` 

1011 which recursively clones the entire structure below this node in the background. 

1012 If this is not wanted, or wanted by a specific setting, overwrite this function 

1013 without a super-call. 

1014 

1015 :param skelType: Defines the type of the node that is cloned. 

1016 :param skel: The new SkeletonInstance that was created. 

1017 :param src_skel: The source SkeletonInstance `skel` was cloned from. 

1018 

1019 .. seealso:: :func:`clone`, :func:`onClone` 

1020 """ 

1021 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""") 

1022 flushCache(kind=skel.kindName) 

1023 

1024 if user := current.user.get(): 

1025 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""") 

1026 

1027 # Clone entire structure below, in case this is a node. 

1028 if skelType == "node": 

1029 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"]) 

1030 

1031 if self.leafSkelCls: 

1032 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"]) 

1033 

1034 

1035Tree.vi = True 

1036Tree.admin = True