Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/bones/string.py: 38%
162 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
1import datetime
2import functools
3import logging
4import string
5import typing as t
6import warnings
7from numbers import Number
9from viur.core import current, db, utils
10from viur.core.bones.base import BaseBone, ReadFromClientError, ReadFromClientErrorSeverity
12if t.TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true
13 from ..skeleton import SkeletonInstance
15DB_TYPE_INDEXED: t.TypeAlias = dict[t.Literal["val", "idx", "sort_idx"], str]
18class StringBone(BaseBone):
19 """
20 The "StringBone" represents a data field that contains text values.
21 """
22 type = "str"
24 def __init__(
25 self,
26 *,
27 caseSensitive: bool = True,
28 max_length: int | None = 254,
29 min_length: int | None = None,
30 natural_sorting: bool | t.Callable = False,
31 **kwargs
32 ):
33 """
34 Initializes a new StringBone.
36 :param caseSensitive: When filtering for values in this bone, should it be case-sensitive?
37 :param max_length: The maximum length allowed for values of this bone. Set to None for no limitation.
38 :param min_length: The minimum length allowed for values of this bone. Set to None for no limitation.
39 :param natural_sorting: Allows a more natural sorting
40 than the default sorting on the plain values.
41 This uses the .sort_idx property.
42 `True` enables sorting according to DIN 5007 Variant 2.
43 With passing a `callable`, a custom transformer method can be set
44 that creates the value for the index property.
45 :param kwargs: Inherited arguments from the BaseBone.
46 """
47 # fixme: Remove in viur-core >= 4
48 if "maxLength" in kwargs: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 warnings.warn("maxLength parameter is deprecated, please use max_length",
50 DeprecationWarning, stacklevel=2)
51 max_length = kwargs.pop("maxLength")
52 super().__init__(**kwargs)
53 if max_length is not None and max_length <= 0: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 raise ValueError("max_length must be a positive integer or None")
55 if min_length is not None and min_length <= 0: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 raise ValueError("min_length must be a positive integer or None")
57 if min_length is not None and max_length is not None: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 if min_length > max_length:
59 raise ValueError("min_length can't be greater than max_length")
60 self.caseSensitive = caseSensitive
61 self.max_length = max_length
62 self.min_length = min_length
63 if callable(natural_sorting): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 self.natural_sorting = natural_sorting
65 elif not isinstance(natural_sorting, bool): 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 raise TypeError("natural_sorting must be a callable or boolean!")
67 elif not natural_sorting: 67 ↛ exitline 67 didn't return from function '__init__' because the condition on line 67 was always true
68 self.natural_sorting = None
69 # else: keep self.natural_sorting as is
71 def type_coerce_single_value(self, value: t.Any) -> str:
72 """Convert a value to a string (if not already)
74 Converts a value that is not a string into a string
75 if a meaningful conversion is possible (simple data types only).
76 """
77 if isinstance(value, str):
78 return value
79 elif isinstance(value, Number): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 return str(value)
81 elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 return value.isoformat()
83 elif isinstance(value, db.Key): 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true
84 return value.to_legacy_urlsafe().decode("ASCII")
85 elif not value: # None or any other falsy value 85 ↛ 88line 85 didn't jump to line 88 because the condition on line 85 was always true
86 return self.getEmptyValue()
87 else:
88 raise ValueError(
89 f"Value {value} of type {type(value)} cannot be coerced for {type(self).__name__} {self.name}"
90 )
92 def singleValueSerialize(
93 self,
94 value: t.Any,
95 skel: "SkeletonInstance",
96 name: str,
97 parentIndexed: bool,
98 ) -> str | DB_TYPE_INDEXED:
99 """
100 Serializes a single value of this data field for storage in the database.
102 :param value: The value to serialize.
103 It should be a str value, if not it is forced with :meth:`type_coerce_single_value`.
104 :param skel: The skeleton instance that this data field belongs to.
105 :param name: The name of this data field.
106 :param parentIndexed: A boolean value indicating whether the parent object has an index on
107 this data field or not.
108 :return: The serialized value.
109 """
110 value = self.type_coerce_single_value(value)
111 if (not self.caseSensitive or self.natural_sorting) and parentIndexed:
112 serialized: DB_TYPE_INDEXED = {"val": value}
113 if not self.caseSensitive: 113 ↛ 115line 113 didn't jump to line 115 because the condition on line 113 was always true
114 serialized["idx"] = value.lower()
115 if self.natural_sorting: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 serialized["sort_idx"] = self.natural_sorting(value)
117 return serialized
118 return value
120 def singleValueUnserialize(self, value: str | DB_TYPE_INDEXED) -> str:
121 """
122 Unserializes a single value of this data field from the database.
124 :param value: The serialized value to unserialize.
125 :return: The unserialized value.
126 """
127 if isinstance(value, dict) and "val" in value:
128 value = value["val"] # Process with the raw value
129 if value:
130 return str(value)
131 else:
132 return self.getEmptyValue()
134 def getEmptyValue(self) -> str:
135 """
136 Returns the empty value for this data field.
138 :return: An empty string.
139 """
140 return ""
142 def isEmpty(self, value):
143 """
144 Determines whether a value for this data field is empty or not.
146 :param value: The value to check for emptiness.
147 :return: A boolean value indicating whether the value is empty or not.
148 """
149 if not value:
150 return True
152 return not bool(str(value).strip())
154 def isInvalid(self, value: t.Any) -> str | None:
155 """
156 Returns None if the value would be valid for
157 this bone, an error-message otherwise.
158 """
159 if self.max_length is not None and len(value) > self.max_length: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 return "Maximum length exceeded"
161 if self.min_length is not None and len(value) < self.min_length: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 return "Minimum length not reached"
163 return None
165 def singleValueFromClient(self, value, skel, bone_name, client_data):
166 """
167 Returns None and the escaped value if the value would be valid for
168 this bone, otherwise the empty value and an error-message.
169 """
171 if not (err := self.isInvalid(str(value))):
172 return utils.string.escape(value, self.max_length), None
174 return self.getEmptyValue(), [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, err)]
176 def buildDBFilter(
177 self,
178 name: str,
179 skel: "SkeletonInstance",
180 dbFilter: db.Query,
181 rawFilter: dict,
182 prefix: t.Optional[str] = None
183 ) -> db.Query:
184 """
185 Builds and returns a database filter for this data field based on the provided raw filter data.
187 :param name: The name of this data field.
188 :param skel: The skeleton instance that this data field belongs to.
189 :param dbFilter: The database filter to add query clauses to.
190 :param rawFilter: A dictionary containing the raw filter data for this data field.
191 :param prefix: An optional prefix to add to the query clause.
192 :return: The database filter with the added query clauses.
193 """
194 if name not in rawFilter and not any(
195 [(x.startswith(name + "$") or x.startswith(name + ".")) for x in rawFilter.keys()]
196 ):
197 return super().buildDBFilter(name, skel, dbFilter, rawFilter, prefix)
199 if not self.languages:
200 namefilter = name
201 else:
202 lang = None
203 for key in rawFilter.keys():
204 if key.startswith(f"{name}."):
205 langStr = key.replace(f"{name}.", "")
206 if langStr in self.languages:
207 lang = langStr
208 break
209 if not lang:
210 lang = current.language.get() # currentSession.getLanguage()
211 if not lang or not lang in self.languages:
212 lang = self.languages[0]
213 namefilter = f"{name}.{lang}"
215 if name + "$lk" in rawFilter: # Do a prefix-match
216 if not self.caseSensitive:
217 dbFilter.filter((prefix or "") + namefilter + ".idx >=", str(rawFilter[name + "$lk"]).lower())
218 dbFilter.filter((prefix or "") + namefilter + ".idx <",
219 str(rawFilter[name + "$lk"] + u"\ufffd").lower())
220 else:
221 dbFilter.filter((prefix or "") + namefilter + " >=", str(rawFilter[name + "$lk"]))
222 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lk"] + u"\ufffd"))
224 if name + "$gt" in rawFilter: # All entries after
225 if not self.caseSensitive:
226 dbFilter.filter((prefix or "") + namefilter + ".idx >", str(rawFilter[name + "$gt"]).lower())
227 else:
228 dbFilter.filter((prefix or "") + namefilter + " >", str(rawFilter[name + "$gt"]))
230 if name + "$lt" in rawFilter: # All entries before
231 if not self.caseSensitive:
232 dbFilter.filter((prefix or "") + namefilter + ".idx <", str(rawFilter[name + "$lt"]).lower())
233 else:
234 dbFilter.filter((prefix or "") + namefilter + " <", str(rawFilter[name + "$lt"]))
236 if name in rawFilter: # Normal, strict match
237 if not self.caseSensitive:
238 dbFilter.filter((prefix or "") + namefilter + ".idx", str(rawFilter[name]).lower())
239 else:
240 dbFilter.filter((prefix or "") + namefilter, str(rawFilter[name]))
242 return dbFilter
244 def buildDBSort(
245 self,
246 name: str,
247 skel: 'SkeletonInstance',
248 query: db.Query,
249 params: dict,
250 postfix: str = "",
251 ) -> t.Optional[db.Query]:
252 return super().buildDBSort(
253 name, skel, query, params,
254 postfix=".sort_idx" if self.natural_sorting else ".idx" if not self.caseSensitive else postfix
255 )
257 def natural_sorting(self, value: str | None) -> str | None:
258 """Implements a default natural sorting transformer.
260 The sorting is according to DIN 5007 Variant 2
261 and sets ö and oe, etc. equal.
262 """
263 if value is None:
264 return None
265 assert isinstance(value, str)
266 if not self.caseSensitive:
267 value = value.lower()
269 # DIN 5007 Variant 2
270 return value.translate(str.maketrans({
271 "ö": "oe",
272 "Ö": "Oe",
273 "ü": "ue",
274 "Ü": "Ue",
275 "ä": "ae",
276 "Ä": "Ae",
277 "ẞ": "SS",
278 }))
280 def getSearchTags(self, skel: "SkeletonInstance", name: str) -> set[str]:
281 """
282 Returns a set of lowercased words that represent searchable tags for the given bone.
284 :param skel: The skeleton instance being searched.
285 :param name: The name of the bone to generate tags for.
287 :return: A set of lowercased words representing searchable tags.
288 """
289 result = set()
290 for idx, lang, value in self.iter_bone_value(skel, name):
291 if value is None:
292 continue
293 for line in str(value).splitlines(): # TODO: Can a StringBone be multiline?
294 for word in line.split(" "):
295 result.add(word.lower())
296 return result
298 def getUniquePropertyIndexValues(self, skel: "SkeletonInstance", name: str) -> list[str]:
299 """
300 Returns a list of unique index values for a given property name.
302 :param skel: The skeleton instance.
303 :param name: The name of the property.
304 :return: A list of unique index values for the property.
305 :raises NotImplementedError: If the StringBone has languages and the implementation
306 for this case is not yet defined.
307 """
308 if self.languages:
309 # Not yet implemented as it's unclear if we should keep each language distinct or not
310 raise NotImplementedError()
312 if not self.caseSensitive and (value := skel[name]) is not None:
313 if self.multiple:
314 value = [v.lower() for v in value]
315 else:
316 value = value.lower()
317 return self._hashValueForUniquePropertyIndex(value)
319 return super().getUniquePropertyIndexValues(skel, name)
321 def refresh(self, skel: "SkeletonInstance", bone_name: str) -> None:
322 super().refresh(skel, bone_name)
324 # TODO: duplicate code, this is the same iteration logic as in NumericBone
325 new_value = {}
326 for _, lang, value in self.iter_bone_value(skel, bone_name):
327 new_value.setdefault(lang, []).append(self.type_coerce_single_value(value))
329 if not self.multiple:
330 # take the first one
331 new_value = {lang: values[0] for lang, values in new_value.items() if values}
333 if self.languages:
334 skel[bone_name] = new_value
335 elif not self.languages:
336 # just the value(s) with None language
337 skel[bone_name] = new_value.get(None, [] if self.multiple else self.getEmptyValue())
339 def structure(self) -> dict:
340 ret = super().structure() | {
341 "maxlength": self.max_length,
342 "minlength": self.min_length
343 }
344 return ret
346 @classmethod
347 def v_func_valid_chars(cls, valid_chars: t.Iterable = string.printable) -> t.Callable:
348 """
349 Returns a function that takes a string and checks whether it contains valid characters.
350 If all characters of the string are valid, it returns None, and succeeds.
351 If invalid characters are present, it returns an appropriate error message.
353 :param valid_chars: An iterable of valid characters.
354 :return: A function that takes a string and check whether it contains valid characters.
356 Example for digits only:
357 .. code-block:: python
358 str_bone = StringBone(vfunc=StringBone.v_func_valid_chars(string.digits))
359 """
361 def v_func(valid_chars_intern, value):
362 if any(char not in valid_chars_intern for char in value):
363 return "Not all letters are available in the charset"
365 return functools.partial(v_func, valid_chars)