Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

715 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-07 19:28 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"viur.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr="Account status", 

108 values=Status, 

109 defaultValue=Status.ACTIVE, 

110 required=True, 

111 ) 

112 

113 lastlogin = DateBone( 

114 descr="Last Login", 

115 readOnly=True, 

116 ) 

117 

118 admin_config = JsonBone( # This bone stores settings from the vi 

119 descr="Config for the User", 

120 visible=False 

121 ) 

122 

123 def __new__(cls, *args, **kwargs): 

124 """ 

125 Constructor for the UserSkel-class, with the capability 

126 to dynamically add bones required for the configured 

127 authentication methods. 

128 """ 

129 for provider in conf.main_app.vi.user.authenticationProviders: 

130 assert issubclass(provider, UserPrimaryAuthentication) 

131 provider.patch_user_skel(cls) 

132 

133 for provider in conf.main_app.vi.user.secondFactorProviders: 

134 assert issubclass(provider, UserSecondFactorAuthentication) 

135 provider.patch_user_skel(cls) 

136 

137 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

138 return super().__new__(cls, *args, **kwargs) 

139 

140 @classmethod 

141 def write(cls, skel, *args, **kwargs): 

142 # Roles 

143 if skel["roles"] and "custom" not in skel["roles"]: 

144 # Collect access rights through rules 

145 access = set() 

146 

147 for role in skel["roles"]: 

148 # Get default access for this role 

149 access |= conf.main_app.vi.user.get_role_defaults(role) 

150 

151 # Go through all modules and evaluate available role-settings 

152 for name in dir(conf.main_app.vi): 

153 if name.startswith("_"): 

154 continue 

155 

156 module = getattr(conf.main_app.vi, name) 

157 if not isinstance(module, Module): 

158 continue 

159 

160 roles = getattr(module, "roles", None) or {} 

161 rights = roles.get(role, roles.get("*", ())) 

162 

163 # Convert role into tuple if it's not 

164 if not isinstance(rights, (tuple, list)): 

165 rights = (rights, ) 

166 

167 if "*" in rights: 

168 for right in module.accessRights: 

169 access.add(f"{name}-{right}") 

170 else: 

171 for right in rights: 

172 if right in module.accessRights: 

173 access.add(f"{name}-{right}") 

174 

175 # special case: "edit" and "delete" actions require "view" as well! 

176 if right in ("edit", "delete") and "view" in module.accessRights: 

177 access.add(f"{name}-view") 

178 

179 skel["access"] = list(access) 

180 

181 return super().write(skel, *args, **kwargs) 

182 

183 

184class UserAuthentication(Module, abc.ABC): 

185 @property 

186 @abc.abstractstaticmethod 

187 def METHOD_NAME() -> str: 

188 """ 

189 Define a unique method name for this authentication. 

190 """ 

191 ... 

192 

193 def __init__(self, moduleName, modulePath, userModule): 

194 super().__init__(moduleName, modulePath) 

195 self._user_module = userModule 

196 

197 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

198 return True 

199 

200 @classmethod 

201 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

202 """ 

203 Allows for an UserAuthentication to patch the UserSkel 

204 class with additional bones which are required for 

205 the implemented authentication method. 

206 """ 

207 ... 

208 

209 

210class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

211 """Abstract class for all primary authentication methods.""" 

212 registrationEnabled = False 

213 

214 @abc.abstractmethod 

215 def login(self, *args, **kwargs): 

216 ... 

217 

218 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

219 """ 

220 Hook that is called whenever a part of the authentication was successful. 

221 It allows to perform further steps in custom authentications, 

222 e.g. change a password after first login. 

223 """ 

224 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

225 

226 

227class UserPassword(UserPrimaryAuthentication): 

228 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

229 

230 registrationEmailVerificationRequired = True 

231 registrationAdminVerificationRequired = True 

232 

233 verifySuccessTemplate = "user_verify_success" 

234 verifyEmailAddressMail = "user_verify_address" 

235 verifyFailedTemplate = "user_verify_failed" 

236 passwordRecoveryTemplate = "user_passwordrecover" 

237 passwordRecoveryMail = "user_password_recovery" 

238 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

239 passwordRecoveryStep1Template = "user_passwordrecover_step1" 

240 passwordRecoveryStep2Template = "user_passwordrecover_step2" 

241 passwordRecoveryStep3Template = "user_passwordrecover_step3" 

242 

243 # The default rate-limit for password recovery (10 tries each 15 minutes) 

244 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

245 

246 # Limit (invalid) login-retries to once per 5 seconds 

247 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

248 

249 @classmethod 

250 def patch_user_skel(cls, skel_cls): 

251 """ 

252 Modifies the UserSkel to be equipped by a PasswordBone. 

253 """ 

254 skel_cls.password = PasswordBone( 

255 readOnly=True, 

256 visible=False, 

257 params={ 

258 "category": "Authentication", 

259 } 

260 ) 

261 

262 class LoginSkel(skeleton.RelSkel): 

263 name = EmailBone( 

264 descr="E-Mail", 

265 required=True, 

266 caseSensitive=False, 

267 ) 

268 password = PasswordBone( 

269 required=True, 

270 test_threshold=0, 

271 ) 

272 

273 class LostPasswordStep1Skel(skeleton.RelSkel): 

274 name = EmailBone( 

275 descr="E-Mail", 

276 required=True, 

277 ) 

278 

279 class LostPasswordStep2Skel(skeleton.RelSkel): 

280 recovery_key = StringBone( 

281 descr="Recovery Key", 

282 required=True, 

283 params={ 

284 "tooltip": i18n.translate( 

285 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey", 

286 defaultText="Please enter the validation key you've received via e-mail.", 

287 hint="Shown when the user needs more than 15 minutes to paste the key", 

288 ), 

289 } 

290 ) 

291 

292 class LostPasswordStep3Skel(skeleton.RelSkel): 

293 # send the recovery key again, in case the password is rejected by some reason. 

294 recovery_key = StringBone( 

295 descr="Recovery Key", 

296 visible=False, 

297 readOnly=True, 

298 ) 

299 

300 password = PasswordBone( 

301 descr="New Password", 

302 required=True, 

303 params={ 

304 "tooltip": i18n.translate( 

305 key="viur.modules.user.userpassword.lostpasswordstep3.password", 

306 defaultText="Please enter a new password for your account.", 

307 ), 

308 } 

309 ) 

310 

311 @exposed 

312 @force_ssl 

313 @skey(allow_empty=True) 

314 def login(self, *, name: str | None = None, password: str | None = None, **kwargs): 

315 if not name or not password: 

316 return self._user_module.render.login(self.LoginSkel(), action="login") 

317 

318 self.loginRateLimit.assertQuotaIsAvailable() 

319 

320 # query for the username. The query might find another user, but the name is being checked for equality below 

321 name = name.lower().strip() 

322 user_skel = self._user_module.baseSkel() 

323 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

324 

325 # extract password hash from raw database entity (skeleton access blocks it) 

326 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

327 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

328 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

329 

330 # now check if the username matches 

331 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

332 

333 # next, check if the password hash matches 

334 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

335 

336 if not is_okay: 

337 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

338 return self._user_module.render.login(self.LoginSkel(), action="login") 

339 

340 # check if iterations are below current security standards, and update if necessary. 

341 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

342 logging.info(f"Update password hash for user {name}.") 

343 # re-hash the password with more iterations 

344 # FIXME: This must be done within a transaction! 

345 user_skel["password"] = password # will be hashed on serialize 

346 user_skel.write(update_relations=False) 

347 

348 return self.next_or_finish(user_skel) 

349 

350 @exposed 

351 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs): 

352 """ 

353 This implements a password recovery process which lets users set a new password for their account, 

354 after validating a recovery key sent by email. 

355 

356 The process is as following: 

357 

358 - The user enters his email adress 

359 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode 

360 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

361 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user 

362 account exists. 

363 - If the user received his email, he can click on the link and set a new password for his account. 

364 

365 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function 

366 to 10 actions per 15 minutes. (One complete recovery process consists of two calls). 

367 """ 

368 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

369 current_request = current.request.get() 

370 

371 if recovery_key is None: 

372 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

373 skel = self.LostPasswordStep1Skel() 

374 

375 if not current_request.isPostRequest or not skel.fromClient(kwargs): 

376 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template) 

377 

378 # validate security key 

379 if not securitykey.validate(skey): 

380 raise errors.PreconditionFailed() 

381 

382 self.passwordRecoveryRateLimit.decrementQuota() 

383 

384 recovery_key = securitykey.create( 

385 duration=datetime.timedelta(minutes=15), 

386 key_length=conf.security.password_recovery_key_length, 

387 user_name=skel["name"].lower(), 

388 session_bound=False, 

389 ) 

390 

391 # Send the code in background 

392 self.sendUserPasswordRecoveryCode( 

393 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

394 ) 

395 

396 # step 2 is only an action-skel, and can be ignored by a direct link in the 

397 # e-mail previously sent. It depends on the implementation of the specific project. 

398 return self._user_module.render.edit( 

399 self.LostPasswordStep2Skel(), 

400 tpl=self.passwordRecoveryStep2Template, 

401 ) 

402 

403 # in step 3 

404 skel = self.LostPasswordStep3Skel() 

405 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

406 

407 # check for any input; Render input-form again when incomplete. 

408 if not skel.fromClient(kwargs) or not current_request.isPostRequest: 

409 return self._user_module.render.edit( 

410 skel=skel, 

411 tpl=self.passwordRecoveryStep3Template, 

412 ) 

413 

414 # validate security key 

415 if not securitykey.validate(skey): 

416 raise errors.PreconditionFailed() 

417 

418 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

419 raise errors.PreconditionFailed( 

420 i18n.translate( 

421 key="viur.modules.user.passwordrecovery.keyexpired", 

422 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

423 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

424 ) 

425 ) 

426 

427 self.passwordRecoveryRateLimit.decrementQuota() 

428 

429 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

430 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

431 

432 if not user_skel: 

433 raise errors.NotFound( 

434 i18n.translate( 

435 key="viur.modules.user.passwordrecovery.usernotfound", 

436 defaultText="There is no account with this name", 

437 hint="We cant find an account with that name (Should never happen)" 

438 ) 

439 ) 

440 

441 # If the account is locked or not yet validated, abort the process. 

442 if not self._user_module.is_active(user_skel): 

443 raise errors.NotFound( 

444 i18n.translate( 

445 key="viur.modules.user.passwordrecovery.accountlocked", 

446 defaultText="This account is currently locked. You cannot change its password.", 

447 hint="Attempted password recovery on a locked account" 

448 ) 

449 ) 

450 

451 # Update the password, save the user, reset his session and show the success-template 

452 user_skel["password"] = skel["password"] 

453 user_skel.write(update_relations=False) 

454 

455 return self._user_module.render.view( 

456 None, 

457 tpl=self.passwordRecoverySuccessTemplate, 

458 ) 

459 

460 @tasks.CallDeferred 

461 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

462 """ 

463 Sends the given recovery code to the user given in userName. This function runs deferred 

464 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

465 code by email (assuming we have working email delivery), but this can be overridden to send it 

466 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

467 can be send to any given user in four hours. 

468 """ 

469 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

470 user_agent = user_agents.parse(user_agent) 

471 email.send_email( 

472 tpl=self.passwordRecoveryMail, 

473 skel=user_skel, 

474 dests=[user_name], 

475 recovery_key=recovery_key, 

476 user_agent={ 

477 "device": user_agent.get_device(), 

478 "os": user_agent.get_os(), 

479 "browser": user_agent.get_browser() 

480 } 

481 ) 

482 

483 @exposed 

484 @skey(forward_payload="data", session_bound=False) 

485 def verify(self, data): 

486 def transact(key): 

487 skel = self._user_module.editSkel() 

488 if not key or not skel.read(key): 

489 return None 

490 

491 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

492 if self.registrationAdminVerificationRequired else Status.ACTIVE 

493 

494 skel.write(update_relations=False) 

495 return skel 

496 

497 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))): 

498 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

499 

500 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

501 

502 def canAdd(self) -> bool: 

503 return self.registrationEnabled 

504 

505 def addSkel(self): 

506 """ 

507 Prepare the add-Skel for rendering. 

508 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

509 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

510 :return: viur.core.skeleton.Skeleton 

511 """ 

512 skel = self._user_module.addSkel() 

513 

514 if self.registrationEmailVerificationRequired: 

515 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

516 elif self.registrationAdminVerificationRequired: 

517 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

518 else: # No further verification required 

519 defaultStatusValue = Status.ACTIVE 

520 

521 skel.status.readOnly = True 

522 skel["status"] = defaultStatusValue 

523 

524 if "password" in skel: 

525 skel.password.required = True # The user will have to set a password 

526 

527 return skel 

528 

529 @force_ssl 

530 @exposed 

531 @skey(allow_empty=True) 

532 def add(self, *args, **kwargs): 

533 """ 

534 Allows guests to register a new account if self.registrationEnabled is set to true 

535 

536 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

537 

538 :returns: The rendered, added object of the entry, eventually with error hints. 

539 

540 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

541 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

542 """ 

543 if not self.canAdd(): 

544 raise errors.Unauthorized() 

545 skel = self.addSkel() 

546 if ( 

547 not kwargs # no data supplied 

548 or not current.request.get().isPostRequest # bail out if not using POST-method 

549 or not skel.fromClient(kwargs) # failure on reading into the bones 

550 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

551 ): 

552 # render the skeleton in the version it could as far as it could be read. 

553 return self._user_module.render.add(skel) 

554 self._user_module.onAdd(skel) 

555 skel.write() 

556 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

557 # The user will have to verify his email-address. Create a skey and send it to his address 

558 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

559 user_key=utils.normalizeKey(skel["key"]), 

560 name=skel["name"]) 

561 skel.skey = BaseBone(descr="Skey") 

562 skel["skey"] = skey 

563 email.send_email(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel) 

564 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

565 return self._user_module.render.addSuccess(skel) 

566 

567 

568class GoogleAccount(UserPrimaryAuthentication): 

569 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

570 

571 @classmethod 

572 def patch_user_skel(cls, skel_cls): 

573 """ 

574 Modifies the UserSkel to be equipped by a bones required by Google Auth 

575 """ 

576 skel_cls.uid = StringBone( 

577 descr="Google UserID", 

578 required=False, 

579 readOnly=True, 

580 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

581 params={ 

582 "category": "Authentication", 

583 } 

584 ) 

585 

586 skel_cls.sync = BooleanBone( 

587 descr="Sync user data with OAuth-based services", 

588 defaultValue=True, 

589 params={ 

590 "category": "Authentication", 

591 "tooltip": 

592 "If set, user data like firstname and lastname is automatically kept" 

593 "synchronous with the information stored at the OAuth service provider" 

594 "(e.g. Google Login)." 

595 } 

596 ) 

597 

598 @exposed 

599 @force_ssl 

600 @skey(allow_empty=True) 

601 def login(self, token: str | None = None, *args, **kwargs): 

602 if not conf.user.google_client_id: 

603 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

604 

605 if not token: 

606 request = current.request.get() 

607 request.response.headers["Content-Type"] = "text/html" 

608 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

609 # We have to allow popups here 

610 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

611 

612 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

613 with open(file_path) as file: 

614 tpl_string = file.read() 

615 

616 # FIXME: Use Jinja2 for rendering? 

617 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

618 extendCsp({ 

619 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

620 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

621 }) 

622 return tpl_string 

623 

624 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

625 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

626 raise ValueError("Invalid issuer") 

627 

628 # Token looks valid :) 

629 uid = user_info["sub"] 

630 email = user_info["email"] 

631 

632 base_skel = self._user_module.baseSkel() 

633 update = False 

634 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

635 # We'll try again - checking if there's already an user with that email 

636 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

637 # Still no luck - it's a completely new user 

638 if not self.registrationEnabled: 

639 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

640 logging.debug(f"Google user is from allowed {domain} - adding account") 

641 else: 

642 logging.debug(f"Google user is from {domain} - denying registration") 

643 raise errors.Forbidden("Registration for new users is disabled") 

644 

645 user_skel = base_skel 

646 user_skel["uid"] = uid 

647 user_skel["name"] = email 

648 update = True 

649 

650 # Take user information from Google, if wanted! 

651 if user_skel["sync"]: 

652 for target, source in { 

653 "name": email, 

654 "firstname": user_info.get("given_name"), 

655 "lastname": user_info.get("family_name"), 

656 }.items(): 

657 

658 if user_skel[target] != source: 

659 user_skel[target] = source 

660 update = True 

661 

662 if update: 

663 assert user_skel.write() 

664 

665 return self.next_or_finish(user_skel) 

666 

667 

668class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

669 """Abstract class for all second factors.""" 

670 MAX_RETRY = 3 

671 second_factor_login_template = "user_login_secondfactor" 

672 """Template to enter the TOPT on login""" 

673 

674 @property 

675 @abc.abstractmethod 

676 def NAME(self) -> str: 

677 """Name for this factor for templates.""" 

678 ... 

679 

680 @property 

681 @abc.abstractmethod 

682 def ACTION_NAME(self) -> str: 

683 """The action name for this factor, used as path-segment.""" 

684 ... 

685 

686 def __init__(self, moduleName, modulePath, _user_module): 

687 super().__init__(moduleName, modulePath, _user_module) 

688 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

689 self.add_url = f"{self.modulePath}/add" 

690 self.start_url = f"{self.modulePath}/start" 

691 

692 

693class TimeBasedOTP(UserSecondFactorAuthentication): 

694 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

695 WINDOW_SIZE = 5 

696 ACTION_NAME = "otp" 

697 NAME = "Time based Otp" 

698 second_factor_login_template = "user_login_secondfactor" 

699 

700 @dataclasses.dataclass 

701 class OtpConfig: 

702 """ 

703 This dataclass is used to provide an interface for a OTP token 

704 algorithm description that is passed within the TimeBasedOTP 

705 class for configuration. 

706 """ 

707 secret: str 

708 timedrift: float = 0.0 

709 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

710 interval: int = 60 

711 

712 class OtpSkel(skeleton.RelSkel): 

713 """ 

714 This is the Skeleton used to ask for the OTP token. 

715 """ 

716 otptoken = NumericBone( 

717 descr="Token", 

718 required=True, 

719 max=999999, 

720 min=0, 

721 ) 

722 

723 @classmethod 

724 def patch_user_skel(cls, skel_cls): 

725 """ 

726 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

727 """ 

728 # One-Time Password Verification 

729 skel_cls.otp_serial = StringBone( 

730 descr="OTP serial", 

731 searchable=True, 

732 params={ 

733 "category": "Second Factor Authentication", 

734 } 

735 ) 

736 

737 skel_cls.otp_secret = CredentialBone( 

738 descr="OTP secret", 

739 params={ 

740 "category": "Second Factor Authentication", 

741 } 

742 ) 

743 

744 skel_cls.otp_timedrift = NumericBone( 

745 descr="OTP time drift", 

746 readOnly=True, 

747 defaultValue=0, 

748 params={ 

749 "category": "Second Factor Authentication", 

750 } 

751 ) 

752 

753 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

754 """ 

755 Returns an instance of self.OtpConfig with a provided token configuration, 

756 or None when there is no appropriate configuration of this second factor handler available. 

757 """ 

758 

759 if otp_secret := skel.dbEntity.get("otp_secret"): 

760 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

761 

762 return None 

763 

764 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

765 """ 

766 Specified whether the second factor authentication can be handled by the given user or not. 

767 """ 

768 return bool(self.get_config(skel)) 

769 

770 @exposed 

771 def start(self): 

772 """ 

773 Configures OTP login for the current session. 

774 

775 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

776 """ 

777 session = current.session.get() 

778 

779 if not (user_key := session.get("possible_user_key")): 

780 raise errors.PreconditionFailed( 

781 "Second factor can only be triggered after successful primary authentication." 

782 ) 

783 

784 user_skel = self._user_module.baseSkel() 

785 if not user_skel.read(user_key): 

786 raise errors.NotFound("The previously authenticated user is gone.") 

787 

788 if not (otp_user_conf := self.get_config(user_skel)): 

789 raise errors.PreconditionFailed("This second factor is not available for the user") 

790 

791 otp_user_conf = { 

792 "key": str(user_key), 

793 } | dataclasses.asdict(otp_user_conf) 

794 

795 session = current.session.get() 

796 session["_otp_user"] = otp_user_conf 

797 session.markChanged() 

798 

799 return self._user_module.render.edit( 

800 self.OtpSkel(), 

801 params={ 

802 "name": i18n.translate(self.NAME), 

803 "action_name": self.ACTION_NAME, 

804 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

805 }, 

806 tpl=self.second_factor_login_template 

807 ) 

808 

809 @exposed 

810 @force_ssl 

811 @skey(allow_empty=True) 

812 def otp(self, *args, **kwargs): 

813 """ 

814 Performs the second factor validation and interaction with the client. 

815 """ 

816 session = current.session.get() 

817 if not (otp_user_conf := session.get("_otp_user")): 

818 raise errors.PreconditionFailed("No OTP process started in this session") 

819 

820 # Check if maximum second factor verification attempts 

821 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

822 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

823 

824 # Read the OTP token via the skeleton, to obtain a valid value 

825 skel = self.OtpSkel() 

826 if skel.fromClient(kwargs): 

827 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

828 res = self.verify( 

829 otp=skel["otptoken"], 

830 secret=otp_user_conf["secret"], 

831 algorithm=otp_user_conf.get("algorithm") or "sha1", 

832 interval=otp_user_conf.get("interval") or 60, 

833 timedrift=otp_user_conf.get("timedrift") or 0.0, 

834 valid_window=self.WINDOW_SIZE 

835 ) 

836 else: 

837 res = None 

838 

839 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

840 if res is None: 

841 otp_user_conf["attempts"] = attempts + 1 

842 session.markChanged() 

843 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

844 return self._user_module.render.edit( 

845 skel, 

846 name=i18n.translate(self.NAME), 

847 action_name=self.ACTION_NAME, 

848 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

849 tpl=self.second_factor_login_template 

850 ) 

851 

852 # Remove otp user config from session 

853 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

854 del session["_otp_user"] 

855 session.markChanged() 

856 

857 # Check if the OTP device has a time drift 

858 

859 timedriftchange = float(res) - otp_user_conf["timedrift"] 

860 if abs(timedriftchange) > 2: 

861 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

862 # update clock-drift value accordingly 

863 self.updateTimeDrift(user_key, timedriftchange) 

864 

865 # Continue with authentication 

866 return self._user_module.secondFactorSucceeded(self, user_key) 

867 

868 @staticmethod 

869 def verify( 

870 otp: str | int, 

871 secret: str, 

872 algorithm: str = "sha1", 

873 interval: int = 60, 

874 timedrift: float = 0.0, 

875 for_time: datetime.datetime | None = None, 

876 valid_window: int = 0, 

877 ) -> int | None: 

878 """ 

879 Verifies the OTP passed in against the current time OTP. 

880 

881 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

882 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

883 on the hardware token generator. This can be used to store the time drift of a given token generator. 

884 

885 :param otp: the OTP token to check against 

886 :param secret: The OTP secret 

887 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

888 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In 

889 pyotp, default is 30! 

890 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

891 :param for_time: Time to check OTP at (defaults to now) 

892 :param valid_window: extends the validity to this many counter ticks before and after the current one 

893 :returns: The index where verification succeeded, None otherwise 

894 """ 

895 # get the hashing digest 

896 digest = { 

897 "sha1": hashlib.sha1, 

898 "sha256": hashlib.sha256, 

899 }.get(algorithm) 

900 

901 if not digest: 

902 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

903 

904 if for_time is None: 

905 for_time = datetime.datetime.now() 

906 

907 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

908 timedrift = round(timedrift) 

909 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

910 otp = str(otp).zfill(6) # fill with zeros in front 

911 

912 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

913 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

914 

915 if valid_window: 

916 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

917 token = str(totp.at(for_time, offset)) 

918 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

919 if hmac.compare_digest(otp, token): 

920 return offset 

921 

922 return None 

923 

924 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

925 

926 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

927 """ 

928 Updates the clock-drift value. 

929 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

930 it out of bounds. Maximum change per call is 0.3 minutes. 

931 :param user_key: For which user should the update occour 

932 :param idx: How many steps before/behind was that token 

933 :return: 

934 """ 

935 

936 # FIXME: The callback in viur-core must be improved, to accept user_skel 

937 

938 def transaction(user_key, idx): 

939 user = db.Get(user_key) 

940 if not isinstance(user.get("otp_timedrift"), float): 

941 user["otp_timedrift"] = 0.0 

942 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3) 

943 db.Put(user) 

944 

945 db.RunInTransaction(transaction, user_key, idx) 

946 

947 

948class AuthenticatorOTP(UserSecondFactorAuthentication): 

949 """ 

950 This class handles the second factor for apps like authy and so on 

951 """ 

952 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

953 

954 second_factor_add_template = "user_secondfactor_add" 

955 """Template to configure (add) a new TOPT""" 

956 

957 ACTION_NAME = "authenticator_otp" 

958 """Action name provided for *otp_template* on login""" 

959 

960 NAME = "Authenticator App" 

961 

962 @exposed 

963 @force_ssl 

964 @skey(allow_empty=True) 

965 def add(self, otp=None): 

966 """ 

967 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store 

968 it in the session. 

969 

970 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

971 the otp_app_secret from the session in the user entry. 

972 """ 

973 current_session = current.session.get() 

974 

975 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

976 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

977 current_session["_maybe_otp_app_secret"] = otp_app_secret 

978 current_session.markChanged() 

979 

980 if otp is None: 

981 return self._user_module.render.second_factor_add( 

982 tpl=self.second_factor_add_template, 

983 action_name=self.ACTION_NAME, 

984 name=i18n.translate(self.NAME), 

985 add_url=self.add_url, 

986 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

987 else: 

988 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

989 return self._user_module.render.second_factor_add( 

990 tpl=self.second_factor_add_template, 

991 action_name=self.ACTION_NAME, 

992 name=i18n.translate(self.NAME), 

993 add_url=self.add_url, 

994 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

995 

996 # Now we can set the otp_app_secret to the current User and render der Success-template 

997 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

998 return self._user_module.render.second_factor_add_success( 

999 action_name=self.ACTION_NAME, 

1000 name=i18n.translate(self.NAME), 

1001 ) 

1002 

1003 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1004 """ 

1005 We can only handle the second factor if we have stored an otp_app_secret before. 

1006 """ 

1007 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1008 

1009 @classmethod 

1010 def patch_user_skel(cls, skel_cls): 

1011 """ 

1012 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1013 """ 

1014 # Authenticator OTP Apps (like Authy) 

1015 skel_cls.otp_app_secret = CredentialBone( 

1016 descr="OTP Secret (App-Key)", 

1017 params={ 

1018 "category": "Second Factor Authentication", 

1019 } 

1020 ) 

1021 

1022 @classmethod 

1023 def set_otp_app_secret(cls, otp_app_secret=None): 

1024 """ 

1025 Write a new OTP Token in the current user entry. 

1026 """ 

1027 if otp_app_secret is None: 

1028 logging.error("No 'otp_app_secret' is provided") 

1029 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1030 if not (cuser := current.user.get()): 

1031 raise errors.Unauthorized() 

1032 

1033 def transaction(user_key): 

1034 if not (user := db.Get(user_key)): 

1035 raise errors.NotFound() 

1036 user["otp_app_secret"] = otp_app_secret 

1037 db.Put(user) 

1038 

1039 db.RunInTransaction(transaction, cuser["key"]) 

1040 

1041 @classmethod 

1042 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1043 """ 

1044 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1045 """ 

1046 if not (cuser := current.user.get()): 

1047 raise errors.Unauthorized() 

1048 if not (issuer := conf.user.otp_issuer): 

1049 logging.warning( 

1050 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1051 issuer = conf.instance.project_id 

1052 

1053 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1054 

1055 @classmethod 

1056 def generate_otp_app_secret(cls) -> str: 

1057 """ 

1058 Generate a new OTP Secret 

1059 :return an otp 

1060 """ 

1061 return pyotp.random_base32() 

1062 

1063 @classmethod 

1064 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1065 return pyotp.TOTP(secret).verify(otp) 

1066 

1067 @exposed 

1068 def start(self): 

1069 otp_user_conf = {"attempts": 0} 

1070 session = current.session.get() 

1071 session["_otp_user"] = otp_user_conf 

1072 session.markChanged() 

1073 return self._user_module.render.edit( 

1074 TimeBasedOTP.OtpSkel(), 

1075 params={ 

1076 "name": i18n.translate(self.NAME), 

1077 "action_name": self.ACTION_NAME, 

1078 "action_url": self.action_url, 

1079 }, 

1080 tpl=self.second_factor_login_template, 

1081 ) 

1082 

1083 @exposed 

1084 @force_ssl 

1085 @skey 

1086 def authenticator_otp(self, **kwargs): 

1087 """ 

1088 We verify the otp here with the secret we stored before. 

1089 """ 

1090 session = current.session.get() 

1091 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1092 

1093 if not (otp_user_conf := session.get("_otp_user")): 

1094 raise errors.PreconditionFailed("No OTP process started in this session") 

1095 

1096 # Check if maximum second factor verification attempts 

1097 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1098 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1099 

1100 if not (user := db.Get(user_key)): 

1101 raise errors.NotFound() 

1102 

1103 skel = TimeBasedOTP.OtpSkel() 

1104 if not skel.fromClient(kwargs): 

1105 raise errors.PreconditionFailed() 

1106 otp_token = str(skel["otptoken"]).zfill(6) 

1107 

1108 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1109 return self._user_module.secondFactorSucceeded(self, user_key) 

1110 otp_user_conf["attempts"] = attempts + 1 

1111 session.markChanged() 

1112 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1113 return self._user_module.render.edit( 

1114 skel, 

1115 name=i18n.translate(self.NAME), 

1116 action_name=self.ACTION_NAME, 

1117 action_url=self.action_url, 

1118 tpl=self.second_factor_login_template, 

1119 ) 

1120 

1121 

1122class User(List): 

1123 """ 

1124 The User module is used to manage and authenticate users in a ViUR system. 

1125 

1126 It is used in almost any ViUR project, but ViUR can also function without any user capabilites. 

1127 """ 

1128 

1129 kindName = "user" 

1130 addTemplate = "user_add" 

1131 addSuccessTemplate = "user_add_success" 

1132 lostPasswordTemplate = "user_lostpassword" 

1133 verifyEmailAddressMail = "user_verify_address" 

1134 passwordRecoveryMail = "user_password_recovery" 

1135 

1136 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1137 None, ( 

1138 UserPassword, 

1139 conf.user.google_client_id and GoogleAccount, 

1140 ) 

1141 )) 

1142 """ 

1143 Specifies primary authentication providers that are made available 

1144 as sub-modules under `user/auth_<classname>`. They might require 

1145 customization or configuration. 

1146 """ 

1147 

1148 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1149 TimeBasedOTP, 

1150 AuthenticatorOTP, 

1151 ) 

1152 """ 

1153 Specifies secondary authentication providers that are made available 

1154 as sub-modules under `user/f2_<classname>`. They might require 

1155 customization or configuration, which is determined during the 

1156 login-process depending on the user that wants to login. 

1157 """ 

1158 

1159 validAuthenticationMethods = tuple(filter( 

1160 None, ( 

1161 (UserPassword, AuthenticatorOTP), 

1162 (UserPassword, TimeBasedOTP), 

1163 (UserPassword, None), 

1164 (GoogleAccount, None) if conf.user.google_client_id else None, 

1165 ) 

1166 )) 

1167 """ 

1168 Specifies the possible combinations of primary- and secondary factor 

1169 login methos. 

1170 

1171 GoogleLogin defaults to no second factor, as the Google Account can be 

1172 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1173 handled when there is a user-dependent configuration available. 

1174 """ 

1175 

1176 msg_missing_second_factor = "Second factor required but not configured for this user." 

1177 

1178 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1179 

1180 default_order = "name.idx" 

1181 

1182 adminInfo = { 

1183 "icon": "person-fill", 

1184 "actions": [ 

1185 "trigger_kick", 

1186 "trigger_takeover", 

1187 ], 

1188 "customActions": { 

1189 "trigger_kick": { 

1190 "name": i18n.translate( 

1191 key="viur.modules.user.customActions.kick", 

1192 defaultText="Kick user", 

1193 hint="Title of the kick user function" 

1194 ), 

1195 "icon": "trash2-fill", 

1196 "access": ["root"], 

1197 "action": "fetch", 

1198 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}", 

1199 "confirm": i18n.translate( 

1200 key="viur.modules.user.customActions.kick.confirm", 

1201 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1202 ), 

1203 "success": i18n.translate( 

1204 key="viur.modules.user.customActions.kick.success", 

1205 defaultText="Sessions of the user are being invalidated.", 

1206 ), 

1207 }, 

1208 "trigger_takeover": { 

1209 "name": i18n.translate( 

1210 key="viur.modules.user.customActions.takeover", 

1211 defaultText="Take-over user", 

1212 hint="Title of the take user over function" 

1213 ), 

1214 "icon": "file-person-fill", 

1215 "access": ["root"], 

1216 "action": "fetch", 

1217 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}", 

1218 "confirm": i18n.translate( 

1219 key="viur.modules.user.customActions.takeover.confirm", 

1220 defaultText="Do you really want to replace your current user session by a " 

1221 "user session of the selected user?", 

1222 ), 

1223 "success": i18n.translate( 

1224 key="viur.modules.user.customActions.takeover.success", 

1225 defaultText="You're now know as the selected user!", 

1226 ), 

1227 "then": "reload-vi", 

1228 }, 

1229 }, 

1230 } 

1231 

1232 roles = { 

1233 "admin": "*", 

1234 } 

1235 

1236 def __init__(self, moduleName, modulePath): 

1237 for provider in self.authenticationProviders: 

1238 assert issubclass(provider, UserPrimaryAuthentication) 

1239 name = f"auth_{provider.__name__.lower()}" 

1240 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1241 

1242 for provider in self.secondFactorProviders: 

1243 assert issubclass(provider, UserSecondFactorAuthentication) 

1244 name = f"f2_{provider.__name__.lower()}" 

1245 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1246 

1247 super().__init__(moduleName, modulePath) 

1248 

1249 def get_role_defaults(self, role: str) -> set[str]: 

1250 """ 

1251 Returns a set of default access rights for a given role. 

1252 

1253 Defaults to "admin" usage for any role > "user" 

1254 and "scriptor" usage for "admin" role. 

1255 """ 

1256 ret = set() 

1257 

1258 if role in ("viewer", "editor", "admin"): 

1259 ret.add("admin") 

1260 

1261 if role == "admin": 

1262 ret.add("scriptor") 

1263 

1264 return ret 

1265 

1266 def addSkel(self): 

1267 skel = super().addSkel().clone() 

1268 user = current.user.get() 

1269 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])): 

1270 skel.status.readOnly = True 

1271 skel["status"] = Status.UNSET 

1272 skel.status.visible = False 

1273 skel.access.readOnly = True 

1274 skel["access"] = [] 

1275 skel.access.visible = False 

1276 else: 

1277 # An admin tries to add a new user. 

1278 skel.status.readOnly = False 

1279 skel.status.visible = True 

1280 skel.access.readOnly = False 

1281 skel.access.visible = True 

1282 

1283 if "password" in skel: 

1284 # Unlock and require a password 

1285 skel.password.required = True 

1286 skel.password.visible = True 

1287 skel.password.readOnly = False 

1288 

1289 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1290 return skel 

1291 

1292 def editSkel(self, *args, **kwargs): 

1293 skel = super().editSkel().clone() 

1294 

1295 if "password" in skel: 

1296 skel.password.required = False 

1297 skel.password.visible = True 

1298 skel.password.readOnly = False 

1299 

1300 user = current.user.get() 

1301 

1302 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only 

1303 skel.name.readOnly = lockFields 

1304 skel.access.readOnly = lockFields 

1305 skel.status.readOnly = lockFields 

1306 

1307 return skel 

1308 

1309 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1310 return getattr(self, f"f2_{cls.__name__.lower()}") 

1311 

1312 def getCurrentUser(self): 

1313 session = current.session.get() 

1314 

1315 if session and session.loaded and (user := session.get("user")): 

1316 skel = self.baseSkel() 

1317 skel.setEntity(user) 

1318 return skel 

1319 

1320 return None 

1321 

1322 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1323 """ 

1324 Continue authentication flow when primary authentication succeeded. 

1325 """ 

1326 skel = self.baseSkel() 

1327 

1328 if not skel.read(user_key): 

1329 raise errors.NotFound("User was not found.") 

1330 

1331 if not provider.can_handle(skel): 

1332 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1333 

1334 session = current.session.get() 

1335 session["possible_user_key"] = user_key.id_or_name 

1336 session["_secondFactorStart"] = utils.utcNow() 

1337 session.markChanged() 

1338 

1339 second_factor_providers = [] 

1340 

1341 for auth_provider, second_factor in self.validAuthenticationMethods: 

1342 if isinstance(provider, auth_provider): 

1343 if second_factor is not None: 

1344 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1345 if second_factor_provider_instance.can_handle(skel): 

1346 second_factor_providers.append(second_factor_provider_instance) 

1347 else: 

1348 second_factor_providers.append(None) 

1349 

1350 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1351 # We have a second factor. So we can get rid of the None 

1352 second_factor_providers.pop(second_factor_providers.index(None)) 

1353 

1354 if len(second_factor_providers) == 0: 

1355 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1356 elif len(second_factor_providers) == 1: 

1357 if second_factor_providers[0] is None: 

1358 # We allow sign-in without a second factor 

1359 return self.authenticateUser(user_key) 

1360 # We have only one second factor we don't need the choice template 

1361 return second_factor_providers[0].start(user_key) 

1362 

1363 # In case there is more than one second factor, let the user select a method. 

1364 return self.render.second_factor_choice(second_factors=second_factor_providers) 

1365 

1366 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1367 """ 

1368 Continue authentication flow when secondary authentication succeeded. 

1369 """ 

1370 session = current.session.get() 

1371 if session["possible_user_key"] != user_key.id_or_name: 

1372 raise errors.Forbidden() 

1373 

1374 # Assert that the second factor verification finished in time 

1375 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1376 raise errors.RequestTimeout() 

1377 

1378 return self.authenticateUser(user_key) 

1379 

1380 def is_active(self, skel: skeleton.SkeletonInstance) -> bool | None: 

1381 """ 

1382 Hookable check if a user is defined as "active" and can login. 

1383 

1384 :param skel: The UserSkel of the user who wants to login. 

1385 """ 

1386 if "status" in skel: 

1387 status = skel["status"] 

1388 if not isinstance(status, (Status, int)): 

1389 try: 

1390 status = int(status) 

1391 except ValueError: 

1392 status = Status.UNSET 

1393 

1394 return status >= Status.ACTIVE.value 

1395 

1396 return None 

1397 

1398 def authenticateUser(self, key: db.Key, **kwargs): 

1399 """ 

1400 Performs Log-In for the current session and the given user key. 

1401 

1402 This resets the current session: All fields not explicitly marked as persistent 

1403 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1404 

1405 :param key: The (DB-)Key of the user we shall authenticate 

1406 """ 

1407 skel = self.baseSkel() 

1408 if not skel.read(key): 

1409 raise ValueError(f"Unable to authenticate unknown user {key}") 

1410 

1411 # Verify that this user account is active 

1412 if not self.is_active(skel): 

1413 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1414 

1415 # Update session for user 

1416 session = current.session.get() 

1417 # Remember persistent fields... 

1418 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1419 session.reset() 

1420 # and copy them over to the new session 

1421 session |= take_over 

1422 

1423 # Update session, user and request 

1424 session["user"] = skel.dbEntity 

1425 

1426 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1427 current.user.set(self.getCurrentUser()) 

1428 

1429 self.onLogin(skel) 

1430 

1431 return self.render.loginSucceeded(**kwargs) 

1432 

1433 @exposed 

1434 @skey 

1435 def logout(self, *args, **kwargs): 

1436 """ 

1437 Implements the logout action. It also terminates the current session (all keys not listed 

1438 in viur.session_persistent_fields_on_logout will be lost). 

1439 """ 

1440 if not (user := current.user.get()): 

1441 raise errors.Unauthorized() 

1442 

1443 self.onLogout(user) 

1444 

1445 session = current.session.get() 

1446 if take_over := {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout}: 

1447 session.reset() 

1448 session |= take_over 

1449 else: 

1450 session.clear() 

1451 current.user.set(None) # set user to none in context var 

1452 return self.render.logoutSuccess() 

1453 

1454 @exposed 

1455 def login(self, *args, **kwargs): 

1456 return self.render.loginChoices([ 

1457 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1458 for primary, secondary in self.validAuthenticationMethods 

1459 ]) 

1460 

1461 def onLogin(self, skel: skeleton.SkeletonInstance): 

1462 """ 

1463 Hook to be called on user login. 

1464 """ 

1465 # Update the lastlogin timestamp (if available!) 

1466 if "lastlogin" in skel: 

1467 now = utils.utcNow() 

1468 

1469 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1470 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1471 skel["lastlogin"] = now 

1472 skel.write(update_relations=False) 

1473 

1474 logging.info(f"""User {skel["name"]} logged in""") 

1475 

1476 def onLogout(self, skel: skeleton.SkeletonInstance): 

1477 """ 

1478 Hook to be called on user logout. 

1479 """ 

1480 logging.info(f"""User {skel["name"]} logged out""") 

1481 

1482 @exposed 

1483 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1484 """ 

1485 Allow a special key "self" to reference the current user. 

1486 

1487 By default, any authenticated user can view its own user entry, 

1488 to obtain access rights and any specific user information. 

1489 This behavior is defined in the customized `canView` function, 

1490 which is overwritten by the User-module. 

1491 

1492 The rendered skeleton can be modified or restriced by specifying 

1493 a customized view-skeleton. 

1494 """ 

1495 if key == "self": 

1496 if user := current.user.get(): 

1497 key = user["key"] 

1498 else: 

1499 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1500 

1501 return super().view(key, *args, **kwargs) 

1502 

1503 def canView(self, skel) -> bool: 

1504 if user := current.user.get(): 

1505 if skel["key"] == user["key"]: 

1506 return True 

1507 

1508 if "root" in user["access"] or "user-view" in user["access"]: 

1509 return True 

1510 

1511 return False 

1512 

1513 @exposed 

1514 @skey(allow_empty=True) 

1515 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1516 """ 

1517 Allow a special key "self" to reference the current user. 

1518 

1519 This modification will only allow to use "self" as a key; 

1520 The specific access right to let the user edit itself must 

1521 still be customized. 

1522 

1523 The rendered and editable skeleton can be modified or restriced 

1524 by specifying a customized edit-skeleton. 

1525 """ 

1526 if key == "self": 

1527 if user := current.user.get(): 

1528 key = user["key"] 

1529 else: 

1530 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1531 

1532 return super().edit(key, *args, **kwargs) 

1533 

1534 @exposed 

1535 def getAuthMethods(self, *args, **kwargs): 

1536 """Inform tools like Viur-Admin which authentication to use""" 

1537 # FIXME: This is almost the same code as in index()... 

1538 # FIXME: VIUR4: The entire function should be removed! 

1539 # TODO: Align result with index(), so that primary and secondary login is presented. 

1540 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!") 

1541 

1542 res = [ 

1543 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1544 for primary, secondary in self.validAuthenticationMethods 

1545 ] 

1546 

1547 return json.dumps(res) 

1548 

1549 @exposed 

1550 @skey 

1551 def trigger(self, action: str, key: str): 

1552 current.request.get().response.headers["Content-Type"] = "application/json" 

1553 

1554 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1555 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", ) 

1556 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)): 

1557 raise errors.Unauthorized() 

1558 

1559 skel = self.baseSkel() 

1560 if not skel.read(key): 

1561 raise errors.NotFound() 

1562 

1563 match action: 

1564 case "takeover": 

1565 self.authenticateUser(skel["key"]) 

1566 

1567 case "kick": 

1568 session.killSessionByUser(skel["key"]) 

1569 

1570 case _: 

1571 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1572 

1573 return json.dumps("OKAY") 

1574 

1575 def onEdited(self, skel): 

1576 super().onEdited(skel) 

1577 

1578 # In case the user is set to inactive, kill all sessions 

1579 if self.is_active(skel) is False: 

1580 session.killSessionByUser(skel["key"]) 

1581 

1582 def onDeleted(self, skel): 

1583 super().onDeleted(skel) 

1584 # Invalidate all sessions of that user 

1585 session.killSessionByUser(skel["key"]) 

1586 

1587 

1588@tasks.StartupTask 

1589def createNewUserIfNotExists(): 

1590 """ 

1591 Create a new Admin user, if the userDB is empty 

1592 """ 

1593 if ( 

1594 (user_module := getattr(conf.main_app.vi, "user", None)) 

1595 and isinstance(user_module, User) 

1596 and "addSkel" in dir(user_module) 

1597 and "validAuthenticationMethods" in dir(user_module) 

1598 # UserPassword must be one of the primary login methods 

1599 and any( 

1600 issubclass(provider[0], UserPassword) 

1601 for provider in user_module.validAuthenticationMethods 

1602 ) 

1603 ): 

1604 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1605 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1606 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1607 pw = utils.string.random(13) 

1608 addSkel["name"] = uname 

1609 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1610 addSkel["access"] = ["root"] 

1611 addSkel["password"] = pw 

1612 

1613 try: 

1614 addSkel.write() 

1615 except Exception as e: 

1616 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1617 logging.exception(e) 

1618 return 

1619 

1620 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1621 

1622 logging.warning(msg) 

1623 email.send_email_to_admins("New ViUR password", msg) 

1624 

1625 

1626# DEPRECATED ATTRIBUTES HANDLING 

1627 

1628def __getattr__(attr): 

1629 match attr: 

1630 case "userSkel": 

1631 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1632 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1633 logging.warning(msg) 

1634 return UserSkel 

1635 

1636 return super(__import__(__name__).__class__).__getattr__(attr)