Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 33%
176 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1import datetime
2import logging
3import typing as t
4import warnings
5from numbers import Number
7from viur.core import current, db, utils
8from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
10if t.TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true
11 from ..skeleton import SkeletonInstance
13DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str]
16class StringBone(BaseBone):
17 """
18 The "StringBone" represents a data field that contains text values.
19 """
20 type = "str"
22 def __init__(
23 self,
24 *,
25 caseSensitive: bool = True,
26 max_length: int | None = 254,
27 min_length: int | None = None,
28 natural_sorting: bool | t.Callable = False,
29 **kwargs
30 ):
31 """
32 Initializes a new StringBone.
34 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive?
35 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation.
36 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation.
37 :param natural_sorting: Allows a more natural sorting
38 than the default sorting on the plain values.
39 This uses the .sort_idx property.
40 `True` enables sorting according to DIN 5007 Variant 2.
41 With passing a `callable`, a custom transformer method can be set
42 that creates the value for the index property.
43 :param kwargs: Inherited arguments from the BaseBone.
44 """
45 # fixme: Remove in viur-core >= 4
46 if "maxLength" in kwargs: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true
47 warnings.warn("maxLength parameter is deprecated, please use max_length",
48 DeprecationWarning, stacklevel=2)
49 max_length = kwargs.pop("maxLength")
50 super().__init__(**kwargs)
51 if max_length is not None and max_length <= 0: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 raise ValueError("max_length must be a positive integer or None")
53 if min_length is not None and min_length <= 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 raise ValueError("min_length must be a positive integer or None")
55 if min_length is not None and max_length is not None: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 if min_length > max_length:
57 raise ValueError("min_length can't be greater than max_length")
58 self.caseSensitive = caseSensitive
59 self.max_length = max_length
60 self.min_length = min_length
61 if callable(natural_sorting): 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 self.natural_sorting = natural_sorting
63 elif not isinstance(natural_sorting, bool): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 raise TypeError("natural_sorting must be a callable or boolean!")
65 elif not natural_sorting: 65 ↛ exitline 65 didn't return from function '__init__' because the condition on line 65 was always true
66 self.natural_sorting = None
67 # else: keep self.natural_sorting as is
69 def type_coerce_single_value(self, value: t.Any) -> str:
70 """Convert a value to a string (if not already)
72 Converts a value that is not a string into a string
73 if a meaningful conversion is possible (simple data types only).
74 """
75 if isinstance(value, str):
76 return value
77 elif isinstance(value, Number): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 return str(value)
79 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 return value.isoformat()
81 elif isinstance(value, db.Key): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 return value.to_legacy_urlsafe().decode("ASCII")
83 elif not value: # None or any other falsy value 83 ↛ 86line 83 didn't jump to line 86 because the condition on line 83 was always true
84 return self.getEmptyValue()
85 else:
86 raise ValueError(
87 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}"
88 )
90 def singleValueSerialize(
91 self,
92 value: t.Any,
93 skel: "SkeletonInstance",
94 name: str,
95 parentIndexed: bool,
96 ) -> str | DB_TYPE_INDEXED:
97 """
98 Serializes a single value of this data field for storage in the database.
100 :param value: The value to serialize.
101 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`.
102 :param skel: The skeleton instance that this data field belongs to.
103 :param name: The name of this data field.
104 :param parentIndexed: A boolean value indicating whether the parent object has an index on
105 this data field or not.
106 :return: The serialized value.
107 """
108 value = self.type_coerce_single_value(value)
109 if (not self.caseSensitive or self.natural_sorting) and parentIndexed:
110 serialized: DB_TYPE_INDEXED = {"val": value}
111 if not self.caseSensitive: 111 ↛ 113line 111 didn't jump to line 113 because the condition on line 111 was always true
112 serialized["idx"] = value.lower()
113 if self.natural_sorting: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 serialized["sort_idx"] = self.natural_sorting(value)
115 return serialized
116 return value
118 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str:
119 """
120 Unserializes a single value of this data field from the database.
122 :param value: The serialized value to unserialize.
123 :return: The unserialized value.
124 """
125 if isinstance(value, dict) and "val" in value:
126 value = value["val"] # Process with the raw value
127 if value:
128 return str(value)
129 else:
130 return self.getEmptyValue()
132 def getEmptyValue(self) -> str:
133 """
134 Returns the empty value for this data field.
136 :return: An empty string.
137 """
138 return ""
140 def isEmpty(self, value):
141 """
142 Determines whether a value for this data field is empty or not.
144 :param value: The value to check for emptiness.
145 :return: A boolean value indicating whether the value is empty or not.
146 """
147 if not value:
148 return True
150 return not bool(str(value).strip())
152 def isInvalid(self, value: t.Any) -> str | None:
153 """
154 Returns None if the value would be valid for
155 this bone, an error-message otherwise.
156 """
157 if self.max_length is not None and len(value) > self.max_length: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true
158 return "Maximum length exceeded"
159 if self.min_length is not None and len(value) < self.min_length: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 return "Minimum length not reached"
161 return None
163 def singleValueFromClient(self, value, skel, bone_name, client_data):
164 """
165 Returns None and the escaped value if the value would be valid for
166 this bone, otherwise the empty value and an error-message.
167 """
169 if not (err := self.isInvalid(str(value))):
170 return utils.string.escape(value, self.max_length), None
172 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
174 def buildDBFilter(
175 self,
176 name: str,
177 skel: "SkeletonInstance",
178 dbFilter: db.Query,
179 rawFilter: dict,
180 prefix: t.Optional[str] = None
181 ) -> db.Query:
182 """
183 Builds and returns a database filter for this data field based on the provided raw filter data.
185 :param name: The name of this data field.
186 :param skel: The skeleton instance that this data field belongs to.
187 :param dbFilter: The database filter to add query clauses to.
188 :param rawFilter: A dictionary containing the raw filter data for this data field.
189 :param prefix: An optional prefix to add to the query clause.
190 :return: The database filter with the added query clauses.
191 """
192 if name not in rawFilter and not any(
193 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()]
194 ):
195 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix)
197 if not self.languages:
198 namefilter = name
199 else:
200 lang = None
201 for key in rawFilter.keys():
202 if key.startswith(f"{name}."):
203 langStr = key.replace(f"{name}.", "")
204 if langStr in self.languages:
205 lang = langStr
206 break
207 if not lang:
208 lang = current.language.get() # currentSession.getLanguage()
209 if not lang or not lang in self.languages:
210 lang = self.languages[0]
211 namefilter = f"{name}.{lang}"
213 if name + "$lk" in rawFilter: # Do a prefix-match
214 if not self.caseSensitive:
215 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower())
216 dbFilter.filter((prefix or "") + namefilter + ".idx <",
217 str(rawFilter[name + "$lk"] + u"\ufffd").lower())
218 else:
219 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"]))
220 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd"))
222 if name + "$gt" in rawFilter: # All entries after
223 if not self.caseSensitive:
224 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower())
225 else:
226 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"]))
228 if name + "$lt" in rawFilter: # All entries before
229 if not self.caseSensitive:
230 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower())
231 else:
232 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"]))
234 if name in rawFilter: # Normal, strict match
235 if not self.caseSensitive:
236 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower())
237 else:
238 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name]))
240 return dbFilter
242 def buildDBSort(
243 self,
244 name: str,
245 skel: "SkeletonInstance",
246 dbFilter: db.Query,
247 rawFilter: dict
248 ) -> t.Optional[db.Query]:
249 """
250 Build a DB sort based on the specified name and a raw filter.
252 :param name: The name of the attribute to sort by.
253 :param skel: A SkeletonInstance object.
254 :param dbFilter: A Query object representing the current DB filter.
255 :param rawFilter: A dictionary containing the raw filter.
256 :return: The Query object with the specified sort applied.
257 """
258 if ((orderby := rawFilter.get("orderby"))
259 and (orderby == name
260 or (isinstance(orderby, str) and orderby.startswith(f"{name}.") and self.languages))):
261 if self.languages:
262 lang = None
263 if orderby.startswith(f"{name}."):
264 lng = orderby.replace(f"{name}.", "")
265 if lng in self.languages:
266 lang = lng
267 if lang is None:
268 lang = current.language.get()
269 if not lang or lang not in self.languages:
270 lang = self.languages[0]
271 prop = f"{name}.{lang}"
272 else:
273 prop = name
274 if self.natural_sorting:
275 prop += ".sort_idx"
276 elif not self.caseSensitive:
277 prop += ".idx"
279 # fixme: VIUR4 replace theses stupid numbers defining a sort-order by a meaningful keys
280 sorting = {
281 "1": db.SortOrder.Descending,
282 "2": db.SortOrder.InvertedAscending,
283 "3": db.SortOrder.InvertedDescending,
284 }.get(rawFilter.get("orderdir"), db.SortOrder.Ascending)
285 order = (prop, sorting)
286 inEqFilter = [x for x in dbFilter.queries.filters.keys() # FIXME: This will break on multi queries
287 if (">" in x[-3:] or "<" in x[-3:] or "!=" in x[-4:])]
288 if inEqFilter:
289 inEqFilter = inEqFilter[0][: inEqFilter[0].find(" ")]
290 if inEqFilter != order[0]:
291 logging.warning(f"I fixed you query! Impossible ordering changed to {inEqFilter}, {order[0]}")
292 dbFilter.order(inEqFilter, order)
293 else:
294 dbFilter.order(order)
295 else:
296 dbFilter.order(order)
297 return dbFilter
299 def natural_sorting(self, value: str | None) -> str | None:
300 """Implements a default natural sorting transformer.
302 The sorting is according to DIN 5007 Variant 2
303 and sets ö and oe, etc. equal.
304 """
305 if value is None:
306 return None
307 assert isinstance(value, str)
308 if not self.caseSensitive:
309 value = value.lower()
311 # DIN 5007 Variant 2
312 return value.translate(str.maketrans({
313 "ö": "oe",
314 "Ö": "Oe",
315 "ü": "ue",
316 "Ü": "Ue",
317 "ä": "ae",
318 "Ä": "Ae",
319 "ẞ": "SS",
320 }))
322 def getSearchTags(self, skel: "SkeletonInstance", name: str) -> set[str]:
323 """
324 Returns a set of lowercased words that represent searchable tags for the given bone.
326 :param skel: The skeleton instance being searched.
327 :param name: The name of the bone to generate tags for.
329 :return: A set of lowercased words representing searchable tags.
330 """
331 result = set()
332 for idx, lang, value in self.iter_bone_value(skel, name):
333 if value is None:
334 continue
335 for line in str(value).splitlines(): # TODO: Can a StringBone be multiline?
336 for word in line.split(" "):
337 result.add(word.lower())
338 return result
340 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]:
341 """
342 Returns a list of unique index values for a given property name.
344 :param skel: The skeleton instance.
345 :param name: The name of the property.
346 :return: A list of unique index values for the property.
347 :raises NotImplementedError: If the StringBone has languages and the implementation
348 for this case is not yet defined.
349 """
350 if self.languages:
351 # Not yet implemented as it's unclear if we should keep each language distinct or not
352 raise NotImplementedError()
354 return super().getUniquePropertyIndexValues(skel, name)
356 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None:
357 super().refresh(skel, bone_name)
359 # TODO: duplicate code, this is the same iteration logic as in NumericBone
360 new_value = {}
361 for _, lang, value in self.iter_bone_value(skel, bone_name):
362 new_value.setdefault(lang, []).append(self.type_coerce_single_value(value))
364 if not self.multiple:
365 # take the first one
366 new_value = {lang: values[0] for lang, values in new_value.items() if values}
368 if self.languages:
369 skel[bone_name] = new_value
370 elif not self.languages:
371 # just the value(s) with None language
372 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue())
374 def structure(self) -> dict:
375 ret = super().structure() | {
376 "maxlength": self.max_length,
377 "minlength": self.min_length
378 }
379 return ret