Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%
715 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-07 19:28 +0000
1import abc
2import datetime
3import enum
4import functools
5import hashlib
6import hmac
7import json
8import logging
9import secrets
10import warnings
11import user_agents
13import pyotp
14import base64
15import dataclasses
16import typing as t
17from google.auth.transport import requests
18from google.oauth2 import id_token
20from viur.core import (
21 conf, current, db, email, errors, i18n,
22 securitykey, session, skeleton, tasks, utils, Module
23)
24from viur.core.decorators import *
25from viur.core.bones import *
26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
27from viur.core.prototypes.list import List
28from viur.core.ratelimit import RateLimit
29from viur.core.securityheaders import extendCsp
32@functools.total_ordering
33class Status(enum.Enum):
34 """Status enum for a user
36 Has backwards compatibility to be comparable with non-enum values.
37 Will be removed with viur-core 4.0.0
38 """
40 UNSET = 0 # Status is unset
41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
43 DISABLED = 5 # Account disabled
44 ACTIVE = 10 # Active
46 def __eq__(self, other):
47 if isinstance(other, Status):
48 return super().__eq__(other)
49 return self.value == other
51 def __lt__(self, other):
52 if isinstance(other, Status):
53 return super().__lt__(other)
54 return self.value < other
57class UserSkel(skeleton.Skeleton):
58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
60 name = EmailBone(
61 descr="E-Mail",
62 required=True,
63 readOnly=True,
64 caseSensitive=False,
65 searchable=True,
66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
67 )
69 firstname = StringBone(
70 descr="Firstname",
71 searchable=True,
72 )
74 lastname = StringBone(
75 descr="Lastname",
76 searchable=True,
77 )
79 roles = SelectBone(
80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"),
81 values=conf.user.roles,
82 required=True,
83 multiple=True,
84 # fixme: This is generally broken in VIUR! See #776 for details.
85 # vfunc=lambda values:
86 # i18n.translate(
87 # "user.bone.roles.invalid",
88 # defaultText="Invalid role setting: 'custom' can only be set alone.")
89 # if "custom" in values and len(values) > 1 else None,
90 defaultValue=list(conf.user.roles.keys())[:1],
91 )
93 access = SelectBone(
94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"),
95 type_suffix="access",
96 values=lambda: {
97 right: i18n.translate(f"viur.modules.user.accessright.{right}", defaultText=right)
98 for right in sorted(conf.user.access_rights)
99 },
100 multiple=True,
101 params={
102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
103 }
104 )
106 status = SelectBone(
107 descr="Account status",
108 values=Status,
109 defaultValue=Status.ACTIVE,
110 required=True,
111 )
113 lastlogin = DateBone(
114 descr="Last Login",
115 readOnly=True,
116 )
118 admin_config = JsonBone( # This bone stores settings from the vi
119 descr="Config for the User",
120 visible=False
121 )
123 def __new__(cls, *args, **kwargs):
124 """
125 Constructor for the UserSkel-class, with the capability
126 to dynamically add bones required for the configured
127 authentication methods.
128 """
129 for provider in conf.main_app.vi.user.authenticationProviders:
130 assert issubclass(provider, UserPrimaryAuthentication)
131 provider.patch_user_skel(cls)
133 for provider in conf.main_app.vi.user.secondFactorProviders:
134 assert issubclass(provider, UserSecondFactorAuthentication)
135 provider.patch_user_skel(cls)
137 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
138 return super().__new__(cls, *args, **kwargs)
140 @classmethod
141 def write(cls, skel, *args, **kwargs):
142 # Roles
143 if skel["roles"] and "custom" not in skel["roles"]:
144 # Collect access rights through rules
145 access = set()
147 for role in skel["roles"]:
148 # Get default access for this role
149 access |= conf.main_app.vi.user.get_role_defaults(role)
151 # Go through all modules and evaluate available role-settings
152 for name in dir(conf.main_app.vi):
153 if name.startswith("_"):
154 continue
156 module = getattr(conf.main_app.vi, name)
157 if not isinstance(module, Module):
158 continue
160 roles = getattr(module, "roles", None) or {}
161 rights = roles.get(role, roles.get("*", ()))
163 # Convert role into tuple if it's not
164 if not isinstance(rights, (tuple, list)):
165 rights = (rights, )
167 if "*" in rights:
168 for right in module.accessRights:
169 access.add(f"{name}-{right}")
170 else:
171 for right in rights:
172 if right in module.accessRights:
173 access.add(f"{name}-{right}")
175 # special case: "edit" and "delete" actions require "view" as well!
176 if right in ("edit", "delete") and "view" in module.accessRights:
177 access.add(f"{name}-view")
179 skel["access"] = list(access)
181 return super().write(skel, *args, **kwargs)
184class UserAuthentication(Module, abc.ABC):
185 @property
186 @abc.abstractstaticmethod
187 def METHOD_NAME() -> str:
188 """
189 Define a unique method name for this authentication.
190 """
191 ...
193 def __init__(self, moduleName, modulePath, userModule):
194 super().__init__(moduleName, modulePath)
195 self._user_module = userModule
197 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
198 return True
200 @classmethod
201 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
202 """
203 Allows for an UserAuthentication to patch the UserSkel
204 class with additional bones which are required for
205 the implemented authentication method.
206 """
207 ...
210class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
211 """Abstract class for all primary authentication methods."""
212 registrationEnabled = False
214 @abc.abstractmethod
215 def login(self, *args, **kwargs):
216 ...
218 def next_or_finish(self, skel: skeleton.SkeletonInstance):
219 """
220 Hook that is called whenever a part of the authentication was successful.
221 It allows to perform further steps in custom authentications,
222 e.g. change a password after first login.
223 """
224 return self._user_module.continueAuthenticationFlow(self, skel["key"])
227class UserPassword(UserPrimaryAuthentication):
228 METHOD_NAME = "X-VIUR-AUTH-User-Password"
230 registrationEmailVerificationRequired = True
231 registrationAdminVerificationRequired = True
233 verifySuccessTemplate = "user_verify_success"
234 verifyEmailAddressMail = "user_verify_address"
235 verifyFailedTemplate = "user_verify_failed"
236 passwordRecoveryTemplate = "user_passwordrecover"
237 passwordRecoveryMail = "user_password_recovery"
238 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
239 passwordRecoveryStep1Template = "user_passwordrecover_step1"
240 passwordRecoveryStep2Template = "user_passwordrecover_step2"
241 passwordRecoveryStep3Template = "user_passwordrecover_step3"
243 # The default rate-limit for password recovery (10 tries each 15 minutes)
244 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
246 # Limit (invalid) login-retries to once per 5 seconds
247 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
249 @classmethod
250 def patch_user_skel(cls, skel_cls):
251 """
252 Modifies the UserSkel to be equipped by a PasswordBone.
253 """
254 skel_cls.password = PasswordBone(
255 readOnly=True,
256 visible=False,
257 params={
258 "category": "Authentication",
259 }
260 )
262 class LoginSkel(skeleton.RelSkel):
263 name = EmailBone(
264 descr="E-Mail",
265 required=True,
266 caseSensitive=False,
267 )
268 password = PasswordBone(
269 required=True,
270 test_threshold=0,
271 )
273 class LostPasswordStep1Skel(skeleton.RelSkel):
274 name = EmailBone(
275 descr="E-Mail",
276 required=True,
277 )
279 class LostPasswordStep2Skel(skeleton.RelSkel):
280 recovery_key = StringBone(
281 descr="Recovery Key",
282 required=True,
283 params={
284 "tooltip": i18n.translate(
285 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey",
286 defaultText="Please enter the validation key you've received via e-mail.",
287 hint="Shown when the user needs more than 15 minutes to paste the key",
288 ),
289 }
290 )
292 class LostPasswordStep3Skel(skeleton.RelSkel):
293 # send the recovery key again, in case the password is rejected by some reason.
294 recovery_key = StringBone(
295 descr="Recovery Key",
296 visible=False,
297 readOnly=True,
298 )
300 password = PasswordBone(
301 descr="New Password",
302 required=True,
303 params={
304 "tooltip": i18n.translate(
305 key="viur.modules.user.userpassword.lostpasswordstep3.password",
306 defaultText="Please enter a new password for your account.",
307 ),
308 }
309 )
311 @exposed
312 @force_ssl
313 @skey(allow_empty=True)
314 def login(self, *, name: str | None = None, password: str | None = None, **kwargs):
315 if not name or not password:
316 return self._user_module.render.login(self.LoginSkel(), action="login")
318 self.loginRateLimit.assertQuotaIsAvailable()
320 # query for the username. The query might find another user, but the name is being checked for equality below
321 name = name.lower().strip()
322 user_skel = self._user_module.baseSkel()
323 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
325 # extract password hash from raw database entity (skeleton access blocks it)
326 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
327 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
328 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"]
330 # now check if the username matches
331 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
333 # next, check if the password hash matches
334 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
336 if not is_okay:
337 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
338 return self._user_module.render.login(self.LoginSkel(), action="login")
340 # check if iterations are below current security standards, and update if necessary.
341 if iterations < PBKDF2_DEFAULT_ITERATIONS:
342 logging.info(f"Update password hash for user {name}.")
343 # re-hash the password with more iterations
344 # FIXME: This must be done within a transaction!
345 user_skel["password"] = password # will be hashed on serialize
346 user_skel.write(update_relations=False)
348 return self.next_or_finish(user_skel)
350 @exposed
351 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs):
352 """
353 This implements a password recovery process which lets users set a new password for their account,
354 after validating a recovery key sent by email.
356 The process is as following:
358 - The user enters his email adress
359 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode
360 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
361 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user
362 account exists.
363 - If the user received his email, he can click on the link and set a new password for his account.
365 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function
366 to 10 actions per 15 minutes. (One complete recovery process consists of two calls).
367 """
368 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
369 current_request = current.request.get()
371 if recovery_key is None:
372 # This is the first step, where we ask for the username of the account we'll going to reset the password on
373 skel = self.LostPasswordStep1Skel()
375 if not current_request.isPostRequest or not skel.fromClient(kwargs):
376 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template)
378 # validate security key
379 if not securitykey.validate(skey):
380 raise errors.PreconditionFailed()
382 self.passwordRecoveryRateLimit.decrementQuota()
384 recovery_key = securitykey.create(
385 duration=datetime.timedelta(minutes=15),
386 key_length=conf.security.password_recovery_key_length,
387 user_name=skel["name"].lower(),
388 session_bound=False,
389 )
391 # Send the code in background
392 self.sendUserPasswordRecoveryCode(
393 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
394 )
396 # step 2 is only an action-skel, and can be ignored by a direct link in the
397 # e-mail previously sent. It depends on the implementation of the specific project.
398 return self._user_module.render.edit(
399 self.LostPasswordStep2Skel(),
400 tpl=self.passwordRecoveryStep2Template,
401 )
403 # in step 3
404 skel = self.LostPasswordStep3Skel()
405 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails.
407 # check for any input; Render input-form again when incomplete.
408 if not skel.fromClient(kwargs) or not current_request.isPostRequest:
409 return self._user_module.render.edit(
410 skel=skel,
411 tpl=self.passwordRecoveryStep3Template,
412 )
414 # validate security key
415 if not securitykey.validate(skey):
416 raise errors.PreconditionFailed()
418 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
419 raise errors.PreconditionFailed(
420 i18n.translate(
421 key="viur.modules.user.passwordrecovery.keyexpired",
422 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
423 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
424 )
425 )
427 self.passwordRecoveryRateLimit.decrementQuota()
429 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
430 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
432 if not user_skel:
433 raise errors.NotFound(
434 i18n.translate(
435 key="viur.modules.user.passwordrecovery.usernotfound",
436 defaultText="There is no account with this name",
437 hint="We cant find an account with that name (Should never happen)"
438 )
439 )
441 # If the account is locked or not yet validated, abort the process.
442 if not self._user_module.is_active(user_skel):
443 raise errors.NotFound(
444 i18n.translate(
445 key="viur.modules.user.passwordrecovery.accountlocked",
446 defaultText="This account is currently locked. You cannot change its password.",
447 hint="Attempted password recovery on a locked account"
448 )
449 )
451 # Update the password, save the user, reset his session and show the success-template
452 user_skel["password"] = skel["password"]
453 user_skel.write(update_relations=False)
455 return self._user_module.render.view(
456 None,
457 tpl=self.passwordRecoverySuccessTemplate,
458 )
460 @tasks.CallDeferred
461 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
462 """
463 Sends the given recovery code to the user given in userName. This function runs deferred
464 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
465 code by email (assuming we have working email delivery), but this can be overridden to send it
466 by SMS or other means. We'll also update the changedate for this user, so no more than one code
467 can be send to any given user in four hours.
468 """
469 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
470 user_agent = user_agents.parse(user_agent)
471 email.send_email(
472 tpl=self.passwordRecoveryMail,
473 skel=user_skel,
474 dests=[user_name],
475 recovery_key=recovery_key,
476 user_agent={
477 "device": user_agent.get_device(),
478 "os": user_agent.get_os(),
479 "browser": user_agent.get_browser()
480 }
481 )
483 @exposed
484 @skey(forward_payload="data", session_bound=False)
485 def verify(self, data):
486 def transact(key):
487 skel = self._user_module.editSkel()
488 if not key or not skel.read(key):
489 return None
491 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
492 if self.registrationAdminVerificationRequired else Status.ACTIVE
494 skel.write(update_relations=False)
495 return skel
497 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))):
498 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
500 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
502 def canAdd(self) -> bool:
503 return self.registrationEnabled
505 def addSkel(self):
506 """
507 Prepare the add-Skel for rendering.
508 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
509 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
510 :return: viur.core.skeleton.Skeleton
511 """
512 skel = self._user_module.addSkel()
514 if self.registrationEmailVerificationRequired:
515 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
516 elif self.registrationAdminVerificationRequired:
517 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
518 else: # No further verification required
519 defaultStatusValue = Status.ACTIVE
521 skel.status.readOnly = True
522 skel["status"] = defaultStatusValue
524 if "password" in skel:
525 skel.password.required = True # The user will have to set a password
527 return skel
529 @force_ssl
530 @exposed
531 @skey(allow_empty=True)
532 def add(self, *args, **kwargs):
533 """
534 Allows guests to register a new account if self.registrationEnabled is set to true
536 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
538 :returns: The rendered, added object of the entry, eventually with error hints.
540 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
541 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
542 """
543 if not self.canAdd():
544 raise errors.Unauthorized()
545 skel = self.addSkel()
546 if (
547 not kwargs # no data supplied
548 or not current.request.get().isPostRequest # bail out if not using POST-method
549 or not skel.fromClient(kwargs) # failure on reading into the bones
550 or utils.parse.bool(kwargs.get("bounce")) # review before adding
551 ):
552 # render the skeleton in the version it could as far as it could be read.
553 return self._user_module.render.add(skel)
554 self._user_module.onAdd(skel)
555 skel.write()
556 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
557 # The user will have to verify his email-address. Create a skey and send it to his address
558 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
559 user_key=utils.normalizeKey(skel["key"]),
560 name=skel["name"])
561 skel.skey = BaseBone(descr="Skey")
562 skel["skey"] = skey
563 email.send_email(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel)
564 self._user_module.onAdded(skel) # Call onAdded on our parent user module
565 return self._user_module.render.addSuccess(skel)
568class GoogleAccount(UserPrimaryAuthentication):
569 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
571 @classmethod
572 def patch_user_skel(cls, skel_cls):
573 """
574 Modifies the UserSkel to be equipped by a bones required by Google Auth
575 """
576 skel_cls.uid = StringBone(
577 descr="Google UserID",
578 required=False,
579 readOnly=True,
580 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
581 params={
582 "category": "Authentication",
583 }
584 )
586 skel_cls.sync = BooleanBone(
587 descr="Sync user data with OAuth-based services",
588 defaultValue=True,
589 params={
590 "category": "Authentication",
591 "tooltip":
592 "If set, user data like firstname and lastname is automatically kept"
593 "synchronous with the information stored at the OAuth service provider"
594 "(e.g. Google Login)."
595 }
596 )
598 @exposed
599 @force_ssl
600 @skey(allow_empty=True)
601 def login(self, token: str | None = None, *args, **kwargs):
602 if not conf.user.google_client_id:
603 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
605 if not token:
606 request = current.request.get()
607 request.response.headers["Content-Type"] = "text/html"
608 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
609 # We have to allow popups here
610 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
612 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
613 with open(file_path) as file:
614 tpl_string = file.read()
616 # FIXME: Use Jinja2 for rendering?
617 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
618 extendCsp({
619 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
620 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
621 })
622 return tpl_string
624 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
625 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
626 raise ValueError("Invalid issuer")
628 # Token looks valid :)
629 uid = user_info["sub"]
630 email = user_info["email"]
632 base_skel = self._user_module.baseSkel()
633 update = False
634 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
635 # We'll try again - checking if there's already an user with that email
636 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
637 # Still no luck - it's a completely new user
638 if not self.registrationEnabled:
639 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
640 logging.debug(f"Google user is from allowed {domain} - adding account")
641 else:
642 logging.debug(f"Google user is from {domain} - denying registration")
643 raise errors.Forbidden("Registration for new users is disabled")
645 user_skel = base_skel
646 user_skel["uid"] = uid
647 user_skel["name"] = email
648 update = True
650 # Take user information from Google, if wanted!
651 if user_skel["sync"]:
652 for target, source in {
653 "name": email,
654 "firstname": user_info.get("given_name"),
655 "lastname": user_info.get("family_name"),
656 }.items():
658 if user_skel[target] != source:
659 user_skel[target] = source
660 update = True
662 if update:
663 assert user_skel.write()
665 return self.next_or_finish(user_skel)
668class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
669 """Abstract class for all second factors."""
670 MAX_RETRY = 3
671 second_factor_login_template = "user_login_secondfactor"
672 """Template to enter the TOPT on login"""
674 @property
675 @abc.abstractmethod
676 def NAME(self) -> str:
677 """Name for this factor for templates."""
678 ...
680 @property
681 @abc.abstractmethod
682 def ACTION_NAME(self) -> str:
683 """The action name for this factor, used as path-segment."""
684 ...
686 def __init__(self, moduleName, modulePath, _user_module):
687 super().__init__(moduleName, modulePath, _user_module)
688 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
689 self.add_url = f"{self.modulePath}/add"
690 self.start_url = f"{self.modulePath}/start"
693class TimeBasedOTP(UserSecondFactorAuthentication):
694 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
695 WINDOW_SIZE = 5
696 ACTION_NAME = "otp"
697 NAME = "Time based Otp"
698 second_factor_login_template = "user_login_secondfactor"
700 @dataclasses.dataclass
701 class OtpConfig:
702 """
703 This dataclass is used to provide an interface for a OTP token
704 algorithm description that is passed within the TimeBasedOTP
705 class for configuration.
706 """
707 secret: str
708 timedrift: float = 0.0
709 algorithm: t.Literal["sha1", "sha256"] = "sha1"
710 interval: int = 60
712 class OtpSkel(skeleton.RelSkel):
713 """
714 This is the Skeleton used to ask for the OTP token.
715 """
716 otptoken = NumericBone(
717 descr="Token",
718 required=True,
719 max=999999,
720 min=0,
721 )
723 @classmethod
724 def patch_user_skel(cls, skel_cls):
725 """
726 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
727 """
728 # One-Time Password Verification
729 skel_cls.otp_serial = StringBone(
730 descr="OTP serial",
731 searchable=True,
732 params={
733 "category": "Second Factor Authentication",
734 }
735 )
737 skel_cls.otp_secret = CredentialBone(
738 descr="OTP secret",
739 params={
740 "category": "Second Factor Authentication",
741 }
742 )
744 skel_cls.otp_timedrift = NumericBone(
745 descr="OTP time drift",
746 readOnly=True,
747 defaultValue=0,
748 params={
749 "category": "Second Factor Authentication",
750 }
751 )
753 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
754 """
755 Returns an instance of self.OtpConfig with a provided token configuration,
756 or None when there is no appropriate configuration of this second factor handler available.
757 """
759 if otp_secret := skel.dbEntity.get("otp_secret"):
760 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
762 return None
764 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
765 """
766 Specified whether the second factor authentication can be handled by the given user or not.
767 """
768 return bool(self.get_config(skel))
770 @exposed
771 def start(self):
772 """
773 Configures OTP login for the current session.
775 A special otp_user_conf has to be specified as a dict, which is stored into the session.
776 """
777 session = current.session.get()
779 if not (user_key := session.get("possible_user_key")):
780 raise errors.PreconditionFailed(
781 "Second factor can only be triggered after successful primary authentication."
782 )
784 user_skel = self._user_module.baseSkel()
785 if not user_skel.read(user_key):
786 raise errors.NotFound("The previously authenticated user is gone.")
788 if not (otp_user_conf := self.get_config(user_skel)):
789 raise errors.PreconditionFailed("This second factor is not available for the user")
791 otp_user_conf = {
792 "key": str(user_key),
793 } | dataclasses.asdict(otp_user_conf)
795 session = current.session.get()
796 session["_otp_user"] = otp_user_conf
797 session.markChanged()
799 return self._user_module.render.edit(
800 self.OtpSkel(),
801 params={
802 "name": i18n.translate(self.NAME),
803 "action_name": self.ACTION_NAME,
804 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
805 },
806 tpl=self.second_factor_login_template
807 )
809 @exposed
810 @force_ssl
811 @skey(allow_empty=True)
812 def otp(self, *args, **kwargs):
813 """
814 Performs the second factor validation and interaction with the client.
815 """
816 session = current.session.get()
817 if not (otp_user_conf := session.get("_otp_user")):
818 raise errors.PreconditionFailed("No OTP process started in this session")
820 # Check if maximum second factor verification attempts
821 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
822 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
824 # Read the OTP token via the skeleton, to obtain a valid value
825 skel = self.OtpSkel()
826 if skel.fromClient(kwargs):
827 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
828 res = self.verify(
829 otp=skel["otptoken"],
830 secret=otp_user_conf["secret"],
831 algorithm=otp_user_conf.get("algorithm") or "sha1",
832 interval=otp_user_conf.get("interval") or 60,
833 timedrift=otp_user_conf.get("timedrift") or 0.0,
834 valid_window=self.WINDOW_SIZE
835 )
836 else:
837 res = None
839 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
840 if res is None:
841 otp_user_conf["attempts"] = attempts + 1
842 session.markChanged()
843 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
844 return self._user_module.render.edit(
845 skel,
846 name=i18n.translate(self.NAME),
847 action_name=self.ACTION_NAME,
848 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
849 tpl=self.second_factor_login_template
850 )
852 # Remove otp user config from session
853 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
854 del session["_otp_user"]
855 session.markChanged()
857 # Check if the OTP device has a time drift
859 timedriftchange = float(res) - otp_user_conf["timedrift"]
860 if abs(timedriftchange) > 2:
861 # The time-drift change accumulates to more than 2 minutes (for interval==60):
862 # update clock-drift value accordingly
863 self.updateTimeDrift(user_key, timedriftchange)
865 # Continue with authentication
866 return self._user_module.secondFactorSucceeded(self, user_key)
868 @staticmethod
869 def verify(
870 otp: str | int,
871 secret: str,
872 algorithm: str = "sha1",
873 interval: int = 60,
874 timedrift: float = 0.0,
875 for_time: datetime.datetime | None = None,
876 valid_window: int = 0,
877 ) -> int | None:
878 """
879 Verifies the OTP passed in against the current time OTP.
881 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
882 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
883 on the hardware token generator. This can be used to store the time drift of a given token generator.
885 :param otp: the OTP token to check against
886 :param secret: The OTP secret
887 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
888 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In
889 pyotp, default is 30!
890 :param timedrift: The known timedrift (old index) of the hardware OTP generator
891 :param for_time: Time to check OTP at (defaults to now)
892 :param valid_window: extends the validity to this many counter ticks before and after the current one
893 :returns: The index where verification succeeded, None otherwise
894 """
895 # get the hashing digest
896 digest = {
897 "sha1": hashlib.sha1,
898 "sha256": hashlib.sha256,
899 }.get(algorithm)
901 if not digest:
902 raise errors.NotImplemented(f"{algorithm=} is not implemented")
904 if for_time is None:
905 for_time = datetime.datetime.now()
907 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
908 timedrift = round(timedrift)
909 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
910 otp = str(otp).zfill(6) # fill with zeros in front
912 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
913 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
915 if valid_window:
916 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
917 token = str(totp.at(for_time, offset))
918 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
919 if hmac.compare_digest(otp, token):
920 return offset
922 return None
924 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
926 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
927 """
928 Updates the clock-drift value.
929 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
930 it out of bounds. Maximum change per call is 0.3 minutes.
931 :param user_key: For which user should the update occour
932 :param idx: How many steps before/behind was that token
933 :return:
934 """
936 # FIXME: The callback in viur-core must be improved, to accept user_skel
938 def transaction(user_key, idx):
939 user = db.Get(user_key)
940 if not isinstance(user.get("otp_timedrift"), float):
941 user["otp_timedrift"] = 0.0
942 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3)
943 db.Put(user)
945 db.RunInTransaction(transaction, user_key, idx)
948class AuthenticatorOTP(UserSecondFactorAuthentication):
949 """
950 This class handles the second factor for apps like authy and so on
951 """
952 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
954 second_factor_add_template = "user_secondfactor_add"
955 """Template to configure (add) a new TOPT"""
957 ACTION_NAME = "authenticator_otp"
958 """Action name provided for *otp_template* on login"""
960 NAME = "Authenticator App"
962 @exposed
963 @force_ssl
964 @skey(allow_empty=True)
965 def add(self, otp=None):
966 """
967 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store
968 it in the session.
970 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
971 the otp_app_secret from the session in the user entry.
972 """
973 current_session = current.session.get()
975 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
976 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
977 current_session["_maybe_otp_app_secret"] = otp_app_secret
978 current_session.markChanged()
980 if otp is None:
981 return self._user_module.render.second_factor_add(
982 tpl=self.second_factor_add_template,
983 action_name=self.ACTION_NAME,
984 name=i18n.translate(self.NAME),
985 add_url=self.add_url,
986 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
987 else:
988 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
989 return self._user_module.render.second_factor_add(
990 tpl=self.second_factor_add_template,
991 action_name=self.ACTION_NAME,
992 name=i18n.translate(self.NAME),
993 add_url=self.add_url,
994 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
996 # Now we can set the otp_app_secret to the current User and render der Success-template
997 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
998 return self._user_module.render.second_factor_add_success(
999 action_name=self.ACTION_NAME,
1000 name=i18n.translate(self.NAME),
1001 )
1003 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1004 """
1005 We can only handle the second factor if we have stored an otp_app_secret before.
1006 """
1007 return bool(skel.dbEntity.get("otp_app_secret", ""))
1009 @classmethod
1010 def patch_user_skel(cls, skel_cls):
1011 """
1012 Modifies the UserSkel to be equipped by bones required by Authenticator App
1013 """
1014 # Authenticator OTP Apps (like Authy)
1015 skel_cls.otp_app_secret = CredentialBone(
1016 descr="OTP Secret (App-Key)",
1017 params={
1018 "category": "Second Factor Authentication",
1019 }
1020 )
1022 @classmethod
1023 def set_otp_app_secret(cls, otp_app_secret=None):
1024 """
1025 Write a new OTP Token in the current user entry.
1026 """
1027 if otp_app_secret is None:
1028 logging.error("No 'otp_app_secret' is provided")
1029 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1030 if not (cuser := current.user.get()):
1031 raise errors.Unauthorized()
1033 def transaction(user_key):
1034 if not (user := db.Get(user_key)):
1035 raise errors.NotFound()
1036 user["otp_app_secret"] = otp_app_secret
1037 db.Put(user)
1039 db.RunInTransaction(transaction, cuser["key"])
1041 @classmethod
1042 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1043 """
1044 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1045 """
1046 if not (cuser := current.user.get()):
1047 raise errors.Unauthorized()
1048 if not (issuer := conf.user.otp_issuer):
1049 logging.warning(
1050 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1051 issuer = conf.instance.project_id
1053 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1055 @classmethod
1056 def generate_otp_app_secret(cls) -> str:
1057 """
1058 Generate a new OTP Secret
1059 :return an otp
1060 """
1061 return pyotp.random_base32()
1063 @classmethod
1064 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1065 return pyotp.TOTP(secret).verify(otp)
1067 @exposed
1068 def start(self):
1069 otp_user_conf = {"attempts": 0}
1070 session = current.session.get()
1071 session["_otp_user"] = otp_user_conf
1072 session.markChanged()
1073 return self._user_module.render.edit(
1074 TimeBasedOTP.OtpSkel(),
1075 params={
1076 "name": i18n.translate(self.NAME),
1077 "action_name": self.ACTION_NAME,
1078 "action_url": self.action_url,
1079 },
1080 tpl=self.second_factor_login_template,
1081 )
1083 @exposed
1084 @force_ssl
1085 @skey
1086 def authenticator_otp(self, **kwargs):
1087 """
1088 We verify the otp here with the secret we stored before.
1089 """
1090 session = current.session.get()
1091 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1093 if not (otp_user_conf := session.get("_otp_user")):
1094 raise errors.PreconditionFailed("No OTP process started in this session")
1096 # Check if maximum second factor verification attempts
1097 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1098 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1100 if not (user := db.Get(user_key)):
1101 raise errors.NotFound()
1103 skel = TimeBasedOTP.OtpSkel()
1104 if not skel.fromClient(kwargs):
1105 raise errors.PreconditionFailed()
1106 otp_token = str(skel["otptoken"]).zfill(6)
1108 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1109 return self._user_module.secondFactorSucceeded(self, user_key)
1110 otp_user_conf["attempts"] = attempts + 1
1111 session.markChanged()
1112 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1113 return self._user_module.render.edit(
1114 skel,
1115 name=i18n.translate(self.NAME),
1116 action_name=self.ACTION_NAME,
1117 action_url=self.action_url,
1118 tpl=self.second_factor_login_template,
1119 )
1122class User(List):
1123 """
1124 The User module is used to manage and authenticate users in a ViUR system.
1126 It is used in almost any ViUR project, but ViUR can also function without any user capabilites.
1127 """
1129 kindName = "user"
1130 addTemplate = "user_add"
1131 addSuccessTemplate = "user_add_success"
1132 lostPasswordTemplate = "user_lostpassword"
1133 verifyEmailAddressMail = "user_verify_address"
1134 passwordRecoveryMail = "user_password_recovery"
1136 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1137 None, (
1138 UserPassword,
1139 conf.user.google_client_id and GoogleAccount,
1140 )
1141 ))
1142 """
1143 Specifies primary authentication providers that are made available
1144 as sub-modules under `user/auth_<classname>`. They might require
1145 customization or configuration.
1146 """
1148 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1149 TimeBasedOTP,
1150 AuthenticatorOTP,
1151 )
1152 """
1153 Specifies secondary authentication providers that are made available
1154 as sub-modules under `user/f2_<classname>`. They might require
1155 customization or configuration, which is determined during the
1156 login-process depending on the user that wants to login.
1157 """
1159 validAuthenticationMethods = tuple(filter(
1160 None, (
1161 (UserPassword, AuthenticatorOTP),
1162 (UserPassword, TimeBasedOTP),
1163 (UserPassword, None),
1164 (GoogleAccount, None) if conf.user.google_client_id else None,
1165 )
1166 ))
1167 """
1168 Specifies the possible combinations of primary- and secondary factor
1169 login methos.
1171 GoogleLogin defaults to no second factor, as the Google Account can be
1172 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1173 handled when there is a user-dependent configuration available.
1174 """
1176 msg_missing_second_factor = "Second factor required but not configured for this user."
1178 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1180 default_order = "name.idx"
1182 adminInfo = {
1183 "icon": "person-fill",
1184 "actions": [
1185 "trigger_kick",
1186 "trigger_takeover",
1187 ],
1188 "customActions": {
1189 "trigger_kick": {
1190 "name": i18n.translate(
1191 key="viur.modules.user.customActions.kick",
1192 defaultText="Kick user",
1193 hint="Title of the kick user function"
1194 ),
1195 "icon": "trash2-fill",
1196 "access": ["root"],
1197 "action": "fetch",
1198 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}",
1199 "confirm": i18n.translate(
1200 key="viur.modules.user.customActions.kick.confirm",
1201 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1202 ),
1203 "success": i18n.translate(
1204 key="viur.modules.user.customActions.kick.success",
1205 defaultText="Sessions of the user are being invalidated.",
1206 ),
1207 },
1208 "trigger_takeover": {
1209 "name": i18n.translate(
1210 key="viur.modules.user.customActions.takeover",
1211 defaultText="Take-over user",
1212 hint="Title of the take user over function"
1213 ),
1214 "icon": "file-person-fill",
1215 "access": ["root"],
1216 "action": "fetch",
1217 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}",
1218 "confirm": i18n.translate(
1219 key="viur.modules.user.customActions.takeover.confirm",
1220 defaultText="Do you really want to replace your current user session by a "
1221 "user session of the selected user?",
1222 ),
1223 "success": i18n.translate(
1224 key="viur.modules.user.customActions.takeover.success",
1225 defaultText="You're now know as the selected user!",
1226 ),
1227 "then": "reload-vi",
1228 },
1229 },
1230 }
1232 roles = {
1233 "admin": "*",
1234 }
1236 def __init__(self, moduleName, modulePath):
1237 for provider in self.authenticationProviders:
1238 assert issubclass(provider, UserPrimaryAuthentication)
1239 name = f"auth_{provider.__name__.lower()}"
1240 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1242 for provider in self.secondFactorProviders:
1243 assert issubclass(provider, UserSecondFactorAuthentication)
1244 name = f"f2_{provider.__name__.lower()}"
1245 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1247 super().__init__(moduleName, modulePath)
1249 def get_role_defaults(self, role: str) -> set[str]:
1250 """
1251 Returns a set of default access rights for a given role.
1253 Defaults to "admin" usage for any role > "user"
1254 and "scriptor" usage for "admin" role.
1255 """
1256 ret = set()
1258 if role in ("viewer", "editor", "admin"):
1259 ret.add("admin")
1261 if role == "admin":
1262 ret.add("scriptor")
1264 return ret
1266 def addSkel(self):
1267 skel = super().addSkel().clone()
1268 user = current.user.get()
1269 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])):
1270 skel.status.readOnly = True
1271 skel["status"] = Status.UNSET
1272 skel.status.visible = False
1273 skel.access.readOnly = True
1274 skel["access"] = []
1275 skel.access.visible = False
1276 else:
1277 # An admin tries to add a new user.
1278 skel.status.readOnly = False
1279 skel.status.visible = True
1280 skel.access.readOnly = False
1281 skel.access.visible = True
1283 if "password" in skel:
1284 # Unlock and require a password
1285 skel.password.required = True
1286 skel.password.visible = True
1287 skel.password.readOnly = False
1289 skel.name.readOnly = False # Don't enforce readonly name in user/add
1290 return skel
1292 def editSkel(self, *args, **kwargs):
1293 skel = super().editSkel().clone()
1295 if "password" in skel:
1296 skel.password.required = False
1297 skel.password.visible = True
1298 skel.password.readOnly = False
1300 user = current.user.get()
1302 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only
1303 skel.name.readOnly = lockFields
1304 skel.access.readOnly = lockFields
1305 skel.status.readOnly = lockFields
1307 return skel
1309 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1310 return getattr(self, f"f2_{cls.__name__.lower()}")
1312 def getCurrentUser(self):
1313 session = current.session.get()
1315 if session and session.loaded and (user := session.get("user")):
1316 skel = self.baseSkel()
1317 skel.setEntity(user)
1318 return skel
1320 return None
1322 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1323 """
1324 Continue authentication flow when primary authentication succeeded.
1325 """
1326 skel = self.baseSkel()
1328 if not skel.read(user_key):
1329 raise errors.NotFound("User was not found.")
1331 if not provider.can_handle(skel):
1332 raise errors.Forbidden("User is not allowed to use this primary login method.")
1334 session = current.session.get()
1335 session["possible_user_key"] = user_key.id_or_name
1336 session["_secondFactorStart"] = utils.utcNow()
1337 session.markChanged()
1339 second_factor_providers = []
1341 for auth_provider, second_factor in self.validAuthenticationMethods:
1342 if isinstance(provider, auth_provider):
1343 if second_factor is not None:
1344 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1345 if second_factor_provider_instance.can_handle(skel):
1346 second_factor_providers.append(second_factor_provider_instance)
1347 else:
1348 second_factor_providers.append(None)
1350 if len(second_factor_providers) > 1 and None in second_factor_providers:
1351 # We have a second factor. So we can get rid of the None
1352 second_factor_providers.pop(second_factor_providers.index(None))
1354 if len(second_factor_providers) == 0:
1355 raise errors.NotAcceptable(self.msg_missing_second_factor)
1356 elif len(second_factor_providers) == 1:
1357 if second_factor_providers[0] is None:
1358 # We allow sign-in without a second factor
1359 return self.authenticateUser(user_key)
1360 # We have only one second factor we don't need the choice template
1361 return second_factor_providers[0].start(user_key)
1363 # In case there is more than one second factor, let the user select a method.
1364 return self.render.second_factor_choice(second_factors=second_factor_providers)
1366 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1367 """
1368 Continue authentication flow when secondary authentication succeeded.
1369 """
1370 session = current.session.get()
1371 if session["possible_user_key"] != user_key.id_or_name:
1372 raise errors.Forbidden()
1374 # Assert that the second factor verification finished in time
1375 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1376 raise errors.RequestTimeout()
1378 return self.authenticateUser(user_key)
1380 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None:
1381 """
1382 Hookable check if a user is defined as "active" and can login.
1384 :param skel: The UserSkel of the user who wants to login.
1385 """
1386 if "status" in skel:
1387 status = skel["status"]
1388 if not isinstance(status, (Status, int)):
1389 try:
1390 status = int(status)
1391 except ValueError:
1392 status = Status.UNSET
1394 return status >= Status.ACTIVE.value
1396 return None
1398 def authenticateUser(self, key: db.Key, **kwargs):
1399 """
1400 Performs Log-In for the current session and the given user key.
1402 This resets the current session: All fields not explicitly marked as persistent
1403 by conf.user.session_persistent_fields_on_login are gone afterwards.
1405 :param key: The (DB-)Key of the user we shall authenticate
1406 """
1407 skel = self.baseSkel()
1408 if not skel.read(key):
1409 raise ValueError(f"Unable to authenticate unknown user {key}")
1411 # Verify that this user account is active
1412 if not self.is_active(skel):
1413 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1415 # Update session for user
1416 session = current.session.get()
1417 # Remember persistent fields...
1418 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1419 session.reset()
1420 # and copy them over to the new session
1421 session |= take_over
1423 # Update session, user and request
1424 session["user"] = skel.dbEntity
1426 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1427 current.user.set(self.getCurrentUser())
1429 self.onLogin(skel)
1431 return self.render.loginSucceeded(**kwargs)
1433 @exposed
1434 @skey
1435 def logout(self, *args, **kwargs):
1436 """
1437 Implements the logout action. It also terminates the current session (all keys not listed
1438 in viur.session_persistent_fields_on_logout will be lost).
1439 """
1440 if not (user := current.user.get()):
1441 raise errors.Unauthorized()
1443 self.onLogout(user)
1445 session = current.session.get()
1446 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}:
1447 session.reset()
1448 session |= take_over
1449 else:
1450 session.clear()
1451 current.user.set(None) # set user to none in context var
1452 return self.render.logoutSuccess()
1454 @exposed
1455 def login(self, *args, **kwargs):
1456 return self.render.loginChoices([
1457 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1458 for primary, secondary in self.validAuthenticationMethods
1459 ])
1461 def onLogin(self, skel: skeleton.SkeletonInstance):
1462 """
1463 Hook to be called on user login.
1464 """
1465 # Update the lastlogin timestamp (if available!)
1466 if "lastlogin" in skel:
1467 now = utils.utcNow()
1469 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1470 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1471 skel["lastlogin"] = now
1472 skel.write(update_relations=False)
1474 logging.info(f"""User {skel["name"]} logged in""")
1476 def onLogout(self, skel: skeleton.SkeletonInstance):
1477 """
1478 Hook to be called on user logout.
1479 """
1480 logging.info(f"""User {skel["name"]} logged out""")
1482 @exposed
1483 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1484 """
1485 Allow a special key "self" to reference the current user.
1487 By default, any authenticated user can view its own user entry,
1488 to obtain access rights and any specific user information.
1489 This behavior is defined in the customized `canView` function,
1490 which is overwritten by the User-module.
1492 The rendered skeleton can be modified or restriced by specifying
1493 a customized view-skeleton.
1494 """
1495 if key == "self":
1496 if user := current.user.get():
1497 key = user["key"]
1498 else:
1499 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1501 return super().view(key, *args, **kwargs)
1503 def canView(self, skel) -> bool:
1504 if user := current.user.get():
1505 if skel["key"] == user["key"]:
1506 return True
1508 if "root" in user["access"] or "user-view" in user["access"]:
1509 return True
1511 return False
1513 @exposed
1514 @skey(allow_empty=True)
1515 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1516 """
1517 Allow a special key "self" to reference the current user.
1519 This modification will only allow to use "self" as a key;
1520 The specific access right to let the user edit itself must
1521 still be customized.
1523 The rendered and editable skeleton can be modified or restriced
1524 by specifying a customized edit-skeleton.
1525 """
1526 if key == "self":
1527 if user := current.user.get():
1528 key = user["key"]
1529 else:
1530 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1532 return super().edit(key, *args, **kwargs)
1534 @exposed
1535 def getAuthMethods(self, *args, **kwargs):
1536 """Inform tools like Viur-Admin which authentication to use"""
1537 # FIXME: This is almost the same code as in index()...
1538 # FIXME: VIUR4: The entire function should be removed!
1539 # TODO: Align result with index(), so that primary and secondary login is presented.
1540 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!")
1542 res = [
1543 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1544 for primary, secondary in self.validAuthenticationMethods
1545 ]
1547 return json.dumps(res)
1549 @exposed
1550 @skey
1551 def trigger(self, action: str, key: str):
1552 current.request.get().response.headers["Content-Type"] = "application/json"
1554 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1555 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", )
1556 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)):
1557 raise errors.Unauthorized()
1559 skel = self.baseSkel()
1560 if not skel.read(key):
1561 raise errors.NotFound()
1563 match action:
1564 case "takeover":
1565 self.authenticateUser(skel["key"])
1567 case "kick":
1568 session.killSessionByUser(skel["key"])
1570 case _:
1571 raise errors.NotImplemented(f"Action {action!r} not implemented")
1573 return json.dumps("OKAY")
1575 def onEdited(self, skel):
1576 super().onEdited(skel)
1578 # In case the user is set to inactive, kill all sessions
1579 if self.is_active(skel) is False:
1580 session.killSessionByUser(skel["key"])
1582 def onDeleted(self, skel):
1583 super().onDeleted(skel)
1584 # Invalidate all sessions of that user
1585 session.killSessionByUser(skel["key"])
1588@tasks.StartupTask
1589def createNewUserIfNotExists():
1590 """
1591 Create a new Admin user, if the userDB is empty
1592 """
1593 if (
1594 (user_module := getattr(conf.main_app.vi, "user", None))
1595 and isinstance(user_module, User)
1596 and "addSkel" in dir(user_module)
1597 and "validAuthenticationMethods" in dir(user_module)
1598 # UserPassword must be one of the primary login methods
1599 and any(
1600 issubclass(provider[0], UserPassword)
1601 for provider in user_module.validAuthenticationMethods
1602 )
1603 ):
1604 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1605 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1606 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1607 pw = utils.string.random(13)
1608 addSkel["name"] = uname
1609 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1610 addSkel["access"] = ["root"]
1611 addSkel["password"] = pw
1613 try:
1614 addSkel.write()
1615 except Exception as e:
1616 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1617 logging.exception(e)
1618 return
1620 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1622 logging.warning(msg)
1623 email.send_email_to_admins("New ViUR password", msg)
1626# DEPRECATED ATTRIBUTES HANDLING
1628def __getattr__(attr):
1629 match attr:
1630 case "userSkel":
1631 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1632 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1633 logging.warning(msg)
1634 return UserSkel
1636 return super(__import__(__name__).__class__).__getattr__(attr)