Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 38%

162 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-07 19:28 +0000

1import datetime 

2import functools 

3import logging 

4import string 

5import typing as t 

6import warnings 

7from numbers import Number 

8 

9from viur.core import current, db, utils 

10from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

11 

12if t.TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true

13 from ..skeleton import SkeletonInstance 

14 

15DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str] 

16 

17 

18class StringBone(BaseBone): 

19 """ 

20 The "StringBone" represents a data field that contains text values. 

21 """ 

22 type = "str" 

23 

24 def __init__( 

25 self, 

26 *, 

27 caseSensitive: bool = True, 

28 max_length: int | None = 254, 

29 min_length: int | None = None, 

30 natural_sorting: bool | t.Callable = False, 

31 **kwargs 

32 ): 

33 """ 

34 Initializes a new StringBone. 

35 

36 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive? 

37 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation. 

38 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation. 

39 :param natural_sorting: Allows a more natural sorting 

40 than the default sorting on the plain values. 

41 This uses the .sort_idx property. 

42 `True` enables sorting according to DIN 5007 Variant 2. 

43 With passing a `callable`, a custom transformer method can be set 

44 that creates the value for the index property. 

45 :param kwargs: Inherited arguments from the BaseBone. 

46 """ 

47 # fixme: Remove in viur-core >= 4 

48 if "maxLength" in kwargs: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 warnings.warn("maxLength parameter is deprecated, please use max_length", 

50 DeprecationWarning, stacklevel=2) 

51 max_length = kwargs.pop("maxLength") 

52 super().__init__(**kwargs) 

53 if max_length is not None and max_length <= 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 raise ValueError("max_length must be a positive integer or None") 

55 if min_length is not None and min_length <= 0: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 raise ValueError("min_length must be a positive integer or None") 

57 if min_length is not None and max_length is not None: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true

58 if min_length > max_length: 

59 raise ValueError("min_length can't be greater than max_length") 

60 self.caseSensitive = caseSensitive 

61 self.max_length = max_length 

62 self.min_length = min_length 

63 if callable(natural_sorting): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 self.natural_sorting = natural_sorting 

65 elif not isinstance(natural_sorting, bool): 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 raise TypeError("natural_sorting must be a callable or boolean!") 

67 elif not natural_sorting: 67 ↛ exitline 67 didn't return from function '__init__' because the condition on line 67 was always true

68 self.natural_sorting = None 

69 # else: keep self.natural_sorting as is 

70 

71 def type_coerce_single_value(self, value: t.Any) -> str: 

72 """Convert a value to a string (if not already) 

73 

74 Converts a value that is not a string into a string 

75 if a meaningful conversion is possible (simple data types only). 

76 """ 

77 if isinstance(value, str): 

78 return value 

79 elif isinstance(value, Number): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return str(value) 

81 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 return value.isoformat() 

83 elif isinstance(value, db.Key): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true

84 return value.to_legacy_urlsafe().decode("ASCII") 

85 elif not value: # None or any other falsy value 85 ↛ 88line 85 didn't jump to line 88 because the condition on line 85 was always true

86 return self.getEmptyValue() 

87 else: 

88 raise ValueError( 

89 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}" 

90 ) 

91 

92 def singleValueSerialize( 

93 self, 

94 value: t.Any, 

95 skel: "SkeletonInstance", 

96 name: str, 

97 parentIndexed: bool, 

98 ) -> str | DB_TYPE_INDEXED: 

99 """ 

100 Serializes a single value of this data field for storage in the database. 

101 

102 :param value: The value to serialize. 

103 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`. 

104 :param skel: The skeleton instance that this data field belongs to. 

105 :param name: The name of this data field. 

106 :param parentIndexed: A boolean value indicating whether the parent object has an index on 

107 this data field or not. 

108 :return: The serialized value. 

109 """ 

110 value = self.type_coerce_single_value(value) 

111 if (not self.caseSensitive or self.natural_sorting) and parentIndexed: 

112 serialized: DB_TYPE_INDEXED = {"val": value} 

113 if not self.caseSensitive: 113 ↛ 115line 113 didn't jump to line 115 because the condition on line 113 was always true

114 serialized["idx"] = value.lower() 

115 if self.natural_sorting: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 serialized["sort_idx"] = self.natural_sorting(value) 

117 return serialized 

118 return value 

119 

120 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str: 

121 """ 

122 Unserializes a single value of this data field from the database. 

123 

124 :param value: The serialized value to unserialize. 

125 :return: The unserialized value. 

126 """ 

127 if isinstance(value, dict) and "val" in value: 

128 value = value["val"] # Process with the raw value 

129 if value: 

130 return str(value) 

131 else: 

132 return self.getEmptyValue() 

133 

134 def getEmptyValue(self) -> str: 

135 """ 

136 Returns the empty value for this data field. 

137 

138 :return: An empty string. 

139 """ 

140 return "" 

141 

142 def isEmpty(self, value): 

143 """ 

144 Determines whether a value for this data field is empty or not. 

145 

146 :param value: The value to check for emptiness. 

147 :return: A boolean value indicating whether the value is empty or not. 

148 """ 

149 if not value: 

150 return True 

151 

152 return not bool(str(value).strip()) 

153 

154 def isInvalid(self, value: t.Any) -> str | None: 

155 """ 

156 Returns None if the value would be valid for 

157 this bone, an error-message otherwise. 

158 """ 

159 if self.max_length is not None and len(value) > self.max_length: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 return "Maximum length exceeded" 

161 if self.min_length is not None and len(value) < self.min_length: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 return "Minimum length not reached" 

163 return None 

164 

165 def singleValueFromClient(self, value, skel, bone_name, client_data): 

166 """ 

167 Returns None and the escaped value if the value would be valid for 

168 this bone, otherwise the empty value and an error-message. 

169 """ 

170 

171 if not (err := self.isInvalid(str(value))): 

172 return utils.string.escape(value, self.max_length), None 

173 

174 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

175 

176 def buildDBFilter( 

177 self, 

178 name: str, 

179 skel: "SkeletonInstance", 

180 dbFilter: db.Query, 

181 rawFilter: dict, 

182 prefix: t.Optional[str] = None 

183 ) -> db.Query: 

184 """ 

185 Builds and returns a database filter for this data field based on the provided raw filter data. 

186 

187 :param name: The name of this data field. 

188 :param skel: The skeleton instance that this data field belongs to. 

189 :param dbFilter: The database filter to add query clauses to. 

190 :param rawFilter: A dictionary containing the raw filter data for this data field. 

191 :param prefix: An optional prefix to add to the query clause. 

192 :return: The database filter with the added query clauses. 

193 """ 

194 if name not in rawFilter and not any( 

195 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()] 

196 ): 

197 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix) 

198 

199 if not self.languages: 

200 namefilter = name 

201 else: 

202 lang = None 

203 for key in rawFilter.keys(): 

204 if key.startswith(f"{name}."): 

205 langStr = key.replace(f"{name}.", "") 

206 if langStr in self.languages: 

207 lang = langStr 

208 break 

209 if not lang: 

210 lang = current.language.get() # currentSession.getLanguage() 

211 if not lang or not lang in self.languages: 

212 lang = self.languages[0] 

213 namefilter = f"{name}.{lang}" 

214 

215 if name + "$lk" in rawFilter: # Do a prefix-match 

216 if not self.caseSensitive: 

217 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower()) 

218 dbFilter.filter((prefix or "") + namefilter + ".idx <", 

219 str(rawFilter[name + "$lk"] + u"\ufffd").lower()) 

220 else: 

221 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"])) 

222 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd")) 

223 

224 if name + "$gt" in rawFilter: # All entries after 

225 if not self.caseSensitive: 

226 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower()) 

227 else: 

228 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"])) 

229 

230 if name + "$lt" in rawFilter: # All entries before 

231 if not self.caseSensitive: 

232 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower()) 

233 else: 

234 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"])) 

235 

236 if name in rawFilter: # Normal, strict match 

237 if not self.caseSensitive: 

238 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower()) 

239 else: 

240 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name])) 

241 

242 return dbFilter 

243 

244 def buildDBSort( 

245 self, 

246 name: str, 

247 skel: 'SkeletonInstance', 

248 query: db.Query, 

249 params: dict, 

250 postfix: str = "", 

251 ) -> t.Optional[db.Query]: 

252 return super().buildDBSort( 

253 name, skel, query, params, 

254 postfix=".sort_idx" if self.natural_sorting else ".idx" if not self.caseSensitive else postfix 

255 ) 

256 

257 def natural_sorting(self, value: str | None) -> str | None: 

258 """Implements a default natural sorting transformer. 

259 

260 The sorting is according to DIN 5007 Variant 2 

261 and sets ö and oe, etc. equal. 

262 """ 

263 if value is None: 

264 return None 

265 assert isinstance(value, str) 

266 if not self.caseSensitive: 

267 value = value.lower() 

268 

269 # DIN 5007 Variant 2 

270 return value.translate(str.maketrans({ 

271 "ö": "oe", 

272 "Ö": "Oe", 

273 "ü": "ue", 

274 "Ü": "Ue", 

275 "ä": "ae", 

276 "Ä": "Ae", 

277 "ẞ": "SS", 

278 })) 

279 

280 def getSearchTags(self, skel: "SkeletonInstance", name: str) -> set[str]: 

281 """ 

282 Returns a set of lowercased words that represent searchable tags for the given bone. 

283 

284 :param skel: The skeleton instance being searched. 

285 :param name: The name of the bone to generate tags for. 

286 

287 :return: A set of lowercased words representing searchable tags. 

288 """ 

289 result = set() 

290 for idx, lang, value in self.iter_bone_value(skel, name): 

291 if value is None: 

292 continue 

293 for line in str(value).splitlines(): # TODO: Can a StringBone be multiline? 

294 for word in line.split(" "): 

295 result.add(word.lower()) 

296 return result 

297 

298 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]: 

299 """ 

300 Returns a list of unique index values for a given property name. 

301 

302 :param skel: The skeleton instance. 

303 :param name: The name of the property. 

304 :return: A list of unique index values for the property. 

305 :raises NotImplementedError: If the StringBone has languages and the implementation 

306 for this case is not yet defined. 

307 """ 

308 if self.languages: 

309 # Not yet implemented as it's unclear if we should keep each language distinct or not 

310 raise NotImplementedError() 

311 

312 if not self.caseSensitive and (value := skel[name]) is not None: 

313 if self.multiple: 

314 value = [v.lower() for v in value] 

315 else: 

316 value = value.lower() 

317 return self._hashValueForUniquePropertyIndex(value) 

318 

319 return super().getUniquePropertyIndexValues(skel, name) 

320 

321 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None: 

322 super().refresh(skel, bone_name) 

323 

324 # TODO: duplicate code, this is the same iteration logic as in NumericBone 

325 new_value = {} 

326 for _, lang, value in self.iter_bone_value(skel, bone_name): 

327 new_value.setdefault(lang, []).append(self.type_coerce_single_value(value)) 

328 

329 if not self.multiple: 

330 # take the first one 

331 new_value = {lang: values[0] for lang, values in new_value.items() if values} 

332 

333 if self.languages: 

334 skel[bone_name] = new_value 

335 elif not self.languages: 

336 # just the value(s) with None language 

337 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue()) 

338 

339 def structure(self) -> dict: 

340 ret = super().structure() | { 

341 "maxlength": self.max_length, 

342 "minlength": self.min_length 

343 } 

344 return ret 

345 

346 @classmethod 

347 def v_func_valid_chars(cls, valid_chars: t.Iterable = string.printable) -> t.Callable: 

348 """ 

349 Returns a function that takes a string and checks whether it contains valid characters. 

350 If all characters of the string are valid, it returns None, and succeeds. 

351 If invalid characters are present, it returns an appropriate error message. 

352 

353 :param valid_chars: An iterable of valid characters. 

354 :return: A function that takes a string and check whether it contains valid characters. 

355 

356 Example for digits only: 

357 .. code-block:: python 

358 str_bone = StringBone(vfunc=StringBone.v_func_valid_chars(string.digits)) 

359 """ 

360 

361 def v_func(valid_chars_intern, value): 

362 if any(char not in valid_chars_intern for char in value): 

363 return "Not all letters are available in the charset" 

364 

365 return functools.partial(v_func, valid_chars)