123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import email.utils
- import logging
- import os
- import time
- from django.conf import settings
- from django.contrib.sites.models import Site
- from django.core import mail
- from django.template.loader import render_to_string
- from tickets.defaults import defaults
- from tickets.models import Attachment, Comment, Task
- log = logging.getLogger(__name__)
- def staff_check(user):
- """If tickets_STAFF_ONLY is set to True, limit view access to staff users only.
- # FIXME: More granular access control needed - see
- https://github.com/shacker/django-todo/issues/50
- """
- if defaults("TICKETS_STAFF_ONLY"):
- return user.is_staff
- else:
- # If unset or False, allow all logged in users
- return True
- def user_can_read_task(task, user):
- return task.task_list.group in user.groups.all() or user.is_superuser
- def tickets_get_backend(task):
- """Returns a mail backend for some task"""
- mail_backends = getattr(settings, "TICKETS_MAIL_BACKENDS", None)
- print(mail_backends)
- if mail_backends is None:
- return None
- #task_backend = mail_backends[task.task_list.slug]
- task_backend = mail_backends['mail-queue']
- if task_backend is None:
- return None
- return task_backend
- def tickets_get_mailer(user, task):
- """A mailer is a (from_address, backend) pair"""
- task_backend = tickets_get_backend(task)
- if task_backend is None:
- return (None, mail.get_connection)
- from_address = getattr(task_backend, "from_address")
- from_address = email.utils.formataddr((user.username, from_address))
- return (from_address, task_backend)
- def tickets_send_mail(user, task, subject, body, recip_list):
- """Send an email attached to task, triggered by user"""
- if settings.IS_SEND_EMAIL:
- references = Comment.objects.filter(task=task).only("email_message_id")
- references = (ref.email_message_id for ref in references)
- references = " ".join(filter(bool, references))
- from_address, backend = tickets_get_mailer(user, task)
- message_hash = hash((subject, body, from_address, frozenset(recip_list), references))
- message_id = (
- # the task_id enables attaching back notification answers
- "<notif-{task_id}."
- # the message hash / epoch pair enables deduplication
- "{message_hash:x}."
- "{epoch}@django-tickets>"
- ).format(
- task_id=task.pk,
- # avoid the -hexstring case (hashes can be negative)
- message_hash=abs(message_hash),
- epoch=int(time.time()),
- )
- # the thread message id is used as a common denominator between all
- # notifications for some task. This message doesn't actually exist,
- # it's just there to make threading possible
- thread_message_id = "<thread-{}@django-tickets>".format(task.pk)
- references = "{} {}".format(references, thread_message_id)
- with backend() as connection:
- message = mail.EmailMessage(
- subject,
- body,
- from_address,
- recip_list,
- [], # Bcc
- headers={
- **getattr(backend, "headers", {}),
- "Message-ID": message_id,
- "References": references,
- "In-reply-to": thread_message_id,
- },
- connection=connection,
- )
- message.send()
- def send_notify_mail(new_task):
- """
- Send email to assignee if task is assigned to someone other than submittor.
- Unassigned tasks should not try to notify.
- """
- if new_task.assigned_to == new_task.created_by:
- return
- current_site = Site.objects.get_current()
- subject = render_to_string("tickets/email/assigned_subject.txt", {"task": new_task})
- body = render_to_string(
- "tickets/email/assigned_body.txt", {"task": new_task, "site": current_site}
- )
- recip_list = [new_task.assigned_to.email]
- tickets_send_mail(new_task.created_by, new_task, subject, body, recip_list)
- def send_notify_change_mail(new_task, user):
- """
- Send email to assignee if task is assigned to someone other than submittor.
- Unassigned tasks should not try to notify.
- """
- if new_task.assigned_to == new_task.created_by:
- return
- current_site = Site.objects.get_current()
- subject = render_to_string("tickets/email/assigned_subject.txt", {"task": new_task})
- body = render_to_string(
- "tickets/email/changetask_body.txt", {"task": new_task, "site": current_site}
- )
- recip_list = [new_task.assigned_to.email]
- if user == new_task.created_by:
- tickets_send_mail(new_task.created_by, new_task, subject, body, recip_list)
- else:
- tickets_send_mail(user, new_task, subject, body, recip_list)
- tickets_send_mail(user, new_task, subject, body, [new_task.created_by.email])
- def send_email_to_thread_participants(task, msg_body, user, subject=None):
- """Notify all previous commentors on a Task about a new comment."""
- current_site = Site.objects.get_current()
- email_subject = subject
- if not subject:
- subject = render_to_string("tickets/email/assigned_subject.txt", {"task": task})
- email_body = render_to_string(
- "tickets/email/newcomment_body.txt",
- {"task": task, "body": msg_body, "site": current_site, "user": user},
- )
- # Get all thread participants
- commenters = Comment.objects.filter(task=task)
- recip_list = set(ca.author.email for ca in commenters if ca.author is not None)
- for related_user in (task.created_by, task.assigned_to):
- if related_user is not None:
- recip_list.add(related_user.email)
- recip_list = list(m for m in recip_list if m)
- if user == task.created_by:
- tickets_send_mail(user, task, email_subject, email_body, recip_list)
- else:
- tickets_send_mail(user, task, email_subject, email_body, recip_list)
- tickets_send_mail(user, task, email_subject, email_body, [task.created_by.email])
- def change_task_status(task_id: int, new_status: str) -> bool:
- """Change task status"""
- try:
- task = Task.objects.get(id=task_id)
- task.status = new_status
- task.save()
- return True
-
- except Task.DoesNotExist:
- log.info(f"Task {task_id} not found.")
- return False
- def remove_attachment_file(attachment_id: int) -> bool:
- """Delete an Attachment object and its corresponding file from the filesystem."""
- try:
- attachment = Attachment.objects.get(id=attachment_id)
- if attachment.file:
- if os.path.isfile(attachment.file.path):
- os.remove(attachment.file.path)
- attachment.delete()
- return True
- except Attachment.DoesNotExist:
- log.info(f"Attachment {attachment_id} not found.")
- return False
|