Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/prototypes/tree.py: 0%
385 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1import logging
2import typing as t
3from viur.core import utils, errors, db, current
4from viur.core.decorators import *
5from viur.core.bones import KeyBone, SortIndexBone
6from viur.core.cache import flushCache
7from viur.core.skeleton import Skeleton, SkeletonInstance
8from viur.core.tasks import CallDeferred
9from .skelmodule import SkelModule, DEFAULT_ORDER_TYPE
12SkelType = t.Literal["node", "leaf"]
15class TreeSkel(Skeleton):
16 parententry = KeyBone(
17 descr="Parent",
18 visible=False,
19 readOnly=True,
20 )
21 parentrepo = KeyBone(
22 descr="BaseRepo",
23 visible=False,
24 readOnly=True,
25 )
26 sortindex = SortIndexBone(
27 visible=False,
28 readOnly=True,
29 )
31 @classmethod
32 def refresh(cls, skelValues): # ViUR2 Compatibility
33 super().refresh(skelValues)
34 if not skelValues["parententry"] and skelValues.dbEntity.get("parentdir"): # parentdir for viur2 compatibility
35 skelValues["parententry"] = utils.normalizeKey(
36 db.Key.from_legacy_urlsafe(skelValues.dbEntity["parentdir"]))
39class Tree(SkelModule):
40 """
41 Tree module prototype.
43 It is used for hierarchical structures, either as a tree with nodes and leafs, or as a hierarchy with nodes only.
44 """
45 accessRights = ("add", "edit", "view", "delete", "manage")
47 nodeSkelCls = None
48 leafSkelCls = None
50 default_order: DEFAULT_ORDER_TYPE = "sortindex"
51 """
52 Allows to specify a default order for this module, which is applied when no other order is specified.
54 Setting a default_order might result in the requirement of additional indexes, which are being raised
55 and must be specified.
56 """
58 def __init__(self, moduleName, modulePath, *args, **kwargs):
59 assert self.nodeSkelCls, f"Need to specify at least nodeSkelCls for {self.__class__.__name__!r}"
60 super().__init__(moduleName, modulePath, *args, **kwargs)
62 @property
63 def handler(self):
64 return "tree" if self.leafSkelCls else "tree.node" # either a tree or a tree with nodes only (former hierarchy)
66 def _checkSkelType(self, skelType: t.Any) -> t.Optional[SkelType]:
67 """
68 Checks for correct skelType.
70 Either returns the type provided, or None in case it is invalid.
71 """
72 skelType = skelType.lower()
73 if skelType == "node" or (skelType == "leaf" and self.leafSkelCls):
74 return skelType
76 return None
78 def _resolveSkelCls(self, skelType: SkelType, *args, **kwargs) -> t.Type[Skeleton]:
79 if not (skelType := self._checkSkelType(skelType)):
80 raise ValueError("Unsupported skelType")
82 if skelType == "leaf":
83 return self.leafSkelCls
85 return self.nodeSkelCls
87 def baseSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
88 """
89 Return unmodified base skeleton for the given skelType.
91 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`viewSkel`, :func:`~baseSkel`
92 """
93 return self._resolveSkelCls(skelType, *args, **kwargs)()
95 def viewSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
96 """
97 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
98 for viewing an existing entry from the tree.
100 The default is a Skeleton instance returned by :func:`~baseSkel`.
102 .. seealso:: :func:`addSkel`, :func:`editSkel`, :func:`~baseSkel`
104 :return: Returns a Skeleton instance for viewing an entry.
105 """
106 return self.baseSkel(skelType, *args, **kwargs)
108 def addSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
109 """
110 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
111 for adding an entry to the tree.
113 The default is a Skeleton instance returned by :func:`~baseSkel`.
115 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
117 :return: Returns a Skeleton instance for adding an entry.
118 """
119 return self.baseSkel(skelType, *args, **kwargs)
121 def editSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
122 """
123 Retrieve a new instance of a :class:`viur.core.skeleton.Skeleton` that is used by the application
124 for editing an existing entry from the tree.
126 The default is a Skeleton instance returned by :func:`~baseSkel`.
128 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
130 :return: Returns a Skeleton instance for editing an entry.
131 """
132 return self.baseSkel(skelType, *args, **kwargs)
134 def cloneSkel(self, skelType: SkelType, *args, **kwargs) -> SkeletonInstance:
135 """
136 Retrieve a new :class:`viur.core.skeleton.SkeletonInstance` that is used by the application
137 for cloning an existing entry of the tree.
139 The default is a SkeletonInstance returned by :func:`~baseSkel`.
141 .. seealso:: :func:`viewSkel`, :func:`editSkel`, :func:`~baseSkel`
143 :return: Returns a SkeletonInstance for cloning an entry.
144 """
145 return self.baseSkel(skelType, *args, **kwargs)
147 def ensureOwnModuleRootNode(self) -> db.Entity:
148 """
149 Ensures, that general root-node for the current module exists.
150 If no root-node exists yet, it will be created.
152 :returns: The entity of the root-node.
153 """
154 key = "rep_module_repo"
155 kindName = self.viewSkel("node").kindName
156 return db.GetOrInsert(db.Key(kindName, key), creationdate=utils.utcNow(), rootNode=1)
158 def getAvailableRootNodes(self, *args, **kwargs) -> list[dict[t.Literal["name", "key"], str]]:
159 """
160 Default function for providing a list of root node items.
161 This list is requested by several module-internal functions and *must* be
162 overridden by a custom functionality. The default stub for this function
163 returns an empty list.
164 An example implementation could be the following:
166 .. code-block:: python
168 # Example
169 def getAvailableRootNodes(self, *args, **kwargs):
170 q = db.Query(self.rootKindName)
171 ret = [{"key": str(e.key()),
172 "name": e.get("name", str(e.key().id_or_name()))} #FIXME
173 for e in q.run(limit=25)]
174 return ret
176 :param args: Can be used in custom implementations.
177 :param kwargs: Can be used in custom implementations.
178 :return: Returns a list of dicts which must provide a "key" and a "name" entry with \
179 respective information.
180 """
181 return []
183 def getRootNode(self, key: db.Key | str) -> SkeletonInstance | None:
184 """
185 Returns the root-node for a given child.
187 :param key: Key of the child node entry.
189 :returns: The skeleton of the root-node.
190 """
191 skel = self.nodeSkelCls()
193 while key:
194 if not skel.fromDB(key):
195 return None
197 key = skel["parententry"]
199 return skel
201 @CallDeferred
202 def updateParentRepo(self, parentNode: str, newRepoKey: str, depth: int = 0):
203 """
204 Recursively fixes the parentrepo key after a move operation.
206 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
208 :param parentNode: URL-safe key of the node which children should be fixed.
209 :param newRepoKey: URL-safe key of the new repository.
210 :param depth: Safety level depth preventing infinitive loops.
211 """
212 if depth > 99:
213 logging.critical(f"Maximum recursion depth reached in {self.updateParentRepo.__module__}/updateParentRepo")
214 logging.critical("Your data is corrupt!")
215 logging.debug(f"{parentNode=}, {newRepoKey=}")
216 return
218 def fixTxn(nodeKey, newRepoKey):
219 node = db.Get(nodeKey)
220 node["parentrepo"] = newRepoKey
221 db.Put(node)
223 # Fix all nodes
224 q = db.Query(self.viewSkel("node").kindName).filter("parententry =", parentNode)
225 for repo in q.iter():
226 self.updateParentRepo(repo.key, newRepoKey, depth=depth + 1)
227 db.RunInTransaction(fixTxn, repo.key, newRepoKey)
229 # Fix the leafs on this level
230 if self.leafSkelCls:
231 q = db.Query(self.viewSkel("leaf").kindName).filter("parententry =", parentNode)
232 for repo in q.iter():
233 db.RunInTransaction(fixTxn, repo.key, newRepoKey)
235 ## Internal exposed functions
237 @internal_exposed
238 def pathToKey(self, key: db.Key):
239 """
240 Returns the recursively expanded path through the Tree from the root-node to a
241 requested node.
242 :param key: Key of the destination *node*.
243 :returns: An nested dictionary with information about all nodes in the path from root to the requested node.
244 """
245 lastLevel = []
246 for x in range(0, 99):
247 currentNodeSkel = self.viewSkel("node")
248 if not currentNodeSkel.fromDB(key):
249 return [] # Either invalid key or listFilter prevented us from fetching anything
250 if currentNodeSkel["parententry"] == currentNodeSkel["parentrepo"]: # We reached the top level
251 break
252 levelQry = self.viewSkel("node").all().filter("parententry =", currentNodeSkel["parententry"])
253 currentLevel = [{"skel": x,
254 "active": x["key"] == currentNodeSkel["key"],
255 "children": lastLevel if x["key"] == currentNodeSkel["key"] else []}
256 for x in self.listFilter(levelQry).fetch(99)]
257 assert currentLevel, "Got emtpy parent list?"
258 lastLevel = currentLevel
259 key = currentNodeSkel["parententry"]
260 return lastLevel
262 ## External exposed functions
264 @exposed
265 def listRootNodes(self, *args, **kwargs) -> t.Any:
266 """
267 Renders a list of all available repositories for the current user using the
268 modules default renderer.
270 :returns: The rendered representation of the available root-nodes.
271 """
272 return self.render.listRootNodes(self.getAvailableRootNodes(*args, **kwargs))
274 @exposed
275 def list(self, skelType: SkelType, *args, **kwargs) -> t.Any:
276 """
277 Prepares and renders a list of entries.
279 All supplied parameters are interpreted as filters for the elements displayed.
281 Unlike other module prototypes in ViUR, the access control in this function is performed
282 by calling the function :func:`listFilter`, which updates the query-filter to match only
283 elements which the user is allowed to see.
285 .. seealso:: :func:`listFilter`, :func:`viur.core.db.mergeExternalFilter`
287 :returns: The rendered list objects for the matching entries.
289 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
290 """
291 if not (skelType := self._checkSkelType(skelType)):
292 raise errors.NotAcceptable("Invalid skelType provided.")
294 # The general access control is made via self.listFilter()
295 query = self.listFilter(self.viewSkel(skelType).all().mergeExternalFilter(kwargs))
296 if query and query.queries and not isinstance(query.queries, list):
297 # Apply default order when specified
298 if self.default_order and not query.queries.orders and not current.request.get().kwargs.get("search"):
299 # TODO: refactor: Duplicate code in prototypes.List
300 if callable(default_order := self.default_order):
301 default_order = default_order(query)
303 if isinstance(default_order, dict):
304 logging.debug(f"Applying filter {default_order=}")
305 query.mergeExternalFilter(default_order)
307 elif default_order:
308 logging.debug(f"Applying {default_order=}")
310 # FIXME: This ugly test can be removed when there is type that abstracts SortOrders
311 if (
312 isinstance(default_order, str)
313 or (
314 isinstance(default_order, tuple)
315 and len(default_order) == 2
316 and isinstance(default_order[0], str)
317 and isinstance(default_order[1], db.SortOrder)
318 )
319 ):
320 query.order(default_order)
321 else:
322 query.order(*default_order)
324 return self.render.list(query.fetch())
326 raise errors.Unauthorized()
328 @exposed
329 def structure(self, skelType: SkelType, *args, **kwargs) -> t.Any:
330 """
331 :returns: Returns the structure of our skeleton as used in list/view. Values are the defaultValues set
332 in each bone.
334 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided.
335 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
336 """
337 if not (skelType := self._checkSkelType(skelType)):
338 raise errors.NotAcceptable(f"Invalid skelType provided.")
339 skel = self.viewSkel(skelType)
340 if not self.canAdd(skelType, None): # We can't use canView here as it would require passing a skeletonInstance.
341 # As a fallback, we'll check if the user has the permissions to view at least one entry
342 qry = self.listFilter(skel.all())
343 if not qry or not qry.getEntry():
344 raise errors.Unauthorized()
345 return self.render.view(skel)
347 @exposed
348 def view(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
349 """
350 Prepares and renders a single entry for viewing.
352 The entry is fetched by its *key* and its *skelType*.
353 The function performs several access control checks on the requested entity before it is rendered.
355 .. seealso:: :func:`canView`, :func:`onView`
357 :returns: The rendered representation of the requested entity.
359 :param skelType: May either be "node" or "leaf".
360 :param key: URL-safe key of the parent.
362 :raises: :exc:`viur.core.errors.NotAcceptable`, when an incorrect *skelType* is provided.
363 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
364 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
365 """
366 if not (skelType := self._checkSkelType(skelType)):
367 raise errors.NotAcceptable(f"Invalid skelType provided.")
369 skel = self.viewSkel(skelType)
370 if not skel.fromDB(key):
371 raise errors.NotFound()
373 if not self.canView(skelType, skel):
374 raise errors.Unauthorized()
376 self.onView(skelType, skel)
377 return self.render.view(skel)
379 @exposed
380 @force_ssl
381 @skey(allow_empty=True)
382 def add(self, skelType: SkelType, node: db.Key | int | str, *args, **kwargs) -> t.Any:
383 """
384 Add a new entry with the given parent *node*, and render the entry, eventually with error notes
385 on incorrect data. Data is taken by any other arguments in *kwargs*.
387 The function performs several access control checks on the requested entity before it is added.
389 .. seealso:: :func:`canAdd`, :func:`onAdd`, , :func:`onAdded`
391 :param skelType: Defines the type of the new entry and may either be "node" or "leaf".
392 :param node: URL-safe key of the parent.
394 :returns: The rendered, added object of the entry, eventually with error hints.
396 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
397 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
398 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
399 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
400 """
401 if not (skelType := self._checkSkelType(skelType)):
402 raise errors.NotAcceptable(f"Invalid skelType provided.")
404 skel = self.addSkel(skelType)
405 parentNodeSkel = self.editSkel("node")
406 if not parentNodeSkel.fromDB(node):
407 raise errors.NotFound("The provided parent node could not be found.")
408 if not self.canAdd(skelType, parentNodeSkel):
409 raise errors.Unauthorized()
411 skel["parententry"] = parentNodeSkel["key"]
412 # parentrepo may not exist in parentNodeSkel as it may be an rootNode
413 skel["parentrepo"] = parentNodeSkel["parentrepo"] or parentNodeSkel["key"]
415 if (
416 not kwargs # no data supplied
417 or not current.request.get().isPostRequest # failure if not using POST-method
418 or not skel.fromClient(kwargs) # failure on reading into the bones
419 or utils.parse.bool(kwargs.get("bounce")) # review before adding
420 ):
421 return self.render.add(skel)
423 self.onAdd(skelType, skel)
424 skel.toDB()
425 self.onAdded(skelType, skel)
427 return self.render.addSuccess(skel)
429 @exposed
430 @force_ssl
431 @skey(allow_empty=True)
432 def edit(self, skelType: SkelType, key: db.Key | int | str, *args, **kwargs) -> t.Any:
433 """
434 Modify an existing entry, and render the entry, eventually with error notes on incorrect data.
435 Data is taken by any other arguments in *kwargs*.
437 The function performs several access control checks on the requested entity before it is added.
439 .. seealso:: :func:`canEdit`, :func:`onEdit`, :func:`onEdited`
441 :param skelType: Defines the type of the entry that should be modified and may either be "node" or "leaf".
442 :param key: URL-safe key of the item to be edited.
444 :returns: The rendered, modified object of the entry, eventually with error hints.
446 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
447 :raises: :exc:`viur.core.errors.NotFound`, when no valid *node* was found.
448 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
449 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
450 """
451 if not (skelType := self._checkSkelType(skelType)):
452 raise errors.NotAcceptable(f"Invalid skelType provided.")
454 skel = self.editSkel(skelType)
455 if not skel.fromDB(key):
456 raise errors.NotFound()
458 if not self.canEdit(skelType, skel):
459 raise errors.Unauthorized()
461 if (
462 not kwargs # no data supplied
463 or not current.request.get().isPostRequest # failure if not using POST-method
464 or not skel.fromClient(kwargs, amend=True) # failure on reading into the bones
465 or utils.parse.bool(kwargs.get("bounce")) # review before adding
466 ):
467 return self.render.edit(skel)
469 self.onEdit(skelType, skel)
470 skel.toDB()
471 self.onEdited(skelType, skel)
473 return self.render.editSuccess(skel)
475 @exposed
476 @force_ssl
477 @force_post
478 @skey
479 def delete(self, skelType: SkelType, key: str, *args, **kwargs) -> t.Any:
480 """
481 Deletes an entry or an directory (including its contents).
483 The function runs several access control checks on the data before it is deleted.
485 .. seealso:: :func:`canDelete`, :func:`onDelete`, :func:`onDeleted`
487 :param skelType: Defines the type of the entry that should be deleted and may either be "node" or "leaf".
488 :param key: URL-safe key of the item to be deleted.
490 :returns: The rendered, deleted object of the entry.
492 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
493 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
494 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
495 """
496 if not (skelType := self._checkSkelType(skelType)):
497 raise errors.NotAcceptable(f"Invalid skelType provided.")
499 skel = self.editSkel(skelType)
500 if not skel.fromDB(key):
501 raise errors.NotFound()
503 if not self.canDelete(skelType, skel):
504 raise errors.Unauthorized()
506 if skelType == "node":
507 self.deleteRecursive(skel["key"])
509 self.onDelete(skelType, skel)
510 skel.delete()
511 self.onDeleted(skelType, skel)
513 return self.render.deleteSuccess(skel, skelType=skelType)
515 @CallDeferred
516 def deleteRecursive(self, parentKey: str):
517 """
518 Recursively processes a delete request.
520 This will delete all entries which are children of *nodeKey*, except *key* nodeKey.
522 :param parentKey: URL-safe key of the node which children should be deleted.
523 """
524 nodeKey = db.keyHelper(parentKey, self.viewSkel("node").kindName)
525 if self.leafSkelCls:
526 for leaf in db.Query(self.viewSkel("leaf").kindName).filter("parententry =", nodeKey).iter():
527 leafSkel = self.viewSkel("leaf")
528 if not leafSkel.fromDB(leaf.key):
529 continue
530 leafSkel.delete()
531 for node in db.Query(self.viewSkel("node").kindName).filter("parententry =", nodeKey).iter():
532 self.deleteRecursive(node.key)
533 nodeSkel = self.viewSkel("node")
534 if not nodeSkel.fromDB(node.key):
535 continue
536 nodeSkel.delete()
538 @exposed
539 @force_ssl
540 @force_post
541 @skey
542 def move(self, skelType: SkelType, key: db.Key | int | str, parentNode: str, *args, **kwargs) -> str:
543 """
544 Move a node (including its contents) or a leaf to another node.
546 .. seealso:: :func:`canMove`
548 :param skelType: Defines the type of the entry that should be moved and may either be "node" or "leaf".
549 :param key: URL-safe key of the item to be moved.
550 :param parentNode: URL-safe key of the destination node, which must be a node.
551 :param skey: The CSRF security key.
553 :returns: The rendered, edited object of the entry.
555 :raises: :exc:`viur.core.errors.NotFound`, when no entry with the given *key* was found.
556 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
557 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
558 """
559 if not (skelType := self._checkSkelType(skelType)):
560 raise errors.NotAcceptable(f"Invalid skelType provided.")
562 skel = self.editSkel(skelType) # srcSkel - the skeleton to be moved
563 parentNodeSkel = self.baseSkel("node") # destSkel - the node it should be moved into
565 if not skel.fromDB(key):
566 raise errors.NotFound("Cannot find entity to move")
568 if not parentNodeSkel.fromDB(parentNode):
569 parentNode = utils.normalizeKey(db.Key.from_legacy_urlsafe(parentNode))
571 if parentNode.kind != parentNodeSkel.kindName:
572 raise errors.NotFound(
573 f"You provided a key of kind {parentNode.kind}, but require a {parentNodeSkel.kindName}."
574 )
576 raise errors.NotFound("Cannot find parentNode entity")
578 if not self.canMove(skelType, skel, parentNodeSkel):
579 raise errors.Unauthorized()
581 if skel["key"] == parentNodeSkel["key"]:
582 raise errors.NotAcceptable("Cannot move a node into itself")
584 ## Test for recursion
585 currLevel = db.Get(parentNodeSkel["key"])
586 for _ in range(0, 99):
587 if currLevel.key == skel["key"]:
588 break
589 if currLevel.get("rootNode") or currLevel.get("is_root_node"):
590 # We reached a rootNode, so this is okay
591 break
592 currLevel = db.Get(currLevel["parententry"])
593 else: # We did not "break" - recursion-level exceeded or loop detected
594 raise errors.NotAcceptable("Unable to find a root node in recursion?")
596 # Test if we try to move a rootNode
597 # TODO: Remove "rootNode"-fallback with VIUR4
598 if skel.dbEntity.get("is_root_node") or skel.dbEntity.get("rootNode"):
599 raise errors.NotAcceptable("Can't move a rootNode to somewhere else")
601 currentParentRepo = skel["parentrepo"]
602 skel["parententry"] = parentNodeSkel["key"]
603 skel["parentrepo"] = parentNodeSkel["parentrepo"] # Fixme: Need to recursive fixing to parentrepo?
604 if "sortindex" in kwargs:
605 try:
606 skel["sortindex"] = float(kwargs["sortindex"])
607 except:
608 raise errors.PreconditionFailed()
610 self.onEdit(skelType, skel)
611 skel.toDB()
612 self.onEdited(skelType, skel)
614 # Ensure a changed parentRepo get's proagated
615 if currentParentRepo != parentNodeSkel["parentrepo"]:
616 self.updateParentRepo(key, parentNodeSkel["parentrepo"])
618 return self.render.editSuccess(skel)
620 @exposed
621 @force_ssl
622 @skey(allow_empty=True)
623 def clone(self, skelType: SkelType, key: db.Key | str | int, **kwargs):
624 """
625 Clone an existing entry, and render the entry, eventually with error notes on incorrect data.
626 Data is taken by any other arguments in *kwargs*.
628 The function performs several access control checks on the requested entity before it is added.
630 .. seealso:: :func:`canEdit`, :func:`canAdd`, :func:`onClone`, :func:`onCloned`
632 :param skelType: Defines the type of the entry that should be cloned and may either be "node" or "leaf".
633 :param key: URL-safe key of the item to be edited.
635 :returns: The cloned object of the entry, eventually with error hints.
637 :raises: :exc:`viur.core.errors.NotAcceptable`, when no valid *skelType* was provided.
638 :raises: :exc:`viur.core.errors.NotFound`, when no *entry* to clone from was found.
639 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
640 """
642 if not (skelType := self._checkSkelType(skelType)):
643 raise errors.NotAcceptable(f"Invalid skelType provided.")
645 skel = self.cloneSkel(skelType)
646 if not skel.fromDB(key):
647 raise errors.NotFound()
649 # a clone-operation is some kind of edit and add...
650 if not (self.canEdit(skelType, skel) and self.canAdd(skelType, kwargs.get("parententry"))):
651 raise errors.Unauthorized()
653 # Remember source skel and unset the key for clone operation!
654 src_skel = skel
655 skel = skel.clone()
656 skel["key"] = None
658 # make parententry required and writeable when provided
659 if "parententry" in kwargs:
660 skel.parententry.readOnly = False
661 skel.parententry.required = True
662 else:
663 _ = skel["parententry"] # TODO: because of accessedValues...
665 # make parentrepo required and writeable when provided
666 if "parentrepo" in kwargs:
667 skel.parentrepo.readOnly = False
668 skel.parentrepo.required = True
669 else:
670 _ = skel["parentrepo"] # TODO: because of accessedValues...
672 # Check all required preconditions for clone
673 if (
674 not kwargs # no data supplied
675 or not current.request.get().isPostRequest # failure if not using POST-method
676 or not skel.fromClient(kwargs) # failure on reading into the bones
677 or utils.parse.bool(kwargs.get("bounce")) # review before changing
678 ):
679 return self.render.edit(skel, action="clone")
681 self.onClone(skelType, skel, src_skel=src_skel)
682 assert skel.toDB()
683 self.onCloned(skelType, skel, src_skel=src_skel)
685 return self.render.editSuccess(skel, action="cloneSuccess")
687 ## Default access control functions
689 def listFilter(self, query: db.Query) -> t.Optional[db.Query]:
690 """
691 Access control function on item listing.
693 This function is invoked by the :func:`list` renderer and the related Jinja2 fetching function,
694 and is used to modify the provided filter parameter to match only items that the current user
695 is allowed to see.
697 :param query: Query which should be altered.
699 :returns: The altered filter, or None if access is not granted.
700 """
702 if (user := current.user.get()) and (f"{self.moduleName}-view" in user["access"] or "root" in user["access"]):
703 return query
705 return None
707 def canView(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
708 """
709 Checks if the current user can view the given entry.
710 Should be identical to what's allowed by listFilter.
711 By default, `meth:listFilter` is used to determine what's allowed and whats not; but this
712 method can be overridden for performance improvements (to eliminate that additional database access).
713 :param skel: The entry we check for
714 :return: True if the current session is authorized to view that entry, False otherwise
715 """
716 queryObj = self.viewSkel(skelType).all().mergeExternalFilter({"key": skel["key"]})
717 queryObj = self.listFilter(queryObj) # Access control
718 if queryObj is None:
719 return False
720 if not queryObj.getEntry():
721 return False
722 return True
724 def canAdd(self, skelType: SkelType, parentNodeSkel: t.Optional[SkeletonInstance]) -> bool:
725 """
726 Access control function for adding permission.
728 Checks if the current user has the permission to add a new entry.
730 The default behavior is:
731 - If no user is logged in, adding is generally refused.
732 - If the user has "root" access, adding is generally allowed.
733 - If the user has the modules "add" permission (module-add) enabled, adding is allowed.
735 It should be overridden for a module-specific behavior.
737 .. seealso:: :func:`add`
739 :param skelType: Defines the type of the node that should be added.
740 :param parentNodeSkel: The parent node where a new entry should be added.
742 :returns: True, if adding entries is allowed, False otherwise.
743 """
745 if not (user := current.user.get()):
746 return False
747 # root user is always allowed.
748 if user["access"] and "root" in user["access"]:
749 return True
750 # user with add-permission is allowed.
751 if user and user["access"] and f"{self.moduleName}-add" in user["access"]:
752 return True
753 return False
755 def canEdit(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
756 """
757 Access control function for modification permission.
759 Checks if the current user has the permission to edit an entry.
761 The default behavior is:
762 - If no user is logged in, editing is generally refused.
763 - If the user has "root" access, editing is generally allowed.
764 - If the user has the modules "edit" permission (module-edit) enabled, editing is allowed.
766 It should be overridden for a module-specific behavior.
768 .. seealso:: :func:`edit`
770 :param skelType: Defines the type of the node that should be edited.
771 :param skel: The Skeleton that should be edited.
773 :returns: True, if editing entries is allowed, False otherwise.
774 """
775 if not (user := current.user.get()):
776 return False
777 if user["access"] and "root" in user["access"]:
778 return True
779 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
780 return True
781 return False
783 def canDelete(self, skelType: SkelType, skel: SkeletonInstance) -> bool:
784 """
785 Access control function for delete permission.
787 Checks if the current user has the permission to delete an entry.
789 The default behavior is:
790 - If no user is logged in, deleting is generally refused.
791 - If the user has "root" access, deleting is generally allowed.
792 - If the user has the modules "deleting" permission (module-delete) enabled, \
793 deleting is allowed.
795 It should be overridden for a module-specific behavior.
797 :param skelType: Defines the type of the node that should be deleted.
798 :param skel: The Skeleton that should be deleted.
800 .. seealso:: :func:`delete`
802 :returns: True, if deleting entries is allowed, False otherwise.
803 """
804 if not (user := current.user.get()):
805 return False
806 if user["access"] and "root" in user["access"]:
807 return True
808 if user and user["access"] and f"{self.moduleName}-delete" in user["access"]:
809 return True
810 return False
812 def canMove(self, skelType: SkelType, node: SkeletonInstance, destNode: SkeletonInstance) -> bool:
813 """
814 Access control function for moving permission.
816 Checks if the current user has the permission to move an entry.
818 The default behavior is:
819 - If no user is logged in, deleting is generally refused.
820 - If the user has "root" access, deleting is generally allowed.
821 - If the user has the modules "edit" permission (module-edit) enabled, \
822 moving is allowed.
824 It should be overridden for a module-specific behavior.
826 :param skelType: Defines the type of the node that shall be deleted.
827 :param node: URL-safe key of the node to be moved.
828 :param destNode: URL-safe key of the node where *node* should be moved to.
830 .. seealso:: :func:`move`
832 :returns: True, if deleting entries is allowed, False otherwise.
833 """
834 if not (user := current.user.get()):
835 return False
836 if user["access"] and "root" in user["access"]:
837 return True
838 if user and user["access"] and f"{self.moduleName}-edit" in user["access"]:
839 return True
840 return False
842 ## Overridable eventhooks
844 def onAdd(self, skelType: SkelType, skel: SkeletonInstance):
845 """
846 Hook function that is called before adding an entry.
848 It can be overridden for a module-specific behavior.
850 :param skelType: Defines the type of the node that shall be added.
851 :param skel: The Skeleton that is going to be added.
853 .. seealso:: :func:`add`, :func:`onAdded`
854 """
855 pass
857 def onAdded(self, skelType: SkelType, skel: SkeletonInstance):
858 """
859 Hook function that is called after adding an entry.
861 It should be overridden for a module-specific behavior.
862 The default is writing a log entry.
864 :param skelType: Defines the type of the node that has been added.
865 :param skel: The Skeleton that has been added.
867 .. seealso:: :func:`add`, :func:`onAdd`
868 """
869 logging.info(f"""Entry of kind {skelType!r} added: {skel["key"]!r}""")
870 flushCache(kind=skel.kindName)
871 if user := current.user.get():
872 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
874 def onEdit(self, skelType: SkelType, skel: SkeletonInstance):
875 """
876 Hook function that is called before editing an entry.
878 It can be overridden for a module-specific behavior.
880 :param skelType: Defines the type of the node that shall be edited.
881 :param skel: The Skeleton that is going to be edited.
883 .. seealso:: :func:`edit`, :func:`onEdited`
884 """
885 pass
887 def onEdited(self, skelType: SkelType, skel: SkeletonInstance):
888 """
889 Hook function that is called after modifying an entry.
891 It should be overridden for a module-specific behavior.
892 The default is writing a log entry.
894 :param skelType: Defines the type of the node that has been edited.
895 :param skel: The Skeleton that has been modified.
897 .. seealso:: :func:`edit`, :func:`onEdit`
898 """
899 logging.info(f"""Entry of kind {skelType!r} changed: {skel["key"]!r}""")
900 flushCache(key=skel["key"])
901 if user := current.user.get():
902 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
904 def onView(self, skelType: SkelType, skel: SkeletonInstance):
905 """
906 Hook function that is called when viewing an entry.
908 It should be overridden for a module-specific behavior.
909 The default is doing nothing.
911 :param skelType: Defines the type of the node that is viewed.
912 :param skel: The Skeleton that is viewed.
914 .. seealso:: :func:`view`
915 """
916 pass
918 def onDelete(self, skelType: SkelType, skel: SkeletonInstance):
919 """
920 Hook function that is called before deleting an entry.
922 It can be overridden for a module-specific behavior.
924 :param skelType: Defines the type of the node that shall be deleted.
925 :param skel: The Skeleton that is going to be deleted.
927 .. seealso:: :func:`delete`, :func:`onDeleted`
928 """
929 pass
931 def onDeleted(self, skelType: SkelType, skel: SkeletonInstance):
932 """
933 Hook function that is called after deleting an entry.
935 It should be overridden for a module-specific behavior.
936 The default is writing a log entry.
938 ..warning: Saving the skeleton again will undo the deletion
939 (if the skeleton was a leaf or a node with no children).
941 :param skelType: Defines the type of the node that is deleted.
942 :param skel: The Skeleton that has been deleted.
944 .. seealso:: :func:`delete`, :func:`onDelete`
945 """
946 logging.info(f"""Entry deleted: {skel["key"]!r} ({skelType!r})""")
947 flushCache(key=skel["key"])
948 if user := current.user.get():
949 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
951 def onClone(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
952 """
953 Hook function that is called before cloning an entry.
955 It can be overwritten to a module-specific behavior.
957 :param skelType: Defines the type of the node that is cloned.
958 :param skel: The new SkeletonInstance that is being created.
959 :param src_skel: The source SkeletonInstance `skel` is cloned from.
961 .. seealso:: :func:`clone`, :func:`onCloned`
962 """
963 pass
965 @CallDeferred
966 def _clone_recursive(
967 self,
968 skel_type: SkelType,
969 src_key: db.Key,
970 target_key: db.Key,
971 target_repo: db.Key,
972 cursor=None
973 ):
974 """
975 Helper function which is used by default onCloned() to clone a recursive structure.
976 """
977 assert (skel_type := self._checkSkelType(skel_type))
979 logging.debug(f"_clone_recursive {skel_type=}, {src_key=}, {target_key=}, {target_repo=}, {cursor=}")
981 q = self.cloneSkel(skel_type).all().filter("parententry", src_key).order("sortindex")
982 q.setCursor(cursor)
984 count = 0
985 for skel in q.fetch():
986 src_skel = skel
988 skel = skel.clone()
989 skel["key"] = None
990 skel["parententry"] = target_key
991 skel["parentrepo"] = target_repo
993 self.onClone(skel_type, skel, src_skel=src_skel)
994 logging.debug(f"copying {skel=}") # this logging _is_ needed, otherwise not all values are being written..
995 assert skel.toDB()
996 self.onCloned(skel_type, skel, src_skel=src_skel)
997 count += 1
999 logging.debug(f"_clone_recursive {count=}")
1001 if cursor := q.getCursor():
1002 self._clone_recursive(skel_type, src_key, target_key, target_repo, skel_type, cursor)
1004 def onCloned(self, skelType: SkelType, skel: SkeletonInstance, src_skel: SkeletonInstance):
1005 """
1006 Hook function that is called after cloning an entry.
1008 It can be overwritten to a module-specific behavior.
1010 By default, when cloning a "node", this function calls :func:`_clone_recursive`
1011 which recursively clones the entire structure below this node in the background.
1012 If this is not wanted, or wanted by a specific setting, overwrite this function
1013 without a super-call.
1015 :param skelType: Defines the type of the node that is cloned.
1016 :param skel: The new SkeletonInstance that was created.
1017 :param src_skel: The source SkeletonInstance `skel` was cloned from.
1019 .. seealso:: :func:`clone`, :func:`onClone`
1020 """
1021 logging.info(f"""Entry cloned: {skel["key"]!r} ({skelType!r})""")
1022 flushCache(kind=skel.kindName)
1024 if user := current.user.get():
1025 logging.info(f"""User: {user["name"]!r} ({user["key"]!r})""")
1027 # Clone entire structure below, in case this is a node.
1028 if skelType == "node":
1029 self._clone_recursive("node", src_skel["key"], skel["key"], skel["parentrepo"])
1031 if self.leafSkelCls:
1032 self._clone_recursive("leaf", src_skel["key"], skel["key"], skel["parentrepo"])
1035Tree.vi = True
1036Tree.admin = True