import email.utils import logging import os import time from django.conf import settings from django.contrib.sites.models import Site from django.core import mail from django.template.loader import render_to_string from tickets.defaults import defaults from tickets.models import Attachment, Comment, Task log = logging.getLogger(__name__) def staff_check(user): """If tickets_STAFF_ONLY is set to True, limit view access to staff users only. # FIXME: More granular access control needed - see """ if defaults("TICKETS_STAFF_ONLY"): return user.is_staff else: # If unset or False, allow all logged in users return True def user_can_read_task(task, user): return in user.groups.all() or user.is_superuser def tickets_get_backend(task): """Returns a mail backend for some task""" mail_backends = getattr(settings, "TICKETS_MAIL_BACKENDS", None) print(mail_backends) if mail_backends is None: return None #task_backend = mail_backends[task.task_list.slug] task_backend = mail_backends['mail-queue'] if task_backend is None: return None return task_backend def tickets_get_mailer(user, task): """A mailer is a (from_address, backend) pair""" task_backend = tickets_get_backend(task) if task_backend is None: return (None, mail.get_connection) from_address = getattr(task_backend, "from_address") from_address = email.utils.formataddr((user.username, from_address)) return (from_address, task_backend) def tickets_send_mail(user, task, subject, body, recip_list): """Send an email attached to task, triggered by user""" if settings.IS_SEND_EMAIL: references = Comment.objects.filter(task=task).only("email_message_id") references = (ref.email_message_id for ref in references) references = " ".join(filter(bool, references)) from_address, backend = tickets_get_mailer(user, task) message_hash = hash((subject, body, from_address, frozenset(recip_list), references)) message_id = ( # the task_id enables attaching back notification answers "" ).format(, # avoid the -hexstring case (hashes can be negative) message_hash=abs(message_hash), epoch=int(time.time()), ) # the thread message id is used as a common denominator between all # notifications for some task. This message doesn't actually exist, # it's just there to make threading possible thread_message_id = "".format( references = "{} {}".format(references, thread_message_id) with backend() as connection: message = mail.EmailMessage( subject, body, from_address, recip_list, [], # Bcc headers={ **getattr(backend, "headers", {}), "Message-ID": message_id, "References": references, "In-reply-to": thread_message_id, }, connection=connection, ) message.send() def send_notify_mail(new_task): """ Send email to assignee if task is assigned to someone other than submittor. Unassigned tasks should not try to notify. """ if new_task.assigned_to == new_task.created_by: return current_site = Site.objects.get_current() subject = render_to_string("tickets/email/assigned_subject.txt", {"task": new_task}) body = render_to_string( "tickets/email/assigned_body.txt", {"task": new_task, "site": current_site} ) recip_list = [] tickets_send_mail(new_task.created_by, new_task, subject, body, recip_list) def send_notify_change_mail(new_task, user): """ Send email to assignee if task is assigned to someone other than submittor. Unassigned tasks should not try to notify. """ if new_task.assigned_to == new_task.created_by: return current_site = Site.objects.get_current() subject = render_to_string("tickets/email/assigned_subject.txt", {"task": new_task}) body = render_to_string( "tickets/email/changetask_body.txt", {"task": new_task, "site": current_site} ) recip_list = [] if user == new_task.created_by: tickets_send_mail(new_task.created_by, new_task, subject, body, recip_list) else: tickets_send_mail(user, new_task, subject, body, recip_list) tickets_send_mail(user, new_task, subject, body, []) def send_email_to_thread_participants(task, msg_body, user, subject=None): """Notify all previous commentors on a Task about a new comment.""" current_site = Site.objects.get_current() email_subject = subject if not subject: subject = render_to_string("tickets/email/assigned_subject.txt", {"task": task}) email_body = render_to_string( "tickets/email/newcomment_body.txt", {"task": task, "body": msg_body, "site": current_site, "user": user}, ) # Get all thread participants commenters = Comment.objects.filter(task=task) recip_list = set( for ca in commenters if is not None) for related_user in (task.created_by, task.assigned_to): if related_user is not None: recip_list.add( recip_list = list(m for m in recip_list if m) if user == task.created_by: tickets_send_mail(user, task, email_subject, email_body, recip_list) else: tickets_send_mail(user, task, email_subject, email_body, recip_list) tickets_send_mail(user, task, email_subject, email_body, []) def change_task_status(task_id: int, new_status: str) -> bool: """Change task status""" try: task = Task.objects.get(id=task_id) task.status = new_status return True except Task.DoesNotExist:"Task {task_id} not found.") return False def remove_attachment_file(attachment_id: int) -> bool: """Delete an Attachment object and its corresponding file from the filesystem.""" try: attachment = Attachment.objects.get(id=attachment_id) if attachment.file: if os.path.isfile(attachment.file.path): os.remove(attachment.file.path) attachment.delete() return True except Attachment.DoesNotExist:"Attachment {attachment_id} not found.") return False