Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

705 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-16 22:16 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"), 

95 type_suffix="access", 

96 values=lambda: { 

97 right: i18n.translate(f"server.modules.user.accessright.{right}", defaultText=right) 

98 for right in sorted(conf.user.access_rights) 

99 }, 

100 multiple=True, 

101 params={ 

102 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

103 } 

104 ) 

105 

106 status = SelectBone( 

107 descr="Account status", 

108 values=Status, 

109 defaultValue=Status.ACTIVE, 

110 required=True, 

111 ) 

112 

113 lastlogin = DateBone( 

114 descr="Last Login", 

115 readOnly=True, 

116 ) 

117 

118 admin_config = JsonBone( # This bone stores settings from the vi 

119 descr="Config for the User", 

120 visible=False 

121 ) 

122 

123 def __new__(cls): 

124 """ 

125 Constructor for the UserSkel-class, with the capability 

126 to dynamically add bones required for the configured 

127 authentication methods. 

128 """ 

129 for provider in conf.main_app.vi.user.authenticationProviders: 

130 assert issubclass(provider, UserPrimaryAuthentication) 

131 provider.patch_user_skel(cls) 

132 

133 for provider in conf.main_app.vi.user.secondFactorProviders: 

134 assert issubclass(provider, UserSecondFactorAuthentication) 

135 provider.patch_user_skel(cls) 

136 

137 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

138 return super().__new__(cls) 

139 

140 @classmethod 

141 def toDB(cls, skel, *args, **kwargs): 

142 # Roles 

143 if skel["roles"] and "custom" not in skel["roles"]: 

144 # Collect access rights through rules 

145 access = set() 

146 

147 for role in skel["roles"]: 

148 # Get default access for this role 

149 access |= conf.main_app.vi.user.get_role_defaults(role) 

150 

151 # Go through all modules and evaluate available role-settings 

152 for name in dir(conf.main_app.vi): 

153 if name.startswith("_"): 

154 continue 

155 

156 module = getattr(conf.main_app.vi, name) 

157 if not isinstance(module, Module): 

158 continue 

159 

160 roles = getattr(module, "roles", None) or {} 

161 rights = roles.get(role, roles.get("*", ())) 

162 

163 # Convert role into tuple if it's not 

164 if not isinstance(rights, (tuple, list)): 

165 rights = (rights, ) 

166 

167 if "*" in rights: 

168 for right in module.accessRights: 

169 access.add(f"{name}-{right}") 

170 else: 

171 for right in rights: 

172 if right in module.accessRights: 

173 access.add(f"{name}-{right}") 

174 

175 # special case: "edit" and "delete" actions require "view" as well! 

176 if right in ("edit", "delete") and "view" in module.accessRights: 

177 access.add(f"{name}-view") 

178 

179 skel["access"] = list(access) 

180 

181 return super().toDB(skel, *args, **kwargs) 

182 

183 

184class UserAuthentication(Module, abc.ABC): 

185 @property 

186 @abc.abstractstaticmethod 

187 def METHOD_NAME() -> str: 

188 """ 

189 Define a unique method name for this authentication. 

190 """ 

191 ... 

192 

193 def __init__(self, moduleName, modulePath, userModule): 

194 super().__init__(moduleName, modulePath) 

195 self._user_module = userModule 

196 

197 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

198 return True 

199 

200 @classmethod 

201 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

202 """ 

203 Allows for an UserAuthentication to patch the UserSkel 

204 class with additional bones which are required for 

205 the implemented authentication method. 

206 """ 

207 ... 

208 

209 

210class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

211 """Abstract class for all primary authentication methods.""" 

212 registrationEnabled = False 

213 

214 @abc.abstractmethod 

215 def login(self, *args, **kwargs): 

216 ... 

217 

218 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

219 """ 

220 Hook that is called whenever a part of the authentication was successful. 

221 It allows to perform further steps in custom authentications, 

222 e.g. change a password after first login. 

223 """ 

224 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

225 

226 

227class UserPassword(UserPrimaryAuthentication): 

228 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

229 

230 registrationEmailVerificationRequired = True 

231 registrationAdminVerificationRequired = True 

232 

233 verifySuccessTemplate = "user_verify_success" 

234 verifyEmailAddressMail = "user_verify_address" 

235 verifyFailedTemplate = "user_verify_failed" 

236 passwordRecoveryTemplate = "user_passwordrecover" 

237 passwordRecoveryMail = "user_password_recovery" 

238 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

239 passwordRecoveryStep1Template = "user_passwordrecover_step1" 

240 passwordRecoveryStep2Template = "user_passwordrecover_step2" 

241 passwordRecoveryStep3Template = "user_passwordrecover_step3" 

242 

243 # The default rate-limit for password recovery (10 tries each 15 minutes) 

244 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

245 

246 # Limit (invalid) login-retries to once per 5 seconds 

247 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

248 

249 @classmethod 

250 def patch_user_skel(cls, skel_cls): 

251 """ 

252 Modifies the UserSkel to be equipped by a PasswordBone. 

253 """ 

254 skel_cls.password = PasswordBone( 

255 readOnly=True, 

256 visible=False, 

257 params={ 

258 "category": "Authentication", 

259 } 

260 ) 

261 

262 class LoginSkel(skeleton.RelSkel): 

263 name = EmailBone( 

264 descr="E-Mail", 

265 required=True, 

266 caseSensitive=False, 

267 ) 

268 password = PasswordBone( 

269 required=True, 

270 test_threshold=0, 

271 ) 

272 

273 class LostPasswordStep1Skel(skeleton.RelSkel): 

274 name = EmailBone( 

275 descr="E-Mail", 

276 required=True, 

277 ) 

278 

279 class LostPasswordStep2Skel(skeleton.RelSkel): 

280 recovery_key = StringBone( 

281 descr="Recovery Key", 

282 required=True, 

283 params={ 

284 "tooltip": i18n.translate( 

285 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey", 

286 defaultText="Please enter the validation key you've received via e-mail.", 

287 hint="Shown when the user needs more than 15 minutes to paste the key", 

288 ), 

289 } 

290 ) 

291 

292 class LostPasswordStep3Skel(skeleton.RelSkel): 

293 # send the recovery key again, in case the password is rejected by some reason. 

294 recovery_key = StringBone( 

295 descr="Recovery Key", 

296 visible=False, 

297 readOnly=True, 

298 ) 

299 

300 password = PasswordBone( 

301 descr="New Password", 

302 required=True, 

303 params={ 

304 "tooltip": i18n.translate( 

305 key="viur.modules.user.userpassword.lostpasswordstep3.password", 

306 defaultText="Please enter a new password for your account.", 

307 ), 

308 } 

309 ) 

310 

311 @exposed 

312 @force_ssl 

313 @skey(allow_empty=True) 

314 def login(self, *, name: str | None = None, password: str | None = None, **kwargs): 

315 if current.user.get(): # User is already logged in, nothing to do. 

316 return self._user_module.render.loginSucceeded() 

317 

318 if not name or not password: 

319 return self._user_module.render.login(self.LoginSkel(), action="login") 

320 

321 self.loginRateLimit.assertQuotaIsAvailable() 

322 

323 # query for the username. The query might find another user, but the name is being checked for equality below 

324 name = name.lower().strip() 

325 user_skel = self._user_module.baseSkel() 

326 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

327 

328 # extract password hash from raw database entity (skeleton access blocks it) 

329 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

330 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

331 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

332 

333 # now check if the username matches 

334 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

335 

336 # next, check if the password hash matches 

337 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

338 

339 # next, check if the user account is active 

340 is_okay &= (user_skel["status"] or 0) >= Status.ACTIVE.value 

341 

342 if not is_okay: 

343 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

344 skel = self.LoginSkel() 

345 return self._user_module.render.login( 

346 skel, 

347 action="login", 

348 loginFailed=True, # FIXME: Is this still being used? 

349 accountStatus=user_skel["status"] # FIXME: Is this still being used? 

350 ) 

351 

352 # check if iterations are below current security standards, and update if necessary. 

353 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

354 logging.info(f"Update password hash for user {name}.") 

355 # re-hash the password with more iterations 

356 # FIXME: This must be done within a transaction! 

357 user_skel["password"] = password # will be hashed on serialize 

358 user_skel.toDB(update_relations=False) 

359 

360 return self.next_or_finish(user_skel) 

361 

362 @exposed 

363 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs): 

364 """ 

365 This implements a password recovery process which lets users set a new password for their account, 

366 after validating a recovery key sent by email. 

367 

368 The process is as following: 

369 

370 - The user enters his email adress 

371 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode 

372 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

373 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user 

374 account exists. 

375 - If the user received his email, he can click on the link and set a new password for his account. 

376 

377 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function 

378 to 10 actions per 15 minutes. (One complete recovery process consists of two calls). 

379 """ 

380 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

381 current_request = current.request.get() 

382 

383 if recovery_key is None: 

384 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

385 skel = self.LostPasswordStep1Skel() 

386 

387 if not current_request.isPostRequest or not skel.fromClient(kwargs): 

388 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template) 

389 

390 # validate security key 

391 if not securitykey.validate(skey): 

392 raise errors.PreconditionFailed() 

393 

394 self.passwordRecoveryRateLimit.decrementQuota() 

395 

396 recovery_key = securitykey.create( 

397 duration=datetime.timedelta(minutes=15), 

398 key_length=conf.security.password_recovery_key_length, 

399 user_name=skel["name"].lower(), 

400 session_bound=False, 

401 ) 

402 

403 # Send the code in background 

404 self.sendUserPasswordRecoveryCode( 

405 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

406 ) 

407 

408 # step 2 is only an action-skel, and can be ignored by a direct link in the 

409 # e-mail previously sent. It depends on the implementation of the specific project. 

410 return self._user_module.render.edit( 

411 self.LostPasswordStep2Skel(), 

412 tpl=self.passwordRecoveryStep2Template, 

413 ) 

414 

415 # in step 3 

416 skel = self.LostPasswordStep3Skel() 

417 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

418 

419 # check for any input; Render input-form again when incomplete. 

420 if not skel.fromClient(kwargs) or not current_request.isPostRequest: 

421 return self._user_module.render.edit( 

422 skel=skel, 

423 tpl=self.passwordRecoveryStep3Template, 

424 ) 

425 

426 # validate security key 

427 if not securitykey.validate(skey): 

428 raise errors.PreconditionFailed() 

429 

430 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

431 raise errors.PreconditionFailed( 

432 i18n.translate( 

433 key="viur.modules.user.passwordrecovery.keyexpired", 

434 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

435 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

436 ) 

437 ) 

438 

439 self.passwordRecoveryRateLimit.decrementQuota() 

440 

441 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

442 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

443 

444 if not user_skel: 

445 raise errors.NotFound( 

446 i18n.translate( 

447 key="viur.modules.user.passwordrecovery.usernotfound", 

448 defaultText="There is no account with this name", 

449 hint="We cant find an account with that name (Should never happen)" 

450 ) 

451 ) 

452 

453 if user_skel["status"] != Status.ACTIVE: # The account is locked or not yet validated. Abort the process. 

454 raise errors.NotFound( 

455 i18n.translate( 

456 key="viur.modules.user.passwordrecovery.accountlocked", 

457 defaultText="This account is currently locked. You cannot change it's password.", 

458 hint="Attempted password recovery on a locked account" 

459 ) 

460 ) 

461 

462 # Update the password, save the user, reset his session and show the success-template 

463 user_skel["password"] = skel["password"] 

464 user_skel.toDB(update_relations=False) 

465 

466 return self._user_module.render.view( 

467 None, 

468 tpl=self.passwordRecoverySuccessTemplate, 

469 ) 

470 

471 @tasks.CallDeferred 

472 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

473 """ 

474 Sends the given recovery code to the user given in userName. This function runs deferred 

475 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

476 code by email (assuming we have working email delivery), but this can be overridden to send it 

477 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

478 can be send to any given user in four hours. 

479 """ 

480 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

481 user_agent = user_agents.parse(user_agent) 

482 email.sendEMail( 

483 tpl=self.passwordRecoveryMail, 

484 skel=user_skel, 

485 dests=[user_name], 

486 recovery_key=recovery_key, 

487 user_agent={ 

488 "device": user_agent.get_device(), 

489 "os": user_agent.get_os(), 

490 "browser": user_agent.get_browser() 

491 } 

492 ) 

493 

494 @exposed 

495 @skey(forward_payload="data", session_bound=False) 

496 def verify(self, data): 

497 def transact(key): 

498 skel = self._user_module.editSkel() 

499 if not key or not skel.fromDB(key): 

500 return None 

501 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

502 if self.registrationAdminVerificationRequired else Status.ACTIVE 

503 

504 skel.toDB(update_relations=False) 

505 return skel 

506 

507 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))): 

508 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

509 

510 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

511 

512 def canAdd(self) -> bool: 

513 return self.registrationEnabled 

514 

515 def addSkel(self): 

516 """ 

517 Prepare the add-Skel for rendering. 

518 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

519 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

520 :return: viur.core.skeleton.Skeleton 

521 """ 

522 skel = self._user_module.addSkel() 

523 

524 if self.registrationEmailVerificationRequired: 

525 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

526 elif self.registrationAdminVerificationRequired: 

527 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

528 else: # No further verification required 

529 defaultStatusValue = Status.ACTIVE 

530 

531 skel.status.readOnly = True 

532 skel["status"] = defaultStatusValue 

533 

534 if "password" in skel: 

535 skel.password.required = True # The user will have to set a password 

536 

537 return skel 

538 

539 @force_ssl 

540 @exposed 

541 @skey(allow_empty=True) 

542 def add(self, *args, **kwargs): 

543 """ 

544 Allows guests to register a new account if self.registrationEnabled is set to true 

545 

546 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

547 

548 :returns: The rendered, added object of the entry, eventually with error hints. 

549 

550 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

551 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

552 """ 

553 if not self.canAdd(): 

554 raise errors.Unauthorized() 

555 skel = self.addSkel() 

556 if ( 

557 not kwargs # no data supplied 

558 or not current.request.get().isPostRequest # bail out if not using POST-method 

559 or not skel.fromClient(kwargs) # failure on reading into the bones 

560 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

561 ): 

562 # render the skeleton in the version it could as far as it could be read. 

563 return self._user_module.render.add(skel) 

564 self._user_module.onAdd(skel) 

565 skel.toDB() 

566 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

567 # The user will have to verify his email-address. Create a skey and send it to his address 

568 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

569 user_key=utils.normalizeKey(skel["key"]), 

570 name=skel["name"]) 

571 skel.skey = BaseBone(descr="Skey") 

572 skel["skey"] = skey 

573 email.sendEMail(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel) 

574 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

575 return self._user_module.render.addSuccess(skel) 

576 

577 

578class GoogleAccount(UserPrimaryAuthentication): 

579 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

580 

581 @classmethod 

582 def patch_user_skel(cls, skel_cls): 

583 """ 

584 Modifies the UserSkel to be equipped by a bones required by Google Auth 

585 """ 

586 skel_cls.uid = StringBone( 

587 descr="Google UserID", 

588 required=False, 

589 readOnly=True, 

590 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

591 params={ 

592 "category": "Authentication", 

593 } 

594 ) 

595 

596 skel_cls.sync = BooleanBone( 

597 descr="Sync user data with OAuth-based services", 

598 defaultValue=True, 

599 params={ 

600 "category": "Authentication", 

601 "tooltip": 

602 "If set, user data like firstname and lastname is automatically kept" 

603 "synchronous with the information stored at the OAuth service provider" 

604 "(e.g. Google Login)." 

605 } 

606 ) 

607 

608 @exposed 

609 @force_ssl 

610 @skey(allow_empty=True) 

611 def login(self, token: str | None = None, *args, **kwargs): 

612 # FIXME: Check if already logged in 

613 if not conf.user.google_client_id: 

614 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

615 

616 if not token: 

617 request = current.request.get() 

618 request.response.headers["Content-Type"] = "text/html" 

619 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

620 # We have to allow popups here 

621 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

622 

623 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

624 with open(file_path) as file: 

625 tpl_string = file.read() 

626 

627 # FIXME: Use Jinja2 for rendering? 

628 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

629 extendCsp({ 

630 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

631 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

632 }) 

633 return tpl_string 

634 

635 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

636 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

637 raise ValueError("Invalid issuer") 

638 

639 # Token looks valid :) 

640 uid = user_info["sub"] 

641 email = user_info["email"] 

642 

643 base_skel = self._user_module.baseSkel() 

644 update = False 

645 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

646 # We'll try again - checking if there's already an user with that email 

647 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

648 # Still no luck - it's a completely new user 

649 if not self.registrationEnabled: 

650 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

651 logging.debug(f"Google user is from allowed {domain} - adding account") 

652 else: 

653 logging.debug(f"Google user is from {domain} - denying registration") 

654 raise errors.Forbidden("Registration for new users is disabled") 

655 

656 user_skel = base_skel 

657 user_skel["uid"] = uid 

658 user_skel["name"] = email 

659 update = True 

660 

661 # Take user information from Google, if wanted! 

662 if user_skel["sync"]: 

663 for target, source in { 

664 "name": email, 

665 "firstname": user_info.get("given_name"), 

666 "lastname": user_info.get("family_name"), 

667 }.items(): 

668 

669 if user_skel[target] != source: 

670 user_skel[target] = source 

671 update = True 

672 

673 if update: 

674 assert user_skel.toDB() 

675 

676 return self.next_or_finish(user_skel) 

677 

678 

679class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

680 """Abstract class for all second factors.""" 

681 MAX_RETRY = 3 

682 second_factor_login_template = "user_login_secondfactor" 

683 """Template to enter the TOPT on login""" 

684 

685 @property 

686 @abc.abstractmethod 

687 def NAME(self) -> str: 

688 """Name for this factor for templates.""" 

689 ... 

690 

691 @property 

692 @abc.abstractmethod 

693 def ACTION_NAME(self) -> str: 

694 """The action name for this factor, used as path-segment.""" 

695 ... 

696 

697 def __init__(self, moduleName, modulePath, _user_module): 

698 super().__init__(moduleName, modulePath, _user_module) 

699 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

700 self.add_url = f"{self.modulePath}/add" 

701 self.start_url = f"{self.modulePath}/start" 

702 

703 

704class TimeBasedOTP(UserSecondFactorAuthentication): 

705 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

706 WINDOW_SIZE = 5 

707 ACTION_NAME = "otp" 

708 NAME = "Time based Otp" 

709 second_factor_login_template = "user_login_secondfactor" 

710 

711 @dataclasses.dataclass 

712 class OtpConfig: 

713 """ 

714 This dataclass is used to provide an interface for a OTP token 

715 algorithm description that is passed within the TimeBasedOTP 

716 class for configuration. 

717 """ 

718 secret: str 

719 timedrift: float = 0.0 

720 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

721 interval: int = 60 

722 

723 class OtpSkel(skeleton.RelSkel): 

724 """ 

725 This is the Skeleton used to ask for the OTP token. 

726 """ 

727 otptoken = NumericBone( 

728 descr="Token", 

729 required=True, 

730 max=999999, 

731 min=0, 

732 ) 

733 

734 @classmethod 

735 def patch_user_skel(cls, skel_cls): 

736 """ 

737 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

738 """ 

739 # One-Time Password Verification 

740 skel_cls.otp_serial = StringBone( 

741 descr="OTP serial", 

742 searchable=True, 

743 params={ 

744 "category": "Second Factor Authentication", 

745 } 

746 ) 

747 

748 skel_cls.otp_secret = CredentialBone( 

749 descr="OTP secret", 

750 params={ 

751 "category": "Second Factor Authentication", 

752 } 

753 ) 

754 

755 skel_cls.otp_timedrift = NumericBone( 

756 descr="OTP time drift", 

757 readOnly=True, 

758 defaultValue=0, 

759 params={ 

760 "category": "Second Factor Authentication", 

761 } 

762 ) 

763 

764 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

765 """ 

766 Returns an instance of self.OtpConfig with a provided token configuration, 

767 or None when there is no appropriate configuration of this second factor handler available. 

768 """ 

769 

770 if otp_secret := skel.dbEntity.get("otp_secret"): 

771 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

772 

773 return None 

774 

775 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

776 """ 

777 Specified whether the second factor authentication can be handled by the given user or not. 

778 """ 

779 return bool(self.get_config(skel)) 

780 

781 @exposed 

782 def start(self): 

783 """ 

784 Configures OTP login for the current session. 

785 

786 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

787 """ 

788 session = current.session.get() 

789 

790 if not (user_key := session.get("possible_user_key")): 

791 raise errors.PreconditionFailed( 

792 "Second factor can only be triggered after successful primary authentication." 

793 ) 

794 

795 user_skel = self._user_module.baseSkel() 

796 if not user_skel.fromDB(user_key): 

797 raise errors.NotFound("The previously authenticated user is gone.") 

798 

799 if not (otp_user_conf := self.get_config(user_skel)): 

800 raise errors.PreconditionFailed("This second factor is not available for the user") 

801 

802 otp_user_conf = { 

803 "key": str(user_key), 

804 } | dataclasses.asdict(otp_user_conf) 

805 

806 session = current.session.get() 

807 session["_otp_user"] = otp_user_conf 

808 session.markChanged() 

809 

810 return self._user_module.render.edit( 

811 self.OtpSkel(), 

812 params={ 

813 "name": i18n.translate(self.NAME), 

814 "action_name": self.ACTION_NAME, 

815 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

816 }, 

817 tpl=self.second_factor_login_template 

818 ) 

819 

820 @exposed 

821 @force_ssl 

822 @skey(allow_empty=True) 

823 def otp(self, *args, **kwargs): 

824 """ 

825 Performs the second factor validation and interaction with the client. 

826 """ 

827 session = current.session.get() 

828 if not (otp_user_conf := session.get("_otp_user")): 

829 raise errors.PreconditionFailed("No OTP process started in this session") 

830 

831 # Check if maximum second factor verification attempts 

832 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

833 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

834 

835 # Read the OTP token via the skeleton, to obtain a valid value 

836 skel = self.OtpSkel() 

837 if skel.fromClient(kwargs): 

838 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

839 res = self.verify( 

840 otp=skel["otptoken"], 

841 secret=otp_user_conf["secret"], 

842 algorithm=otp_user_conf.get("algorithm") or "sha1", 

843 interval=otp_user_conf.get("interval") or 60, 

844 timedrift=otp_user_conf.get("timedrift") or 0.0, 

845 valid_window=self.WINDOW_SIZE 

846 ) 

847 else: 

848 res = None 

849 

850 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

851 if res is None: 

852 otp_user_conf["attempts"] = attempts + 1 

853 session.markChanged() 

854 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

855 return self._user_module.render.edit( 

856 skel, 

857 name=i18n.translate(self.NAME), 

858 action_name=self.ACTION_NAME, 

859 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

860 tpl=self.second_factor_login_template 

861 ) 

862 

863 # Remove otp user config from session 

864 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

865 del session["_otp_user"] 

866 session.markChanged() 

867 

868 # Check if the OTP device has a time drift 

869 

870 timedriftchange = float(res) - otp_user_conf["timedrift"] 

871 if abs(timedriftchange) > 2: 

872 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

873 # update clock-drift value accordingly 

874 self.updateTimeDrift(user_key, timedriftchange) 

875 

876 # Continue with authentication 

877 return self._user_module.secondFactorSucceeded(self, user_key) 

878 

879 @staticmethod 

880 def verify( 

881 otp: str | int, 

882 secret: str, 

883 algorithm: str = "sha1", 

884 interval: int = 60, 

885 timedrift: float = 0.0, 

886 for_time: datetime.datetime | None = None, 

887 valid_window: int = 0, 

888 ) -> int | None: 

889 """ 

890 Verifies the OTP passed in against the current time OTP. 

891 

892 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

893 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

894 on the hardware token generator. This can be used to store the time drift of a given token generator. 

895 

896 :param otp: the OTP token to check against 

897 :param secret: The OTP secret 

898 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

899 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In 

900 pyotp, default is 30! 

901 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

902 :param for_time: Time to check OTP at (defaults to now) 

903 :param valid_window: extends the validity to this many counter ticks before and after the current one 

904 :returns: The index where verification succeeded, None otherwise 

905 """ 

906 # get the hashing digest 

907 digest = { 

908 "sha1": hashlib.sha1, 

909 "sha256": hashlib.sha256, 

910 }.get(algorithm) 

911 

912 if not digest: 

913 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

914 

915 if for_time is None: 

916 for_time = datetime.datetime.now() 

917 

918 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

919 timedrift = round(timedrift) 

920 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

921 otp = str(otp).zfill(6) # fill with zeros in front 

922 

923 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

924 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

925 

926 if valid_window: 

927 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

928 token = str(totp.at(for_time, offset)) 

929 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

930 if hmac.compare_digest(otp, token): 

931 return offset 

932 

933 return None 

934 

935 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

936 

937 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

938 """ 

939 Updates the clock-drift value. 

940 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

941 it out of bounds. Maximum change per call is 0.3 minutes. 

942 :param user_key: For which user should the update occour 

943 :param idx: How many steps before/behind was that token 

944 :return: 

945 """ 

946 

947 # FIXME: The callback in viur-core must be improved, to accept user_skel 

948 

949 def transaction(user_key, idx): 

950 user = db.Get(user_key) 

951 if not isinstance(user.get("otp_timedrift"), float): 

952 user["otp_timedrift"] = 0.0 

953 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3) 

954 db.Put(user) 

955 

956 db.RunInTransaction(transaction, user_key, idx) 

957 

958 

959class AuthenticatorOTP(UserSecondFactorAuthentication): 

960 """ 

961 This class handles the second factor for apps like authy and so on 

962 """ 

963 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

964 

965 second_factor_add_template = "user_secondfactor_add" 

966 """Template to configure (add) a new TOPT""" 

967 

968 ACTION_NAME = "authenticator_otp" 

969 """Action name provided for *otp_template* on login""" 

970 

971 NAME = "Authenticator App" 

972 

973 @exposed 

974 @force_ssl 

975 @skey(allow_empty=True) 

976 def add(self, otp=None): 

977 """ 

978 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store 

979 it in the session. 

980 

981 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

982 the otp_app_secret from the session in the user entry. 

983 """ 

984 current_session = current.session.get() 

985 

986 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

987 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

988 current_session["_maybe_otp_app_secret"] = otp_app_secret 

989 current_session.markChanged() 

990 

991 if otp is None: 

992 return self._user_module.render.second_factor_add( 

993 tpl=self.second_factor_add_template, 

994 action_name=self.ACTION_NAME, 

995 name=i18n.translate(self.NAME), 

996 add_url=self.add_url, 

997 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

998 else: 

999 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

1000 return self._user_module.render.second_factor_add( 

1001 tpl=self.second_factor_add_template, 

1002 action_name=self.ACTION_NAME, 

1003 name=i18n.translate(self.NAME), 

1004 add_url=self.add_url, 

1005 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1006 

1007 # Now we can set the otp_app_secret to the current User and render der Success-template 

1008 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1009 return self._user_module.render.second_factor_add_success( 

1010 action_name=self.ACTION_NAME, 

1011 name=i18n.translate(self.NAME), 

1012 ) 

1013 

1014 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1015 """ 

1016 We can only handle the second factor if we have stored an otp_app_secret before. 

1017 """ 

1018 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1019 

1020 @classmethod 

1021 def patch_user_skel(cls, skel_cls): 

1022 """ 

1023 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1024 """ 

1025 # Authenticator OTP Apps (like Authy) 

1026 skel_cls.otp_app_secret = CredentialBone( 

1027 descr="OTP Secret (App-Key)", 

1028 params={ 

1029 "category": "Second Factor Authentication", 

1030 } 

1031 ) 

1032 

1033 @classmethod 

1034 def set_otp_app_secret(cls, otp_app_secret=None): 

1035 """ 

1036 Write a new OTP Token in the current user entry. 

1037 """ 

1038 if otp_app_secret is None: 

1039 logging.error("No 'otp_app_secret' is provided") 

1040 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1041 if not (cuser := current.user.get()): 

1042 raise errors.Unauthorized() 

1043 

1044 def transaction(user_key): 

1045 if not (user := db.Get(user_key)): 

1046 raise errors.NotFound() 

1047 user["otp_app_secret"] = otp_app_secret 

1048 db.Put(user) 

1049 

1050 db.RunInTransaction(transaction, cuser["key"]) 

1051 

1052 @classmethod 

1053 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1054 """ 

1055 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1056 """ 

1057 if not (cuser := current.user.get()): 

1058 raise errors.Unauthorized() 

1059 if not (issuer := conf.user.otp_issuer): 

1060 logging.warning( 

1061 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1062 issuer = conf.instance.project_id 

1063 

1064 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1065 

1066 @classmethod 

1067 def generate_otp_app_secret(cls) -> str: 

1068 """ 

1069 Generate a new OTP Secret 

1070 :return an otp 

1071 """ 

1072 return pyotp.random_base32() 

1073 

1074 @classmethod 

1075 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1076 return pyotp.TOTP(secret).verify(otp) 

1077 

1078 @exposed 

1079 def start(self): 

1080 otp_user_conf = {"attempts": 0} 

1081 session = current.session.get() 

1082 session["_otp_user"] = otp_user_conf 

1083 session.markChanged() 

1084 return self._user_module.render.edit( 

1085 TimeBasedOTP.OtpSkel(), 

1086 params={ 

1087 "name": i18n.translate(self.NAME), 

1088 "action_name": self.ACTION_NAME, 

1089 "action_url": self.action_url, 

1090 }, 

1091 tpl=self.second_factor_login_template, 

1092 ) 

1093 

1094 @exposed 

1095 @force_ssl 

1096 @skey 

1097 def authenticator_otp(self, **kwargs): 

1098 """ 

1099 We verify the otp here with the secret we stored before. 

1100 """ 

1101 session = current.session.get() 

1102 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1103 

1104 if not (otp_user_conf := session.get("_otp_user")): 

1105 raise errors.PreconditionFailed("No OTP process started in this session") 

1106 

1107 # Check if maximum second factor verification attempts 

1108 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1109 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1110 

1111 if not (user := db.Get(user_key)): 

1112 raise errors.NotFound() 

1113 

1114 skel = TimeBasedOTP.OtpSkel() 

1115 if not skel.fromClient(kwargs): 

1116 raise errors.PreconditionFailed() 

1117 otp_token = str(skel["otptoken"]).zfill(6) 

1118 

1119 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1120 return self._user_module.secondFactorSucceeded(self, user_key) 

1121 otp_user_conf["attempts"] = attempts + 1 

1122 session.markChanged() 

1123 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1124 return self._user_module.render.edit( 

1125 skel, 

1126 name=i18n.translate(self.NAME), 

1127 action_name=self.ACTION_NAME, 

1128 action_url=self.action_url, 

1129 tpl=self.second_factor_login_template, 

1130 ) 

1131 

1132 

1133class User(List): 

1134 kindName = "user" 

1135 addTemplate = "user_add" 

1136 addSuccessTemplate = "user_add_success" 

1137 lostPasswordTemplate = "user_lostpassword" 

1138 verifyEmailAddressMail = "user_verify_address" 

1139 passwordRecoveryMail = "user_password_recovery" 

1140 

1141 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1142 None, ( 

1143 UserPassword, 

1144 conf.user.google_client_id and GoogleAccount, 

1145 ) 

1146 )) 

1147 """ 

1148 Specifies primary authentication providers that are made available 

1149 as sub-modules under `user/auth_<classname>`. They might require 

1150 customization or configuration. 

1151 """ 

1152 

1153 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1154 TimeBasedOTP, 

1155 AuthenticatorOTP, 

1156 ) 

1157 """ 

1158 Specifies secondary authentication providers that are made available 

1159 as sub-modules under `user/f2_<classname>`. They might require 

1160 customization or configuration, which is determined during the 

1161 login-process depending on the user that wants to login. 

1162 """ 

1163 

1164 validAuthenticationMethods = tuple(filter( 

1165 None, ( 

1166 (UserPassword, AuthenticatorOTP), 

1167 (UserPassword, TimeBasedOTP), 

1168 (UserPassword, None), 

1169 (GoogleAccount, None) if conf.user.google_client_id else None, 

1170 ) 

1171 )) 

1172 """ 

1173 Specifies the possible combinations of primary- and secondary factor 

1174 login methos. 

1175 

1176 GoogleLogin defaults to no second factor, as the Google Account can be 

1177 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1178 handled when there is a user-dependent configuration available. 

1179 """ 

1180 

1181 msg_missing_second_factor = "Second factor required but not configured for this user." 

1182 

1183 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1184 

1185 default_order = "name.idx" 

1186 

1187 adminInfo = { 

1188 "icon": "person-fill", 

1189 "actions": [ 

1190 "trigger_kick", 

1191 "trigger_takeover", 

1192 ], 

1193 "customActions": { 

1194 "trigger_kick": { 

1195 "name": i18n.translate( 

1196 key="viur.modules.user.customActions.kick", 

1197 defaultText="Kick user", 

1198 hint="Title of the kick user function" 

1199 ), 

1200 "icon": "trash2-fill", 

1201 "access": ["root"], 

1202 "action": "fetch", 

1203 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}", 

1204 "confirm": i18n.translate( 

1205 key="viur.modules.user.customActions.kick.confirm", 

1206 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1207 ), 

1208 "success": i18n.translate( 

1209 key="viur.modules.user.customActions.kick.success", 

1210 defaultText="Sessions of the user are being invalidated.", 

1211 ), 

1212 }, 

1213 "trigger_takeover": { 

1214 "name": i18n.translate( 

1215 key="viur.modules.user.customActions.takeover", 

1216 defaultText="Take-over user", 

1217 hint="Title of the take user over function" 

1218 ), 

1219 "icon": "file-person-fill", 

1220 "access": ["root"], 

1221 "action": "fetch", 

1222 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}", 

1223 "confirm": i18n.translate( 

1224 key="viur.modules.user.customActions.takeover.confirm", 

1225 defaultText="Do you really want to replace your current user session by a " 

1226 "user session of the selected user?", 

1227 ), 

1228 "success": i18n.translate( 

1229 key="viur.modules.user.customActions.takeover.success", 

1230 defaultText="You're now know as the selected user!", 

1231 ), 

1232 "then": "reload-vi", 

1233 }, 

1234 }, 

1235 } 

1236 

1237 roles = { 

1238 "admin": "*", 

1239 } 

1240 

1241 def __init__(self, moduleName, modulePath): 

1242 for provider in self.authenticationProviders: 

1243 assert issubclass(provider, UserPrimaryAuthentication) 

1244 name = f"auth_{provider.__name__.lower()}" 

1245 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1246 

1247 for provider in self.secondFactorProviders: 

1248 assert issubclass(provider, UserSecondFactorAuthentication) 

1249 name = f"f2_{provider.__name__.lower()}" 

1250 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1251 

1252 super().__init__(moduleName, modulePath) 

1253 

1254 def get_role_defaults(self, role: str) -> set[str]: 

1255 """ 

1256 Returns a set of default access rights for a given role. 

1257 """ 

1258 if role in ("viewer", "editor", "admin"): 

1259 return {"admin"} 

1260 

1261 return set() 

1262 

1263 def addSkel(self): 

1264 skel = super().addSkel().clone() 

1265 user = current.user.get() 

1266 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])): 

1267 skel.status.readOnly = True 

1268 skel["status"] = Status.UNSET 

1269 skel.status.visible = False 

1270 skel.access.readOnly = True 

1271 skel["access"] = [] 

1272 skel.access.visible = False 

1273 else: 

1274 # An admin tries to add a new user. 

1275 skel.status.readOnly = False 

1276 skel.status.visible = True 

1277 skel.access.readOnly = False 

1278 skel.access.visible = True 

1279 

1280 if "password" in skel: 

1281 # Unlock and require a password 

1282 skel.password.required = True 

1283 skel.password.visible = True 

1284 skel.password.readOnly = False 

1285 

1286 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1287 return skel 

1288 

1289 def editSkel(self, *args, **kwargs): 

1290 skel = super().editSkel().clone() 

1291 

1292 if "password" in skel: 

1293 skel.password.required = False 

1294 skel.password.visible = True 

1295 skel.password.readOnly = False 

1296 

1297 user = current.user.get() 

1298 

1299 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only 

1300 skel.name.readOnly = lockFields 

1301 skel.access.readOnly = lockFields 

1302 skel.status.readOnly = lockFields 

1303 

1304 return skel 

1305 

1306 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1307 return getattr(self, f"f2_{cls.__name__.lower()}") 

1308 

1309 def getCurrentUser(self): 

1310 session = current.session.get() 

1311 

1312 if session and (user := session.get("user")): 

1313 skel = self.baseSkel() 

1314 skel.setEntity(user) 

1315 return skel 

1316 

1317 return None 

1318 

1319 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1320 """ 

1321 Continue authentication flow when primary authentication succeeded. 

1322 """ 

1323 skel = self.baseSkel() 

1324 

1325 if not skel.fromDB(user_key): 

1326 raise errors.NotFound("User was not found.") 

1327 

1328 if not provider.can_handle(skel): 

1329 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1330 

1331 session = current.session.get() 

1332 session["possible_user_key"] = user_key.id_or_name 

1333 session["_secondFactorStart"] = utils.utcNow() 

1334 session.markChanged() 

1335 

1336 second_factor_providers = [] 

1337 

1338 for auth_provider, second_factor in self.validAuthenticationMethods: 

1339 if isinstance(provider, auth_provider): 

1340 if second_factor is not None: 

1341 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1342 if second_factor_provider_instance.can_handle(skel): 

1343 second_factor_providers.append(second_factor_provider_instance) 

1344 else: 

1345 second_factor_providers.append(None) 

1346 

1347 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1348 # We have a second factor. So we can get rid of the None 

1349 second_factor_providers.pop(second_factor_providers.index(None)) 

1350 

1351 if len(second_factor_providers) == 0: 

1352 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1353 elif len(second_factor_providers) == 1: 

1354 if second_factor_providers[0] is None: 

1355 # We allow sign-in without a second factor 

1356 return self.authenticateUser(user_key) 

1357 # We have only one second factor we don't need the choice template 

1358 return second_factor_providers[0].start(user_key) 

1359 

1360 # In case there is more than one second factor, let the user select a method. 

1361 return self.render.second_factor_choice(second_factors=second_factor_providers) 

1362 

1363 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1364 """ 

1365 Continue authentication flow when secondary authentication succeeded. 

1366 """ 

1367 session = current.session.get() 

1368 if session["possible_user_key"] != user_key.id_or_name: 

1369 raise errors.Forbidden() 

1370 

1371 # Assert that the second factor verification finished in time 

1372 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1373 raise errors.RequestTimeout() 

1374 

1375 return self.authenticateUser(user_key) 

1376 

1377 def authenticateUser(self, key: db.Key, **kwargs): 

1378 """ 

1379 Performs Log-In for the current session and the given user key. 

1380 

1381 This resets the current session: All fields not explicitly marked as persistent 

1382 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1383 

1384 :param key: The (DB-)Key of the user we shall authenticate 

1385 """ 

1386 skel = self.baseSkel() 

1387 if not skel.fromDB(key): 

1388 raise ValueError(f"Unable to authenticate unknown user {key}") 

1389 

1390 # Verify that this user account is active 

1391 if skel["status"] < Status.ACTIVE.value: 

1392 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1393 

1394 # Update session for user 

1395 session = current.session.get() 

1396 # Remember persistent fields... 

1397 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1398 session.reset() 

1399 # and copy them over to the new session 

1400 session |= take_over 

1401 

1402 # Update session, user and request 

1403 session["user"] = skel.dbEntity 

1404 

1405 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1406 current.user.set(self.getCurrentUser()) 

1407 

1408 self.onLogin(skel) 

1409 

1410 return self.render.loginSucceeded(**kwargs) 

1411 

1412 @exposed 

1413 @skey 

1414 def logout(self, *args, **kwargs): 

1415 """ 

1416 Implements the logout action. It also terminates the current session (all keys not listed 

1417 in viur.session_persistent_fields_on_logout will be lost). 

1418 """ 

1419 if not (user := current.user.get()): 

1420 raise errors.Unauthorized() 

1421 

1422 self.onLogout(user) 

1423 

1424 session = current.session.get() 

1425 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout} 

1426 session.reset() 

1427 session |= take_over 

1428 current.user.set(None) # set user to none in context var 

1429 return self.render.logoutSuccess() 

1430 

1431 @exposed 

1432 def login(self, *args, **kwargs): 

1433 return self.render.loginChoices([ 

1434 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1435 for primary, secondary in self.validAuthenticationMethods 

1436 ]) 

1437 

1438 def onLogin(self, skel: skeleton.SkeletonInstance): 

1439 """ 

1440 Hook to be called on user login. 

1441 """ 

1442 # Update the lastlogin timestamp (if available!) 

1443 if "lastlogin" in skel: 

1444 now = utils.utcNow() 

1445 

1446 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1447 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1448 skel["lastlogin"] = now 

1449 skel.toDB(update_relations=False) 

1450 

1451 logging.info(f"""User {skel["name"]} logged in""") 

1452 

1453 def onLogout(self, skel: skeleton.SkeletonInstance): 

1454 """ 

1455 Hook to be called on user logout. 

1456 """ 

1457 logging.info(f"""User {skel["name"]} logged out""") 

1458 

1459 @exposed 

1460 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1461 """ 

1462 Allow a special key "self" to reference the current user. 

1463 

1464 By default, any authenticated user can view its own user entry, 

1465 to obtain access rights and any specific user information. 

1466 This behavior is defined in the customized `canView` function, 

1467 which is overwritten by the User-module. 

1468 

1469 The rendered skeleton can be modified or restriced by specifying 

1470 a customized view-skeleton. 

1471 """ 

1472 if key == "self": 

1473 if user := current.user.get(): 

1474 key = user["key"] 

1475 else: 

1476 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1477 

1478 return super().view(key, *args, **kwargs) 

1479 

1480 def canView(self, skel) -> bool: 

1481 if user := current.user.get(): 

1482 if skel["key"] == user["key"]: 

1483 return True 

1484 

1485 if "root" in user["access"] or "user-view" in user["access"]: 

1486 return True 

1487 

1488 return False 

1489 

1490 @exposed 

1491 @skey(allow_empty=True) 

1492 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1493 """ 

1494 Allow a special key "self" to reference the current user. 

1495 

1496 This modification will only allow to use "self" as a key; 

1497 The specific access right to let the user edit itself must 

1498 still be customized. 

1499 

1500 The rendered and editable skeleton can be modified or restriced 

1501 by specifying a customized edit-skeleton. 

1502 """ 

1503 if key == "self": 

1504 if user := current.user.get(): 

1505 key = user["key"] 

1506 else: 

1507 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1508 

1509 return super().edit(key, *args, **kwargs) 

1510 

1511 @exposed 

1512 def getAuthMethods(self, *args, **kwargs): 

1513 """Inform tools like Viur-Admin which authentication to use""" 

1514 # FIXME: This is almost the same code as in index()... 

1515 # FIXME: VIUR4: The entire function should be removed! 

1516 # TODO: Align result with index(), so that primary and secondary login is presented. 

1517 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!") 

1518 

1519 res = [ 

1520 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1521 for primary, secondary in self.validAuthenticationMethods 

1522 ] 

1523 

1524 return json.dumps(res) 

1525 

1526 @exposed 

1527 @skey 

1528 def trigger(self, action: str, key: str): 

1529 current.request.get().response.headers["Content-Type"] = "application/json" 

1530 

1531 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1532 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", ) 

1533 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)): 

1534 raise errors.Unauthorized() 

1535 

1536 skel = self.baseSkel() 

1537 if not skel.fromDB(key): 

1538 raise errors.NotFound() 

1539 

1540 match action: 

1541 case "takeover": 

1542 self.authenticateUser(skel["key"]) 

1543 

1544 case "kick": 

1545 session.killSessionByUser(skel["key"]) 

1546 

1547 case _: 

1548 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1549 

1550 return json.dumps("OKAY") 

1551 

1552 def onEdited(self, skel): 

1553 super().onEdited(skel) 

1554 # In case the user is set to inactive, kill all sessions 

1555 if "status" in skel and skel["status"] < Status.ACTIVE.value: 

1556 session.killSessionByUser(skel["key"]) 

1557 

1558 def onDeleted(self, skel): 

1559 super().onDeleted(skel) 

1560 # Invalidate all sessions of that user 

1561 session.killSessionByUser(skel["key"]) 

1562 

1563 

1564@tasks.StartupTask 

1565def createNewUserIfNotExists(): 

1566 """ 

1567 Create a new Admin user, if the userDB is empty 

1568 """ 

1569 if ( 

1570 (user_module := getattr(conf.main_app.vi, "user", None)) 

1571 and isinstance(user_module, User) 

1572 and "addSkel" in dir(user_module) 

1573 and "validAuthenticationMethods" in dir(user_module) 

1574 # UserPassword must be one of the primary login methods 

1575 and any( 

1576 issubclass(provider[0], UserPassword) 

1577 for provider in user_module.validAuthenticationMethods 

1578 ) 

1579 ): 

1580 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1581 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1582 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1583 pw = utils.string.random(13) 

1584 addSkel["name"] = uname 

1585 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1586 addSkel["access"] = ["root"] 

1587 addSkel["password"] = pw 

1588 

1589 try: 

1590 addSkel.toDB() 

1591 except Exception as e: 

1592 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1593 logging.exception(e) 

1594 return 

1595 

1596 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1597 

1598 logging.warning(msg) 

1599 email.sendEMailToAdmins("New ViUR password", msg) 

1600 

1601 

1602# DEPRECATED ATTRIBUTES HANDLING 

1603 

1604def __getattr__(attr): 

1605 match attr: 

1606 case "userSkel": 

1607 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1608 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1609 logging.warning(msg) 

1610 return UserSkel 

1611 

1612 return super(__import__(__name__).__class__).__getattr__(attr)