Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 33%

176 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-03 13:41 +0000

1import datetime 

2import logging 

3import typing as t 

4import warnings 

5from numbers import Number 

6 

7from viur.core import current, db, utils 

8from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity 

9 

10if t.TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true

11 from ..skeleton import SkeletonInstance 

12 

13DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str] 

14 

15 

16class StringBone(BaseBone): 

17 """ 

18 The "StringBone" represents a data field that contains text values. 

19 """ 

20 type = "str" 

21 

22 def __init__( 

23 self, 

24 *, 

25 caseSensitive: bool = True, 

26 max_length: int | None = 254, 

27 min_length: int | None = None, 

28 natural_sorting: bool | t.Callable = False, 

29 **kwargs 

30 ): 

31 """ 

32 Initializes a new StringBone. 

33 

34 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive? 

35 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation. 

36 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation. 

37 :param natural_sorting: Allows a more natural sorting 

38 than the default sorting on the plain values. 

39 This uses the .sort_idx property. 

40 `True` enables sorting according to DIN 5007 Variant 2. 

41 With passing a `callable`, a custom transformer method can be set 

42 that creates the value for the index property. 

43 :param kwargs: Inherited arguments from the BaseBone. 

44 """ 

45 # fixme: Remove in viur-core >= 4 

46 if "maxLength" in kwargs: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true

47 warnings.warn("maxLength parameter is deprecated, please use max_length", 

48 DeprecationWarning, stacklevel=2) 

49 max_length = kwargs.pop("maxLength") 

50 super().__init__(**kwargs) 

51 if max_length is not None and max_length <= 0: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 raise ValueError("max_length must be a positive integer or None") 

53 if min_length is not None and min_length <= 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 raise ValueError("min_length must be a positive integer or None") 

55 if min_length is not None and max_length is not None: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 if min_length > max_length: 

57 raise ValueError("min_length can't be greater than max_length") 

58 self.caseSensitive = caseSensitive 

59 self.max_length = max_length 

60 self.min_length = min_length 

61 if callable(natural_sorting): 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 self.natural_sorting = natural_sorting 

63 elif not isinstance(natural_sorting, bool): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 raise TypeError("natural_sorting must be a callable or boolean!") 

65 elif not natural_sorting: 65 ↛ exitline 65 didn't return from function '__init__' because the condition on line 65 was always true

66 self.natural_sorting = None 

67 # else: keep self.natural_sorting as is 

68 

69 def type_coerce_single_value(self, value: t.Any) -> str: 

70 """Convert a value to a string (if not already) 

71 

72 Converts a value that is not a string into a string 

73 if a meaningful conversion is possible (simple data types only). 

74 """ 

75 if isinstance(value, str): 

76 return value 

77 elif isinstance(value, Number): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 return str(value) 

79 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return value.isoformat() 

81 elif isinstance(value, db.Key): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 return value.to_legacy_urlsafe().decode("ASCII") 

83 elif not value: # None or any other falsy value 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true

84 return self.getEmptyValue() 

85 else: 

86 raise ValueError( 

87 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}" 

88 ) 

89 

90 def singleValueSerialize( 

91 self, 

92 value: t.Any, 

93 skel: "SkeletonInstance", 

94 name: str, 

95 parentIndexed: bool, 

96 ) -> str | DB_TYPE_INDEXED: 

97 """ 

98 Serializes a single value of this data field for storage in the database. 

99 

100 :param value: The value to serialize. 

101 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`. 

102 :param skel: The skeleton instance that this data field belongs to. 

103 :param name: The name of this data field. 

104 :param parentIndexed: A boolean value indicating whether the parent object has an index on 

105 this data field or not. 

106 :return: The serialized value. 

107 """ 

108 value = self.type_coerce_single_value(value) 

109 if (not self.caseSensitive or self.natural_sorting) and parentIndexed: 

110 serialized: DB_TYPE_INDEXED = {"val": value} 

111 if not self.caseSensitive: 111 ↛ 113line 111 didn't jump to line 113 because the condition on line 111 was always true

112 serialized["idx"] = value.lower() 

113 if self.natural_sorting: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 serialized["sort_idx"] = self.natural_sorting(value) 

115 return serialized 

116 return value 

117 

118 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str: 

119 """ 

120 Unserializes a single value of this data field from the database. 

121 

122 :param value: The serialized value to unserialize. 

123 :return: The unserialized value. 

124 """ 

125 if isinstance(value, dict) and "val" in value: 

126 value = value["val"] # Process with the raw value 

127 if value: 

128 return str(value) 

129 else: 

130 return self.getEmptyValue() 

131 

132 def getEmptyValue(self) -> str: 

133 """ 

134 Returns the empty value for this data field. 

135 

136 :return: An empty string. 

137 """ 

138 return "" 

139 

140 def isEmpty(self, value): 

141 """ 

142 Determines whether a value for this data field is empty or not. 

143 

144 :param value: The value to check for emptiness. 

145 :return: A boolean value indicating whether the value is empty or not. 

146 """ 

147 if not value: 

148 return True 

149 

150 return not bool(str(value).strip()) 

151 

152 def isInvalid(self, value: t.Any) -> str | None: 

153 """ 

154 Returns None if the value would be valid for 

155 this bone, an error-message otherwise. 

156 """ 

157 if self.max_length is not None and len(value) > self.max_length: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 return "Maximum length exceeded" 

159 if self.min_length is not None and len(value) < self.min_length: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 return "Minimum length not reached" 

161 return None 

162 

163 def singleValueFromClient(self, value, skel, bone_name, client_data): 

164 """ 

165 Returns None and the escaped value if the value would be valid for 

166 this bone, otherwise the empty value and an error-message. 

167 """ 

168 

169 if not (err := self.isInvalid(str(value))): 

170 return utils.string.escape(value, self.max_length), None 

171 

172 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)] 

173 

174 def buildDBFilter( 

175 self, 

176 name: str, 

177 skel: "SkeletonInstance", 

178 dbFilter: db.Query, 

179 rawFilter: dict, 

180 prefix: t.Optional[str] = None 

181 ) -> db.Query: 

182 """ 

183 Builds and returns a database filter for this data field based on the provided raw filter data. 

184 

185 :param name: The name of this data field. 

186 :param skel: The skeleton instance that this data field belongs to. 

187 :param dbFilter: The database filter to add query clauses to. 

188 :param rawFilter: A dictionary containing the raw filter data for this data field. 

189 :param prefix: An optional prefix to add to the query clause. 

190 :return: The database filter with the added query clauses. 

191 """ 

192 if name not in rawFilter and not any( 

193 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()] 

194 ): 

195 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix) 

196 

197 if not self.languages: 

198 namefilter = name 

199 else: 

200 lang = None 

201 for key in rawFilter.keys(): 

202 if key.startswith(f"{name}."): 

203 langStr = key.replace(f"{name}.", "") 

204 if langStr in self.languages: 

205 lang = langStr 

206 break 

207 if not lang: 

208 lang = current.language.get() # currentSession.getLanguage() 

209 if not lang or not lang in self.languages: 

210 lang = self.languages[0] 

211 namefilter = f"{name}.{lang}" 

212 

213 if name + "$lk" in rawFilter: # Do a prefix-match 

214 if not self.caseSensitive: 

215 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower()) 

216 dbFilter.filter((prefix or "") + namefilter + ".idx <", 

217 str(rawFilter[name + "$lk"] + u"\ufffd").lower()) 

218 else: 

219 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"])) 

220 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd")) 

221 

222 if name + "$gt" in rawFilter: # All entries after 

223 if not self.caseSensitive: 

224 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower()) 

225 else: 

226 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"])) 

227 

228 if name + "$lt" in rawFilter: # All entries before 

229 if not self.caseSensitive: 

230 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower()) 

231 else: 

232 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"])) 

233 

234 if name in rawFilter: # Normal, strict match 

235 if not self.caseSensitive: 

236 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower()) 

237 else: 

238 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name])) 

239 

240 return dbFilter 

241 

242 def buildDBSort( 

243 self, 

244 name: str, 

245 skel: "SkeletonInstance", 

246 dbFilter: db.Query, 

247 rawFilter: dict 

248 ) -> t.Optional[db.Query]: 

249 """ 

250 Build a DB sort based on the specified name and a raw filter. 

251 

252 :param name: The name of the attribute to sort by. 

253 :param skel: A SkeletonInstance object. 

254 :param dbFilter: A Query object representing the current DB filter. 

255 :param rawFilter: A dictionary containing the raw filter. 

256 :return: The Query object with the specified sort applied. 

257 """ 

258 if ((orderby := rawFilter.get("orderby")) 

259 and (orderby == name 

260 or (isinstance(orderby, str) and orderby.startswith(f"{name}.") and self.languages))): 

261 if self.languages: 

262 lang = None 

263 if orderby.startswith(f"{name}."): 

264 lng = orderby.replace(f"{name}.", "") 

265 if lng in self.languages: 

266 lang = lng 

267 if lang is None: 

268 lang = current.language.get() 

269 if not lang or lang not in self.languages: 

270 lang = self.languages[0] 

271 prop = f"{name}.{lang}" 

272 else: 

273 prop = name 

274 if self.natural_sorting: 

275 prop += ".sort_idx" 

276 elif not self.caseSensitive: 

277 prop += ".idx" 

278 

279 # fixme: VIUR4 replace theses stupid numbers defining a sort-order by a meaningful keys 

280 sorting = { 

281 "1": db.SortOrder.Descending, 

282 "2": db.SortOrder.InvertedAscending, 

283 "3": db.SortOrder.InvertedDescending, 

284 }.get(rawFilter.get("orderdir"), db.SortOrder.Ascending) 

285 order = (prop, sorting) 

286 inEqFilter = [x for x in dbFilter.queries.filters.keys() # FIXME: This will break on multi queries 

287 if (">" in x[-3:] or "<" in x[-3:] or "!=" in x[-4:])] 

288 if inEqFilter: 

289 inEqFilter = inEqFilter[0][: inEqFilter[0].find(" ")] 

290 if inEqFilter != order[0]: 

291 logging.warning(f"I fixed you query! Impossible ordering changed to {inEqFilter}, {order[0]}") 

292 dbFilter.order(inEqFilter, order) 

293 else: 

294 dbFilter.order(order) 

295 else: 

296 dbFilter.order(order) 

297 return dbFilter 

298 

299 def natural_sorting(self, value: str | None) -> str | None: 

300 """Implements a default natural sorting transformer. 

301 

302 The sorting is according to DIN 5007 Variant 2 

303 and sets ö and oe, etc. equal. 

304 """ 

305 if value is None: 

306 return None 

307 assert isinstance(value, str) 

308 if not self.caseSensitive: 

309 value = value.lower() 

310 

311 # DIN 5007 Variant 2 

312 return value.translate(str.maketrans({ 

313 "ö": "oe", 

314 "Ö": "Oe", 

315 "ü": "ue", 

316 "Ü": "Ue", 

317 "ä": "ae", 

318 "Ä": "Ae", 

319 "ẞ": "SS", 

320 })) 

321 

322 def getSearchTags(self, skel: "SkeletonInstance", name: str) -> set[str]: 

323 """ 

324 Returns a set of lowercased words that represent searchable tags for the given bone. 

325 

326 :param skel: The skeleton instance being searched. 

327 :param name: The name of the bone to generate tags for. 

328 

329 :return: A set of lowercased words representing searchable tags. 

330 """ 

331 result = set() 

332 for idx, lang, value in self.iter_bone_value(skel, name): 

333 if value is None: 

334 continue 

335 for line in str(value).splitlines(): # TODO: Can a StringBone be multiline? 

336 for word in line.split(" "): 

337 result.add(word.lower()) 

338 return result 

339 

340 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]: 

341 """ 

342 Returns a list of unique index values for a given property name. 

343 

344 :param skel: The skeleton instance. 

345 :param name: The name of the property. 

346 :return: A list of unique index values for the property. 

347 :raises NotImplementedError: If the StringBone has languages and the implementation 

348 for this case is not yet defined. 

349 """ 

350 if self.languages: 

351 # Not yet implemented as it's unclear if we should keep each language distinct or not 

352 raise NotImplementedError() 

353 

354 return super().getUniquePropertyIndexValues(skel, name) 

355 

356 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None: 

357 super().refresh(skel, bone_name) 

358 

359 # TODO: duplicate code, this is the same iteration logic as in NumericBone 

360 new_value = {} 

361 for _, lang, value in self.iter_bone_value(skel, bone_name): 

362 new_value.setdefault(lang, []).append(self.type_coerce_single_value(value)) 

363 

364 if not self.multiple: 

365 # take the first one 

366 new_value = {lang: values[0] for lang, values in new_value.items() if values} 

367 

368 if self.languages: 

369 skel[bone_name] = new_value 

370 elif not self.languages: 

371 # just the value(s) with None language 

372 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue()) 

373 

374 def structure(self) -> dict: 

375 ret = super().structure() | { 

376 "maxlength": self.max_length, 

377 "minlength": self.min_length 

378 } 

379 return ret