Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/session.py: 24%

110 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-07 19:28 +0000

1import datetime 

2import logging 

3import time 

4from viur.core.tasks import DeleteEntitiesIter 

5from viur.core.config import conf # this import has to stay alone due partial import 

6from viur.core import db, utils, tasks, current 

7import typing as t 

8 

9""" 

10 Provides the session implementation for the Google AppEngine™ based on the datastore. 

11 To access the current session, and call current.session.get() 

12 

13 Example: 

14 

15 .. code-block:: python 

16 

17 from viur.core import current 

18 sessionData = current.session.get() 

19 sessionData["your_key"] = "your_data" 

20 data = sessionData["your_key"] 

21 

22 A get-method is provided for convenience. 

23 It returns None instead of raising an Exception if the key is not found. 

24""" 

25 

26_SENTINEL: t.Final[object] = object() 

27 

28 

29class Session(db.Entity): 

30 """ 

31 Store Sessions inside the datastore. 

32 The behaviour of this module can be customized in the following ways: 

33 

34 - :prop:same_site can be set to None, "none", "lax" or "strict" to influence the same-site tag on the cookies 

35 we set 

36 - :prop:use_session_cookie is set to True by default, causing the cookie to be treated as a session cookie 

37 (it will be deleted on browser close). If set to False, it will be emitted with the life-time in 

38 conf.user.session_life_time. 

39 - The config variable conf.user.session_life_time: Determines, how long (in seconds) a session is valid. 

40 Even if :prop:use_session_cookie is set to True, the session is voided server-side after no request has been 

41 made within the configured lifetime. 

42 - The config variables conf.user.session_persistent_fields_on_login and 

43 conf.user.session_persistent_fields_on_logout lists fields, that may survive a login/logout action. 

44 For security reasons, we completely destroy a session on login/logout (it will be deleted, a new empty 

45 database object will be created and a new cookie with a different key is sent to the browser). This causes 

46 all data currently stored to be lost. Only keys listed in these variables will be copied into the new 

47 session. 

48 """ 

49 kindName = "viur-session" 

50 same_site = "lax" # Either None (don't issue same_site header), "none", "lax" or "strict" 

51 use_session_cookie = True # If True, issue the cookie without a lifeTime (will disappear on browser close) 

52 cookie_name = f"""viur_cookie_{conf.instance.project_id}""" 

53 GUEST_USER = "__guest__" 

54 

55 def __init__(self): 

56 super().__init__() 

57 self.changed = False 

58 self.cookie_key = None 

59 self.static_security_key = None 

60 self.loaded = False 

61 

62 def load(self): 

63 """ 

64 Initializes the Session. 

65 

66 If the client supplied a valid Cookie, the session is read from the datastore, otherwise a new, 

67 empty session will be initialized. 

68 """ 

69 

70 if cookie_key := current.request.get().request.cookies.get(self.cookie_name): 

71 cookie_key = str(cookie_key) 

72 if data := db.Get(db.Key(self.kindName, cookie_key)): # Loaded successfully 

73 if data["lastseen"] < time.time() - conf.user.session_life_time: 

74 # This session is too old 

75 self.reset() 

76 return False 

77 

78 self.loaded = True 

79 self.cookie_key = cookie_key 

80 

81 super().clear() 

82 super().update(data["data"]) 

83 

84 self.static_security_key = data.get("static_security_key") or data.get("staticSecurityKey") 

85 if data["lastseen"] < time.time() - 5 * 60: # Refresh every 5 Minutes 

86 self.changed = True 

87 

88 else: 

89 self.reset() 

90 

91 def save(self): 

92 """ 

93 Writes the session into the database. 

94 

95 Does nothing, in case the session hasn't been changed in the current request. 

96 """ 

97 

98 if not self.changed: 

99 return 

100 current_request = current.request.get() 

101 # We will not issue sessions over http anymore 

102 if not (current_request.isSSLConnection or conf.instance.is_dev_server): 

103 return 

104 

105 # Get the current user's key 

106 try: 

107 # Check for our custom user-api 

108 user_key = conf.main_app.vi.user.getCurrentUser()["key"] 

109 except Exception: 

110 user_key = Session.GUEST_USER # this is a guest 

111 

112 if not self.loaded: 

113 self.cookie_key = utils.string.random(42) 

114 self.static_security_key = utils.string.random(13) 

115 

116 dbSession = db.Entity(db.Key(self.kindName, self.cookie_key)) 

117 

118 dbSession["data"] = db.fixUnindexableProperties(self) 

119 dbSession["static_security_key"] = self.static_security_key 

120 dbSession["lastseen"] = time.time() 

121 dbSession["user"] = str(user_key) # allow filtering for users 

122 dbSession.exclude_from_indexes = {"data"} 

123 

124 db.Put(dbSession) 

125 

126 # Provide Set-Cookie header entry with configured properties 

127 flags = ( 

128 "Path=/", 

129 "HttpOnly", 

130 f"SameSite={self.same_site}" if self.same_site and not conf.instance.is_dev_server else None, 

131 "Secure" if not conf.instance.is_dev_server else None, 

132 f"Max-Age={conf.user.session_life_time}" if not self.use_session_cookie else None, 

133 ) 

134 

135 current_request.response.headerlist.append( 

136 ("Set-Cookie", f"{self.cookie_name}={self.cookie_key};{';'.join([f for f in flags if f])}") 

137 ) 

138 

139 def __setitem__(self, key: str, item: t.Any): 

140 """ 

141 Stores a new value under the given key. 

142 

143 If that key exists before, its value is 

144 overwritten. 

145 """ 

146 super().__setitem__(key, item) 

147 self.changed = True 

148 

149 def markChanged(self) -> None: 

150 """ 

151 Explicitly mark the current session as changed. 

152 This will force save() to write into the datastore, 

153 even if it believes that this session hasn't changed. 

154 """ 

155 self.changed = True 

156 

157 def reset(self) -> None: 

158 """ 

159 Invalidates the current session and starts a new one. 

160 

161 This function is especially useful at login, where 

162 we might need to create an SSL-capable session. 

163 

164 :warning: Everything is flushed. 

165 """ 

166 

167 self.clear() 

168 self.cookie_key = utils.string.random(42) 

169 self.static_security_key = utils.string.random(13) 

170 self.loaded = True 

171 self.changed = True 

172 

173 def __delitem__(self, key: str) -> None: 

174 """ 

175 Removes a *key* from the session. 

176 This key must exist. 

177 """ 

178 super().__delitem__(key) 

179 self.changed = True 

180 

181 def __ior__(self, other: dict) -> t.Self: 

182 """ 

183 Merges the contents of a dict into the session. 

184 """ 

185 super().__ior__(other) 

186 self.changed = True 

187 return self 

188 

189 def update(self, other: dict) -> None: 

190 """ 

191 Merges the contents of a dict into the session. 

192 """ 

193 self |= other 

194 

195 def pop(self, key: str, default=_SENTINEL) -> t.Any: 

196 """ 

197 Delete a specified key from the session. 

198 

199 If key is in the session, remove it and return its value, else return default. 

200 If default is not given and key is not in the session, a KeyError is raised. 

201 """ 

202 if key in self or default is _SENTINEL: 

203 value = super().pop(key) 

204 self.changed = True 

205 

206 return value 

207 

208 return default 

209 

210 def clear(self) -> None: 

211 if self.cookie_key: 

212 db.Delete(db.Key(self.kindName, self.cookie_key)) 

213 from viur.core import securitykey 

214 securitykey.clear_session_skeys(self.cookie_key) 

215 current.request.get().response.delete_cookie(self.cookie_name) 

216 self.loaded = False 

217 self.cookie_key = None 

218 super().clear() 

219 

220 def popitem(self) -> t.Tuple[t.Any, t.Any]: 

221 self.changed = True 

222 return super().popitem() 

223 

224 def setdefault(self, key, default=None) -> t.Any: 

225 if key not in self: 

226 self.changed = True 

227 return super().setdefault(key, default) 

228 

229 

230 

231@tasks.CallDeferred 

232def killSessionByUser(user: t.Optional[t.Union[str, "db.Key", None]] = None): 

233 """ 

234 Invalidates all active sessions for the given *user*. 

235 

236 This means that this user is instantly logged out. 

237 If no user is given, it tries to invalidate **all** active sessions. 

238 

239 Use "__guest__" to kill all sessions not associated with a user. 

240 

241 :param user: UserID, "__guest__" or None. 

242 """ 

243 logging.info(f"Invalidating all sessions for {user=}") 

244 

245 query = db.Query(Session.kindName).filter("user =", str(user)) 

246 for obj in query.iter(): 

247 db.Delete(obj.key) 

248 

249 

250@tasks.PeriodicTask(interval=datetime.timedelta(hours=4)) 

251def start_clear_sessions(): 

252 """ 

253 Removes old (expired) Sessions 

254 """ 

255 query = db.Query(Session.kindName).filter("lastseen <", time.time() - (conf.user.session_life_time + 300)) 

256 DeleteEntitiesIter.startIterOnQuery(query)