imap.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import email
  2. import email.parser
  3. import imaplib
  4. import logging
  5. import time
  6. from email.policy import default
  7. from contextlib import contextmanager
  8. logger = logging.getLogger(__name__)
  9. def imap_check(command_tuple):
  10. status, ids = command_tuple
  11. assert status == "OK", ids
  12. @contextmanager
  13. def imap_connect(host, port, username, password):
  14. conn = imaplib.IMAP4_SSL(host=host, port=port)
  15. conn.login(username, password)
  16. imap_check(conn.list())
  17. try:
  18. yield conn
  19. finally:
  20. conn.close()
  21. def parse_message(message):
  22. for response_part in message:
  23. if not isinstance(response_part, tuple):
  24. continue
  25. message_metadata, message_content = response_part
  26. email_parser = email.parser.BytesFeedParser(policy=default)
  27. email_parser.feed(message_content)
  28. return email_parser.close()
  29. def search_message(conn, *filters):
  30. status, message_ids = conn.search(None, *filters)
  31. for message_id in message_ids[0].split():
  32. status, message = conn.fetch(message_id, "(RFC822)")
  33. yield message_id, parse_message(message)
  34. def imap_producer(
  35. process_all=False,
  36. preserve=False,
  37. host=None,
  38. port=993,
  39. username=None,
  40. password=None,
  41. nap_duration=1,
  42. input_folder="INBOX",
  43. ):
  44. logger.debug("starting IMAP worker")
  45. imap_filter = "(ALL)" if process_all else "(UNSEEN)"
  46. def process_batch():
  47. logger.debug("starting to process batch")
  48. # reconnect each time to avoid repeated failures due to a lost connection
  49. with imap_connect(host, port, username, password) as conn:
  50. # select the requested folder
  51. imap_check(conn.select(input_folder, readonly=False))
  52. try:
  53. for message_uid, message in search_message(conn, imap_filter):
  54. logger.info(f"received message {message_uid}")
  55. try:
  56. yield message
  57. except Exception:
  58. logger.exception(f"something went wrong while processing {message_uid}")
  59. raise
  60. if not preserve:
  61. # tag the message for deletion
  62. conn.store(message_uid, "+FLAGS", "\\Deleted")
  63. else:
  64. logger.debug("did not receive any message")
  65. finally:
  66. if not preserve:
  67. # flush deleted messages
  68. conn.expunge()
  69. while True:
  70. try:
  71. yield from process_batch()
  72. except (GeneratorExit, KeyboardInterrupt):
  73. # the generator was closed, due to the consumer
  74. # breaking out of the loop, or an exception occuring
  75. raise
  76. except Exception:
  77. logger.exception("mail fetching went wrong, retrying")
  78. # sleep to avoid using too much resources
  79. # tickets: get notified when a new message arrives
  80. time.sleep(nap_duration)