Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/modules/user.py: 0%

705 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-03 13:41 +0000

1import abc 

2import datetime 

3import enum 

4import functools 

5import hashlib 

6import hmac 

7import json 

8import logging 

9import secrets 

10import warnings 

11import user_agents 

12 

13import pyotp 

14import base64 

15import dataclasses 

16import typing as t 

17from google.auth.transport import requests 

18from google.oauth2 import id_token 

19 

20from viur.core import ( 

21 conf, current, db, email, errors, i18n, 

22 securitykey, session, skeleton, tasks, utils, Module 

23) 

24from viur.core.decorators import * 

25from viur.core.bones import * 

26from viur.core.bones.password import PBKDF2_DEFAULT_ITERATIONS, encode_password 

27from viur.core.prototypes.list import List 

28from viur.core.ratelimit import RateLimit 

29from viur.core.securityheaders import extendCsp 

30 

31 

32@functools.total_ordering 

33class Status(enum.Enum): 

34 """Status enum for a user 

35 

36 Has backwards compatibility to be comparable with non-enum values. 

37 Will be removed with viur-core 4.0.0 

38 """ 

39 

40 UNSET = 0 # Status is unset 

41 WAITING_FOR_EMAIL_VERIFICATION = 1 # Waiting for email verification 

42 WAITING_FOR_ADMIN_VERIFICATION = 2 # Waiting for verification through admin 

43 DISABLED = 5 # Account disabled 

44 ACTIVE = 10 # Active 

45 

46 def __eq__(self, other): 

47 if isinstance(other, Status): 

48 return super().__eq__(other) 

49 return self.value == other 

50 

51 def __lt__(self, other): 

52 if isinstance(other, Status): 

53 return super().__lt__(other) 

54 return self.value < other 

55 

56 

57class UserSkel(skeleton.Skeleton): 

58 kindName = "user" # this assignment is required, as this Skeleton is defined in viur-core (see #604) 

59 

60 name = EmailBone( 

61 descr="E-Mail", 

62 required=True, 

63 readOnly=True, 

64 caseSensitive=False, 

65 searchable=True, 

66 unique=UniqueValue(UniqueLockMethod.SameValue, True, "Username already taken"), 

67 ) 

68 

69 firstname = StringBone( 

70 descr="Firstname", 

71 searchable=True, 

72 ) 

73 

74 lastname = StringBone( 

75 descr="Lastname", 

76 searchable=True, 

77 ) 

78 

79 roles = SelectBone( 

80 descr=i18n.translate("viur.user.bone.roles", defaultText="Roles"), 

81 values=conf.user.roles, 

82 required=True, 

83 multiple=True, 

84 # fixme: This is generally broken in VIUR! See #776 for details. 

85 # vfunc=lambda values: 

86 # i18n.translate( 

87 # "user.bone.roles.invalid", 

88 # defaultText="Invalid role setting: 'custom' can only be set alone.") 

89 # if "custom" in values and len(values) > 1 else None, 

90 defaultValue=list(conf.user.roles.keys())[:1], 

91 ) 

92 

93 access = SelectBone( 

94 descr=i18n.translate("viur.user.bone.access", defaultText="Access rights"), 

95 values=lambda: { 

96 right: i18n.translate(f"server.modules.user.accessright.{right}", defaultText=right) 

97 for right in sorted(conf.user.access_rights) 

98 }, 

99 multiple=True, 

100 params={ 

101 "readonlyIf": "'custom' not in roles" # if "custom" is not in roles, "access" is managed by the role system 

102 } 

103 ) 

104 

105 status = SelectBone( 

106 descr="Account status", 

107 values=Status, 

108 defaultValue=Status.ACTIVE, 

109 required=True, 

110 ) 

111 

112 lastlogin = DateBone( 

113 descr="Last Login", 

114 readOnly=True, 

115 ) 

116 

117 admin_config = JsonBone( # This bone stores settings from the vi 

118 descr="Config for the User", 

119 visible=False 

120 ) 

121 

122 def __new__(cls): 

123 """ 

124 Constructor for the UserSkel-class, with the capability 

125 to dynamically add bones required for the configured 

126 authentication methods. 

127 """ 

128 for provider in conf.main_app.vi.user.authenticationProviders: 

129 assert issubclass(provider, UserPrimaryAuthentication) 

130 provider.patch_user_skel(cls) 

131 

132 for provider in conf.main_app.vi.user.secondFactorProviders: 

133 assert issubclass(provider, UserSecondFactorAuthentication) 

134 provider.patch_user_skel(cls) 

135 

136 cls.__boneMap__ = skeleton.MetaBaseSkel.generate_bonemap(cls) 

137 return super().__new__(cls) 

138 

139 @classmethod 

140 def toDB(cls, skel, *args, **kwargs): 

141 # Roles 

142 if skel["roles"] and "custom" not in skel["roles"]: 

143 # Collect access rights through rules 

144 access = set() 

145 

146 for role in skel["roles"]: 

147 # Get default access for this role 

148 access |= conf.main_app.vi.user.get_role_defaults(role) 

149 

150 # Go through all modules and evaluate available role-settings 

151 for name in dir(conf.main_app.vi): 

152 if name.startswith("_"): 

153 continue 

154 

155 module = getattr(conf.main_app.vi, name) 

156 if not isinstance(module, Module): 

157 continue 

158 

159 roles = getattr(module, "roles", None) or {} 

160 rights = roles.get(role, roles.get("*", ())) 

161 

162 # Convert role into tuple if it's not 

163 if not isinstance(rights, (tuple, list)): 

164 rights = (rights, ) 

165 

166 if "*" in rights: 

167 for right in module.accessRights: 

168 access.add(f"{name}-{right}") 

169 else: 

170 for right in rights: 

171 if right in module.accessRights: 

172 access.add(f"{name}-{right}") 

173 

174 # special case: "edit" and "delete" actions require "view" as well! 

175 if right in ("edit", "delete") and "view" in module.accessRights: 

176 access.add(f"{name}-view") 

177 

178 skel["access"] = list(access) 

179 

180 return super().toDB(skel, *args, **kwargs) 

181 

182 

183class UserAuthentication(Module, abc.ABC): 

184 @property 

185 @abc.abstractstaticmethod 

186 def METHOD_NAME() -> str: 

187 """ 

188 Define a unique method name for this authentication. 

189 """ 

190 ... 

191 

192 def __init__(self, moduleName, modulePath, userModule): 

193 super().__init__(moduleName, modulePath) 

194 self._user_module = userModule 

195 

196 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

197 return True 

198 

199 @classmethod 

200 def patch_user_skel(cls, skel_cls: skeleton.Skeleton): 

201 """ 

202 Allows for an UserAuthentication to patch the UserSkel 

203 class with additional bones which are required for 

204 the implemented authentication method. 

205 """ 

206 ... 

207 

208 

209class UserPrimaryAuthentication(UserAuthentication, abc.ABC): 

210 """Abstract class for all primary authentication methods.""" 

211 registrationEnabled = False 

212 

213 @abc.abstractmethod 

214 def login(self, *args, **kwargs): 

215 ... 

216 

217 def next_or_finish(self, skel: skeleton.SkeletonInstance): 

218 """ 

219 Hook that is called whenever a part of the authentication was successful. 

220 It allows to perform further steps in custom authentications, 

221 e.g. change a password after first login. 

222 """ 

223 return self._user_module.continueAuthenticationFlow(self, skel["key"]) 

224 

225 

226class UserPassword(UserPrimaryAuthentication): 

227 METHOD_NAME = "X-VIUR-AUTH-User-Password" 

228 

229 registrationEmailVerificationRequired = True 

230 registrationAdminVerificationRequired = True 

231 

232 verifySuccessTemplate = "user_verify_success" 

233 verifyEmailAddressMail = "user_verify_address" 

234 verifyFailedTemplate = "user_verify_failed" 

235 passwordRecoveryTemplate = "user_passwordrecover" 

236 passwordRecoveryMail = "user_password_recovery" 

237 passwordRecoverySuccessTemplate = "user_passwordrecover_success" 

238 passwordRecoveryStep1Template = "user_passwordrecover_step1" 

239 passwordRecoveryStep2Template = "user_passwordrecover_step2" 

240 passwordRecoveryStep3Template = "user_passwordrecover_step3" 

241 

242 # The default rate-limit for password recovery (10 tries each 15 minutes) 

243 passwordRecoveryRateLimit = RateLimit("user.passwordrecovery", 10, 15, "ip") 

244 

245 # Limit (invalid) login-retries to once per 5 seconds 

246 loginRateLimit = RateLimit("user.login", 12, 1, "ip") 

247 

248 @classmethod 

249 def patch_user_skel(cls, skel_cls): 

250 """ 

251 Modifies the UserSkel to be equipped by a PasswordBone. 

252 """ 

253 skel_cls.password = PasswordBone( 

254 readOnly=True, 

255 visible=False, 

256 params={ 

257 "category": "Authentication", 

258 } 

259 ) 

260 

261 class LoginSkel(skeleton.RelSkel): 

262 name = EmailBone( 

263 descr="E-Mail", 

264 required=True, 

265 caseSensitive=False, 

266 ) 

267 password = PasswordBone( 

268 required=True, 

269 test_threshold=0, 

270 ) 

271 

272 class LostPasswordStep1Skel(skeleton.RelSkel): 

273 name = EmailBone( 

274 descr="E-Mail", 

275 required=True, 

276 ) 

277 

278 class LostPasswordStep2Skel(skeleton.RelSkel): 

279 recovery_key = StringBone( 

280 descr="Recovery Key", 

281 required=True, 

282 params={ 

283 "tooltip": i18n.translate( 

284 key="viur.modules.user.userpassword.lostpasswordstep2.recoverykey", 

285 defaultText="Please enter the validation key you've received via e-mail.", 

286 hint="Shown when the user needs more than 15 minutes to paste the key", 

287 ), 

288 } 

289 ) 

290 

291 class LostPasswordStep3Skel(skeleton.RelSkel): 

292 # send the recovery key again, in case the password is rejected by some reason. 

293 recovery_key = StringBone( 

294 descr="Recovery Key", 

295 visible=False, 

296 readOnly=True, 

297 ) 

298 

299 password = PasswordBone( 

300 descr="New Password", 

301 required=True, 

302 params={ 

303 "tooltip": i18n.translate( 

304 key="viur.modules.user.userpassword.lostpasswordstep3.password", 

305 defaultText="Please enter a new password for your account.", 

306 ), 

307 } 

308 ) 

309 

310 @exposed 

311 @force_ssl 

312 @skey(allow_empty=True) 

313 def login(self, *, name: str | None = None, password: str | None = None, **kwargs): 

314 if current.user.get(): # User is already logged in, nothing to do. 

315 return self._user_module.render.loginSucceeded() 

316 

317 if not name or not password: 

318 return self._user_module.render.login(self.LoginSkel(), action="login") 

319 

320 self.loginRateLimit.assertQuotaIsAvailable() 

321 

322 # query for the username. The query might find another user, but the name is being checked for equality below 

323 name = name.lower().strip() 

324 user_skel = self._user_module.baseSkel() 

325 user_skel = user_skel.all().filter("name.idx >=", name).getSkel() or user_skel 

326 

327 # extract password hash from raw database entity (skeleton access blocks it) 

328 password_data = (user_skel.dbEntity and user_skel.dbEntity.get("password")) or {} 

329 iterations = password_data.get("iterations", 1001) # remember iterations; old password hashes used 1001 

330 password_hash = encode_password(password, password_data.get("salt", "-invalid-"), iterations)["pwhash"] 

331 

332 # now check if the username matches 

333 is_okay = secrets.compare_digest((user_skel["name"] or "").lower().strip().encode(), name.encode()) 

334 

335 # next, check if the password hash matches 

336 is_okay &= secrets.compare_digest(password_data.get("pwhash", b"-invalid-"), password_hash) 

337 

338 # next, check if the user account is active 

339 is_okay &= (user_skel["status"] or 0) >= Status.ACTIVE.value 

340 

341 if not is_okay: 

342 self.loginRateLimit.decrementQuota() # Only failed login attempts will count to the quota 

343 skel = self.LoginSkel() 

344 return self._user_module.render.login( 

345 skel, 

346 action="login", 

347 loginFailed=True, # FIXME: Is this still being used? 

348 accountStatus=user_skel["status"] # FIXME: Is this still being used? 

349 ) 

350 

351 # check if iterations are below current security standards, and update if necessary. 

352 if iterations < PBKDF2_DEFAULT_ITERATIONS: 

353 logging.info(f"Update password hash for user {name}.") 

354 # re-hash the password with more iterations 

355 # FIXME: This must be done within a transaction! 

356 user_skel["password"] = password # will be hashed on serialize 

357 user_skel.toDB(update_relations=False) 

358 

359 return self.next_or_finish(user_skel) 

360 

361 @exposed 

362 def pwrecover(self, recovery_key: str | None = None, skey: str | None = None, *args, **kwargs): 

363 """ 

364 This implements a password recovery process which lets users set a new password for their account, 

365 after validating a recovery key sent by email. 

366 

367 The process is as following: 

368 

369 - The user enters his email adress 

370 - We'll generate a random code and store it as a security-key and call sendUserPasswordRecoveryCode 

371 - sendUserPasswordRecoveryCode will run in the background, check if we have a user with that name 

372 and send a link with the code . It runs as a deferredTask so we don't leak the information if a user 

373 account exists. 

374 - If the user received his email, he can click on the link and set a new password for his account. 

375 

376 To prevent automated attacks, the fist step is guarded by a captcha and we limited calls to this function 

377 to 10 actions per 15 minutes. (One complete recovery process consists of two calls). 

378 """ 

379 self.passwordRecoveryRateLimit.assertQuotaIsAvailable() 

380 current_request = current.request.get() 

381 

382 if recovery_key is None: 

383 # This is the first step, where we ask for the username of the account we'll going to reset the password on 

384 skel = self.LostPasswordStep1Skel() 

385 

386 if not current_request.isPostRequest or not skel.fromClient(kwargs): 

387 return self._user_module.render.edit(skel, tpl=self.passwordRecoveryStep1Template) 

388 

389 # validate security key 

390 if not securitykey.validate(skey): 

391 raise errors.PreconditionFailed() 

392 

393 self.passwordRecoveryRateLimit.decrementQuota() 

394 

395 recovery_key = securitykey.create( 

396 duration=datetime.timedelta(minutes=15), 

397 key_length=conf.security.password_recovery_key_length, 

398 user_name=skel["name"].lower(), 

399 session_bound=False, 

400 ) 

401 

402 # Send the code in background 

403 self.sendUserPasswordRecoveryCode( 

404 skel["name"], recovery_key, current_request.request.headers["User-Agent"] 

405 ) 

406 

407 # step 2 is only an action-skel, and can be ignored by a direct link in the 

408 # e-mail previously sent. It depends on the implementation of the specific project. 

409 return self._user_module.render.edit( 

410 self.LostPasswordStep2Skel(), 

411 tpl=self.passwordRecoveryStep2Template, 

412 ) 

413 

414 # in step 3 

415 skel = self.LostPasswordStep3Skel() 

416 skel["recovery_key"] = recovery_key # resend the recovery key again, in case the fromClient() fails. 

417 

418 # check for any input; Render input-form again when incomplete. 

419 if not skel.fromClient(kwargs) or not current_request.isPostRequest: 

420 return self._user_module.render.edit( 

421 skel=skel, 

422 tpl=self.passwordRecoveryStep3Template, 

423 ) 

424 

425 # validate security key 

426 if not securitykey.validate(skey): 

427 raise errors.PreconditionFailed() 

428 

429 if not (recovery_request := securitykey.validate(recovery_key, session_bound=False)): 

430 raise errors.PreconditionFailed( 

431 i18n.translate( 

432 key="viur.modules.user.passwordrecovery.keyexpired", 

433 defaultText="The recovery key is expired or invalid. Please start the recovery process again.", 

434 hint="Shown when the user needs more than 15 minutes to paste the key, or entered an invalid key." 

435 ) 

436 ) 

437 

438 self.passwordRecoveryRateLimit.decrementQuota() 

439 

440 # If we made it here, the key was correct, so we'd hopefully have a valid user for this 

441 user_skel = self._user_module.viewSkel().all().filter("name.idx =", recovery_request["user_name"]).getSkel() 

442 

443 if not user_skel: 

444 raise errors.NotFound( 

445 i18n.translate( 

446 key="viur.modules.user.passwordrecovery.usernotfound", 

447 defaultText="There is no account with this name", 

448 hint="We cant find an account with that name (Should never happen)" 

449 ) 

450 ) 

451 

452 if user_skel["status"] != Status.ACTIVE: # The account is locked or not yet validated. Abort the process. 

453 raise errors.NotFound( 

454 i18n.translate( 

455 key="viur.modules.user.passwordrecovery.accountlocked", 

456 defaultText="This account is currently locked. You cannot change it's password.", 

457 hint="Attempted password recovery on a locked account" 

458 ) 

459 ) 

460 

461 # Update the password, save the user, reset his session and show the success-template 

462 user_skel["password"] = skel["password"] 

463 user_skel.toDB(update_relations=False) 

464 

465 return self._user_module.render.view( 

466 None, 

467 tpl=self.passwordRecoverySuccessTemplate, 

468 ) 

469 

470 @tasks.CallDeferred 

471 def sendUserPasswordRecoveryCode(self, user_name: str, recovery_key: str, user_agent: str) -> None: 

472 """ 

473 Sends the given recovery code to the user given in userName. This function runs deferred 

474 so there's no timing sidechannel that leaks if this user exists. Per default, we'll send the 

475 code by email (assuming we have working email delivery), but this can be overridden to send it 

476 by SMS or other means. We'll also update the changedate for this user, so no more than one code 

477 can be send to any given user in four hours. 

478 """ 

479 if user_skel := self._user_module.viewSkel().all().filter("name.idx =", user_name).getSkel(): 

480 user_agent = user_agents.parse(user_agent) 

481 email.sendEMail( 

482 tpl=self.passwordRecoveryMail, 

483 skel=user_skel, 

484 dests=[user_name], 

485 recovery_key=recovery_key, 

486 user_agent={ 

487 "device": user_agent.get_device(), 

488 "os": user_agent.get_os(), 

489 "browser": user_agent.get_browser() 

490 } 

491 ) 

492 

493 @exposed 

494 @skey(forward_payload="data", session_bound=False) 

495 def verify(self, data): 

496 def transact(key): 

497 skel = self._user_module.editSkel() 

498 if not key or not skel.fromDB(key): 

499 return None 

500 skel["status"] = Status.WAITING_FOR_ADMIN_VERIFICATION \ 

501 if self.registrationAdminVerificationRequired else Status.ACTIVE 

502 

503 skel.toDB(update_relations=False) 

504 return skel 

505 

506 if not isinstance(data, dict) or not (skel := db.RunInTransaction(transact, data.get("user_key"))): 

507 return self._user_module.render.view(None, tpl=self.verifyFailedTemplate) 

508 

509 return self._user_module.render.view(skel, tpl=self.verifySuccessTemplate) 

510 

511 def canAdd(self) -> bool: 

512 return self.registrationEnabled 

513 

514 def addSkel(self): 

515 """ 

516 Prepare the add-Skel for rendering. 

517 Currently only calls self._user_module.addSkel() and sets skel["status"] depending on 

518 self.registrationEmailVerificationRequired and self.registrationAdminVerificationRequired 

519 :return: viur.core.skeleton.Skeleton 

520 """ 

521 skel = self._user_module.addSkel() 

522 

523 if self.registrationEmailVerificationRequired: 

524 defaultStatusValue = Status.WAITING_FOR_EMAIL_VERIFICATION 

525 elif self.registrationAdminVerificationRequired: 

526 defaultStatusValue = Status.WAITING_FOR_ADMIN_VERIFICATION 

527 else: # No further verification required 

528 defaultStatusValue = Status.ACTIVE 

529 

530 skel.status.readOnly = True 

531 skel["status"] = defaultStatusValue 

532 

533 if "password" in skel: 

534 skel.password.required = True # The user will have to set a password 

535 

536 return skel 

537 

538 @force_ssl 

539 @exposed 

540 @skey(allow_empty=True) 

541 def add(self, *args, **kwargs): 

542 """ 

543 Allows guests to register a new account if self.registrationEnabled is set to true 

544 

545 .. seealso:: :func:`addSkel`, :func:`onAdded`, :func:`canAdd`, :func:`onAdd` 

546 

547 :returns: The rendered, added object of the entry, eventually with error hints. 

548 

549 :raises: :exc:`viur.core.errors.Unauthorized`, if the current user does not have the required permissions. 

550 :raises: :exc:`viur.core.errors.PreconditionFailed`, if the *skey* could not be verified. 

551 """ 

552 if not self.canAdd(): 

553 raise errors.Unauthorized() 

554 skel = self.addSkel() 

555 if ( 

556 not kwargs # no data supplied 

557 or not current.request.get().isPostRequest # bail out if not using POST-method 

558 or not skel.fromClient(kwargs) # failure on reading into the bones 

559 or utils.parse.bool(kwargs.get("bounce")) # review before adding 

560 ): 

561 # render the skeleton in the version it could as far as it could be read. 

562 return self._user_module.render.add(skel) 

563 self._user_module.onAdd(skel) 

564 skel.toDB() 

565 if self.registrationEmailVerificationRequired and skel["status"] == Status.WAITING_FOR_EMAIL_VERIFICATION: 

566 # The user will have to verify his email-address. Create a skey and send it to his address 

567 skey = securitykey.create(duration=datetime.timedelta(days=7), session_bound=False, 

568 user_key=utils.normalizeKey(skel["key"]), 

569 name=skel["name"]) 

570 skel.skey = BaseBone(descr="Skey") 

571 skel["skey"] = skey 

572 email.sendEMail(dests=[skel["name"]], tpl=self._user_module.verifyEmailAddressMail, skel=skel) 

573 self._user_module.onAdded(skel) # Call onAdded on our parent user module 

574 return self._user_module.render.addSuccess(skel) 

575 

576 

577class GoogleAccount(UserPrimaryAuthentication): 

578 METHOD_NAME = "X-VIUR-AUTH-Google-Account" 

579 

580 @classmethod 

581 def patch_user_skel(cls, skel_cls): 

582 """ 

583 Modifies the UserSkel to be equipped by a bones required by Google Auth 

584 """ 

585 skel_cls.uid = StringBone( 

586 descr="Google UserID", 

587 required=False, 

588 readOnly=True, 

589 unique=UniqueValue(UniqueLockMethod.SameValue, False, "UID already in use"), 

590 params={ 

591 "category": "Authentication", 

592 } 

593 ) 

594 

595 skel_cls.sync = BooleanBone( 

596 descr="Sync user data with OAuth-based services", 

597 defaultValue=True, 

598 params={ 

599 "category": "Authentication", 

600 "tooltip": 

601 "If set, user data like firstname and lastname is automatically kept" 

602 "synchronous with the information stored at the OAuth service provider" 

603 "(e.g. Google Login)." 

604 } 

605 ) 

606 

607 @exposed 

608 @force_ssl 

609 @skey(allow_empty=True) 

610 def login(self, token: str | None = None, *args, **kwargs): 

611 # FIXME: Check if already logged in 

612 if not conf.user.google_client_id: 

613 raise errors.PreconditionFailed("Please configure conf.user.google_client_id!") 

614 

615 if not token: 

616 request = current.request.get() 

617 request.response.headers["Content-Type"] = "text/html" 

618 if request.response.headers.get("cross-origin-opener-policy") == "same-origin": 

619 # We have to allow popups here 

620 request.response.headers["cross-origin-opener-policy"] = "same-origin-allow-popups" 

621 

622 file_path = conf.instance.core_base_path.joinpath("viur/core/template/vi_user_google_login.html") 

623 with open(file_path) as file: 

624 tpl_string = file.read() 

625 

626 # FIXME: Use Jinja2 for rendering? 

627 tpl_string = tpl_string.replace("{{ clientID }}", conf.user.google_client_id) 

628 extendCsp({ 

629 "script-src": ["sha256-JpzaUIxV/gVOQhKoDLerccwqDDIVsdn1JclA6kRNkLw="], 

630 "style-src": ["sha256-FQpGSicYMVC5jxKGS5sIEzrRjSJmkxKPaetUc7eamqc="] 

631 }) 

632 return tpl_string 

633 

634 user_info = id_token.verify_oauth2_token(token, requests.Request(), conf.user.google_client_id) 

635 if user_info["iss"] not in {"accounts.google.com", "https://accounts.google.com"}: 

636 raise ValueError("Invalid issuer") 

637 

638 # Token looks valid :) 

639 uid = user_info["sub"] 

640 email = user_info["email"] 

641 

642 base_skel = self._user_module.baseSkel() 

643 update = False 

644 if not (user_skel := base_skel.all().filter("uid =", uid).getSkel()): 

645 # We'll try again - checking if there's already an user with that email 

646 if not (user_skel := base_skel.all().filter("name.idx =", email.lower()).getSkel()): 

647 # Still no luck - it's a completely new user 

648 if not self.registrationEnabled: 

649 if (domain := user_info.get("hd")) and domain in conf.user.google_gsuite_domains: 

650 logging.debug(f"Google user is from allowed {domain} - adding account") 

651 else: 

652 logging.debug(f"Google user is from {domain} - denying registration") 

653 raise errors.Forbidden("Registration for new users is disabled") 

654 

655 user_skel = base_skel 

656 user_skel["uid"] = uid 

657 user_skel["name"] = email 

658 update = True 

659 

660 # Take user information from Google, if wanted! 

661 if user_skel["sync"]: 

662 for target, source in { 

663 "name": email, 

664 "firstname": user_info.get("given_name"), 

665 "lastname": user_info.get("family_name"), 

666 }.items(): 

667 

668 if user_skel[target] != source: 

669 user_skel[target] = source 

670 update = True 

671 

672 if update: 

673 assert user_skel.toDB() 

674 

675 return self.next_or_finish(user_skel) 

676 

677 

678class UserSecondFactorAuthentication(UserAuthentication, abc.ABC): 

679 """Abstract class for all second factors.""" 

680 MAX_RETRY = 3 

681 second_factor_login_template = "user_login_secondfactor" 

682 """Template to enter the TOPT on login""" 

683 

684 @property 

685 @abc.abstractmethod 

686 def NAME(self) -> str: 

687 """Name for this factor for templates.""" 

688 ... 

689 

690 @property 

691 @abc.abstractmethod 

692 def ACTION_NAME(self) -> str: 

693 """The action name for this factor, used as path-segment.""" 

694 ... 

695 

696 def __init__(self, moduleName, modulePath, _user_module): 

697 super().__init__(moduleName, modulePath, _user_module) 

698 self.action_url = f"{self.modulePath}/{self.ACTION_NAME}" 

699 self.add_url = f"{self.modulePath}/add" 

700 self.start_url = f"{self.modulePath}/start" 

701 

702 

703class TimeBasedOTP(UserSecondFactorAuthentication): 

704 METHOD_NAME = "X-VIUR-2FACTOR-TimeBasedOTP" 

705 WINDOW_SIZE = 5 

706 ACTION_NAME = "otp" 

707 NAME = "Time based Otp" 

708 second_factor_login_template = "user_login_secondfactor" 

709 

710 @dataclasses.dataclass 

711 class OtpConfig: 

712 """ 

713 This dataclass is used to provide an interface for a OTP token 

714 algorithm description that is passed within the TimeBasedOTP 

715 class for configuration. 

716 """ 

717 secret: str 

718 timedrift: float = 0.0 

719 algorithm: t.Literal["sha1", "sha256"] = "sha1" 

720 interval: int = 60 

721 

722 class OtpSkel(skeleton.RelSkel): 

723 """ 

724 This is the Skeleton used to ask for the OTP token. 

725 """ 

726 otptoken = NumericBone( 

727 descr="Token", 

728 required=True, 

729 max=999999, 

730 min=0, 

731 ) 

732 

733 @classmethod 

734 def patch_user_skel(cls, skel_cls): 

735 """ 

736 Modifies the UserSkel to be equipped by a bones required by Timebased OTP 

737 """ 

738 # One-Time Password Verification 

739 skel_cls.otp_serial = StringBone( 

740 descr="OTP serial", 

741 searchable=True, 

742 params={ 

743 "category": "Second Factor Authentication", 

744 } 

745 ) 

746 

747 skel_cls.otp_secret = CredentialBone( 

748 descr="OTP secret", 

749 params={ 

750 "category": "Second Factor Authentication", 

751 } 

752 ) 

753 

754 skel_cls.otp_timedrift = NumericBone( 

755 descr="OTP time drift", 

756 readOnly=True, 

757 defaultValue=0, 

758 params={ 

759 "category": "Second Factor Authentication", 

760 } 

761 ) 

762 

763 def get_config(self, skel: skeleton.SkeletonInstance) -> OtpConfig | None: 

764 """ 

765 Returns an instance of self.OtpConfig with a provided token configuration, 

766 or None when there is no appropriate configuration of this second factor handler available. 

767 """ 

768 

769 if otp_secret := skel.dbEntity.get("otp_secret"): 

770 return self.OtpConfig(secret=otp_secret, timedrift=skel.dbEntity.get("otp_timedrift") or 0) 

771 

772 return None 

773 

774 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

775 """ 

776 Specified whether the second factor authentication can be handled by the given user or not. 

777 """ 

778 return bool(self.get_config(skel)) 

779 

780 @exposed 

781 def start(self): 

782 """ 

783 Configures OTP login for the current session. 

784 

785 A special otp_user_conf has to be specified as a dict, which is stored into the session. 

786 """ 

787 session = current.session.get() 

788 

789 if not (user_key := session.get("possible_user_key")): 

790 raise errors.PreconditionFailed( 

791 "Second factor can only be triggered after successful primary authentication." 

792 ) 

793 

794 user_skel = self._user_module.baseSkel() 

795 if not user_skel.fromDB(user_key): 

796 raise errors.NotFound("The previously authenticated user is gone.") 

797 

798 if not (otp_user_conf := self.get_config(user_skel)): 

799 raise errors.PreconditionFailed("This second factor is not available for the user") 

800 

801 otp_user_conf = { 

802 "key": str(user_key), 

803 } | dataclasses.asdict(otp_user_conf) 

804 

805 session = current.session.get() 

806 session["_otp_user"] = otp_user_conf 

807 session.markChanged() 

808 

809 return self._user_module.render.edit( 

810 self.OtpSkel(), 

811 params={ 

812 "name": i18n.translate(self.NAME), 

813 "action_name": self.ACTION_NAME, 

814 "action_url": f"{self.modulePath}/{self.ACTION_NAME}", 

815 }, 

816 tpl=self.second_factor_login_template 

817 ) 

818 

819 @exposed 

820 @force_ssl 

821 @skey(allow_empty=True) 

822 def otp(self, *args, **kwargs): 

823 """ 

824 Performs the second factor validation and interaction with the client. 

825 """ 

826 session = current.session.get() 

827 if not (otp_user_conf := session.get("_otp_user")): 

828 raise errors.PreconditionFailed("No OTP process started in this session") 

829 

830 # Check if maximum second factor verification attempts 

831 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

832 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

833 

834 # Read the OTP token via the skeleton, to obtain a valid value 

835 skel = self.OtpSkel() 

836 if skel.fromClient(kwargs): 

837 # Verify the otptoken. If valid, this returns the current timedrift index for this hardware OTP. 

838 res = self.verify( 

839 otp=skel["otptoken"], 

840 secret=otp_user_conf["secret"], 

841 algorithm=otp_user_conf.get("algorithm") or "sha1", 

842 interval=otp_user_conf.get("interval") or 60, 

843 timedrift=otp_user_conf.get("timedrift") or 0.0, 

844 valid_window=self.WINDOW_SIZE 

845 ) 

846 else: 

847 res = None 

848 

849 # Check if Token is invalid. Caution: 'if not verifyIndex' gets false positive for verifyIndex === 0! 

850 if res is None: 

851 otp_user_conf["attempts"] = attempts + 1 

852 session.markChanged() 

853 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

854 return self._user_module.render.edit( 

855 skel, 

856 name=i18n.translate(self.NAME), 

857 action_name=self.ACTION_NAME, 

858 action_url=f"{self.modulePath}/{self.ACTION_NAME}", 

859 tpl=self.second_factor_login_template 

860 ) 

861 

862 # Remove otp user config from session 

863 user_key = db.keyHelper(otp_user_conf["key"], self._user_module._resolveSkelCls().kindName) 

864 del session["_otp_user"] 

865 session.markChanged() 

866 

867 # Check if the OTP device has a time drift 

868 

869 timedriftchange = float(res) - otp_user_conf["timedrift"] 

870 if abs(timedriftchange) > 2: 

871 # The time-drift change accumulates to more than 2 minutes (for interval==60): 

872 # update clock-drift value accordingly 

873 self.updateTimeDrift(user_key, timedriftchange) 

874 

875 # Continue with authentication 

876 return self._user_module.secondFactorSucceeded(self, user_key) 

877 

878 @staticmethod 

879 def verify( 

880 otp: str | int, 

881 secret: str, 

882 algorithm: str = "sha1", 

883 interval: int = 60, 

884 timedrift: float = 0.0, 

885 for_time: datetime.datetime | None = None, 

886 valid_window: int = 0, 

887 ) -> int | None: 

888 """ 

889 Verifies the OTP passed in against the current time OTP. 

890 

891 This is a fork of pyotp.verify. Rather than true/false, if valid_window > 0, it returns the index for which 

892 the OTP value obtained by pyotp.at(for_time=time.time(), counter_offset=index) equals the current value shown 

893 on the hardware token generator. This can be used to store the time drift of a given token generator. 

894 

895 :param otp: the OTP token to check against 

896 :param secret: The OTP secret 

897 :param algorithm: digest function to use in the HMAC (expected to be sha1 or sha256) 

898 :param interval: the time interval in seconds for OTP. This defaults to 60 (old OTP c200 Generators). In 

899 pyotp, default is 30! 

900 :param timedrift: The known timedrift (old index) of the hardware OTP generator 

901 :param for_time: Time to check OTP at (defaults to now) 

902 :param valid_window: extends the validity to this many counter ticks before and after the current one 

903 :returns: The index where verification succeeded, None otherwise 

904 """ 

905 # get the hashing digest 

906 digest = { 

907 "sha1": hashlib.sha1, 

908 "sha256": hashlib.sha256, 

909 }.get(algorithm) 

910 

911 if not digest: 

912 raise errors.NotImplemented(f"{algorithm=} is not implemented") 

913 

914 if for_time is None: 

915 for_time = datetime.datetime.now() 

916 

917 # Timedrift is updated only in fractions in order to prevent problems, but we need an integer index 

918 timedrift = round(timedrift) 

919 secret = bytes.decode(base64.b32encode(bytes.fromhex(secret))) # decode secret 

920 otp = str(otp).zfill(6) # fill with zeros in front 

921 

922 # logging.debug(f"TimeBasedOTP:verify: {digest=}, {interval=}, {valid_window=}") 

923 totp = pyotp.TOTP(secret, digest=digest, interval=interval) 

924 

925 if valid_window: 

926 for offset in range(timedrift - valid_window, timedrift + valid_window + 1): 

927 token = str(totp.at(for_time, offset)) 

928 # logging.debug(f"TimeBasedOTP:verify: {offset=}, {otp=}, {token=}") 

929 if hmac.compare_digest(otp, token): 

930 return offset 

931 

932 return None 

933 

934 return 0 if hmac.compare_digest(otp, str(totp.at(for_time, timedrift))) else None 

935 

936 def updateTimeDrift(self, user_key: db.Key, idx: float) -> None: 

937 """ 

938 Updates the clock-drift value. 

939 The value is only changed in 1/10 steps, so that a late submit by an user doesn't skew 

940 it out of bounds. Maximum change per call is 0.3 minutes. 

941 :param user_key: For which user should the update occour 

942 :param idx: How many steps before/behind was that token 

943 :return: 

944 """ 

945 

946 # FIXME: The callback in viur-core must be improved, to accept user_skel 

947 

948 def transaction(user_key, idx): 

949 user = db.Get(user_key) 

950 if not isinstance(user.get("otp_timedrift"), float): 

951 user["otp_timedrift"] = 0.0 

952 user["otp_timedrift"] += min(max(0.1 * idx, -0.3), 0.3) 

953 db.Put(user) 

954 

955 db.RunInTransaction(transaction, user_key, idx) 

956 

957 

958class AuthenticatorOTP(UserSecondFactorAuthentication): 

959 """ 

960 This class handles the second factor for apps like authy and so on 

961 """ 

962 METHOD_NAME = "X-VIUR-2FACTOR-AuthenticatorOTP" 

963 

964 second_factor_add_template = "user_secondfactor_add" 

965 """Template to configure (add) a new TOPT""" 

966 

967 ACTION_NAME = "authenticator_otp" 

968 """Action name provided for *otp_template* on login""" 

969 

970 NAME = "Authenticator App" 

971 

972 @exposed 

973 @force_ssl 

974 @skey(allow_empty=True) 

975 def add(self, otp=None): 

976 """ 

977 We try to read the otp_app_secret form the current session. When this fails we generate a new one and store 

978 it in the session. 

979 

980 If an otp and a skey are provided we are validate the skey and the otp. If both is successfully we store 

981 the otp_app_secret from the session in the user entry. 

982 """ 

983 current_session = current.session.get() 

984 

985 if not (otp_app_secret := current_session.get("_maybe_otp_app_secret")): 

986 otp_app_secret = AuthenticatorOTP.generate_otp_app_secret() 

987 current_session["_maybe_otp_app_secret"] = otp_app_secret 

988 current_session.markChanged() 

989 

990 if otp is None: 

991 return self._user_module.render.second_factor_add( 

992 tpl=self.second_factor_add_template, 

993 action_name=self.ACTION_NAME, 

994 name=i18n.translate(self.NAME), 

995 add_url=self.add_url, 

996 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) 

997 else: 

998 if not AuthenticatorOTP.verify_otp(otp, otp_app_secret): 

999 return self._user_module.render.second_factor_add( 

1000 tpl=self.second_factor_add_template, 

1001 action_name=self.ACTION_NAME, 

1002 name=i18n.translate(self.NAME), 

1003 add_url=self.add_url, 

1004 otp_uri=AuthenticatorOTP.generate_otp_app_secret_uri(otp_app_secret)) # to add errors 

1005 

1006 # Now we can set the otp_app_secret to the current User and render der Success-template 

1007 AuthenticatorOTP.set_otp_app_secret(otp_app_secret) 

1008 return self._user_module.render.second_factor_add_success( 

1009 action_name=self.ACTION_NAME, 

1010 name=i18n.translate(self.NAME), 

1011 ) 

1012 

1013 def can_handle(self, skel: skeleton.SkeletonInstance) -> bool: 

1014 """ 

1015 We can only handle the second factor if we have stored an otp_app_secret before. 

1016 """ 

1017 return bool(skel.dbEntity.get("otp_app_secret", "")) 

1018 

1019 @classmethod 

1020 def patch_user_skel(cls, skel_cls): 

1021 """ 

1022 Modifies the UserSkel to be equipped by bones required by Authenticator App 

1023 """ 

1024 # Authenticator OTP Apps (like Authy) 

1025 skel_cls.otp_app_secret = CredentialBone( 

1026 descr="OTP Secret (App-Key)", 

1027 params={ 

1028 "category": "Second Factor Authentication", 

1029 } 

1030 ) 

1031 

1032 @classmethod 

1033 def set_otp_app_secret(cls, otp_app_secret=None): 

1034 """ 

1035 Write a new OTP Token in the current user entry. 

1036 """ 

1037 if otp_app_secret is None: 

1038 logging.error("No 'otp_app_secret' is provided") 

1039 raise errors.PreconditionFailed("No 'otp_app_secret' is provided") 

1040 if not (cuser := current.user.get()): 

1041 raise errors.Unauthorized() 

1042 

1043 def transaction(user_key): 

1044 if not (user := db.Get(user_key)): 

1045 raise errors.NotFound() 

1046 user["otp_app_secret"] = otp_app_secret 

1047 db.Put(user) 

1048 

1049 db.RunInTransaction(transaction, cuser["key"]) 

1050 

1051 @classmethod 

1052 def generate_otp_app_secret_uri(cls, otp_app_secret) -> str: 

1053 """ 

1054 :return an otp uri like otpauth://totp/Example:alice@google.com?secret=ABCDEFGH1234&issuer=Example 

1055 """ 

1056 if not (cuser := current.user.get()): 

1057 raise errors.Unauthorized() 

1058 if not (issuer := conf.user.otp_issuer): 

1059 logging.warning( 

1060 f"conf.user.otp_issuer is None we replace the issuer by {conf.instance.project_id=}") 

1061 issuer = conf.instance.project_id 

1062 

1063 return pyotp.TOTP(otp_app_secret).provisioning_uri(name=cuser["name"], issuer_name=issuer) 

1064 

1065 @classmethod 

1066 def generate_otp_app_secret(cls) -> str: 

1067 """ 

1068 Generate a new OTP Secret 

1069 :return an otp 

1070 """ 

1071 return pyotp.random_base32() 

1072 

1073 @classmethod 

1074 def verify_otp(cls, otp: str | int, secret: str) -> bool: 

1075 return pyotp.TOTP(secret).verify(otp) 

1076 

1077 @exposed 

1078 def start(self): 

1079 otp_user_conf = {"attempts": 0} 

1080 session = current.session.get() 

1081 session["_otp_user"] = otp_user_conf 

1082 session.markChanged() 

1083 return self._user_module.render.edit( 

1084 TimeBasedOTP.OtpSkel(), 

1085 params={ 

1086 "name": i18n.translate(self.NAME), 

1087 "action_name": self.ACTION_NAME, 

1088 "action_url": self.action_url, 

1089 }, 

1090 tpl=self.second_factor_login_template, 

1091 ) 

1092 

1093 @exposed 

1094 @force_ssl 

1095 @skey 

1096 def authenticator_otp(self, **kwargs): 

1097 """ 

1098 We verify the otp here with the secret we stored before. 

1099 """ 

1100 session = current.session.get() 

1101 user_key = db.Key(self._user_module.kindName, session["possible_user_key"]) 

1102 

1103 if not (otp_user_conf := session.get("_otp_user")): 

1104 raise errors.PreconditionFailed("No OTP process started in this session") 

1105 

1106 # Check if maximum second factor verification attempts 

1107 if (attempts := otp_user_conf.get("attempts") or 0) > self.MAX_RETRY: 

1108 raise errors.Forbidden("Maximum amount of authentication retries exceeded") 

1109 

1110 if not (user := db.Get(user_key)): 

1111 raise errors.NotFound() 

1112 

1113 skel = TimeBasedOTP.OtpSkel() 

1114 if not skel.fromClient(kwargs): 

1115 raise errors.PreconditionFailed() 

1116 otp_token = str(skel["otptoken"]).zfill(6) 

1117 

1118 if AuthenticatorOTP.verify_otp(otp=otp_token, secret=user["otp_app_secret"]): 

1119 return self._user_module.secondFactorSucceeded(self, user_key) 

1120 otp_user_conf["attempts"] = attempts + 1 

1121 session.markChanged() 

1122 skel.errors = [ReadFromClientError(ReadFromClientErrorSeverity.Invalid, "Wrong OTP Token", ["otptoken"])] 

1123 return self._user_module.render.edit( 

1124 skel, 

1125 name=i18n.translate(self.NAME), 

1126 action_name=self.ACTION_NAME, 

1127 action_url=self.action_url, 

1128 tpl=self.second_factor_login_template, 

1129 ) 

1130 

1131 

1132class User(List): 

1133 kindName = "user" 

1134 addTemplate = "user_add" 

1135 addSuccessTemplate = "user_add_success" 

1136 lostPasswordTemplate = "user_lostpassword" 

1137 verifyEmailAddressMail = "user_verify_address" 

1138 passwordRecoveryMail = "user_password_recovery" 

1139 

1140 authenticationProviders: t.Iterable[UserPrimaryAuthentication] = tuple(filter( 

1141 None, ( 

1142 UserPassword, 

1143 conf.user.google_client_id and GoogleAccount, 

1144 ) 

1145 )) 

1146 """ 

1147 Specifies primary authentication providers that are made available 

1148 as sub-modules under `user/auth_<classname>`. They might require 

1149 customization or configuration. 

1150 """ 

1151 

1152 secondFactorProviders: t.Iterable[UserSecondFactorAuthentication] = ( 

1153 TimeBasedOTP, 

1154 AuthenticatorOTP, 

1155 ) 

1156 """ 

1157 Specifies secondary authentication providers that are made available 

1158 as sub-modules under `user/f2_<classname>`. They might require 

1159 customization or configuration, which is determined during the 

1160 login-process depending on the user that wants to login. 

1161 """ 

1162 

1163 validAuthenticationMethods = tuple(filter( 

1164 None, ( 

1165 (UserPassword, AuthenticatorOTP), 

1166 (UserPassword, TimeBasedOTP), 

1167 (UserPassword, None), 

1168 (GoogleAccount, None) if conf.user.google_client_id else None, 

1169 ) 

1170 )) 

1171 """ 

1172 Specifies the possible combinations of primary- and secondary factor 

1173 login methos. 

1174 

1175 GoogleLogin defaults to no second factor, as the Google Account can be 

1176 secured by a secondary factor. AuthenticatorOTP and TimeBasedOTP are only 

1177 handled when there is a user-dependent configuration available. 

1178 """ 

1179 

1180 msg_missing_second_factor = "Second factor required but not configured for this user." 

1181 

1182 secondFactorTimeWindow = datetime.timedelta(minutes=10) 

1183 

1184 default_order = "name.idx" 

1185 

1186 adminInfo = { 

1187 "icon": "person-fill", 

1188 "actions": [ 

1189 "trigger_kick", 

1190 "trigger_takeover", 

1191 ], 

1192 "customActions": { 

1193 "trigger_kick": { 

1194 "name": i18n.translate( 

1195 key="viur.modules.user.customActions.kick", 

1196 defaultText="Kick user", 

1197 hint="Title of the kick user function" 

1198 ), 

1199 "icon": "trash2-fill", 

1200 "access": ["root"], 

1201 "action": "fetch", 

1202 "url": "/vi/{{module}}/trigger/kick/{{key}}?skey={{skey}}", 

1203 "confirm": i18n.translate( 

1204 key="viur.modules.user.customActions.kick.confirm", 

1205 defaultText="Do you really want to drop all sessions of the selected user from the system?", 

1206 ), 

1207 "success": i18n.translate( 

1208 key="viur.modules.user.customActions.kick.success", 

1209 defaultText="Sessions of the user are being invalidated.", 

1210 ), 

1211 }, 

1212 "trigger_takeover": { 

1213 "name": i18n.translate( 

1214 key="viur.modules.user.customActions.takeover", 

1215 defaultText="Take-over user", 

1216 hint="Title of the take user over function" 

1217 ), 

1218 "icon": "file-person-fill", 

1219 "access": ["root"], 

1220 "action": "fetch", 

1221 "url": "/vi/{{module}}/trigger/takeover/{{key}}?skey={{skey}}", 

1222 "confirm": i18n.translate( 

1223 key="viur.modules.user.customActions.takeover.confirm", 

1224 defaultText="Do you really want to replace your current user session by a " 

1225 "user session of the selected user?", 

1226 ), 

1227 "success": i18n.translate( 

1228 key="viur.modules.user.customActions.takeover.success", 

1229 defaultText="You're now know as the selected user!", 

1230 ), 

1231 "then": "reload-vi", 

1232 }, 

1233 }, 

1234 } 

1235 

1236 roles = { 

1237 "admin": "*", 

1238 } 

1239 

1240 def __init__(self, moduleName, modulePath): 

1241 for provider in self.authenticationProviders: 

1242 assert issubclass(provider, UserPrimaryAuthentication) 

1243 name = f"auth_{provider.__name__.lower()}" 

1244 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1245 

1246 for provider in self.secondFactorProviders: 

1247 assert issubclass(provider, UserSecondFactorAuthentication) 

1248 name = f"f2_{provider.__name__.lower()}" 

1249 setattr(self, name, provider(name, f"{modulePath}/{name}", self)) 

1250 

1251 super().__init__(moduleName, modulePath) 

1252 

1253 def get_role_defaults(self, role: str) -> set[str]: 

1254 """ 

1255 Returns a set of default access rights for a given role. 

1256 """ 

1257 if role in ("viewer", "editor", "admin"): 

1258 return {"admin"} 

1259 

1260 return set() 

1261 

1262 def addSkel(self): 

1263 skel = super().addSkel().clone() 

1264 user = current.user.get() 

1265 if not (user and user["access"] and (f"{self.moduleName}-add" in user["access"] or "root" in user["access"])): 

1266 skel.status.readOnly = True 

1267 skel["status"] = Status.UNSET 

1268 skel.status.visible = False 

1269 skel.access.readOnly = True 

1270 skel["access"] = [] 

1271 skel.access.visible = False 

1272 else: 

1273 # An admin tries to add a new user. 

1274 skel.status.readOnly = False 

1275 skel.status.visible = True 

1276 skel.access.readOnly = False 

1277 skel.access.visible = True 

1278 

1279 if "password" in skel: 

1280 # Unlock and require a password 

1281 skel.password.required = True 

1282 skel.password.visible = True 

1283 skel.password.readOnly = False 

1284 

1285 skel.name.readOnly = False # Don't enforce readonly name in user/add 

1286 return skel 

1287 

1288 def editSkel(self, *args, **kwargs): 

1289 skel = super().editSkel().clone() 

1290 

1291 if "password" in skel: 

1292 skel.password.required = False 

1293 skel.password.visible = True 

1294 skel.password.readOnly = False 

1295 

1296 user = current.user.get() 

1297 

1298 lockFields = not (user and "root" in user["access"]) # If we aren't root, make certain fields read-only 

1299 skel.name.readOnly = lockFields 

1300 skel.access.readOnly = lockFields 

1301 skel.status.readOnly = lockFields 

1302 

1303 return skel 

1304 

1305 def secondFactorProviderByClass(self, cls) -> UserSecondFactorAuthentication: 

1306 return getattr(self, f"f2_{cls.__name__.lower()}") 

1307 

1308 def getCurrentUser(self): 

1309 session = current.session.get() 

1310 

1311 if session and (user := session.get("user")): 

1312 skel = self.baseSkel() 

1313 skel.setEntity(user) 

1314 return skel 

1315 

1316 return None 

1317 

1318 def continueAuthenticationFlow(self, provider: UserPrimaryAuthentication, user_key: db.Key): 

1319 """ 

1320 Continue authentication flow when primary authentication succeeded. 

1321 """ 

1322 skel = self.baseSkel() 

1323 

1324 if not skel.fromDB(user_key): 

1325 raise errors.NotFound("User was not found.") 

1326 

1327 if not provider.can_handle(skel): 

1328 raise errors.Forbidden("User is not allowed to use this primary login method.") 

1329 

1330 session = current.session.get() 

1331 session["possible_user_key"] = user_key.id_or_name 

1332 session["_secondFactorStart"] = utils.utcNow() 

1333 session.markChanged() 

1334 

1335 second_factor_providers = [] 

1336 

1337 for auth_provider, second_factor in self.validAuthenticationMethods: 

1338 if isinstance(provider, auth_provider): 

1339 if second_factor is not None: 

1340 second_factor_provider_instance = self.secondFactorProviderByClass(second_factor) 

1341 if second_factor_provider_instance.can_handle(skel): 

1342 second_factor_providers.append(second_factor_provider_instance) 

1343 else: 

1344 second_factor_providers.append(None) 

1345 

1346 if len(second_factor_providers) > 1 and None in second_factor_providers: 

1347 # We have a second factor. So we can get rid of the None 

1348 second_factor_providers.pop(second_factor_providers.index(None)) 

1349 

1350 if len(second_factor_providers) == 0: 

1351 raise errors.NotAcceptable(self.msg_missing_second_factor) 

1352 elif len(second_factor_providers) == 1: 

1353 if second_factor_providers[0] is None: 

1354 # We allow sign-in without a second factor 

1355 return self.authenticateUser(user_key) 

1356 # We have only one second factor we don't need the choice template 

1357 return second_factor_providers[0].start(user_key) 

1358 

1359 # In case there is more than one second factor, let the user select a method. 

1360 return self.render.second_factor_choice(second_factors=second_factor_providers) 

1361 

1362 def secondFactorSucceeded(self, provider: UserSecondFactorAuthentication, user_key: db.Key): 

1363 """ 

1364 Continue authentication flow when secondary authentication succeeded. 

1365 """ 

1366 session = current.session.get() 

1367 if session["possible_user_key"] != user_key.id_or_name: 

1368 raise errors.Forbidden() 

1369 

1370 # Assert that the second factor verification finished in time 

1371 if utils.utcNow() - session["_secondFactorStart"] > self.secondFactorTimeWindow: 

1372 raise errors.RequestTimeout() 

1373 

1374 return self.authenticateUser(user_key) 

1375 

1376 def authenticateUser(self, key: db.Key, **kwargs): 

1377 """ 

1378 Performs Log-In for the current session and the given user key. 

1379 

1380 This resets the current session: All fields not explicitly marked as persistent 

1381 by conf.user.session_persistent_fields_on_login are gone afterwards. 

1382 

1383 :param key: The (DB-)Key of the user we shall authenticate 

1384 """ 

1385 skel = self.baseSkel() 

1386 if not skel.fromDB(key): 

1387 raise ValueError(f"Unable to authenticate unknown user {key}") 

1388 

1389 # Verify that this user account is active 

1390 if skel["status"] < Status.ACTIVE.value: 

1391 raise errors.Forbidden("The user is disabled and cannot be authenticated.") 

1392 

1393 # Update session for user 

1394 session = current.session.get() 

1395 # Remember persistent fields... 

1396 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_login} 

1397 session.reset() 

1398 # and copy them over to the new session 

1399 session |= take_over 

1400 

1401 # Update session, user and request 

1402 session["user"] = skel.dbEntity 

1403 

1404 current.request.get().response.headers[securitykey.SECURITYKEY_STATIC_HEADER] = session.static_security_key 

1405 current.user.set(self.getCurrentUser()) 

1406 

1407 self.onLogin(skel) 

1408 

1409 return self.render.loginSucceeded(**kwargs) 

1410 

1411 @exposed 

1412 @skey 

1413 def logout(self, *args, **kwargs): 

1414 """ 

1415 Implements the logout action. It also terminates the current session (all keys not listed 

1416 in viur.session_persistent_fields_on_logout will be lost). 

1417 """ 

1418 if not (user := current.user.get()): 

1419 raise errors.Unauthorized() 

1420 

1421 self.onLogout(user) 

1422 

1423 session = current.session.get() 

1424 take_over = {k: v for k, v in session.items() if k in conf.user.session_persistent_fields_on_logout} 

1425 session.reset() 

1426 session |= take_over 

1427 current.user.set(None) # set user to none in context var 

1428 return self.render.logoutSuccess() 

1429 

1430 @exposed 

1431 def login(self, *args, **kwargs): 

1432 return self.render.loginChoices([ 

1433 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1434 for primary, secondary in self.validAuthenticationMethods 

1435 ]) 

1436 

1437 def onLogin(self, skel: skeleton.SkeletonInstance): 

1438 """ 

1439 Hook to be called on user login. 

1440 """ 

1441 # Update the lastlogin timestamp (if available!) 

1442 if "lastlogin" in skel: 

1443 now = utils.utcNow() 

1444 

1445 # Conserve DB-Writes: Update the user max once in 30 Minutes (why??) 

1446 if not skel["lastlogin"] or ((now - skel["lastlogin"]) > datetime.timedelta(minutes=30)): 

1447 skel["lastlogin"] = now 

1448 skel.toDB(update_relations=False) 

1449 

1450 logging.info(f"""User {skel["name"]} logged in""") 

1451 

1452 def onLogout(self, skel: skeleton.SkeletonInstance): 

1453 """ 

1454 Hook to be called on user logout. 

1455 """ 

1456 logging.info(f"""User {skel["name"]} logged out""") 

1457 

1458 @exposed 

1459 def view(self, key: db.Key | int | str = "self", *args, **kwargs): 

1460 """ 

1461 Allow a special key "self" to reference the current user. 

1462 

1463 By default, any authenticated user can view its own user entry, 

1464 to obtain access rights and any specific user information. 

1465 This behavior is defined in the customized `canView` function, 

1466 which is overwritten by the User-module. 

1467 

1468 The rendered skeleton can be modified or restriced by specifying 

1469 a customized view-skeleton. 

1470 """ 

1471 if key == "self": 

1472 if user := current.user.get(): 

1473 key = user["key"] 

1474 else: 

1475 raise errors.Unauthorized("Cannot view 'self' with unknown user") 

1476 

1477 return super().view(key, *args, **kwargs) 

1478 

1479 def canView(self, skel) -> bool: 

1480 if user := current.user.get(): 

1481 if skel["key"] == user["key"]: 

1482 return True 

1483 

1484 if "root" in user["access"] or "user-view" in user["access"]: 

1485 return True 

1486 

1487 return False 

1488 

1489 @exposed 

1490 @skey(allow_empty=True) 

1491 def edit(self, key: db.Key | int | str = "self", *args, **kwargs): 

1492 """ 

1493 Allow a special key "self" to reference the current user. 

1494 

1495 This modification will only allow to use "self" as a key; 

1496 The specific access right to let the user edit itself must 

1497 still be customized. 

1498 

1499 The rendered and editable skeleton can be modified or restriced 

1500 by specifying a customized edit-skeleton. 

1501 """ 

1502 if key == "self": 

1503 if user := current.user.get(): 

1504 key = user["key"] 

1505 else: 

1506 raise errors.Unauthorized("Cannot edit 'self' with unknown user") 

1507 

1508 return super().edit(key, *args, **kwargs) 

1509 

1510 @exposed 

1511 def getAuthMethods(self, *args, **kwargs): 

1512 """Inform tools like Viur-Admin which authentication to use""" 

1513 # FIXME: This is almost the same code as in index()... 

1514 # FIXME: VIUR4: The entire function should be removed! 

1515 # TODO: Align result with index(), so that primary and secondary login is presented. 

1516 # logging.warning("DEPRECATED!!! Use of 'User.getAuthMethods' is deprecated! Use 'User.login'-method instead!") 

1517 

1518 res = [ 

1519 (primary.METHOD_NAME, secondary.METHOD_NAME if secondary else None) 

1520 for primary, secondary in self.validAuthenticationMethods 

1521 ] 

1522 

1523 return json.dumps(res) 

1524 

1525 @exposed 

1526 @skey 

1527 def trigger(self, action: str, key: str): 

1528 current.request.get().response.headers["Content-Type"] = "application/json" 

1529 

1530 # Check for provided access right definition (equivalent to client-side check), fallback to root! 

1531 access = self.adminInfo.get("customActions", {}).get(f"trigger_{action}", {}).get("access") or ("root", ) 

1532 if not ((cuser := current.user.get()) and any(role in cuser["access"] for role in access)): 

1533 raise errors.Unauthorized() 

1534 

1535 skel = self.baseSkel() 

1536 if not skel.fromDB(key): 

1537 raise errors.NotFound() 

1538 

1539 match action: 

1540 case "takeover": 

1541 self.authenticateUser(skel["key"]) 

1542 

1543 case "kick": 

1544 session.killSessionByUser(skel["key"]) 

1545 

1546 case _: 

1547 raise errors.NotImplemented(f"Action {action!r} not implemented") 

1548 

1549 return json.dumps("OKAY") 

1550 

1551 def onEdited(self, skel): 

1552 super().onEdited(skel) 

1553 # In case the user is set to inactive, kill all sessions 

1554 if "status" in skel and skel["status"] < Status.ACTIVE.value: 

1555 session.killSessionByUser(skel["key"]) 

1556 

1557 def onDeleted(self, skel): 

1558 super().onDeleted(skel) 

1559 # Invalidate all sessions of that user 

1560 session.killSessionByUser(skel["key"]) 

1561 

1562 

1563@tasks.StartupTask 

1564def createNewUserIfNotExists(): 

1565 """ 

1566 Create a new Admin user, if the userDB is empty 

1567 """ 

1568 if ( 

1569 (user_module := getattr(conf.main_app.vi, "user", None)) 

1570 and isinstance(user_module, User) 

1571 and "addSkel" in dir(user_module) 

1572 and "validAuthenticationMethods" in dir(user_module) 

1573 # UserPassword must be one of the primary login methods 

1574 and any( 

1575 issubclass(provider[0], UserPassword) 

1576 for provider in user_module.validAuthenticationMethods 

1577 ) 

1578 ): 

1579 if not db.Query(user_module.addSkel().kindName).getEntry(): # There's currently no user in the database 

1580 addSkel = skeleton.skeletonByKind(user_module.addSkel().kindName)() # Ensure we have the full skeleton 

1581 uname = f"""admin@{conf.instance.project_id}.appspot.com""" 

1582 pw = utils.string.random(13) 

1583 addSkel["name"] = uname 

1584 addSkel["status"] = Status.ACTIVE # Ensure it's enabled right away 

1585 addSkel["access"] = ["root"] 

1586 addSkel["password"] = pw 

1587 

1588 try: 

1589 addSkel.toDB() 

1590 except Exception as e: 

1591 logging.critical(f"Something went wrong when trying to add admin user {uname!r} with Password {pw!r}") 

1592 logging.exception(e) 

1593 return 

1594 

1595 msg = f"ViUR created a new admin-user for you!\nUsername: {uname}\nPassword: {pw}" 

1596 

1597 logging.warning(msg) 

1598 email.sendEMailToAdmins("New ViUR password", msg) 

1599 

1600 

1601# DEPRECATED ATTRIBUTES HANDLING 

1602 

1603def __getattr__(attr): 

1604 match attr: 

1605 case "userSkel": 

1606 msg = f"Use of `userSkel` is deprecated; Please use `UserSkel` instead!" 

1607 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

1608 logging.warning(msg) 

1609 return UserSkel 

1610 

1611 return super(__import__(__name__).__class__).__getattr__(attr)