Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/securitykey.py: 0%

54 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-03 13:41 +0000

1""" 

2 Implementation of one-time CSRF-security-keys. 

3 

4 CSRF-security-keys (Cross-Site Request Forgery) are used mostly to make requests unique and non-reproducible. 

5 Doing the same request again requires to obtain a fresh security key first. 

6 Furthermore, security keys can be used to implemented credential-reset mechanisms or similar features, where a 

7 URL is only valid for one call. 

8 

9 ..note: 

10 There's also a hidden 3rd type of security-key: The session's static security key. 

11 

12 This key is only revealed once during login, as the protected header "Sec-X-ViUR-StaticSessionKey". 

13 

14 This can be used instead of the one-time sessions security key by sending it back as the same protected HTTP 

15 header and setting the skey value to "STATIC_SESSION_KEY". This is only intended for non-web-browser, 

16 programmatic access (admin tools, import tools etc.) where CSRF attacks are not applicable. 

17 

18 Therefor that header is prefixed with "Sec-" - so it cannot be read or set using JavaScript. 

19""" 

20import typing as t 

21import datetime 

22import hmac 

23from viur.core import conf, utils, current, db, tasks 

24 

25SECURITYKEY_KINDNAME = "viur-securitykey" 

26SECURITYKEY_DURATION = 24 * 60 * 60 # one day 

27SECURITYKEY_STATIC_HEADER: t.Final[str] = "Sec-X-ViUR-StaticSessionKey" 

28"""The name of the header in which the static session key is provided at login 

29and must be specified in requests that require a skey.""" 

30SECURITYKEY_STATIC_SKEY: t.Final[str] = "STATIC_SESSION_KEY" 

31"""Value that must be used as a marker in the payload (key: skey) to indicate 

32that the session key from the headers should be used.""" 

33 

34 

35def create( 

36 duration: None | int | datetime.timedelta = None, 

37 session_bound: bool = True, 

38 key_length: int = 13, 

39 indexed: bool = True, 

40 **custom_data) -> str: 

41 """ 

42 Creates a new one-time CSRF-security-key. 

43 

44 The custom data (given as **custom_data) that can be stored with the key. 

45 Any data provided must be serializable by the datastore. 

46 

47 :param duration: Make this CSRF-token valid for a fixed timeframe. 

48 :param session_bound: Bind this CSRF-token to the current session. 

49 :param indexed: Indexes all values stored with the security-key (default), set False to not index. 

50 :param custom_data: Any other data is stored with the CSRF-token, for later re-use. 

51 

52 :returns: The new one-time key, which is a randomized string. 

53 """ 

54 if any(k.startswith("viur_") for k in custom_data): 

55 raise ValueError("custom_data keys with a 'viur_'-prefix are reserved.") 

56 

57 if not duration: 

58 duration = conf.user.session_life_time if session_bound else SECURITYKEY_DURATION 

59 key = utils.string.random(key_length) 

60 

61 entity = db.Entity(db.Key(SECURITYKEY_KINDNAME, key)) 

62 entity |= custom_data 

63 

64 entity["viur_session"] = current.session.get().cookie_key if session_bound else None 

65 entity["viur_until"] = utils.utcNow() + utils.parse.timedelta(duration) 

66 

67 

68 if not indexed: 

69 entity.exclude_from_indexes = [k for k in entity.keys() if not k.startswith("viur_")] 

70 

71 db.Put(entity) 

72 

73 return key 

74 

75 

76def validate(key: str, session_bound: bool = True) -> bool | db.Entity: 

77 """ 

78 Validates a CSRF-security-key. 

79 

80 :param key: The CSRF-token to be validated. 

81 :param session_bound: If True, make sure the CSRF-token is created inside the current session. 

82 :returns: False if the key was not valid for whatever reasons, the data (given during :meth:`create`) as 

83 dictionary or True if the dict is empty (or session was True). 

84 """ 

85 if session_bound and key == SECURITYKEY_STATIC_SKEY: 

86 if skey_header_value := current.request.get().request.headers.get(SECURITYKEY_STATIC_HEADER): 

87 return hmac.compare_digest(current.session.get().static_security_key, skey_header_value) 

88 

89 return False 

90 

91 if not key or not (entity := db.Get(db.Key(SECURITYKEY_KINDNAME, key))): 

92 return False 

93 

94 # First of all, delete the entity, validation is done afterward. 

95 db.Delete(entity) 

96 

97 # Key has expired? 

98 if entity["viur_until"] < utils.utcNow(): 

99 return False 

100 

101 del entity["viur_until"] 

102 

103 # Key is session bound? 

104 if session_bound: 

105 if entity["viur_session"] != current.session.get().cookie_key: 

106 return False 

107 elif entity["viur_session"]: 

108 return False 

109 

110 del entity["viur_session"] 

111 

112 return entity or True 

113 

114 

115@tasks.PeriodicTask(60 * 4) 

116def periodic_clear_skeys(): 

117 from viur.core import tasks 

118 """ 

119 Removes expired CSRF-security-keys periodically. 

120 """ 

121 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_until <", utils.utcNow() - datetime.timedelta(seconds=300)) 

122 tasks.DeleteEntitiesIter.startIterOnQuery(query) 

123 

124 

125@tasks.CallDeferred 

126def clear_session_skeys(session_key): 

127 from viur.core import tasks 

128 """ 

129 Removes any CSRF-security-keys bound to a specific session. 

130 This function is called by the Session-module based on reset-actions. 

131 """ 

132 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_session", session_key) 

133 tasks.DeleteEntitiesIter.startIterOnQuery(query)