Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%
705 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-03 13:41 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-03 13:41 +0000
1import abc
2import datetime
3import enum
4import functools
5import hashlib
6import hmac
7import json
8import logging
9import secrets
10import warnings
11import user_agents
13import pyotp
14import base64
15import dataclasses
16import typing as t
17from google.auth.transport import requests
18from google.oauth2 import id_token
20from viur.core import (
21 conf, current, db, email, errors, i18n,
22 securitykey, session, skeleton, tasks, utils, Module
23)
24from viur.core.decorators import *
25from viur.core.bones import *
26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password
27from viur.core.prototypes.list import List
28from viur.core.ratelimit import RateLimit
29from viur.core.securityheaders import extendCsp
32@functools.total_ordering
33class Status(enum.Enum):
34 """Status enum for a user
36 Has backwards compatibility to be comparable with non-enum values.
37 Will be removed with viur-core 4.0.0
38 """
40 UNSET = 0 # Status is unset
41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification
42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin
43 DISABLED = 5 # Account disabled
44 ACTIVE = 10 # Active
46 def __eq__(self, other):
47 if isinstance(other, Status):
48 return super().__eq__(other)
49 return self.value == other
51 def __lt__(self, other):
52 if isinstance(other, Status):
53 return super().__lt__(other)
54 return self.value < other
57class UserSkel(skeleton.Skeleton):
58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604)
60 name = EmailBone(
61 descr="E-Mail",
62 required=True,
63 readOnly=True,
64 caseSensitive=False,
65 searchable=True,
66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"),
67 )
69 firstname = StringBone(
70 descr="Firstname",
71 searchable=True,
72 )
74 lastname = StringBone(
75 descr="Lastname",
76 searchable=True,
77 )
79 roles = SelectBone(
80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"),
81 values=conf.user.roles,
82 required=True,
83 multiple=True,
84 # fixme: This is generally broken in VIUR! See #776 for details.
85 # vfunc=lambda values:
86 # i18n.translate(
87 # "user.bone.roles.invalid",
88 # defaultText="Invalid role setting: 'custom' can only be set alone.")
89 # if "custom" in values and len(values) > 1 else None,
90 defaultValue=list(conf.user.roles.keys())[:1],
91 )
93 access = SelectBone(
94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"),
95 values=lambda: {
96 right: i18n.translate(f"server.modules.user.accessright.{right}", defaultText=right)
97 for right in sorted(conf.user.access_rights)
98 },
99 multiple=True,
100 params={
101 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system
102 }
103 )
105 status = SelectBone(
106 descr="Account status",
107 values=Status,
108 defaultValue=Status.ACTIVE,
109 required=True,
110 )
112 lastlogin = DateBone(
113 descr="Last Login",
114 readOnly=True,
115 )
117 admin_config = JsonBone( # This bone stores settings from the vi
118 descr="Config for the User",
119 visible=False
120 )
122 def __new__(cls):
123 """
124 Constructor for the UserSkel-class, with the capability
125 to dynamically add bones required for the configured
126 authentication methods.
127 """
128 for provider in conf.main_app.vi.user.authenticationProviders:
129 assert issubclass(provider, UserPrimaryAuthentication)
130 provider.patch_user_skel(cls)
132 for provider in conf.main_app.vi.user.secondFactorProviders:
133 assert issubclass(provider, UserSecondFactorAuthentication)
134 provider.patch_user_skel(cls)
136 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls)
137 return super().__new__(cls)
139 @classmethod
140 def toDB(cls, skel, *args, **kwargs):
141 # Roles
142 if skel["roles"] and "custom" not in skel["roles"]:
143 # Collect access rights through rules
144 access = set()
146 for role in skel["roles"]:
147 # Get default access for this role
148 access |= conf.main_app.vi.user.get_role_defaults(role)
150 # Go through all modules and evaluate available role-settings
151 for name in dir(conf.main_app.vi):
152 if name.startswith("_"):
153 continue
155 module = getattr(conf.main_app.vi, name)
156 if not isinstance(module, Module):
157 continue
159 roles = getattr(module, "roles", None) or {}
160 rights = roles.get(role, roles.get("*", ()))
162 # Convert role into tuple if it's not
163 if not isinstance(rights, (tuple, list)):
164 rights = (rights, )
166 if "*" in rights:
167 for right in module.accessRights:
168 access.add(f"{name}-{right}")
169 else:
170 for right in rights:
171 if right in module.accessRights:
172 access.add(f"{name}-{right}")
174 # special case: "edit" and "delete" actions require "view" as well!
175 if right in ("edit", "delete") and "view" in module.accessRights:
176 access.add(f"{name}-view")
178 skel["access"] = list(access)
180 return super().toDB(skel, *args, **kwargs)
183class UserAuthentication(Module, abc.ABC):
184 @property
185 @abc.abstractstaticmethod
186 def METHOD_NAME() -> str:
187 """
188 Define a unique method name for this authentication.
189 """
190 ...
192 def __init__(self, moduleName, modulePath, userModule):
193 super().__init__(moduleName, modulePath)
194 self._user_module = userModule
196 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
197 return True
199 @classmethod
200 def patch_user_skel(cls, skel_cls: skeleton.Skeleton):
201 """
202 Allows for an UserAuthentication to patch the UserSkel
203 class with additional bones which are required for
204 the implemented authentication method.
205 """
206 ...
209class UserPrimaryAuthentication(UserAuthentication, abc.ABC):
210 """Abstract class for all primary authentication methods."""
211 registrationEnabled = False
213 @abc.abstractmethod
214 def login(self, *args, **kwargs):
215 ...
217 def next_or_finish(self, skel: skeleton.SkeletonInstance):
218 """
219 Hook that is called whenever a part of the authentication was successful.
220 It allows to perform further steps in custom authentications,
221 e.g. change a password after first login.
222 """
223 return self._user_module.continueAuthenticationFlow(self, skel["key"])
226class UserPassword(UserPrimaryAuthentication):
227 METHOD_NAME = "X-VIUR-AUTH-User-Password"
229 registrationEmailVerificationRequired = True
230 registrationAdminVerificationRequired = True
232 verifySuccessTemplate = "user_verify_success"
233 verifyEmailAddressMail = "user_verify_address"
234 verifyFailedTemplate = "user_verify_failed"
235 passwordRecoveryTemplate = "user_passwordrecover"
236 passwordRecoveryMail = "user_password_recovery"
237 passwordRecoverySuccessTemplate = "user_passwordrecover_success"
238 passwordRecoveryStep1Template = "user_passwordrecover_step1"
239 passwordRecoveryStep2Template = "user_passwordrecover_step2"
240 passwordRecoveryStep3Template = "user_passwordrecover_step3"
242 # The default rate-limit for password recovery (10 tries each 15 minutes)
243 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip")
245 # Limit (invalid) login-retries to once per 5 seconds
246 loginRateLimit = RateLimit("user.login", 12, 1, "ip")
248 @classmethod
249 def patch_user_skel(cls, skel_cls):
250 """
251 Modifies the UserSkel to be equipped by a PasswordBone.
252 """
253 skel_cls.password = PasswordBone(
254 readOnly=True,
255 visible=False,
256 params={
257 "category": "Authentication",
258 }
259 )
261 class LoginSkel(skeleton.RelSkel):
262 name = EmailBone(
263 descr="E-Mail",
264 required=True,
265 caseSensitive=False,
266 )
267 password = PasswordBone(
268 required=True,
269 test_threshold=0,
270 )
272 class LostPasswordStep1Skel(skeleton.RelSkel):
273 name = EmailBone(
274 descr="E-Mail",
275 required=True,
276 )
278 class LostPasswordStep2Skel(skeleton.RelSkel):
279 recovery_key = StringBone(
280 descr="Recovery Key",
281 required=True,
282 params={
283 "tooltip": i18n.translate(
284 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey",
285 defaultText="Please enter the validation key you've received via e-mail.",
286 hint="Shown when the user needs more than 15 minutes to paste the key",
287 ),
288 }
289 )
291 class LostPasswordStep3Skel(skeleton.RelSkel):
292 # send the recovery key again, in case the password is rejected by some reason.
293 recovery_key = StringBone(
294 descr="Recovery Key",
295 visible=False,
296 readOnly=True,
297 )
299 password = PasswordBone(
300 descr="New Password",
301 required=True,
302 params={
303 "tooltip": i18n.translate(
304 key="viur.modules.user.userpassword.lostpasswordstep3.password",
305 defaultText="Please enter a new password for your account.",
306 ),
307 }
308 )
310 @exposed
311 @force_ssl
312 @skey(allow_empty=True)
313 def login(self, *, name: str | None = None, password: str | None = None, **kwargs):
314 if current.user.get(): # User is already logged in, nothing to do.
315 return self._user_module.render.loginSucceeded()
317 if not name or not password:
318 return self._user_module.render.login(self.LoginSkel(), action="login")
320 self.loginRateLimit.assertQuotaIsAvailable()
322 # query for the username. The query might find another user, but the name is being checked for equality below
323 name = name.lower().strip()
324 user_skel = self._user_module.baseSkel()
325 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel
327 # extract password hash from raw database entity (skeleton access blocks it)
328 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {}
329 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001
330 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"]
332 # now check if the username matches
333 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode())
335 # next, check if the password hash matches
336 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash)
338 # next, check if the user account is active
339 is_okay &= (user_skel["status"] or 0) >= Status.ACTIVE.value
341 if not is_okay:
342 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota
343 skel = self.LoginSkel()
344 return self._user_module.render.login(
345 skel,
346 action="login",
347 loginFailed=True, # FIXME: Is this still being used?
348 accountStatus=user_skel["status"] # FIXME: Is this still being used?
349 )
351 # check if iterations are below current security standards, and update if necessary.
352 if iterations < PBKDF2_DEFAULT_ITERATIONS:
353 logging.info(f"Update password hash for user {name}.")
354 # re-hash the password with more iterations
355 # FIXME: This must be done within a transaction!
356 user_skel["password"] = password # will be hashed on serialize
357 user_skel.toDB(update_relations=False)
359 return self.next_or_finish(user_skel)
361 @exposed
362 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs):
363 """
364 This implements a password recovery process which lets users set a new password for their account,
365 after validating a recovery key sent by email.
367 The process is as following:
369 - The user enters his email adress
370 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode
371 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name
372 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user
373 account exists.
374 - If the user received his email, he can click on the link and set a new password for his account.
376 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function
377 to 10 actions per 15 minutes. (One complete recovery process consists of two calls).
378 """
379 self.passwordRecoveryRateLimit.assertQuotaIsAvailable()
380 current_request = current.request.get()
382 if recovery_key is None:
383 # This is the first step, where we ask for the username of the account we'll going to reset the password on
384 skel = self.LostPasswordStep1Skel()
386 if not current_request.isPostRequest or not skel.fromClient(kwargs):
387 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template)
389 # validate security key
390 if not securitykey.validate(skey):
391 raise errors.PreconditionFailed()
393 self.passwordRecoveryRateLimit.decrementQuota()
395 recovery_key = securitykey.create(
396 duration=datetime.timedelta(minutes=15),
397 key_length=conf.security.password_recovery_key_length,
398 user_name=skel["name"].lower(),
399 session_bound=False,
400 )
402 # Send the code in background
403 self.sendUserPasswordRecoveryCode(
404 skel["name"], recovery_key, current_request.request.headers["User-Agent"]
405 )
407 # step 2 is only an action-skel, and can be ignored by a direct link in the
408 # e-mail previously sent. It depends on the implementation of the specific project.
409 return self._user_module.render.edit(
410 self.LostPasswordStep2Skel(),
411 tpl=self.passwordRecoveryStep2Template,
412 )
414 # in step 3
415 skel = self.LostPasswordStep3Skel()
416 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails.
418 # check for any input; Render input-form again when incomplete.
419 if not skel.fromClient(kwargs) or not current_request.isPostRequest:
420 return self._user_module.render.edit(
421 skel=skel,
422 tpl=self.passwordRecoveryStep3Template,
423 )
425 # validate security key
426 if not securitykey.validate(skey):
427 raise errors.PreconditionFailed()
429 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)):
430 raise errors.PreconditionFailed(
431 i18n.translate(
432 key="viur.modules.user.passwordrecovery.keyexpired",
433 defaultText="The recovery key is expired or invalid. Please start the recovery process again.",
434 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key."
435 )
436 )
438 self.passwordRecoveryRateLimit.decrementQuota()
440 # If we made it here, the key was correct, so we'd hopefully have a valid user for this
441 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel()
443 if not user_skel:
444 raise errors.NotFound(
445 i18n.translate(
446 key="viur.modules.user.passwordrecovery.usernotfound",
447 defaultText="There is no account with this name",
448 hint="We cant find an account with that name (Should never happen)"
449 )
450 )
452 if user_skel["status"] != Status.ACTIVE: # The account is locked or not yet validated. Abort the process.
453 raise errors.NotFound(
454 i18n.translate(
455 key="viur.modules.user.passwordrecovery.accountlocked",
456 defaultText="This account is currently locked. You cannot change it's password.",
457 hint="Attempted password recovery on a locked account"
458 )
459 )
461 # Update the password, save the user, reset his session and show the success-template
462 user_skel["password"] = skel["password"]
463 user_skel.toDB(update_relations=False)
465 return self._user_module.render.view(
466 None,
467 tpl=self.passwordRecoverySuccessTemplate,
468 )
470 @tasks.CallDeferred
471 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None:
472 """
473 Sends the given recovery code to the user given in userName. This function runs deferred
474 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the
475 code by email (assuming we have working email delivery), but this can be overridden to send it
476 by SMS or other means. We'll also update the changedate for this user, so no more than one code
477 can be send to any given user in four hours.
478 """
479 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel():
480 user_agent = user_agents.parse(user_agent)
481 email.sendEMail(
482 tpl=self.passwordRecoveryMail,
483 skel=user_skel,
484 dests=[user_name],
485 recovery_key=recovery_key,
486 user_agent={
487 "device": user_agent.get_device(),
488 "os": user_agent.get_os(),
489 "browser": user_agent.get_browser()
490 }
491 )
493 @exposed
494 @skey(forward_payload="data", session_bound=False)
495 def verify(self, data):
496 def transact(key):
497 skel = self._user_module.editSkel()
498 if not key or not skel.fromDB(key):
499 return None
500 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \
501 if self.registrationAdminVerificationRequired else Status.ACTIVE
503 skel.toDB(update_relations=False)
504 return skel
506 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))):
507 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate)
509 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate)
511 def canAdd(self) -> bool:
512 return self.registrationEnabled
514 def addSkel(self):
515 """
516 Prepare the add-Skel for rendering.
517 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on
518 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired
519 :return: viur.core.skeleton.Skeleton
520 """
521 skel = self._user_module.addSkel()
523 if self.registrationEmailVerificationRequired:
524 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION
525 elif self.registrationAdminVerificationRequired:
526 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION
527 else: # No further verification required
528 defaultStatusValue = Status.ACTIVE
530 skel.status.readOnly = True
531 skel["status"] = defaultStatusValue
533 if "password" in skel:
534 skel.password.required = True # The user will have to set a password
536 return skel
538 @force_ssl
539 @exposed
540 @skey(allow_empty=True)
541 def add(self, *args, **kwargs):
542 """
543 Allows guests to register a new account if self.registrationEnabled is set to true
545 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd`
547 :returns: The rendered, added object of the entry, eventually with error hints.
549 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions.
550 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified.
551 """
552 if not self.canAdd():
553 raise errors.Unauthorized()
554 skel = self.addSkel()
555 if (
556 not kwargs # no data supplied
557 or not current.request.get().isPostRequest # bail out if not using POST-method
558 or not skel.fromClient(kwargs) # failure on reading into the bones
559 or utils.parse.bool(kwargs.get("bounce")) # review before adding
560 ):
561 # render the skeleton in the version it could as far as it could be read.
562 return self._user_module.render.add(skel)
563 self._user_module.onAdd(skel)
564 skel.toDB()
565 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION:
566 # The user will have to verify his email-address. Create a skey and send it to his address
567 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False,
568 user_key=utils.normalizeKey(skel["key"]),
569 name=skel["name"])
570 skel.skey = BaseBone(descr="Skey")
571 skel["skey"] = skey
572 email.sendEMail(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel)
573 self._user_module.onAdded(skel) # Call onAdded on our parent user module
574 return self._user_module.render.addSuccess(skel)
577class GoogleAccount(UserPrimaryAuthentication):
578 METHOD_NAME = "X-VIUR-AUTH-Google-Account"
580 @classmethod
581 def patch_user_skel(cls, skel_cls):
582 """
583 Modifies the UserSkel to be equipped by a bones required by Google Auth
584 """
585 skel_cls.uid = StringBone(
586 descr="Google UserID",
587 required=False,
588 readOnly=True,
589 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"),
590 params={
591 "category": "Authentication",
592 }
593 )
595 skel_cls.sync = BooleanBone(
596 descr="Sync user data with OAuth-based services",
597 defaultValue=True,
598 params={
599 "category": "Authentication",
600 "tooltip":
601 "If set, user data like firstname and lastname is automatically kept"
602 "synchronous with the information stored at the OAuth service provider"
603 "(e.g. Google Login)."
604 }
605 )
607 @exposed
608 @force_ssl
609 @skey(allow_empty=True)
610 def login(self, token: str | None = None, *args, **kwargs):
611 # FIXME: Check if already logged in
612 if not conf.user.google_client_id:
613 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!")
615 if not token:
616 request = current.request.get()
617 request.response.headers["Content-Type"] = "text/html"
618 if request.response.headers.get("cross-origin-opener-policy") == "same-origin":
619 # We have to allow popups here
620 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups"
622 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html")
623 with open(file_path) as file:
624 tpl_string = file.read()
626 # FIXME: Use Jinja2 for rendering?
627 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id)
628 extendCsp({
629 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="],
630 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="]
631 })
632 return tpl_string
634 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id)
635 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}:
636 raise ValueError("Invalid issuer")
638 # Token looks valid :)
639 uid = user_info["sub"]
640 email = user_info["email"]
642 base_skel = self._user_module.baseSkel()
643 update = False
644 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()):
645 # We'll try again - checking if there's already an user with that email
646 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()):
647 # Still no luck - it's a completely new user
648 if not self.registrationEnabled:
649 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains:
650 logging.debug(f"Google user is from allowed {domain} - adding account")
651 else:
652 logging.debug(f"Google user is from {domain} - denying registration")
653 raise errors.Forbidden("Registration for new users is disabled")
655 user_skel = base_skel
656 user_skel["uid"] = uid
657 user_skel["name"] = email
658 update = True
660 # Take user information from Google, if wanted!
661 if user_skel["sync"]:
662 for target, source in {
663 "name": email,
664 "firstname": user_info.get("given_name"),
665 "lastname": user_info.get("family_name"),
666 }.items():
668 if user_skel[target] != source:
669 user_skel[target] = source
670 update = True
672 if update:
673 assert user_skel.toDB()
675 return self.next_or_finish(user_skel)
678class UserSecondFactorAuthentication(UserAuthentication, abc.ABC):
679 """Abstract class for all second factors."""
680 MAX_RETRY = 3
681 second_factor_login_template = "user_login_secondfactor"
682 """Template to enter the TOPT on login"""
684 @property
685 @abc.abstractmethod
686 def NAME(self) -> str:
687 """Name for this factor for templates."""
688 ...
690 @property
691 @abc.abstractmethod
692 def ACTION_NAME(self) -> str:
693 """The action name for this factor, used as path-segment."""
694 ...
696 def __init__(self, moduleName, modulePath, _user_module):
697 super().__init__(moduleName, modulePath, _user_module)
698 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}"
699 self.add_url = f"{self.modulePath}/add"
700 self.start_url = f"{self.modulePath}/start"
703class TimeBasedOTP(UserSecondFactorAuthentication):
704 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP"
705 WINDOW_SIZE = 5
706 ACTION_NAME = "otp"
707 NAME = "Time based Otp"
708 second_factor_login_template = "user_login_secondfactor"
710 @dataclasses.dataclass
711 class OtpConfig:
712 """
713 This dataclass is used to provide an interface for a OTP token
714 algorithm description that is passed within the TimeBasedOTP
715 class for configuration.
716 """
717 secret: str
718 timedrift: float = 0.0
719 algorithm: t.Literal["sha1", "sha256"] = "sha1"
720 interval: int = 60
722 class OtpSkel(skeleton.RelSkel):
723 """
724 This is the Skeleton used to ask for the OTP token.
725 """
726 otptoken = NumericBone(
727 descr="Token",
728 required=True,
729 max=999999,
730 min=0,
731 )
733 @classmethod
734 def patch_user_skel(cls, skel_cls):
735 """
736 Modifies the UserSkel to be equipped by a bones required by Timebased OTP
737 """
738 # One-Time Password Verification
739 skel_cls.otp_serial = StringBone(
740 descr="OTP serial",
741 searchable=True,
742 params={
743 "category": "Second Factor Authentication",
744 }
745 )
747 skel_cls.otp_secret = CredentialBone(
748 descr="OTP secret",
749 params={
750 "category": "Second Factor Authentication",
751 }
752 )
754 skel_cls.otp_timedrift = NumericBone(
755 descr="OTP time drift",
756 readOnly=True,
757 defaultValue=0,
758 params={
759 "category": "Second Factor Authentication",
760 }
761 )
763 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None:
764 """
765 Returns an instance of self.OtpConfig with a provided token configuration,
766 or None when there is no appropriate configuration of this second factor handler available.
767 """
769 if otp_secret := skel.dbEntity.get("otp_secret"):
770 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0)
772 return None
774 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
775 """
776 Specified whether the second factor authentication can be handled by the given user or not.
777 """
778 return bool(self.get_config(skel))
780 @exposed
781 def start(self):
782 """
783 Configures OTP login for the current session.
785 A special otp_user_conf has to be specified as a dict, which is stored into the session.
786 """
787 session = current.session.get()
789 if not (user_key := session.get("possible_user_key")):
790 raise errors.PreconditionFailed(
791 "Second factor can only be triggered after successful primary authentication."
792 )
794 user_skel = self._user_module.baseSkel()
795 if not user_skel.fromDB(user_key):
796 raise errors.NotFound("The previously authenticated user is gone.")
798 if not (otp_user_conf := self.get_config(user_skel)):
799 raise errors.PreconditionFailed("This second factor is not available for the user")
801 otp_user_conf = {
802 "key": str(user_key),
803 } | dataclasses.asdict(otp_user_conf)
805 session = current.session.get()
806 session["_otp_user"] = otp_user_conf
807 session.markChanged()
809 return self._user_module.render.edit(
810 self.OtpSkel(),
811 params={
812 "name": i18n.translate(self.NAME),
813 "action_name": self.ACTION_NAME,
814 "action_url": f"{self.modulePath}/{self.ACTION_NAME}",
815 },
816 tpl=self.second_factor_login_template
817 )
819 @exposed
820 @force_ssl
821 @skey(allow_empty=True)
822 def otp(self, *args, **kwargs):
823 """
824 Performs the second factor validation and interaction with the client.
825 """
826 session = current.session.get()
827 if not (otp_user_conf := session.get("_otp_user")):
828 raise errors.PreconditionFailed("No OTP process started in this session")
830 # Check if maximum second factor verification attempts
831 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
832 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
834 # Read the OTP token via the skeleton, to obtain a valid value
835 skel = self.OtpSkel()
836 if skel.fromClient(kwargs):
837 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP.
838 res = self.verify(
839 otp=skel["otptoken"],
840 secret=otp_user_conf["secret"],
841 algorithm=otp_user_conf.get("algorithm") or "sha1",
842 interval=otp_user_conf.get("interval") or 60,
843 timedrift=otp_user_conf.get("timedrift") or 0.0,
844 valid_window=self.WINDOW_SIZE
845 )
846 else:
847 res = None
849 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0!
850 if res is None:
851 otp_user_conf["attempts"] = attempts + 1
852 session.markChanged()
853 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
854 return self._user_module.render.edit(
855 skel,
856 name=i18n.translate(self.NAME),
857 action_name=self.ACTION_NAME,
858 action_url=f"{self.modulePath}/{self.ACTION_NAME}",
859 tpl=self.second_factor_login_template
860 )
862 # Remove otp user config from session
863 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName)
864 del session["_otp_user"]
865 session.markChanged()
867 # Check if the OTP device has a time drift
869 timedriftchange = float(res) - otp_user_conf["timedrift"]
870 if abs(timedriftchange) > 2:
871 # The time-drift change accumulates to more than 2 minutes (for interval==60):
872 # update clock-drift value accordingly
873 self.updateTimeDrift(user_key, timedriftchange)
875 # Continue with authentication
876 return self._user_module.secondFactorSucceeded(self, user_key)
878 @staticmethod
879 def verify(
880 otp: str | int,
881 secret: str,
882 algorithm: str = "sha1",
883 interval: int = 60,
884 timedrift: float = 0.0,
885 for_time: datetime.datetime | None = None,
886 valid_window: int = 0,
887 ) -> int | None:
888 """
889 Verifies the OTP passed in against the current time OTP.
891 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which
892 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown
893 on the hardware token generator. This can be used to store the time drift of a given token generator.
895 :param otp: the OTP token to check against
896 :param secret: The OTP secret
897 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256)
898 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In
899 pyotp, default is 30!
900 :param timedrift: The known timedrift (old index) of the hardware OTP generator
901 :param for_time: Time to check OTP at (defaults to now)
902 :param valid_window: extends the validity to this many counter ticks before and after the current one
903 :returns: The index where verification succeeded, None otherwise
904 """
905 # get the hashing digest
906 digest = {
907 "sha1": hashlib.sha1,
908 "sha256": hashlib.sha256,
909 }.get(algorithm)
911 if not digest:
912 raise errors.NotImplemented(f"{algorithm=} is not implemented")
914 if for_time is None:
915 for_time = datetime.datetime.now()
917 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index
918 timedrift = round(timedrift)
919 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret
920 otp = str(otp).zfill(6) # fill with zeros in front
922 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}")
923 totp = pyotp.TOTP(secret, digest=digest, interval=interval)
925 if valid_window:
926 for offset in range(timedrift - valid_window, timedrift + valid_window + 1):
927 token = str(totp.at(for_time, offset))
928 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}")
929 if hmac.compare_digest(otp, token):
930 return offset
932 return None
934 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None
936 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None:
937 """
938 Updates the clock-drift value.
939 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew
940 it out of bounds. Maximum change per call is 0.3 minutes.
941 :param user_key: For which user should the update occour
942 :param idx: How many steps before/behind was that token
943 :return:
944 """
946 # FIXME: The callback in viur-core must be improved, to accept user_skel
948 def transaction(user_key, idx):
949 user = db.Get(user_key)
950 if not isinstance(user.get("otp_timedrift"), float):
951 user["otp_timedrift"] = 0.0
952 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3)
953 db.Put(user)
955 db.RunInTransaction(transaction, user_key, idx)
958class AuthenticatorOTP(UserSecondFactorAuthentication):
959 """
960 This class handles the second factor for apps like authy and so on
961 """
962 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP"
964 second_factor_add_template = "user_secondfactor_add"
965 """Template to configure (add) a new TOPT"""
967 ACTION_NAME = "authenticator_otp"
968 """Action name provided for *otp_template* on login"""
970 NAME = "Authenticator App"
972 @exposed
973 @force_ssl
974 @skey(allow_empty=True)
975 def add(self, otp=None):
976 """
977 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store
978 it in the session.
980 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store
981 the otp_app_secret from the session in the user entry.
982 """
983 current_session = current.session.get()
985 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")):
986 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret()
987 current_session["_maybe_otp_app_secret"] = otp_app_secret
988 current_session.markChanged()
990 if otp is None:
991 return self._user_module.render.second_factor_add(
992 tpl=self.second_factor_add_template,
993 action_name=self.ACTION_NAME,
994 name=i18n.translate(self.NAME),
995 add_url=self.add_url,
996 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret))
997 else:
998 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret):
999 return self._user_module.render.second_factor_add(
1000 tpl=self.second_factor_add_template,
1001 action_name=self.ACTION_NAME,
1002 name=i18n.translate(self.NAME),
1003 add_url=self.add_url,
1004 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors
1006 # Now we can set the otp_app_secret to the current User and render der Success-template
1007 AuthenticatorOTP.set_otp_app_secret(otp_app_secret)
1008 return self._user_module.render.second_factor_add_success(
1009 action_name=self.ACTION_NAME,
1010 name=i18n.translate(self.NAME),
1011 )
1013 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool:
1014 """
1015 We can only handle the second factor if we have stored an otp_app_secret before.
1016 """
1017 return bool(skel.dbEntity.get("otp_app_secret", ""))
1019 @classmethod
1020 def patch_user_skel(cls, skel_cls):
1021 """
1022 Modifies the UserSkel to be equipped by bones required by Authenticator App
1023 """
1024 # Authenticator OTP Apps (like Authy)
1025 skel_cls.otp_app_secret = CredentialBone(
1026 descr="OTP Secret (App-Key)",
1027 params={
1028 "category": "Second Factor Authentication",
1029 }
1030 )
1032 @classmethod
1033 def set_otp_app_secret(cls, otp_app_secret=None):
1034 """
1035 Write a new OTP Token in the current user entry.
1036 """
1037 if otp_app_secret is None:
1038 logging.error("No 'otp_app_secret' is provided")
1039 raise errors.PreconditionFailed("No 'otp_app_secret' is provided")
1040 if not (cuser := current.user.get()):
1041 raise errors.Unauthorized()
1043 def transaction(user_key):
1044 if not (user := db.Get(user_key)):
1045 raise errors.NotFound()
1046 user["otp_app_secret"] = otp_app_secret
1047 db.Put(user)
1049 db.RunInTransaction(transaction, cuser["key"])
1051 @classmethod
1052 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str:
1053 """
1054 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example
1055 """
1056 if not (cuser := current.user.get()):
1057 raise errors.Unauthorized()
1058 if not (issuer := conf.user.otp_issuer):
1059 logging.warning(
1060 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}")
1061 issuer = conf.instance.project_id
1063 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer)
1065 @classmethod
1066 def generate_otp_app_secret(cls) -> str:
1067 """
1068 Generate a new OTP Secret
1069 :return an otp
1070 """
1071 return pyotp.random_base32()
1073 @classmethod
1074 def verify_otp(cls, otp: str | int, secret: str) -> bool:
1075 return pyotp.TOTP(secret).verify(otp)
1077 @exposed
1078 def start(self):
1079 otp_user_conf = {"attempts": 0}
1080 session = current.session.get()
1081 session["_otp_user"] = otp_user_conf
1082 session.markChanged()
1083 return self._user_module.render.edit(
1084 TimeBasedOTP.OtpSkel(),
1085 params={
1086 "name": i18n.translate(self.NAME),
1087 "action_name": self.ACTION_NAME,
1088 "action_url": self.action_url,
1089 },
1090 tpl=self.second_factor_login_template,
1091 )
1093 @exposed
1094 @force_ssl
1095 @skey
1096 def authenticator_otp(self, **kwargs):
1097 """
1098 We verify the otp here with the secret we stored before.
1099 """
1100 session = current.session.get()
1101 user_key = db.Key(self._user_module.kindName, session["possible_user_key"])
1103 if not (otp_user_conf := session.get("_otp_user")):
1104 raise errors.PreconditionFailed("No OTP process started in this session")
1106 # Check if maximum second factor verification attempts
1107 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY:
1108 raise errors.Forbidden("Maximum amount of authentication retries exceeded")
1110 if not (user := db.Get(user_key)):
1111 raise errors.NotFound()
1113 skel = TimeBasedOTP.OtpSkel()
1114 if not skel.fromClient(kwargs):
1115 raise errors.PreconditionFailed()
1116 otp_token = str(skel["otptoken"]).zfill(6)
1118 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]):
1119 return self._user_module.secondFactorSucceeded(self, user_key)
1120 otp_user_conf["attempts"] = attempts + 1
1121 session.markChanged()
1122 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])]
1123 return self._user_module.render.edit(
1124 skel,
1125 name=i18n.translate(self.NAME),
1126 action_name=self.ACTION_NAME,
1127 action_url=self.action_url,
1128 tpl=self.second_factor_login_template,
1129 )
1132class User(List):
1133 kindName = "user"
1134 addTemplate = "user_add"
1135 addSuccessTemplate = "user_add_success"
1136 lostPasswordTemplate = "user_lostpassword"
1137 verifyEmailAddressMail = "user_verify_address"
1138 passwordRecoveryMail = "user_password_recovery"
1140 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter(
1141 None, (
1142 UserPassword,
1143 conf.user.google_client_id and GoogleAccount,
1144 )
1145 ))
1146 """
1147 Specifies primary authentication providers that are made available
1148 as sub-modules under `user/auth_<classname>`. They might require
1149 customization or configuration.
1150 """
1152 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = (
1153 TimeBasedOTP,
1154 AuthenticatorOTP,
1155 )
1156 """
1157 Specifies secondary authentication providers that are made available
1158 as sub-modules under `user/f2_<classname>`. They might require
1159 customization or configuration, which is determined during the
1160 login-process depending on the user that wants to login.
1161 """
1163 validAuthenticationMethods = tuple(filter(
1164 None, (
1165 (UserPassword, AuthenticatorOTP),
1166 (UserPassword, TimeBasedOTP),
1167 (UserPassword, None),
1168 (GoogleAccount, None) if conf.user.google_client_id else None,
1169 )
1170 ))
1171 """
1172 Specifies the possible combinations of primary- and secondary factor
1173 login methos.
1175 GoogleLogin defaults to no second factor, as the Google Account can be
1176 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only
1177 handled when there is a user-dependent configuration available.
1178 """
1180 msg_missing_second_factor = "Second factor required but not configured for this user."
1182 secondFactorTimeWindow = datetime.timedelta(minutes=10)
1184 default_order = "name.idx"
1186 adminInfo = {
1187 "icon": "person-fill",
1188 "actions": [
1189 "trigger_kick",
1190 "trigger_takeover",
1191 ],
1192 "customActions": {
1193 "trigger_kick": {
1194 "name": i18n.translate(
1195 key="viur.modules.user.customActions.kick",
1196 defaultText="Kick user",
1197 hint="Title of the kick user function"
1198 ),
1199 "icon": "trash2-fill",
1200 "access": ["root"],
1201 "action": "fetch",
1202 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}",
1203 "confirm": i18n.translate(
1204 key="viur.modules.user.customActions.kick.confirm",
1205 defaultText="Do you really want to drop all sessions of the selected user from the system?",
1206 ),
1207 "success": i18n.translate(
1208 key="viur.modules.user.customActions.kick.success",
1209 defaultText="Sessions of the user are being invalidated.",
1210 ),
1211 },
1212 "trigger_takeover": {
1213 "name": i18n.translate(
1214 key="viur.modules.user.customActions.takeover",
1215 defaultText="Take-over user",
1216 hint="Title of the take user over function"
1217 ),
1218 "icon": "file-person-fill",
1219 "access": ["root"],
1220 "action": "fetch",
1221 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}",
1222 "confirm": i18n.translate(
1223 key="viur.modules.user.customActions.takeover.confirm",
1224 defaultText="Do you really want to replace your current user session by a "
1225 "user session of the selected user?",
1226 ),
1227 "success": i18n.translate(
1228 key="viur.modules.user.customActions.takeover.success",
1229 defaultText="You're now know as the selected user!",
1230 ),
1231 "then": "reload-vi",
1232 },
1233 },
1234 }
1236 roles = {
1237 "admin": "*",
1238 }
1240 def __init__(self, moduleName, modulePath):
1241 for provider in self.authenticationProviders:
1242 assert issubclass(provider, UserPrimaryAuthentication)
1243 name = f"auth_{provider.__name__.lower()}"
1244 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1246 for provider in self.secondFactorProviders:
1247 assert issubclass(provider, UserSecondFactorAuthentication)
1248 name = f"f2_{provider.__name__.lower()}"
1249 setattr(self, name, provider(name, f"{modulePath}/{name}", self))
1251 super().__init__(moduleName, modulePath)
1253 def get_role_defaults(self, role: str) -> set[str]:
1254 """
1255 Returns a set of default access rights for a given role.
1256 """
1257 if role in ("viewer", "editor", "admin"):
1258 return {"admin"}
1260 return set()
1262 def addSkel(self):
1263 skel = super().addSkel().clone()
1264 user = current.user.get()
1265 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])):
1266 skel.status.readOnly = True
1267 skel["status"] = Status.UNSET
1268 skel.status.visible = False
1269 skel.access.readOnly = True
1270 skel["access"] = []
1271 skel.access.visible = False
1272 else:
1273 # An admin tries to add a new user.
1274 skel.status.readOnly = False
1275 skel.status.visible = True
1276 skel.access.readOnly = False
1277 skel.access.visible = True
1279 if "password" in skel:
1280 # Unlock and require a password
1281 skel.password.required = True
1282 skel.password.visible = True
1283 skel.password.readOnly = False
1285 skel.name.readOnly = False # Don't enforce readonly name in user/add
1286 return skel
1288 def editSkel(self, *args, **kwargs):
1289 skel = super().editSkel().clone()
1291 if "password" in skel:
1292 skel.password.required = False
1293 skel.password.visible = True
1294 skel.password.readOnly = False
1296 user = current.user.get()
1298 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only
1299 skel.name.readOnly = lockFields
1300 skel.access.readOnly = lockFields
1301 skel.status.readOnly = lockFields
1303 return skel
1305 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication:
1306 return getattr(self, f"f2_{cls.__name__.lower()}")
1308 def getCurrentUser(self):
1309 session = current.session.get()
1311 if session and (user := session.get("user")):
1312 skel = self.baseSkel()
1313 skel.setEntity(user)
1314 return skel
1316 return None
1318 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key):
1319 """
1320 Continue authentication flow when primary authentication succeeded.
1321 """
1322 skel = self.baseSkel()
1324 if not skel.fromDB(user_key):
1325 raise errors.NotFound("User was not found.")
1327 if not provider.can_handle(skel):
1328 raise errors.Forbidden("User is not allowed to use this primary login method.")
1330 session = current.session.get()
1331 session["possible_user_key"] = user_key.id_or_name
1332 session["_secondFactorStart"] = utils.utcNow()
1333 session.markChanged()
1335 second_factor_providers = []
1337 for auth_provider, second_factor in self.validAuthenticationMethods:
1338 if isinstance(provider, auth_provider):
1339 if second_factor is not None:
1340 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor)
1341 if second_factor_provider_instance.can_handle(skel):
1342 second_factor_providers.append(second_factor_provider_instance)
1343 else:
1344 second_factor_providers.append(None)
1346 if len(second_factor_providers) > 1 and None in second_factor_providers:
1347 # We have a second factor. So we can get rid of the None
1348 second_factor_providers.pop(second_factor_providers.index(None))
1350 if len(second_factor_providers) == 0:
1351 raise errors.NotAcceptable(self.msg_missing_second_factor)
1352 elif len(second_factor_providers) == 1:
1353 if second_factor_providers[0] is None:
1354 # We allow sign-in without a second factor
1355 return self.authenticateUser(user_key)
1356 # We have only one second factor we don't need the choice template
1357 return second_factor_providers[0].start(user_key)
1359 # In case there is more than one second factor, let the user select a method.
1360 return self.render.second_factor_choice(second_factors=second_factor_providers)
1362 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key):
1363 """
1364 Continue authentication flow when secondary authentication succeeded.
1365 """
1366 session = current.session.get()
1367 if session["possible_user_key"] != user_key.id_or_name:
1368 raise errors.Forbidden()
1370 # Assert that the second factor verification finished in time
1371 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow:
1372 raise errors.RequestTimeout()
1374 return self.authenticateUser(user_key)
1376 def authenticateUser(self, key: db.Key, **kwargs):
1377 """
1378 Performs Log-In for the current session and the given user key.
1380 This resets the current session: All fields not explicitly marked as persistent
1381 by conf.user.session_persistent_fields_on_login are gone afterwards.
1383 :param key: The (DB-)Key of the user we shall authenticate
1384 """
1385 skel = self.baseSkel()
1386 if not skel.fromDB(key):
1387 raise ValueError(f"Unable to authenticate unknown user {key}")
1389 # Verify that this user account is active
1390 if skel["status"] < Status.ACTIVE.value:
1391 raise errors.Forbidden("The user is disabled and cannot be authenticated.")
1393 # Update session for user
1394 session = current.session.get()
1395 # Remember persistent fields...
1396 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login}
1397 session.reset()
1398 # and copy them over to the new session
1399 session |= take_over
1401 # Update session, user and request
1402 session["user"] = skel.dbEntity
1404 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key
1405 current.user.set(self.getCurrentUser())
1407 self.onLogin(skel)
1409 return self.render.loginSucceeded(**kwargs)
1411 @exposed
1412 @skey
1413 def logout(self, *args, **kwargs):
1414 """
1415 Implements the logout action. It also terminates the current session (all keys not listed
1416 in viur.session_persistent_fields_on_logout will be lost).
1417 """
1418 if not (user := current.user.get()):
1419 raise errors.Unauthorized()
1421 self.onLogout(user)
1423 session = current.session.get()
1424 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}
1425 session.reset()
1426 session |= take_over
1427 current.user.set(None) # set user to none in context var
1428 return self.render.logoutSuccess()
1430 @exposed
1431 def login(self, *args, **kwargs):
1432 return self.render.loginChoices([
1433 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1434 for primary, secondary in self.validAuthenticationMethods
1435 ])
1437 def onLogin(self, skel: skeleton.SkeletonInstance):
1438 """
1439 Hook to be called on user login.
1440 """
1441 # Update the lastlogin timestamp (if available!)
1442 if "lastlogin" in skel:
1443 now = utils.utcNow()
1445 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??)
1446 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)):
1447 skel["lastlogin"] = now
1448 skel.toDB(update_relations=False)
1450 logging.info(f"""User {skel["name"]} logged in""")
1452 def onLogout(self, skel: skeleton.SkeletonInstance):
1453 """
1454 Hook to be called on user logout.
1455 """
1456 logging.info(f"""User {skel["name"]} logged out""")
1458 @exposed
1459 def view(self, key: db.Key | int | str = "self", *args, **kwargs):
1460 """
1461 Allow a special key "self" to reference the current user.
1463 By default, any authenticated user can view its own user entry,
1464 to obtain access rights and any specific user information.
1465 This behavior is defined in the customized `canView` function,
1466 which is overwritten by the User-module.
1468 The rendered skeleton can be modified or restriced by specifying
1469 a customized view-skeleton.
1470 """
1471 if key == "self":
1472 if user := current.user.get():
1473 key = user["key"]
1474 else:
1475 raise errors.Unauthorized("Cannot view 'self' with unknown user")
1477 return super().view(key, *args, **kwargs)
1479 def canView(self, skel) -> bool:
1480 if user := current.user.get():
1481 if skel["key"] == user["key"]:
1482 return True
1484 if "root" in user["access"] or "user-view" in user["access"]:
1485 return True
1487 return False
1489 @exposed
1490 @skey(allow_empty=True)
1491 def edit(self, key: db.Key | int | str = "self", *args, **kwargs):
1492 """
1493 Allow a special key "self" to reference the current user.
1495 This modification will only allow to use "self" as a key;
1496 The specific access right to let the user edit itself must
1497 still be customized.
1499 The rendered and editable skeleton can be modified or restriced
1500 by specifying a customized edit-skeleton.
1501 """
1502 if key == "self":
1503 if user := current.user.get():
1504 key = user["key"]
1505 else:
1506 raise errors.Unauthorized("Cannot edit 'self' with unknown user")
1508 return super().edit(key, *args, **kwargs)
1510 @exposed
1511 def getAuthMethods(self, *args, **kwargs):
1512 """Inform tools like Viur-Admin which authentication to use"""
1513 # FIXME: This is almost the same code as in index()...
1514 # FIXME: VIUR4: The entire function should be removed!
1515 # TODO: Align result with index(), so that primary and secondary login is presented.
1516 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!")
1518 res = [
1519 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None)
1520 for primary, secondary in self.validAuthenticationMethods
1521 ]
1523 return json.dumps(res)
1525 @exposed
1526 @skey
1527 def trigger(self, action: str, key: str):
1528 current.request.get().response.headers["Content-Type"] = "application/json"
1530 # Check for provided access right definition (equivalent to client-side check), fallback to root!
1531 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", )
1532 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)):
1533 raise errors.Unauthorized()
1535 skel = self.baseSkel()
1536 if not skel.fromDB(key):
1537 raise errors.NotFound()
1539 match action:
1540 case "takeover":
1541 self.authenticateUser(skel["key"])
1543 case "kick":
1544 session.killSessionByUser(skel["key"])
1546 case _:
1547 raise errors.NotImplemented(f"Action {action!r} not implemented")
1549 return json.dumps("OKAY")
1551 def onEdited(self, skel):
1552 super().onEdited(skel)
1553 # In case the user is set to inactive, kill all sessions
1554 if "status" in skel and skel["status"] < Status.ACTIVE.value:
1555 session.killSessionByUser(skel["key"])
1557 def onDeleted(self, skel):
1558 super().onDeleted(skel)
1559 # Invalidate all sessions of that user
1560 session.killSessionByUser(skel["key"])
1563@tasks.StartupTask
1564def createNewUserIfNotExists():
1565 """
1566 Create a new Admin user, if the userDB is empty
1567 """
1568 if (
1569 (user_module := getattr(conf.main_app.vi, "user", None))
1570 and isinstance(user_module, User)
1571 and "addSkel" in dir(user_module)
1572 and "validAuthenticationMethods" in dir(user_module)
1573 # UserPassword must be one of the primary login methods
1574 and any(
1575 issubclass(provider[0], UserPassword)
1576 for provider in user_module.validAuthenticationMethods
1577 )
1578 ):
1579 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database
1580 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton
1581 uname = f"""admin@{conf.instance.project_id}.appspot.com"""
1582 pw = utils.string.random(13)
1583 addSkel["name"] = uname
1584 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away
1585 addSkel["access"] = ["root"]
1586 addSkel["password"] = pw
1588 try:
1589 addSkel.toDB()
1590 except Exception as e:
1591 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}")
1592 logging.exception(e)
1593 return
1595 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}"
1597 logging.warning(msg)
1598 email.sendEMailToAdmins("New ViUR password", msg)
1601# DEPRECATED ATTRIBUTES HANDLING
1603def __getattr__(attr):
1604 match attr:
1605 case "userSkel":
1606 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!"
1607 warnings.warn(msg, DeprecationWarning, stacklevel=2)
1608 logging.warning(msg)
1609 return UserSkel
1611 return super(__import__(__name__).__class__).__getattr__(attr)