import datetime import os import textwrap from django.utils import timezone from django.conf import settings from django.contrib.auth.models import Group from django.db import DEFAULT_DB_ALIAS, models from django.db.transaction import Atomic, get_connection from django.urls import reverse def get_attachment_upload_dir(instance, filename): """Determine upload dir for task attachment files. """ return "/".join(["tasks", "attachments", str(instance.task.id), filename]) class LockedAtomicTransaction(Atomic): """ modified from https://stackoverflow.com/a/41831049 this is needed for safely merging Does a atomic transaction, but also locks the entire table for any transactions, for the duration of this transaction. Although this is the only way to avoid concurrency issues in certain situations, it should be used with caution, since it has impacts on performance, for obvious reasons... """ def __init__(self, *models, using=None, savepoint=None): if using is None: using = DEFAULT_DB_ALIAS super().__init__(using, savepoint) self.models = models def __enter__(self): super(LockedAtomicTransaction, self).__enter__() # Make sure not to lock, when sqlite is used, or you'll run into problems while running tests!!! if settings.DATABASES[self.using]["ENGINE"] != "django.db.backends.sqlite3": cursor = None try: cursor = get_connection(self.using).cursor() for model in self.models: cursor.execute( "LOCK TABLE {table_name}".format(table_name=model._meta.db_table) ) finally: if cursor and not cursor.closed: cursor.close() class TaskList(models.Model): name = models.CharField(max_length=48, verbose_name='Name') slug = models.SlugField(default="", verbose_name='Slug') group = models.ForeignKey(Group, on_delete=models.CASCADE, verbose_name='Group') def __str__(self): return self.name class Meta: ordering = ["name"] verbose_name_plural = "Ticket Lists" unique_together = ("group", "slug") class TicketType(models.Model): name = models.CharField(max_length=32, unique=True, editable=True, verbose_name='Name') life_cycle = models.CharField(max_length=192, unique=True, editable=True, verbose_name='Life cycle') def __str__(self): return self.name class Task(models.Model): task_list = models.ForeignKey(TaskList, blank=True, on_delete=models.CASCADE, verbose_name="Task list") status = models.PositiveSmallIntegerField(blank=True, null=True, verbose_name="Status") created_date = models.DateField(verbose_name="Created Date") status_changed_date = models.DateTimeField(blank=True, verbose_name="Status changed date") created_by = models.ForeignKey( settings.AUTH_USER_MODEL, blank=True, null=True, on_delete=models.SET_NULL, verbose_name="Created by" ) priority = models.PositiveSmallIntegerField(default=0, verbose_name="Priority") type = models.ForeignKey( TicketType, on_delete=models.SET_NULL, default=0, null=True, related_name="tickets_tickettype", verbose_name="Type") title = models.CharField(max_length=64, verbose_name="Title") note = models.TextField(blank=True, null=True, verbose_name="Note") due_date = models.DateField(blank=True, null=True, verbose_name="Due date") assigned_to = models.ForeignKey( settings.AUTH_USER_MODEL, blank=True, null=True, related_name="tickets_assigned_to", on_delete=models.SET_NULL, verbose_name="Assigned to" ) def get_states_list(self): data = self.type.life_cycle data = data.split(",") for i, item in enumerate(data): data[i] = item.split("-") for inner_index, inner_item in enumerate(data[i]): data[i][inner_index] = int(inner_item) return data def now_state_list(self): states_list = self.get_states_list() if self.status == None: return states_list[0] else: for i in range(len(states_list)): if states_list[i][0] == self.status: return states_list[i] def next_state(self): states_list = self.get_states_list() try: return self.now_state_list(self, states_list)[1] except IndexError: return 1 def first_state(self): return self.get_states_list()[0][0] # Has due date for an instance of this object passed? def overdue_status(self): "Returns whether the Tasks's due date has passed or not." if self.due_date and datetime.date.today() > self.due_date: return True def __str__(self): return self.title def get_absolute_url(self): return reverse("tickets:task_detail", kwargs={"task_id": self.id}) def save(self, **kwargs): if not self.id: self.created_date = timezone.now() self.status_changed_date = timezone.now() super(Task, self).save() # def merge_into(self, merge_target): # if merge_target.pk == self.pk: # raise ValueError("can't merge a task with self") # # lock the comments to avoid concurrent additions of comments after the # # update request. these comments would be irremediably lost because of # # the cascade clause # with LockedAtomicTransaction(Comment): # Comment.objects.filter(task=self).update(task=merge_target) # self.delete() class Meta: verbose_name = "Task" ordering = ["priority", "created_date"] class Comment(models.Model): author = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, blank=True, null=True, related_name="tickets_comments", verbose_name='Author' ) task = models.ForeignKey(Task, on_delete=models.CASCADE, verbose_name='Task') date = models.DateTimeField(default=timezone.now, verbose_name='Date') email_from = models.CharField(max_length=320, blank=True, null=True, verbose_name='Eamil from') email_message_id = models.CharField(max_length=255, blank=True, null=True, verbose_name='Eamil message id') body = models.TextField(blank=True, verbose_name='Body') class Meta: # an email should only appear once per task verbose_name='Comment' unique_together = ("task", "email_message_id") @property def author_text(self): if self.author is not None: return str(self.author) assert self.email_message_id is not None return str(self.email_from) @property def snippet(self): body_snippet = textwrap.shorten(self.body, width=35, placeholder="...") # Define here rather than in __str__ so we can use it in the admin list_display return "{author} - {snippet}...".format(author=self.author_text, snippet=body_snippet) def __str__(self): return self.snippet class Attachment(models.Model): """ Defines a generic file attachment for use in M2M relation with Task. """ task = models.ForeignKey(Task, on_delete=models.CASCADE, verbose_name='Task') added_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='Added by') timestamp = models.DateTimeField(default=datetime.datetime.now, verbose_name='Timestamp') file = models.FileField(upload_to=get_attachment_upload_dir, max_length=255, verbose_name='File') def filename(self): return os.path.basename(self.file.name) def extension(self): name, extension = os.path.splitext(self.file.name) return extension def __str__(self): return f"{self.task.id} - {self.file.name}"