Coverage for /home/runner/work/viur-core/viur-core/viur/src/viur/core/email.py: 0%
317 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-16 22:16 +0000
1import base64
2import json
3import logging
4import os
5import typing as t
6from abc import ABC, abstractmethod
7from urllib import request
9import requests
11from viur.core import db, utils
12from viur.core.config import conf
13from viur.core.tasks import CallDeferred, DeleteEntitiesIter, PeriodicTask
14from viur.core.bones.text import HtmlSerializer
15from google.appengine.api.mail import SendMail as GAE_SendMail, Attachment as GAE_Attachment
17if t.TYPE_CHECKING:
18 from viur.core.skeleton import SkeletonInstance
20mailjet_dependencies = True
21try:
22 import mailjet_rest
23except ModuleNotFoundError:
24 mailjet_dependencies = False
26"""
27 This module implements an email delivery system for ViUR. Emails will be queued so that we don't overwhelm
28 the email service. As the Appengine does not provide an email-api anymore, you'll have to use a 3rd party service
29 to actually deliver the email. A sample implementation for Send in Blue (https://sendinblue.com/) is provided.
30 To enable Send in Blue, set conf.email.transport_class to EmailTransportSendInBlue and add your API-Key to
31 conf.email.sendinblue_api_key. To send via another service, you'll have to implement a different transport
32 class (and point conf.email.transport_class to that class). This module needs a custom queue
33 (viur-emails) with a larger backoff value (so that we don't try to deliver the same email multiple times within a
34 short timeframe). A suggested configuration would be
36 - name: viur-emails
37 rate: 1/s
38 retry_parameters:
39 min_backoff_seconds: 3600
40 max_backoff_seconds: 3600
42"""
44EMAIL_KINDNAME = "viur-emails"
46AttachmentInline = t.TypedDict("AttachmentInline", {
47 "filename": str,
48 "content": bytes,
49 "mimetype": str,
50})
51AttachmentViurFile = t.TypedDict("AttachmentViurFile", {
52 "filename": str,
53 "file_key": db.Key | str,
54})
55AttachmentGscFile = t.TypedDict("AttachmentGscFile", {
56 "filename": str,
57 "gcsfile": db.Key | str,
58})
60Attachment: t.TypeAlias = AttachmentInline | AttachmentViurFile | AttachmentGscFile
63@PeriodicTask(interval=60 * 24)
64def cleanOldEmailsFromLog(*args, **kwargs):
65 """Start the QueryIter DeleteOldEmailsFromLog to remove old, successfully send emails from the queue"""
66 qry = db.Query(EMAIL_KINDNAME).filter("isSend =", True) \
67 .filter("creationDate <", utils.utcNow() - conf.email.log_retention)
68 DeleteEntitiesIter.startIterOnQuery(qry)
71class EmailTransport(ABC):
72 maxRetries = 3
74 @staticmethod
75 @abstractmethod
76 def deliverEmail(*, sender: str, dests: list[str], cc: list[str], bcc: list[str], subject: str, body: str,
77 headers: dict[str, str], attachments: list[Attachment],
78 customData: dict | None, **kwargs):
79 """
80 The actual email delivery must be implemented here. All email-adresses can be either in the form of
81 "mm@example.com" or "Max Musterman <mm@example.com>". If the delivery was successful, this method
82 should return normally, if there was an error delivering the message it *must* raise an exception.
84 :param sender: The sender to be used on the outgoing email
85 :param dests: List of recipients
86 :param cc: : List of carbon copy-recipients
87 :param bcc: List of blind carbon copy-recipients
88 :param subject: The subject of this email
89 :param body: The contents of this email (may be text/plain or text/html)
90 :param headers: Custom headers to send along with this email
91 :param attachments: List of attachments to include in this email
92 :param customData:
93 """
94 raise NotImplementedError()
96 @staticmethod
97 def validateQueueEntity(entity: db.Entity):
98 """
99 This function can be used to pre-validate the queue entity before it's deferred into the queue.
100 Must raise an exception if the email cannot be send (f.e. if it contains an invalid attachment)
101 :param entity: The entity to validate
102 """
103 return
105 @staticmethod
106 def transportSuccessfulCallback(entity: db.Entity):
107 """
108 This callback can be overridden by the project to execute additional tasks after an email
109 has been successfully send.
110 :param entity: The entity which has been send
111 """
112 pass
114 @staticmethod
115 def splitAddress(address: str) -> dict[str, str]:
116 """
117 Splits a Name/Address Pair into a dict,
118 i.e. "Max Musterman <mm@example.com>" into
119 {"name": "Max Mustermann", "email": "mm@example.com"}
120 :param address: Name/Address pair
121 :return: split dict
122 """
123 posLt = address.rfind("<")
124 posGt = address.rfind(">")
125 if -1 < posLt < posGt:
126 email = address[posLt + 1:posGt]
127 sname = address.replace(f"<{email}>", "", 1).strip()
128 return {"name": sname, "email": email}
129 else:
130 return {"email": address}
132 @staticmethod
133 def validate_attachment(attachment: Attachment) -> None:
134 """Validate attachment before queueing the email"""
135 if not isinstance(attachment, dict):
136 raise TypeError(f"Attachment must be a dict, not {type(attachment)}")
137 if "filename" not in attachment:
138 raise ValueError(f"Attachment {attachment} must have a filename")
139 if not any(prop in attachment for prop in ("content", "file_key", "gcsfile")):
140 raise ValueError(f"Attachment {attachment} must have content, file_key or gcsfile")
141 if "content" in attachment and not isinstance(attachment["content"], bytes):
142 raise ValueError(f"Attachment content must be bytes, not {type(attachment['content'])}")
144 @staticmethod
145 def fetch_attachment(attachment: Attachment) -> AttachmentInline:
146 """Fetch attachment (if necessary) in sendEmailDeferred deferred task
148 This allows sending emails with large attachments,
149 and prevents the queue entry from exceeding the maximum datastore Entity size.
150 """
151 # We need a copy of the attachments to keep the content apart from the db.Entity,
152 # which will be re-written later with the response.
153 attachment = attachment.copy()
154 if file_key := attachment.get("file_key"):
155 if attachment.get("content"):
156 raise ValueError(f'Got {file_key=} but also content in attachment {attachment.get("filename")=}')
157 blob, content_type = conf.main_app.vi.file.read(key=file_key)
158 attachment["content"] = blob.getvalue()
159 attachment["mimetype"] = content_type
160 elif gcsfile := attachment.get("gcsfile"):
161 if attachment.get("content"):
162 raise ValueError(f'Got {gcsfile=} but also content in attachment {attachment.get("filename")=}')
163 blob, content_type = conf.main_app.vi.file.read(path=gcsfile)
164 attachment["content"] = blob.getvalue()
165 attachment["mimetype"] = content_type
166 return attachment
169@CallDeferred
170def sendEmailDeferred(key: db.Key):
171 """
172 Callback from the Taskqueue to send the given Email
173 :param emailKey: Database-Key of the email we should send
174 """
175 logging.debug(f"Sending deferred e-mail {key!r}")
176 queued_email = db.Get(key)
177 assert queued_email, "Email queue object went missing!"
179 if queued_email["isSend"]:
180 return True
181 elif queued_email["errorCount"] > 3:
182 raise ChildProcessError("Error-Count exceeded")
184 transport_class = conf.email.transport_class # First, ensure we're able to send email at all
185 assert issubclass(transport_class, EmailTransport), "No or invalid email transportclass specified!"
187 try:
188 result_data = transport_class.deliverEmail(
189 dests=queued_email["dests"],
190 sender=queued_email["sender"],
191 cc=queued_email["cc"],
192 bcc=queued_email["bcc"],
193 subject=queued_email["subject"],
194 body=queued_email["body"],
195 headers=queued_email["headers"],
196 attachments=queued_email["attachments"]
197 )
199 except Exception:
200 # Increase the errorCount and bail out
201 queued_email["errorCount"] += 1
202 db.Put(queued_email)
203 raise
205 # If that transportFunction did not raise an error that email has been successfully send
206 queued_email["isSend"] = True
207 queued_email["sendDate"] = utils.utcNow()
208 queued_email["transportFuncResult"] = result_data
209 queued_email.exclude_from_indexes.add("transportFuncResult")
211 db.Put(queued_email)
213 try:
214 transport_class.transportSuccessfulCallback(queued_email)
215 except Exception as e:
216 logging.exception(e)
219def normalize_to_list(value: None | t.Any | list[t.Any] | t.Callable[[], list]) -> list[t.Any]:
220 """
221 Convert the given value to a list.
223 If the value parameter is callable, it will be called first to get the actual value.
224 """
225 if callable(value):
226 value = value()
227 if value is None:
228 return []
229 if isinstance(value, list):
230 return value
231 return [value]
234def sendEMail(
235 *,
236 tpl: str = None,
237 stringTemplate: str = None,
238 skel: t.Union[None, dict, "SkeletonInstance", list["SkeletonInstance"]] = None,
239 sender: str = None,
240 dests: str | list[str] = None,
241 cc: str | list[str] = None,
242 bcc: str | list[str] = None,
243 headers: dict[str, str] = None,
244 attachments: list[Attachment] = None,
245 context: db.DATASTORE_BASE_TYPES | list[db.DATASTORE_BASE_TYPES] | db.Entity = None,
246 **kwargs,
247) -> bool:
248 """
249 General purpose function for sending e-mail.
250 This function allows for sending e-mails, also with generated content using the Jinja2 template engine.
251 Your have to implement a method which should be called to send the prepared email finally. For this you have
252 to allocate *viur.email.transport_class* in conf.
254 :param tpl: The name of a template from the deploy/emails directory.
255 :param stringTemplate: This string is interpreted as the template contents. Alternative to load from template file.
256 :param skel: The data made available to the template. In case of a Skeleton or SkelList, its parsed the usual way;\
257 Dictionaries are passed unchanged.
258 :param sender: The address sending this mail.
259 :param dests: A list of addresses to send this mail to. A bare string will be treated as a list with 1 address.
260 :param cc: Carbon-copy recipients. A bare string will be treated as a list with 1 address.
261 :param bcc: Blind carbon-copy recipients. A bare string will be treated as a list with 1 address.
262 :param headers: Specify headers for this email.
263 :param attachments:
264 List of files to be sent within the mail as attachments. Each attachment must be a dictionary with these keys:
265 - filename (string): Name of the file that's attached. Always required
266 - content (bytes): Content of the attachment as bytes.
267 - mimetype (string): Mimetype of the file. Suggested parameter for other implementations (not used by SIB)
268 - gcsfile (string): Path to a GCS-File to include instead of content.
269 - file_key (string): Key of a FileSkeleton to include instead of content.
271 :param context: Arbitrary data that can be stored along the queue entry to be evaluated in
272 transportSuccessfulCallback (useful for tracking delivery / opening events etc).
274 .. warning::
275 As emails will be queued (and not send directly) you cannot exceed 1MB in total
276 (for all text and attachments combined)!
277 """
278 # First, ensure we're able to send email at all
279 transport_class = conf.email.transport_class # First, ensure we're able to send email at all
280 assert issubclass(transport_class, EmailTransport), "No or invalid email transportclass specified!"
282 # Ensure that all recipient parameters (dest, cc, bcc) are a list
283 dests = normalize_to_list(dests)
284 cc = normalize_to_list(cc)
285 bcc = normalize_to_list(bcc)
287 assert dests or cc or bcc, "No destination address given"
288 assert all([isinstance(x, str) and x for x in dests]), "Found non-string or empty destination address"
289 assert all([isinstance(x, str) and x for x in cc]), "Found non-string or empty cc address"
290 assert all([isinstance(x, str) and x for x in bcc]), "Found non-string or empty bcc address"
292 if not (bool(stringTemplate) ^ bool(tpl)):
293 raise ValueError("You have to set the params 'tpl' xor a 'stringTemplate'.")
295 if attachments := normalize_to_list(attachments):
296 # Ensure each attachment has the filename key and rewrite each dict to db.Entity so we can exclude
297 # it from being indexed
298 for _ in range(0, len(attachments)):
299 attachment = attachments.pop(0)
300 transport_class.validate_attachment(attachment)
302 if "mimetype" not in attachment:
303 attachment["mimetype"] = "application/octet-stream"
305 entity = db.Entity()
306 for k, v in attachment.items():
307 entity[k] = v
308 entity.exclude_from_indexes.add(k)
310 attachments.append(entity)
312 # If conf.email.recipient_override is set we'll redirect any email to these address(es)
313 if conf.email.recipient_override:
314 logging.warning(f"Overriding destination {dests!r} with {conf.email.recipient_override!r}")
315 old_dests = dests
316 new_dests = normalize_to_list(conf.email.recipient_override)
317 dests = []
318 for new_dest in new_dests:
319 if new_dest.startswith("@"):
320 for old_dest in old_dests:
321 dests.append(old_dest.replace(".", "_dot_").replace("@", "_at_") + new_dest)
322 else:
323 dests.append(new_dest)
324 cc = bcc = []
326 elif conf.email.recipient_override is False:
327 logging.warning("Sending emails disabled by config[viur.email.recipientOverride]")
328 return False
330 if conf.email.sender_override:
331 sender = conf.email.sender_override
332 elif sender is None:
333 sender = f'viur@{conf.instance.project_id}.appspotmail.com'
335 subject, body = conf.emailRenderer(dests, tpl, stringTemplate, skel, **kwargs)
337 # Push that email to the outgoing queue
338 queued_email = db.Entity(db.Key(EMAIL_KINDNAME))
340 queued_email["isSend"] = False
341 queued_email["errorCount"] = 0
342 queued_email["creationDate"] = utils.utcNow()
343 queued_email["sender"] = sender
344 queued_email["dests"] = dests
345 queued_email["cc"] = cc
346 queued_email["bcc"] = bcc
347 queued_email["subject"] = subject
348 queued_email["body"] = body
349 queued_email["headers"] = headers
350 queued_email["attachments"] = attachments
351 queued_email["context"] = context
352 queued_email.exclude_from_indexes = {"body", "attachments", "context"}
354 transport_class.validateQueueEntity(queued_email) # Will raise an exception if the entity is not valid
356 if conf.instance.is_dev_server:
357 if not conf.email.send_from_local_development_server or transport_class is EmailTransportAppengine:
358 logging.info("Not sending email from local development server")
359 logging.info(f"""Subject: {queued_email["subject"]}""")
360 logging.info(f"""Body: {queued_email["body"]}""")
361 logging.info(f"""Recipients: {queued_email["dests"]}""")
362 return False
364 db.Put(queued_email)
365 sendEmailDeferred(queued_email.key, _queue="viur-emails")
366 return True
369def sendEMailToAdmins(subject: str, body: str, *args, **kwargs) -> bool:
370 """
371 Sends an e-mail to the root users of the current app.
373 If conf.email.admin_recipients is set, these recipients
374 will be used instead of the root users.
376 :param subject: Defines the subject of the message.
377 :param body: Defines the message body.
378 """
379 success = False
380 try:
381 users = []
382 if conf.email.admin_recipients is not None:
383 users = normalize_to_list(conf.email.admin_recipients)
384 elif "user" in dir(conf.main_app.vi):
385 for user_skel in conf.main_app.vi.user.viewSkel().all().filter("access =", "root").fetch():
386 users.append(user_skel["name"])
388 # Prefix the instance's project_id to subject
389 subject = f"{conf.instance.project_id}: {subject}"
391 if users:
392 ret = sendEMail(dests=users, stringTemplate=os.linesep.join((subject, body)), *args, **kwargs)
393 success = True
394 return ret
395 else:
396 logging.warning("There are no recipients for admin e-mails available.")
398 finally:
399 if not success:
400 logging.critical("Cannot send mail to Admins.")
401 logging.debug(f"{subject = }, {body = }")
403 return False
406class EmailTransportSendInBlue(EmailTransport):
407 maxRetries = 3
408 # List of allowed file extensions that can be send from Send in Blue
409 allowedExtensions = {"gif", "png", "bmp", "cgm", "jpg", "jpeg", "tif",
410 "tiff", "rtf", "txt", "css", "shtml", "html", "htm",
411 "csv", "zip", "pdf", "xml", "doc", "docx", "ics",
412 "xls", "xlsx", "ppt", "tar", "ez"}
414 @staticmethod
415 def deliverEmail(*, sender: str, dests: list[str], cc: list[str], bcc: list[str], subject: str, body: str,
416 headers: dict[str, str], attachments: list[Attachment], **kwargs):
417 """
418 Internal function for delivering Emails using Send in Blue. This function requires the
419 conf.email.sendinblue_api_key to be set.
420 This function is supposed to return on success and throw an exception otherwise.
421 If no exception is thrown, the email is considered send and will not be retried.
422 """
423 dataDict = {
424 "sender": EmailTransportSendInBlue.splitAddress(sender),
425 "to": [],
426 "htmlContent": body,
427 "subject": subject,
428 }
429 for dest in dests:
430 dataDict["to"].append(EmailTransportSendInBlue.splitAddress(dest))
431 # intitialize bcc and cc lists in dataDict
432 if bcc:
433 dataDict["bcc"] = []
434 for dest in bcc:
435 dataDict["bcc"].append(EmailTransportSendInBlue.splitAddress(dest))
436 if cc:
437 dataDict["cc"] = []
438 for dest in cc:
439 dataDict["cc"].append(EmailTransportSendInBlue.splitAddress(dest))
440 if headers:
441 if "Reply-To" in headers:
442 dataDict["replyTo"] = EmailTransportSendInBlue.splitAddress(headers["Reply-To"])
443 del headers["Reply-To"]
444 if headers:
445 dataDict["headers"] = headers
446 if attachments:
447 dataDict["attachment"] = []
448 for attachment in attachments:
449 attachment = EmailTransportSendInBlue.fetch_attachment(attachment)
450 dataDict["attachment"].append({
451 "name": attachment["filename"],
452 "content": base64.b64encode(attachment["content"]).decode("ASCII")
453 })
454 payload = json.dumps(dataDict).encode("UTF-8")
455 headers = {
456 "api-key": conf.email.sendinblue_api_key,
457 "Content-Type": "application/json; charset=utf-8"
458 }
459 reqObj = request.Request(url="https://api.sendinblue.com/v3/smtp/email",
460 data=payload, headers=headers, method="POST")
461 try:
462 response = request.urlopen(reqObj)
463 except request.HTTPError as e:
464 logging.error("Sending email failed!")
465 logging.error(dataDict)
466 logging.error(e.read())
467 raise
468 assert str(response.code)[0] == "2", "Received a non 2XX Status Code!"
469 return response.read().decode("UTF-8")
471 @staticmethod
472 def validateQueueEntity(entity: db.Entity):
473 """
474 For Send in Blue, we'll validate the attachments (if any) against the list of supported file extensions
475 """
476 for attachment in entity.get("attachments") or []:
477 ext = attachment["filename"].split(".")[-1].lower()
478 if ext not in EmailTransportSendInBlue.allowedExtensions:
479 raise ValueError(f"The file-extension {ext} cannot be send using Send in Blue")
481 @PeriodicTask(interval=60 * 60)
482 @staticmethod
483 def check_sib_quota() -> None:
484 """Periodically checks the remaining SendInBlue email quota.
486 This task does not have to be enabled.
487 It automatically checks if the apiKey is configured.
489 There are three default thresholds: 1000, 500, 100
490 Others can be set via conf.email.sendinblue_thresholds.
491 An email will be sent for the lowest threshold that has been undercut.
492 """
493 if conf.email.sendinblue_api_key is None:
494 return # no SIB key, we cannot check
496 req = requests.get(
497 "https://api.sendinblue.com/v3/account",
498 headers={"api-key": conf.email.sendinblue_api_key},
499 )
500 if not req.ok:
501 logging.error("Failed to fetch SIB account information")
502 return
503 data = req.json()
504 logging.debug(f"SIB account data: {data}")
505 for plan in data["plan"]:
506 if plan["type"] == "payAsYouGo":
507 credits = plan["credits"]
508 break
509 else:
510 credits = -1
511 logging.info(f"SIB E-Mail credits: {credits}")
513 # Keep track of the last credits and the limit for which a email has
514 # already been sent. This way, emails for the same limit will not be
515 # sent more than once and the remaining e-mail credits will not be wasted.
516 key = db.Key("viur-email-conf", "sib-credits")
517 if not (entity := db.Get(key)):
518 logging.debug(f"{entity = }")
519 entity = db.Entity(key)
520 logging.debug(f"{entity = }")
521 logging.debug(f"{entity = }")
522 entity.setdefault("latest_warning_for", None)
523 entity["credits"] = credits
524 entity["email"] = data["email"]
526 thresholds = sorted(conf.email.sendinblue_thresholds, reverse=True)
527 for idx, limit in list(enumerate(thresholds, 1))[::-1]:
528 if credits < limit:
529 if entity["latest_warning_for"] == limit:
530 logging.info(f"Already send an email for {limit = }.")
531 break
533 sendEMailToAdmins(
534 f"SendInBlue email budget {credits} ({idx}. warning)",
535 f"The SendInBlue email budget reached {credits} credits "
536 f"for {data['email']}. Please increase soon.",
537 )
538 entity["latest_warning_for"] = limit
539 break
540 else:
541 # Credits are above all limits
542 entity["latest_warning_for"] = None
544 db.Put(entity)
547if mailjet_dependencies:
548 class EmailTransportMailjet(EmailTransport):
549 @staticmethod
550 def deliverEmail(*, sender: str, dests: list[str], cc: list[str], bcc: list[str], subject: str, body: str,
551 headers: dict[str, str], attachments: list[Attachment], **kwargs):
552 if not (conf.email.mailjet_api_key and conf.email.mailjet_api_secret):
553 raise RuntimeError("Mailjet config missing, check 'mailjet_api_key' and 'mailjet_api_secret'")
555 email = {
556 "from": EmailTransportMailjet.splitAddress(sender),
557 "htmlpart": body,
558 "subject": subject,
559 "to": [EmailTransportMailjet.splitAddress(dest) for dest in dests],
560 }
562 if bcc:
563 email["bcc"] = [EmailTransportMailjet.splitAddress(b) for b in bcc]
565 if cc:
566 email["cc"] = [EmailTransportMailjet.splitAddress(c) for c in cc]
568 if headers:
569 email["headers"] = headers
571 if attachments:
572 email["attachments"] = []
574 for attachment in attachments:
575 attachment = EmailTransportMailjet.fetch_attachment(attachment)
576 email["attachments"].append({
577 "filename": attachment["filename"],
578 "base64content": base64.b64encode(attachment["content"]).decode("ASCII"),
579 "contenttype": attachment["mimetype"]
580 })
582 mj_client = mailjet_rest.Client(
583 auth=(conf.email.mailjet_api_key, conf.email.mailjet_api_secret),
584 version="v3.1"
585 )
587 result = mj_client.send.create(data={"messages": [email]})
588 assert 200 <= result.status_code < 300, f"Received {result.status_code=} {result.reason=}"
589 return result.content.decode("UTF-8")
592class EmailTransportAppengine(EmailTransport):
593 """
594 Abstraction of the Google AppEngine Mail API for email transportation.
595 """
597 @staticmethod
598 def deliverEmail(
599 *,
600 sender: str,
601 dests: list[str],
602 cc: list[str],
603 bcc: list[str],
604 subject: str,
605 body: str,
606 headers: dict[str, str],
607 attachments: list[Attachment],
608 **kwargs,
609 ):
610 # need to build a silly dict because the google.appengine mail api doesn't accept None or empty values ...
611 params = {
612 "to": [EmailTransportAppengine.splitAddress(dest)["email"] for dest in dests],
613 "sender": sender,
614 "subject": subject,
615 "body": HtmlSerializer().sanitize(body),
616 "html": body,
617 }
619 if cc:
620 params["cc"] = [EmailTransportAppengine.splitAddress(c)["email"] for c in cc]
622 if bcc:
623 params["bcc"] = [EmailTransportAppengine.splitAddress(c)["email"] for c in bcc]
625 if attachments:
626 params["attachments"] = []
628 for attachment in attachments:
629 attachment = EmailTransportAppengine.fetch_attachment(attachment)
630 params["attachments"].append(
631 GAE_Attachment(attachment["filename"], attachment["content"])
632 )
634 GAE_SendMail(**params)
637# Set (limited, but free) Google AppEngine Mail API as default
638if conf.email.transport_class is None:
639 conf.email.transport_class = EmailTransportAppengine