Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/securitykey.py: 0%
54 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1"""
2 Implementation of one-time CSRF-security-keys.
4 CSRF-security-keys (Cross-Site Request Forgery) are used mostly to make requests unique and non-reproducible.
5 Doing the same request again requires to obtain a fresh security key first.
6 Furthermore, security keys can be used to implemented credential-reset mechanisms or similar features, where a
7 URL is only valid for one call.
9 ..note:
10 There's also a hidden 3rd type of security-key: The session's static security key.
12 This key is only revealed once during login, as the protected header "Sec-X-ViUR-StaticSessionKey".
14 This can be used instead of the one-time sessions security key by sending it back as the same protected HTTP
15 header and setting the skey value to "STATIC_SESSION_KEY". This is only intended for non-web-browser,
16 programmatic access (admin tools, import tools etc.) where CSRF attacks are not applicable.
18 Therefor that header is prefixed with "Sec-" - so it cannot be read or set using JavaScript.
19"""
20import typing as t
21import datetime
22import hmac
23from viur.core import conf, utils, current, db, tasks
25SECURITYKEY_KINDNAME = "viur-securitykey"
26SECURITYKEY_DURATION = 24 * 60 * 60 # one day
27SECURITYKEY_STATIC_HEADER: t.Final[str] = "Sec-X-ViUR-StaticSessionKey"
28"""The name of the header in which the static session key is provided at login
29and must be specified in requests that require a skey."""
30SECURITYKEY_STATIC_SKEY: t.Final[str] = "STATIC_SESSION_KEY"
31"""Value that must be used as a marker in the payload (key: skey) to indicate
32that the session key from the headers should be used."""
35def create(
36 duration: None | int | datetime.timedelta = None,
37 session_bound: bool = True,
38 key_length: int = 13,
39 indexed: bool = True,
40 **custom_data) -> str:
41 """
42 Creates a new one-time CSRF-security-key.
44 The custom data (given as **custom_data) that can be stored with the key.
45 Any data provided must be serializable by the datastore.
47 :param duration: Make this CSRF-token valid for a fixed timeframe.
48 :param session_bound: Bind this CSRF-token to the current session.
49 :param indexed: Indexes all values stored with the security-key (default), set False to not index.
50 :param custom_data: Any other data is stored with the CSRF-token, for later re-use.
52 :returns: The new one-time key, which is a randomized string.
53 """
54 if any(k.startswith("viur_") for k in custom_data):
55 raise ValueError("custom_data keys with a 'viur_'-prefix are reserved.")
57 if not duration:
58 duration = conf.user.session_life_time if session_bound else SECURITYKEY_DURATION
59 key = utils.string.random(key_length)
61 entity = db.Entity(db.Key(SECURITYKEY_KINDNAME, key))
62 entity |= custom_data
64 entity["viur_session"] = current.session.get().cookie_key if session_bound else None
65 entity["viur_until"] = utils.utcNow() + utils.parse.timedelta(duration)
68 if not indexed:
69 entity.exclude_from_indexes = [k for k in entity.keys() if not k.startswith("viur_")]
71 db.Put(entity)
73 return key
76def validate(key: str, session_bound: bool = True) -> bool | db.Entity:
77 """
78 Validates a CSRF-security-key.
80 :param key: The CSRF-token to be validated.
81 :param session_bound: If True, make sure the CSRF-token is created inside the current session.
82 :returns: False if the key was not valid for whatever reasons, the data (given during :meth:`create`) as
83 dictionary or True if the dict is empty (or session was True).
84 """
85 if session_bound and key == SECURITYKEY_STATIC_SKEY:
86 if skey_header_value := current.request.get().request.headers.get(SECURITYKEY_STATIC_HEADER):
87 return hmac.compare_digest(current.session.get().static_security_key, skey_header_value)
89 return False
91 if not key or not (entity := db.Get(db.Key(SECURITYKEY_KINDNAME, key))):
92 return False
94 # First of all, delete the entity, validation is done afterward.
95 db.Delete(entity)
97 # Key has expired?
98 if entity["viur_until"] < utils.utcNow():
99 return False
101 del entity["viur_until"]
103 # Key is session bound?
104 if session_bound:
105 if entity["viur_session"] != current.session.get().cookie_key:
106 return False
107 elif entity["viur_session"]:
108 return False
110 del entity["viur_session"]
112 return entity or True
115@tasks.PeriodicTask(60 * 4)
116def periodic_clear_skeys():
117 from viur.core import tasks
118 """
119 Removes expired CSRF-security-keys periodically.
120 """
121 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_until <", utils.utcNow() - datetime.timedelta(seconds=300))
122 tasks.DeleteEntitiesIter.startIterOnQuery(query)
125@tasks.CallDeferred
126def clear_session_skeys(session_key):
127 from viur.core import tasks
128 """
129 Removes any CSRF-security-keys bound to a specific session.
130 This function is called by the Session-module based on reset-actions.
131 """
132 query = db.Query(SECURITYKEY_KINDNAME).filter("viur_session", session_key)
133 tasks.DeleteEntitiesIter.startIterOnQuery(query)