Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%
705 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1import abc
2import datetime
3import enum
4import functools
5import hashlib
6import hmac
7import json
8import logging
9import secrets
10import warnings
11import user_agents
13import pyotp
14import base64
15import dataclasses
16import typing as t
17from google.auth.transport import requests
18from google.oauth2 import id_token
20from viur.core import (
21 conf, current, db, email, errors, i18n,
22 securitykey, session, skeleton, tasks, utils, Module
23)
24from viur.core.decorators import *
25from viur.core.bones import *
26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
27from viur.core.prototypes.list import List
28from viur.core.ratelimit import RateLimit
29from viur.core.securityheaders import extendCsp
32@functools.total_ordering
33class Status(enum.Enum):
34 """Status enum for a user
36 Has backwards compatibility to be comparable with non-enum values.
37 Will be removed with viur-core 4.0.0
38 """
40 UNSET = 0 # Status is unset
41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
43 DISABLED = 5 # Account disabled
44 ACTIVE = 10 # Active
46 def __eq__(self, other):
47 if isinstance(other, Status):
48 return super().__eq__(other)
49 return self.value == other
51 def __lt__(self, other):
52 if isinstance(other, Status):
53 return super().__lt__(other)
54 return self.value < other
57class UserSkel(skeleton.Skeleton):
58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
60 name = EmailBone(
61 descr="E-Mail",
62 required=True,
63 readOnly=True,
64 caseSensitive=False,
65 searchable=True,
66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
67 )
69 firstname = StringBone(
70 descr="Firstname",
71 searchable=True,
72 )
74 lastname = StringBone(
75 descr="Lastname",
76 searchable=True,
77 )
79 roles = SelectBone(
80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"),
81 values=conf.user.roles,
82 required=True,
83 multiple=True,
84 # fixme: This is generally broken in VIUR! See #776 for details.
85 # vfunc=lambda values:
86 # i18n.translate(
87 # "user.bone.roles.invalid",
88 # defaultText="Invalid role setting: 'custom' can only be set alone.")
89 # if "custom" in values and len(values) > 1 else None,
90 defaultValue=list(conf.user.roles.keys())[:1],
91 )
93 access = SelectBone(
94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"),
95 type_suffix="access",
96 values=lambda: {
97 right: i18n.translate(f"server.modules.user.accessright.{right}", defaultText=right)
98 for right in sorted(conf.user.access_rights)
99 },
100 multiple=True,
101 params={
102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
103 }
104 )
106 status = SelectBone(
107 descr="Account status",
108 values=Status,
109 defaultValue=Status.ACTIVE,
110 required=True,
111 )
113 lastlogin = DateBone(
114 descr="Last Login",
115 readOnly=True,
116 )
118 admin_config = JsonBone( # This bone stores settings from the vi
119 descr="Config for the User",
120 visible=False
121 )
123 def __new__(cls):
124 """
125 Constructor for the UserSkel-class, with the capability
126 to dynamically add bones required for the configured
127 authentication methods.
128 """
129 for provider in conf.main_app.vi.user.authenticationProviders:
130 assert issubclass(provider, UserPrimaryAuthentication)
131 provider.patch_user_skel(cls)
133 for provider in conf.main_app.vi.user.secondFactorProviders:
134 assert issubclass(provider, UserSecondFactorAuthentication)
135 provider.patch_user_skel(cls)
137 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
138 return super().__new__(cls)
140 @classmethod
141 def toDB(cls, skel, *args, **kwargs):
142 # Roles
143 if skel["roles"] and "custom" not in skel["roles"]:
144 # Collect access rights through rules
145 access = set()
147 for role in skel["roles"]:
148 # Get default access for this role
149 access |= conf.main_app.vi.user.get_role_defaults(role)
151 # Go through all modules and evaluate available role-settings
152 for name in dir(conf.main_app.vi):
153 if name.startswith("_"):
154 continue
156 module = getattr(conf.main_app.vi, name)
157 if not isinstance(module, Module):
158 continue
160 roles = getattr(module, "roles", None) or {}
161 rights = roles.get(role, roles.get("*", ()))
163 # Convert role into tuple if it's not
164 if not isinstance(rights, (tuple, list)):
165 rights = (rights, )
167 if "*" in rights:
168 for right in module.accessRights:
169 access.add(f"{name}-{right}")
170 else:
171 for right in rights:
172 if right in module.accessRights:
173 access.add(f"{name}-{right}")
175 # special case: "edit" and "delete" actions require "view" as well!
176 if right in ("edit", "delete") and "view" in module.accessRights:
177 access.add(f"{name}-view")
179 skel["access"] = list(access)
181 return super().toDB(skel, *args, **kwargs)
184class UserAuthentication(Module, abc.ABC):
185 @property
186 @abc.abstractstaticmethod
187 def METHOD_NAME() -> str:
188 """
189 Define a unique method name for this authentication.
190 """
191 ...
193 def __init__(self, moduleName, modulePath, userModule):
194 super().__init__(moduleName, modulePath)
195 self._user_module = userModule
197 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
198 return True
200 @classmethod
201 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
202 """
203 Allows for an UserAuthentication to patch the UserSkel
204 class with additional bones which are required for
205 the implemented authentication method.
206 """
207 ...
210class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
211 """Abstract class for all primary authentication methods."""
212 registrationEnabled = False
214 @abc.abstractmethod
215 def login(self, *args, **kwargs):
216 ...
218 def next_or_finish(self, skel: skeleton.SkeletonInstance):
219 """
220 Hook that is called whenever a part of the authentication was successful.
221 It allows to perform further steps in custom authentications,
222 e.g. change a password after first login.
223 """
224 return self._user_module.continueAuthenticationFlow(self, skel["key"])
227class UserPassword(UserPrimaryAuthentication):
228 METHOD_NAME = "X-VIUR-AUTH-User-Password"
230 registrationEmailVerificationRequired = True
231 registrationAdminVerificationRequired = True
233 verifySuccessTemplate = "user_verify_success"
234 verifyEmailAddressMail = "user_verify_address"
235 verifyFailedTemplate = "user_verify_failed"
236 passwordRecoveryTemplate = "user_passwordrecover"
237 passwordRecoveryMail = "user_password_recovery"
238 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
239 passwordRecoveryStep1Template = "user_passwordrecover_step1"
240 passwordRecoveryStep2Template = "user_passwordrecover_step2"
241 passwordRecoveryStep3Template = "user_passwordrecover_step3"
243 # The default rate-limit for password recovery (10 tries each 15 minutes)
244 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
246 # Limit (invalid) login-retries to once per 5 seconds
247 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
249 @classmethod
250 def patch_user_skel(cls, skel_cls):
251 """
252 Modifies the UserSkel to be equipped by a PasswordBone.
253 """
254 skel_cls.password = PasswordBone(
255 readOnly=True,
256 visible=False,
257 params={
258 "category": "Authentication",
259 }
260 )
262 class LoginSkel(skeleton.RelSkel):
263 name = EmailBone(
264 descr="E-Mail",
265 required=True,
266 caseSensitive=False,
267 )
268 password = PasswordBone(
269 required=True,
270 test_threshold=0,
271 )
273 class LostPasswordStep1Skel(skeleton.RelSkel):
274 name = EmailBone(
275 descr="E-Mail",
276 required=True,
277 )
279 class LostPasswordStep2Skel(skeleton.RelSkel):
280 recovery_key = StringBone(
281 descr="Recovery Key",
282 required=True,
283 params={
284 "tooltip": i18n.translate(
285 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey",
286 defaultText="Please enter the validation key you've received via e-mail.",
287 hint="Shown when the user needs more than 15 minutes to paste the key",
288 ),
289 }
290 )
292 class LostPasswordStep3Skel(skeleton.RelSkel):
293 # send the recovery key again, in case the password is rejected by some reason.
294 recovery_key = StringBone(
295 descr="Recovery Key",
296 visible=False,
297 readOnly=True,
298 )
300 password = PasswordBone(
301 descr="New Password",
302 required=True,
303 params={
304 "tooltip": i18n.translate(
305 key="viur.modules.user.userpassword.lostpasswordstep3.password",
306 defaultText="Please enter a new password for your account.",
307 ),
308 }
309 )
311 @exposed
312 @force_ssl
313 @skey(allow_empty=True)
314 def login(self, *, name: str | None = None, password: str | None = None, **kwargs):
315 if current.user.get(): # User is already logged in, nothing to do.
316 return self._user_module.render.loginSucceeded()
318 if not name or not password:
319 return self._user_module.render.login(self.LoginSkel(), action="login")
321 self.loginRateLimit.assertQuotaIsAvailable()
323 # query for the username. The query might find another user, but the name is being checked for equality below
324 name = name.lower().strip()
325 user_skel = self._user_module.baseSkel()
326 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
328 # extract password hash from raw database entity (skeleton access blocks it)
329 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
330 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
331 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"]
333 # now check if the username matches
334 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
336 # next, check if the password hash matches
337 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
339 # next, check if the user account is active
340 is_okay &= (user_skel["status"] or 0) >= Status.ACTIVE.value
342 if not is_okay:
343 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
344 skel = self.LoginSkel()
345 return self._user_module.render.login(
346 skel,
347 action="login",
348 loginFailed=True, # FIXME: Is this still being used?
349 accountStatus=user_skel["status"] # FIXME: Is this still being used?
350 )
352 # check if iterations are below current security standards, and update if necessary.
353 if iterations < PBKDF2_DEFAULT_ITERATIONS:
354 logging.info(f"Update password hash for user {name}.")
355 # re-hash the password with more iterations
356 # FIXME: This must be done within a transaction!
357 user_skel["password"] = password # will be hashed on serialize
358 user_skel.toDB(update_relations=False)
360 return self.next_or_finish(user_skel)
362 @exposed
363 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs):
364 """
365 This implements a password recovery process which lets users set a new password for their account,
366 after validating a recovery key sent by email.
368 The process is as following:
370 - The user enters his email adress
371 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode
372 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
373 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user
374 account exists.
375 - If the user received his email, he can click on the link and set a new password for his account.
377 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function
378 to 10 actions per 15 minutes. (One complete recovery process consists of two calls).
379 """
380 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
381 current_request = current.request.get()
383 if recovery_key is None:
384 # This is the first step, where we ask for the username of the account we'll going to reset the password on
385 skel = self.LostPasswordStep1Skel()
387 if not current_request.isPostRequest or not skel.fromClient(kwargs):
388 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template)
390 # validate security key
391 if not securitykey.validate(skey):
392 raise errors.PreconditionFailed()
394 self.passwordRecoveryRateLimit.decrementQuota()
396 recovery_key = securitykey.create(
397 duration=datetime.timedelta(minutes=15),
398 key_length=conf.security.password_recovery_key_length,
399 user_name=skel["name"].lower(),
400 session_bound=False,
401 )
403 # Send the code in background
404 self.sendUserPasswordRecoveryCode(
405 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
406 )
408 # step 2 is only an action-skel, and can be ignored by a direct link in the
409 # e-mail previously sent. It depends on the implementation of the specific project.
410 return self._user_module.render.edit(
411 self.LostPasswordStep2Skel(),
412 tpl=self.passwordRecoveryStep2Template,
413 )
415 # in step 3
416 skel = self.LostPasswordStep3Skel()
417 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails.
419 # check for any input; Render input-form again when incomplete.
420 if not skel.fromClient(kwargs) or not current_request.isPostRequest:
421 return self._user_module.render.edit(
422 skel=skel,
423 tpl=self.passwordRecoveryStep3Template,
424 )
426 # validate security key
427 if not securitykey.validate(skey):
428 raise errors.PreconditionFailed()
430 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
431 raise errors.PreconditionFailed(
432 i18n.translate(
433 key="viur.modules.user.passwordrecovery.keyexpired",
434 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
435 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
436 )
437 )
439 self.passwordRecoveryRateLimit.decrementQuota()
441 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
442 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
444 if not user_skel:
445 raise errors.NotFound(
446 i18n.translate(
447 key="viur.modules.user.passwordrecovery.usernotfound",
448 defaultText="There is no account with this name",
449 hint="We cant find an account with that name (Should never happen)"
450 )
451 )
453 if user_skel["status"] != Status.ACTIVE: # The account is locked or not yet validated. Abort the process.
454 raise errors.NotFound(
455 i18n.translate(
456 key="viur.modules.user.passwordrecovery.accountlocked",
457 defaultText="This account is currently locked. You cannot change it's password.",
458 hint="Attempted password recovery on a locked account"
459 )
460 )
462 # Update the password, save the user, reset his session and show the success-template
463 user_skel["password"] = skel["password"]
464 user_skel.toDB(update_relations=False)
466 return self._user_module.render.view(
467 None,
468 tpl=self.passwordRecoverySuccessTemplate,
469 )
471 @tasks.CallDeferred
472 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
473 """
474 Sends the given recovery code to the user given in userName. This function runs deferred
475 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
476 code by email (assuming we have working email delivery), but this can be overridden to send it
477 by SMS or other means. We'll also update the changedate for this user, so no more than one code
478 can be send to any given user in four hours.
479 """
480 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
481 user_agent = user_agents.parse(user_agent)
482 email.sendEMail(
483 tpl=self.passwordRecoveryMail,
484 skel=user_skel,
485 dests=[user_name],
486 recovery_key=recovery_key,
487 user_agent={
488 "device": user_agent.get_device(),
489 "os": user_agent.get_os(),
490 "browser": user_agent.get_browser()
491 }
492 )
494 @exposed
495 @skey(forward_payload="data", session_bound=False)
496 def verify(self, data):
497 def transact(key):
498 skel = self._user_module.editSkel()
499 if not key or not skel.fromDB(key):
500 return None
501 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
502 if self.registrationAdminVerificationRequired else Status.ACTIVE
504 skel.toDB(update_relations=False)
505 return skel
507 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))):
508 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
510 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
512 def canAdd(self) -> bool:
513 return self.registrationEnabled
515 def addSkel(self):
516 """
517 Prepare the add-Skel for rendering.
518 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
519 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
520 :return: viur.core.skeleton.Skeleton
521 """
522 skel = self._user_module.addSkel()
524 if self.registrationEmailVerificationRequired:
525 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
526 elif self.registrationAdminVerificationRequired:
527 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
528 else: # No further verification required
529 defaultStatusValue = Status.ACTIVE
531 skel.status.readOnly = True
532 skel["status"] = defaultStatusValue
534 if "password" in skel:
535 skel.password.required = True # The user will have to set a password
537 return skel
539 @force_ssl
540 @exposed
541 @skey(allow_empty=True)
542 def add(self, *args, **kwargs):
543 """
544 Allows guests to register a new account if self.registrationEnabled is set to true
546 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
548 :returns: The rendered, added object of the entry, eventually with error hints.
550 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
551 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
552 """
553 if not self.canAdd():
554 raise errors.Unauthorized()
555 skel = self.addSkel()
556 if (
557 not kwargs # no data supplied
558 or not current.request.get().isPostRequest # bail out if not using POST-method
559 or not skel.fromClient(kwargs) # failure on reading into the bones
560 or utils.parse.bool(kwargs.get("bounce")) # review before adding
561 ):
562 # render the skeleton in the version it could as far as it could be read.
563 return self._user_module.render.add(skel)
564 self._user_module.onAdd(skel)
565 skel.toDB()
566 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
567 # The user will have to verify his email-address. Create a skey and send it to his address
568 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
569 user_key=utils.normalizeKey(skel["key"]),
570 name=skel["name"])
571 skel.skey = BaseBone(descr="Skey")
572 skel["skey"] = skey
573 email.sendEMail(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel)
574 self._user_module.onAdded(skel) # Call onAdded on our parent user module
575 return self._user_module.render.addSuccess(skel)
578class GoogleAccount(UserPrimaryAuthentication):
579 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
581 @classmethod
582 def patch_user_skel(cls, skel_cls):
583 """
584 Modifies the UserSkel to be equipped by a bones required by Google Auth
585 """
586 skel_cls.uid = StringBone(
587 descr="Google UserID",
588 required=False,
589 readOnly=True,
590 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
591 params={
592 "category": "Authentication",
593 }
594 )
596 skel_cls.sync = BooleanBone(
597 descr="Sync user data with OAuth-based services",
598 defaultValue=True,
599 params={
600 "category": "Authentication",
601 "tooltip":
602 "If set, user data like firstname and lastname is automatically kept"
603 "synchronous with the information stored at the OAuth service provider"
604 "(e.g. Google Login)."
605 }
606 )
608 @exposed
609 @force_ssl
610 @skey(allow_empty=True)
611 def login(self, token: str | None = None, *args, **kwargs):
612 # FIXME: Check if already logged in
613 if not conf.user.google_client_id:
614 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
616 if not token:
617 request = current.request.get()
618 request.response.headers["Content-Type"] = "text/html"
619 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
620 # We have to allow popups here
621 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
623 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
624 with open(file_path) as file:
625 tpl_string = file.read()
627 # FIXME: Use Jinja2 for rendering?
628 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
629 extendCsp({
630 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
631 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
632 })
633 return tpl_string
635 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
636 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
637 raise ValueError("Invalid issuer")
639 # Token looks valid :)
640 uid = user_info["sub"]
641 email = user_info["email"]
643 base_skel = self._user_module.baseSkel()
644 update = False
645 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
646 # We'll try again - checking if there's already an user with that email
647 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
648 # Still no luck - it's a completely new user
649 if not self.registrationEnabled:
650 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
651 logging.debug(f"Google user is from allowed {domain} - adding account")
652 else:
653 logging.debug(f"Google user is from {domain} - denying registration")
654 raise errors.Forbidden("Registration for new users is disabled")
656 user_skel = base_skel
657 user_skel["uid"] = uid
658 user_skel["name"] = email
659 update = True
661 # Take user information from Google, if wanted!
662 if user_skel["sync"]:
663 for target, source in {
664 "name": email,
665 "firstname": user_info.get("given_name"),
666 "lastname": user_info.get("family_name"),
667 }.items():
669 if user_skel[target] != source:
670 user_skel[target] = source
671 update = True
673 if update:
674 assert user_skel.toDB()
676 return self.next_or_finish(user_skel)
679class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
680 """Abstract class for all second factors."""
681 MAX_RETRY = 3
682 second_factor_login_template = "user_login_secondfactor"
683 """Template to enter the TOPT on login"""
685 @property
686 @abc.abstractmethod
687 def NAME(self) -> str:
688 """Name for this factor for templates."""
689 ...
691 @property
692 @abc.abstractmethod
693 def ACTION_NAME(self) -> str:
694 """The action name for this factor, used as path-segment."""
695 ...
697 def __init__(self, moduleName, modulePath, _user_module):
698 super().__init__(moduleName, modulePath, _user_module)
699 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
700 self.add_url = f"{self.modulePath}/add"
701 self.start_url = f"{self.modulePath}/start"
704class TimeBasedOTP(UserSecondFactorAuthentication):
705 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
706 WINDOW_SIZE = 5
707 ACTION_NAME = "otp"
708 NAME = "Time based Otp"
709 second_factor_login_template = "user_login_secondfactor"
711 @dataclasses.dataclass
712 class OtpConfig:
713 """
714 This dataclass is used to provide an interface for a OTP token
715 algorithm description that is passed within the TimeBasedOTP
716 class for configuration.
717 """
718 secret: str
719 timedrift: float = 0.0
720 algorithm: t.Literal["sha1", "sha256"] = "sha1"
721 interval: int = 60
723 class OtpSkel(skeleton.RelSkel):
724 """
725 This is the Skeleton used to ask for the OTP token.
726 """
727 otptoken = NumericBone(
728 descr="Token",
729 required=True,
730 max=999999,
731 min=0,
732 )
734 @classmethod
735 def patch_user_skel(cls, skel_cls):
736 """
737 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
738 """
739 # One-Time Password Verification
740 skel_cls.otp_serial = StringBone(
741 descr="OTP serial",
742 searchable=True,
743 params={
744 "category": "Second Factor Authentication",
745 }
746 )
748 skel_cls.otp_secret = CredentialBone(
749 descr="OTP secret",
750 params={
751 "category": "Second Factor Authentication",
752 }
753 )
755 skel_cls.otp_timedrift = NumericBone(
756 descr="OTP time drift",
757 readOnly=True,
758 defaultValue=0,
759 params={
760 "category": "Second Factor Authentication",
761 }
762 )
764 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
765 """
766 Returns an instance of self.OtpConfig with a provided token configuration,
767 or None when there is no appropriate configuration of this second factor handler available.
768 """
770 if otp_secret := skel.dbEntity.get("otp_secret"):
771 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
773 return None
775 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
776 """
777 Specified whether the second factor authentication can be handled by the given user or not.
778 """
779 return bool(self.get_config(skel))
781 @exposed
782 def start(self):
783 """
784 Configures OTP login for the current session.
786 A special otp_user_conf has to be specified as a dict, which is stored into the session.
787 """
788 session = current.session.get()
790 if not (user_key := session.get("possible_user_key")):
791 raise errors.PreconditionFailed(
792 "Second factor can only be triggered after successful primary authentication."
793 )
795 user_skel = self._user_module.baseSkel()
796 if not user_skel.fromDB(user_key):
797 raise errors.NotFound("The previously authenticated user is gone.")
799 if not (otp_user_conf := self.get_config(user_skel)):
800 raise errors.PreconditionFailed("This second factor is not available for the user")
802 otp_user_conf = {
803 "key": str(user_key),
804 } | dataclasses.asdict(otp_user_conf)
806 session = current.session.get()
807 session["_otp_user"] = otp_user_conf
808 session.markChanged()
810 return self._user_module.render.edit(
811 self.OtpSkel(),
812 params={
813 "name": i18n.translate(self.NAME),
814 "action_name": self.ACTION_NAME,
815 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
816 },
817 tpl=self.second_factor_login_template
818 )
820 @exposed
821 @force_ssl
822 @skey(allow_empty=True)
823 def otp(self, *args, **kwargs):
824 """
825 Performs the second factor validation and interaction with the client.
826 """
827 session = current.session.get()
828 if not (otp_user_conf := session.get("_otp_user")):
829 raise errors.PreconditionFailed("No OTP process started in this session")
831 # Check if maximum second factor verification attempts
832 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
833 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
835 # Read the OTP token via the skeleton, to obtain a valid value
836 skel = self.OtpSkel()
837 if skel.fromClient(kwargs):
838 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
839 res = self.verify(
840 otp=skel["otptoken"],
841 secret=otp_user_conf["secret"],
842 algorithm=otp_user_conf.get("algorithm") or "sha1",
843 interval=otp_user_conf.get("interval") or 60,
844 timedrift=otp_user_conf.get("timedrift") or 0.0,
845 valid_window=self.WINDOW_SIZE
846 )
847 else:
848 res = None
850 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
851 if res is None:
852 otp_user_conf["attempts"] = attempts + 1
853 session.markChanged()
854 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
855 return self._user_module.render.edit(
856 skel,
857 name=i18n.translate(self.NAME),
858 action_name=self.ACTION_NAME,
859 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
860 tpl=self.second_factor_login_template
861 )
863 # Remove otp user config from session
864 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
865 del session["_otp_user"]
866 session.markChanged()
868 # Check if the OTP device has a time drift
870 timedriftchange = float(res) - otp_user_conf["timedrift"]
871 if abs(timedriftchange) > 2:
872 # The time-drift change accumulates to more than 2 minutes (for interval==60):
873 # update clock-drift value accordingly
874 self.updateTimeDrift(user_key, timedriftchange)
876 # Continue with authentication
877 return self._user_module.secondFactorSucceeded(self, user_key)
879 @staticmethod
880 def verify(
881 otp: str | int,
882 secret: str,
883 algorithm: str = "sha1",
884 interval: int = 60,
885 timedrift: float = 0.0,
886 for_time: datetime.datetime | None = None,
887 valid_window: int = 0,
888 ) -> int | None:
889 """
890 Verifies the OTP passed in against the current time OTP.
892 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
893 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
894 on the hardware token generator. This can be used to store the time drift of a given token generator.
896 :param otp: the OTP token to check against
897 :param secret: The OTP secret
898 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
899 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In
900 pyotp, default is 30!
901 :param timedrift: The known timedrift (old index) of the hardware OTP generator
902 :param for_time: Time to check OTP at (defaults to now)
903 :param valid_window: extends the validity to this many counter ticks before and after the current one
904 :returns: The index where verification succeeded, None otherwise
905 """
906 # get the hashing digest
907 digest = {
908 "sha1": hashlib.sha1,
909 "sha256": hashlib.sha256,
910 }.get(algorithm)
912 if not digest:
913 raise errors.NotImplemented(f"{algorithm=} is not implemented")
915 if for_time is None:
916 for_time = datetime.datetime.now()
918 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
919 timedrift = round(timedrift)
920 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
921 otp = str(otp).zfill(6) # fill with zeros in front
923 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
924 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
926 if valid_window:
927 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
928 token = str(totp.at(for_time, offset))
929 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
930 if hmac.compare_digest(otp, token):
931 return offset
933 return None
935 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
937 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
938 """
939 Updates the clock-drift value.
940 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
941 it out of bounds. Maximum change per call is 0.3 minutes.
942 :param user_key: For which user should the update occour
943 :param idx: How many steps before/behind was that token
944 :return:
945 """
947 # FIXME: The callback in viur-core must be improved, to accept user_skel
949 def transaction(user_key, idx):
950 user = db.Get(user_key)
951 if not isinstance(user.get("otp_timedrift"), float):
952 user["otp_timedrift"] = 0.0
953 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3)
954 db.Put(user)
956 db.RunInTransaction(transaction, user_key, idx)
959class AuthenticatorOTP(UserSecondFactorAuthentication):
960 """
961 This class handles the second factor for apps like authy and so on
962 """
963 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
965 second_factor_add_template = "user_secondfactor_add"
966 """Template to configure (add) a new TOPT"""
968 ACTION_NAME = "authenticator_otp"
969 """Action name provided for *otp_template* on login"""
971 NAME = "Authenticator App"
973 @exposed
974 @force_ssl
975 @skey(allow_empty=True)
976 def add(self, otp=None):
977 """
978 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store
979 it in the session.
981 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
982 the otp_app_secret from the session in the user entry.
983 """
984 current_session = current.session.get()
986 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
987 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
988 current_session["_maybe_otp_app_secret"] = otp_app_secret
989 current_session.markChanged()
991 if otp is None:
992 return self._user_module.render.second_factor_add(
993 tpl=self.second_factor_add_template,
994 action_name=self.ACTION_NAME,
995 name=i18n.translate(self.NAME),
996 add_url=self.add_url,
997 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
998 else:
999 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
1000 return self._user_module.render.second_factor_add(
1001 tpl=self.second_factor_add_template,
1002 action_name=self.ACTION_NAME,
1003 name=i18n.translate(self.NAME),
1004 add_url=self.add_url,
1005 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
1007 # Now we can set the otp_app_secret to the current User and render der Success-template
1008 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
1009 return self._user_module.render.second_factor_add_success(
1010 action_name=self.ACTION_NAME,
1011 name=i18n.translate(self.NAME),
1012 )
1014 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1015 """
1016 We can only handle the second factor if we have stored an otp_app_secret before.
1017 """
1018 return bool(skel.dbEntity.get("otp_app_secret", ""))
1020 @classmethod
1021 def patch_user_skel(cls, skel_cls):
1022 """
1023 Modifies the UserSkel to be equipped by bones required by Authenticator App
1024 """
1025 # Authenticator OTP Apps (like Authy)
1026 skel_cls.otp_app_secret = CredentialBone(
1027 descr="OTP Secret (App-Key)",
1028 params={
1029 "category": "Second Factor Authentication",
1030 }
1031 )
1033 @classmethod
1034 def set_otp_app_secret(cls, otp_app_secret=None):
1035 """
1036 Write a new OTP Token in the current user entry.
1037 """
1038 if otp_app_secret is None:
1039 logging.error("No 'otp_app_secret' is provided")
1040 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1041 if not (cuser := current.user.get()):
1042 raise errors.Unauthorized()
1044 def transaction(user_key):
1045 if not (user := db.Get(user_key)):
1046 raise errors.NotFound()
1047 user["otp_app_secret"] = otp_app_secret
1048 db.Put(user)
1050 db.RunInTransaction(transaction, cuser["key"])
1052 @classmethod
1053 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1054 """
1055 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1056 """
1057 if not (cuser := current.user.get()):
1058 raise errors.Unauthorized()
1059 if not (issuer := conf.user.otp_issuer):
1060 logging.warning(
1061 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1062 issuer = conf.instance.project_id
1064 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1066 @classmethod
1067 def generate_otp_app_secret(cls) -> str:
1068 """
1069 Generate a new OTP Secret
1070 :return an otp
1071 """
1072 return pyotp.random_base32()
1074 @classmethod
1075 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1076 return pyotp.TOTP(secret).verify(otp)
1078 @exposed
1079 def start(self):
1080 otp_user_conf = {"attempts": 0}
1081 session = current.session.get()
1082 session["_otp_user"] = otp_user_conf
1083 session.markChanged()
1084 return self._user_module.render.edit(
1085 TimeBasedOTP.OtpSkel(),
1086 params={
1087 "name": i18n.translate(self.NAME),
1088 "action_name": self.ACTION_NAME,
1089 "action_url": self.action_url,
1090 },
1091 tpl=self.second_factor_login_template,
1092 )
1094 @exposed
1095 @force_ssl
1096 @skey
1097 def authenticator_otp(self, **kwargs):
1098 """
1099 We verify the otp here with the secret we stored before.
1100 """
1101 session = current.session.get()
1102 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1104 if not (otp_user_conf := session.get("_otp_user")):
1105 raise errors.PreconditionFailed("No OTP process started in this session")
1107 # Check if maximum second factor verification attempts
1108 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1109 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1111 if not (user := db.Get(user_key)):
1112 raise errors.NotFound()
1114 skel = TimeBasedOTP.OtpSkel()
1115 if not skel.fromClient(kwargs):
1116 raise errors.PreconditionFailed()
1117 otp_token = str(skel["otptoken"]).zfill(6)
1119 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1120 return self._user_module.secondFactorSucceeded(self, user_key)
1121 otp_user_conf["attempts"] = attempts + 1
1122 session.markChanged()
1123 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1124 return self._user_module.render.edit(
1125 skel,
1126 name=i18n.translate(self.NAME),
1127 action_name=self.ACTION_NAME,
1128 action_url=self.action_url,
1129 tpl=self.second_factor_login_template,
1130 )
1133class User(List):
1134 kindName = "user"
1135 addTemplate = "user_add"
1136 addSuccessTemplate = "user_add_success"
1137 lostPasswordTemplate = "user_lostpassword"
1138 verifyEmailAddressMail = "user_verify_address"
1139 passwordRecoveryMail = "user_password_recovery"
1141 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1142 None, (
1143 UserPassword,
1144 conf.user.google_client_id and GoogleAccount,
1145 )
1146 ))
1147 """
1148 Specifies primary authentication providers that are made available
1149 as sub-modules under `user/auth_<classname>`. They might require
1150 customization or configuration.
1151 """
1153 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1154 TimeBasedOTP,
1155 AuthenticatorOTP,
1156 )
1157 """
1158 Specifies secondary authentication providers that are made available
1159 as sub-modules under `user/f2_<classname>`. They might require
1160 customization or configuration, which is determined during the
1161 login-process depending on the user that wants to login.
1162 """
1164 validAuthenticationMethods = tuple(filter(
1165 None, (
1166 (UserPassword, AuthenticatorOTP),
1167 (UserPassword, TimeBasedOTP),
1168 (UserPassword, None),
1169 (GoogleAccount, None) if conf.user.google_client_id else None,
1170 )
1171 ))
1172 """
1173 Specifies the possible combinations of primary- and secondary factor
1174 login methos.
1176 GoogleLogin defaults to no second factor, as the Google Account can be
1177 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1178 handled when there is a user-dependent configuration available.
1179 """
1181 msg_missing_second_factor = "Second factor required but not configured for this user."
1183 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1185 default_order = "name.idx"
1187 adminInfo = {
1188 "icon": "person-fill",
1189 "actions": [
1190 "trigger_kick",
1191 "trigger_takeover",
1192 ],
1193 "customActions": {
1194 "trigger_kick": {
1195 "name": i18n.translate(
1196 key="viur.modules.user.customActions.kick",
1197 defaultText="Kick user",
1198 hint="Title of the kick user function"
1199 ),
1200 "icon": "trash2-fill",
1201 "access": ["root"],
1202 "action": "fetch",
1203 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}",
1204 "confirm": i18n.translate(
1205 key="viur.modules.user.customActions.kick.confirm",
1206 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1207 ),
1208 "success": i18n.translate(
1209 key="viur.modules.user.customActions.kick.success",
1210 defaultText="Sessions of the user are being invalidated.",
1211 ),
1212 },
1213 "trigger_takeover": {
1214 "name": i18n.translate(
1215 key="viur.modules.user.customActions.takeover",
1216 defaultText="Take-over user",
1217 hint="Title of the take user over function"
1218 ),
1219 "icon": "file-person-fill",
1220 "access": ["root"],
1221 "action": "fetch",
1222 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}",
1223 "confirm": i18n.translate(
1224 key="viur.modules.user.customActions.takeover.confirm",
1225 defaultText="Do you really want to replace your current user session by a "
1226 "user session of the selected user?",
1227 ),
1228 "success": i18n.translate(
1229 key="viur.modules.user.customActions.takeover.success",
1230 defaultText="You're now know as the selected user!",
1231 ),
1232 "then": "reload-vi",
1233 },
1234 },
1235 }
1237 roles = {
1238 "admin": "*",
1239 }
1241 def __init__(self, moduleName, modulePath):
1242 for provider in self.authenticationProviders:
1243 assert issubclass(provider, UserPrimaryAuthentication)
1244 name = f"auth_{provider.__name__.lower()}"
1245 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1247 for provider in self.secondFactorProviders:
1248 assert issubclass(provider, UserSecondFactorAuthentication)
1249 name = f"f2_{provider.__name__.lower()}"
1250 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1252 super().__init__(moduleName, modulePath)
1254 def get_role_defaults(self, role: str) -> set[str]:
1255 """
1256 Returns a set of default access rights for a given role.
1257 """
1258 if role in ("viewer", "editor", "admin"):
1259 return {"admin"}
1261 return set()
1263 def addSkel(self):
1264 skel = super().addSkel().clone()
1265 user = current.user.get()
1266 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])):
1267 skel.status.readOnly = True
1268 skel["status"] = Status.UNSET
1269 skel.status.visible = False
1270 skel.access.readOnly = True
1271 skel["access"] = []
1272 skel.access.visible = False
1273 else:
1274 # An admin tries to add a new user.
1275 skel.status.readOnly = False
1276 skel.status.visible = True
1277 skel.access.readOnly = False
1278 skel.access.visible = True
1280 if "password" in skel:
1281 # Unlock and require a password
1282 skel.password.required = True
1283 skel.password.visible = True
1284 skel.password.readOnly = False
1286 skel.name.readOnly = False # Don't enforce readonly name in user/add
1287 return skel
1289 def editSkel(self, *args, **kwargs):
1290 skel = super().editSkel().clone()
1292 if "password" in skel:
1293 skel.password.required = False
1294 skel.password.visible = True
1295 skel.password.readOnly = False
1297 user = current.user.get()
1299 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only
1300 skel.name.readOnly = lockFields
1301 skel.access.readOnly = lockFields
1302 skel.status.readOnly = lockFields
1304 return skel
1306 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1307 return getattr(self, f"f2_{cls.__name__.lower()}")
1309 def getCurrentUser(self):
1310 session = current.session.get()
1312 if session and (user := session.get("user")):
1313 skel = self.baseSkel()
1314 skel.setEntity(user)
1315 return skel
1317 return None
1319 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1320 """
1321 Continue authentication flow when primary authentication succeeded.
1322 """
1323 skel = self.baseSkel()
1325 if not skel.fromDB(user_key):
1326 raise errors.NotFound("User was not found.")
1328 if not provider.can_handle(skel):
1329 raise errors.Forbidden("User is not allowed to use this primary login method.")
1331 session = current.session.get()
1332 session["possible_user_key"] = user_key.id_or_name
1333 session["_secondFactorStart"] = utils.utcNow()
1334 session.markChanged()
1336 second_factor_providers = []
1338 for auth_provider, second_factor in self.validAuthenticationMethods:
1339 if isinstance(provider, auth_provider):
1340 if second_factor is not None:
1341 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1342 if second_factor_provider_instance.can_handle(skel):
1343 second_factor_providers.append(second_factor_provider_instance)
1344 else:
1345 second_factor_providers.append(None)
1347 if len(second_factor_providers) > 1 and None in second_factor_providers:
1348 # We have a second factor. So we can get rid of the None
1349 second_factor_providers.pop(second_factor_providers.index(None))
1351 if len(second_factor_providers) == 0:
1352 raise errors.NotAcceptable(self.msg_missing_second_factor)
1353 elif len(second_factor_providers) == 1:
1354 if second_factor_providers[0] is None:
1355 # We allow sign-in without a second factor
1356 return self.authenticateUser(user_key)
1357 # We have only one second factor we don't need the choice template
1358 return second_factor_providers[0].start(user_key)
1360 # In case there is more than one second factor, let the user select a method.
1361 return self.render.second_factor_choice(second_factors=second_factor_providers)
1363 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1364 """
1365 Continue authentication flow when secondary authentication succeeded.
1366 """
1367 session = current.session.get()
1368 if session["possible_user_key"] != user_key.id_or_name:
1369 raise errors.Forbidden()
1371 # Assert that the second factor verification finished in time
1372 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1373 raise errors.RequestTimeout()
1375 return self.authenticateUser(user_key)
1377 def authenticateUser(self, key: db.Key, **kwargs):
1378 """
1379 Performs Log-In for the current session and the given user key.
1381 This resets the current session: All fields not explicitly marked as persistent
1382 by conf.user.session_persistent_fields_on_login are gone afterwards.
1384 :param key: The (DB-)Key of the user we shall authenticate
1385 """
1386 skel = self.baseSkel()
1387 if not skel.fromDB(key):
1388 raise ValueError(f"Unable to authenticate unknown user {key}")
1390 # Verify that this user account is active
1391 if skel["status"] < Status.ACTIVE.value:
1392 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1394 # Update session for user
1395 session = current.session.get()
1396 # Remember persistent fields...
1397 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1398 session.reset()
1399 # and copy them over to the new session
1400 session |= take_over
1402 # Update session, user and request
1403 session["user"] = skel.dbEntity
1405 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1406 current.user.set(self.getCurrentUser())
1408 self.onLogin(skel)
1410 return self.render.loginSucceeded(**kwargs)
1412 @exposed
1413 @skey
1414 def logout(self, *args, **kwargs):
1415 """
1416 Implements the logout action. It also terminates the current session (all keys not listed
1417 in viur.session_persistent_fields_on_logout will be lost).
1418 """
1419 if not (user := current.user.get()):
1420 raise errors.Unauthorized()
1422 self.onLogout(user)
1424 session = current.session.get()
1425 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}
1426 session.reset()
1427 session |= take_over
1428 current.user.set(None) # set user to none in context var
1429 return self.render.logoutSuccess()
1431 @exposed
1432 def login(self, *args, **kwargs):
1433 return self.render.loginChoices([
1434 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1435 for primary, secondary in self.validAuthenticationMethods
1436 ])
1438 def onLogin(self, skel: skeleton.SkeletonInstance):
1439 """
1440 Hook to be called on user login.
1441 """
1442 # Update the lastlogin timestamp (if available!)
1443 if "lastlogin" in skel:
1444 now = utils.utcNow()
1446 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1447 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1448 skel["lastlogin"] = now
1449 skel.toDB(update_relations=False)
1451 logging.info(f"""User {skel["name"]} logged in""")
1453 def onLogout(self, skel: skeleton.SkeletonInstance):
1454 """
1455 Hook to be called on user logout.
1456 """
1457 logging.info(f"""User {skel["name"]} logged out""")
1459 @exposed
1460 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1461 """
1462 Allow a special key "self" to reference the current user.
1464 By default, any authenticated user can view its own user entry,
1465 to obtain access rights and any specific user information.
1466 This behavior is defined in the customized `canView` function,
1467 which is overwritten by the User-module.
1469 The rendered skeleton can be modified or restriced by specifying
1470 a customized view-skeleton.
1471 """
1472 if key == "self":
1473 if user := current.user.get():
1474 key = user["key"]
1475 else:
1476 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1478 return super().view(key, *args, **kwargs)
1480 def canView(self, skel) -> bool:
1481 if user := current.user.get():
1482 if skel["key"] == user["key"]:
1483 return True
1485 if "root" in user["access"] or "user-view" in user["access"]:
1486 return True
1488 return False
1490 @exposed
1491 @skey(allow_empty=True)
1492 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1493 """
1494 Allow a special key "self" to reference the current user.
1496 This modification will only allow to use "self" as a key;
1497 The specific access right to let the user edit itself must
1498 still be customized.
1500 The rendered and editable skeleton can be modified or restriced
1501 by specifying a customized edit-skeleton.
1502 """
1503 if key == "self":
1504 if user := current.user.get():
1505 key = user["key"]
1506 else:
1507 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1509 return super().edit(key, *args, **kwargs)
1511 @exposed
1512 def getAuthMethods(self, *args, **kwargs):
1513 """Inform tools like Viur-Admin which authentication to use"""
1514 # FIXME: This is almost the same code as in index()...
1515 # FIXME: VIUR4: The entire function should be removed!
1516 # TODO: Align result with index(), so that primary and secondary login is presented.
1517 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!")
1519 res = [
1520 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1521 for primary, secondary in self.validAuthenticationMethods
1522 ]
1524 return json.dumps(res)
1526 @exposed
1527 @skey
1528 def trigger(self, action: str, key: str):
1529 current.request.get().response.headers["Content-Type"] = "application/json"
1531 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1532 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", )
1533 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)):
1534 raise errors.Unauthorized()
1536 skel = self.baseSkel()
1537 if not skel.fromDB(key):
1538 raise errors.NotFound()
1540 match action:
1541 case "takeover":
1542 self.authenticateUser(skel["key"])
1544 case "kick":
1545 session.killSessionByUser(skel["key"])
1547 case _:
1548 raise errors.NotImplemented(f"Action {action!r} not implemented")
1550 return json.dumps("OKAY")
1552 def onEdited(self, skel):
1553 super().onEdited(skel)
1554 # In case the user is set to inactive, kill all sessions
1555 if "status" in skel and skel["status"] < Status.ACTIVE.value:
1556 session.killSessionByUser(skel["key"])
1558 def onDeleted(self, skel):
1559 super().onDeleted(skel)
1560 # Invalidate all sessions of that user
1561 session.killSessionByUser(skel["key"])
1564@tasks.StartupTask
1565def createNewUserIfNotExists():
1566 """
1567 Create a new Admin user, if the userDB is empty
1568 """
1569 if (
1570 (user_module := getattr(conf.main_app.vi, "user", None))
1571 and isinstance(user_module, User)
1572 and "addSkel" in dir(user_module)
1573 and "validAuthenticationMethods" in dir(user_module)
1574 # UserPassword must be one of the primary login methods
1575 and any(
1576 issubclass(provider[0], UserPassword)
1577 for provider in user_module.validAuthenticationMethods
1578 )
1579 ):
1580 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1581 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1582 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1583 pw = utils.string.random(13)
1584 addSkel["name"] = uname
1585 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1586 addSkel["access"] = ["root"]
1587 addSkel["password"] = pw
1589 try:
1590 addSkel.toDB()
1591 except Exception as e:
1592 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1593 logging.exception(e)
1594 return
1596 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1598 logging.warning(msg)
1599 email.sendEMailToAdmins("New ViUR password", msg)
1602# DEPRECATED ATTRIBUTES HANDLING
1604def __getattr__(attr):
1605 match attr:
1606 case "userSkel":
1607 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1608 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1609 logging.warning(msg)
1610 return UserSkel
1612 return super(__import__(__name__).__class__).__getattr__(attr)