Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/key.py: 8%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-03 13:41 +0000

1import copy 

2import logging 

3import typing as t 

4 

5from viur.core import db, utils 

6from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

7 

8 

9class KeyBone(BaseBone): 

10 """ 

11 The KeyBone is used for managing keys in the database. It provides various methods for validating, 

12 converting, and storing key values, as well as querying the database. 

13 Key management is crucial for maintaining relationships between entities in the database, and the 

14 KeyBone class helps ensure that keys are handled correctly and efficiently throughout the system. 

15 

16 :param str descr: The description of the KeyBone. 

17 :param bool readOnly: Whether the KeyBone is read-only. 

18 :param bool visible: Whether the KeyBone is visible. 

19 :param Union[None, List[str]] allowed_kinds: The allowed entity kinds for the KeyBone. 

20 :param bool check: Whether to check for entity existence. 

21 """ 

22 type = "key" 

23 

24 def __init__( 

25 self, 

26 *, 

27 descr: str = "Key", 

28 readOnly: bool = True, # default is readonly 

29 visible: bool = False, # default is invisible 

30 allowed_kinds: None | list[str] = None, # None allows for any kind 

31 check: bool = False, # check for entity existence 

32 **kwargs 

33 ): 

34 super().__init__(descr=descr, readOnly=readOnly, visible=visible, defaultValue=None, **kwargs) 

35 self.allowed_kinds = allowed_kinds 

36 self.check = check 

37 

38 def singleValueFromClient(self, value, skel, bone_name, client_data): 

39 # check for correct key 

40 if isinstance(value, str): 

41 value = value.strip() 

42 

43 if self.allowed_kinds: 

44 try: 

45 key = db.keyHelper(value, self.allowed_kinds[0], self.allowed_kinds[1:]) 

46 except ValueError as e: 

47 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, e.args[0])] 

48 else: 

49 try: 

50 if isinstance(value, db.Key): 

51 key = db.normalizeKey(value) 

52 else: 

53 key = db.normalizeKey(db.Key.from_legacy_urlsafe(value)) 

54 except Exception as exc: 

55 logging.exception(f"Failed to normalize {value}: {exc}") 

56 return self.getEmptyValue(), [ 

57 ReadFromClientError( 

58 ReadFromClientErrorSeverity.Invalid, 

59 "The provided key is not a valid database key" 

60 ) 

61 ] 

62 

63 # Check custom validity 

64 err = self.isInvalid(key) 

65 if err: 

66 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

67 

68 if self.check: 

69 if db.Get(key) is None: 

70 return self.getEmptyValue(), [ 

71 ReadFromClientError( 

72 ReadFromClientErrorSeverity.Invalid, 

73 "The provided key does not exist" 

74 ) 

75 ] 

76 

77 return key, None 

78 

79 def unserialize(self, skel: 'viur.core.skeleton.SkeletonValues', name: str) -> bool: 

80 """ 

81 This method is the inverse of :meth:serialize. It reads the key value from the datastore 

82 and populates the corresponding KeyBone in the Skeleton. The method converts the value from 

83 the datastore into an appropriate format for further use in the program. 

84 

85 :param skel: The SkeletonValues instance this bone is a part of. 

86 :param name: The property name of this bone in the Skeleton (not the description). 

87 

88 :return: A boolean value indicating whether the operation was successful. Returns True if 

89 the key value was successfully unserialized and added to the accessedValues of the 

90 Skeleton, and False otherwise. 

91 

92 .. note:: The method contains an inner function, fixVals(val), which normalizes and 

93 validates the key values before populating the bone. 

94 """ 

95 

96 def fixVals(val): 

97 if isinstance(val, str): 

98 try: 

99 val = utils.normalizeKey(db.Key.from_legacy_urlsafe(val)) 

100 except: 

101 val = None 

102 elif not isinstance(val, db.Key): 

103 val = None 

104 return val 

105 

106 if (name == "key" 

107 and isinstance(skel.dbEntity, db.Entity) 

108 and skel.dbEntity.key 

109 and not skel.dbEntity.key.is_partial): 

110 skel.accessedValues[name] = skel.dbEntity.key 

111 return True 

112 elif name in skel.dbEntity: 

113 val = skel.dbEntity[name] 

114 if isinstance(val, list): 

115 val = [fixVals(x) for x in val if fixVals(x)] 

116 else: 

117 val = fixVals(val) 

118 if self.multiple and not isinstance(val, list): 

119 if val: 

120 val = [val] 

121 else: 

122 val = [] 

123 elif not self.multiple and isinstance(val, list): 

124 val = val[0] 

125 skel.accessedValues[name] = val 

126 return True 

127 return False 

128 

129 def serialize(self, skel: 'SkeletonInstance', name: str, parentIndexed: bool) -> bool: 

130 """ 

131 This method serializes the KeyBone into a format that can be written to the datastore. It 

132 converts the key value from the Skeleton object into a format suitable for storage in the 

133 datastore. 

134 

135 :param skel: The SkeletonInstance this bone is a part of. 

136 :param name: The property name of this bone in the Skeleton (not the description). 

137 :param parentIndexed: A boolean value indicating whether the parent entity is indexed or not. 

138 

139 :return: A boolean value indicating whether the operation was successful. Returns True if 

140 the key value was successfully serialized and added to the datastore entity, and False 

141 otherwise. 

142 

143 .. note:: Key values are always indexed, so the method discards any exclusion from indexing 

144 for key values. 

145 """ 

146 if name in skel.accessedValues: 

147 if name == "key": 

148 skel.dbEntity.key = skel.accessedValues["key"] 

149 else: 

150 skel.dbEntity[name] = skel.accessedValues[name] 

151 skel.dbEntity.exclude_from_indexes.discard(name) # Keys can never be not indexed 

152 return True 

153 return False 

154 

155 def buildDBFilter( 

156 self, 

157 name: str, 

158 skel: 'viur.core.skeleton.SkeletonInstance', 

159 dbFilter: db.Query, 

160 rawFilter: dict, 

161 prefix: t.Optional[str] = None 

162 ) -> db.Query: 

163 """ 

164 This method parses the search filter specified by the client in their request and converts 

165 it into a format that can be understood by the datastore. It takes care of ignoring filters 

166 that do not target this bone and safely handles malformed data in the raw filter. 

167 

168 :param name: The property name of this bone in the Skeleton (not the description). 

169 :param skel: The :class:viur.core.skeleton.SkeletonInstance this bone is a part of. 

170 :param dbFilter: The current :class:viur.core.db.Query instance the filters should be 

171 applied to. 

172 :param rawFilter: The dictionary of filters the client wants to have applied. 

173 :param prefix: An optional string to prepend to the filter key. Defaults to None. 

174 

175 :return: The modified :class:viur.core.db.Query. 

176 

177 The method takes the following steps: 

178 

179 #. Decodes the provided key(s) from the raw filter. 

180 #. If the filter contains a list of keys, it iterates through the list, creating a new 

181 filter for each key and appending it to the list of queries. 

182 #. If the filter contains a single key, it applies the filter directly to the query. 

183 #. In case of any invalid key or other issues, it raises a RuntimeError. 

184 """ 

185 

186 def _decodeKey(key): 

187 if isinstance(key, db.Key): 

188 return key 

189 else: 

190 try: 

191 return db.Key.from_legacy_urlsafe(key) 

192 except Exception as e: 

193 logging.exception(e) 

194 logging.warning(f"Could not decode key {key}") 

195 raise RuntimeError() 

196 

197 if name in rawFilter: 

198 if isinstance(rawFilter[name], list): 

199 if isinstance(dbFilter.queries, list): 

200 raise ValueError("In-Filter already used!") 

201 elif dbFilter.queries is None: 

202 return dbFilter # Query is already unsatisfiable 

203 oldFilter = dbFilter.queries 

204 dbFilter.queries = [] 

205 for key in rawFilter[name]: 

206 newFilter = copy.deepcopy(oldFilter) 

207 try: 

208 if name == "key": 

209 newFilter.filters[f"{prefix or ''}{db.KEY_SPECIAL_PROPERTY} ="] = _decodeKey(key) 

210 else: 

211 newFilter.filters[f"{prefix or ''}{name} ="] = _decodeKey(key) 

212 except: # Invalid key or something 

213 raise RuntimeError() 

214 dbFilter.queries.append(newFilter) 

215 else: 

216 try: 

217 if name == "key": 

218 dbFilter.filter(f"""{prefix or ""}{db.KEY_SPECIAL_PROPERTY} =""", _decodeKey(rawFilter[name])) 

219 else: 

220 dbFilter.filter(f"""{prefix or ""}{name} =""", _decodeKey(rawFilter[name])) 

221 except: # Invalid key or something 

222 raise RuntimeError() 

223 return dbFilter