123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- import datetime
- import os
- import textwrap
- from django.utils import timezone
- from django.conf import settings
- from django.contrib.auth.models import Group
- from django.db import DEFAULT_DB_ALIAS, models
- from django.db.transaction import Atomic, get_connection
- from django.urls import reverse
- def get_attachment_upload_dir(instance, filename):
- """Determine upload dir for task attachment files.
- """
- return "/".join(["tasks", "attachments", str(instance.task.id), filename])
- class LockedAtomicTransaction(Atomic):
- """
- modified from https://stackoverflow.com/a/41831049
- this is needed for safely merging
- Does a atomic transaction, but also locks the entire table for any transactions, for the duration of this
- transaction. Although this is the only way to avoid concurrency issues in certain situations, it should be used with
- caution, since it has impacts on performance, for obvious reasons...
- """
- def __init__(self, *models, using=None, savepoint=None):
- if using is None:
- using = DEFAULT_DB_ALIAS
- super().__init__(using, savepoint)
- self.models = models
- def __enter__(self):
- super(LockedAtomicTransaction, self).__enter__()
- # Make sure not to lock, when sqlite is used, or you'll run into problems while running tests!!!
- if settings.DATABASES[self.using]["ENGINE"] != "django.db.backends.sqlite3":
- cursor = None
- try:
- cursor = get_connection(self.using).cursor()
- for model in self.models:
- cursor.execute(
- "LOCK TABLE {table_name}".format(table_name=model._meta.db_table)
- )
- finally:
- if cursor and not cursor.closed:
- cursor.close()
- class TaskList(models.Model):
- name = models.CharField(max_length=48, verbose_name='Name')
- slug = models.SlugField(default="", verbose_name='Slug')
- group = models.ForeignKey(Group, on_delete=models.CASCADE, verbose_name='Group')
- def __str__(self):
- return self.name
- class Meta:
- ordering = ["name"]
- verbose_name_plural = "Ticket Lists"
- unique_together = ("group", "slug")
- class TicketType(models.Model):
- name = models.CharField(max_length=32, unique=True, editable=True, verbose_name='Name')
- life_cycle = models.CharField(max_length=192, unique=True, editable=True, verbose_name='Life cycle')
- def __str__(self):
- return self.name
- class Task(models.Model):
- task_list = models.ForeignKey(TaskList, blank=True, on_delete=models.CASCADE, verbose_name="Task list")
- status = models.PositiveSmallIntegerField(blank=True, null=True, verbose_name="Status")
- created_date = models.DateField(verbose_name="Created Date")
- status_changed_date = models.DateTimeField(blank=True, verbose_name="Status changed date")
- created_by = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- blank=True,
- null=True,
- on_delete=models.SET_NULL,
- verbose_name="Created by"
- )
- priority = models.PositiveSmallIntegerField(default=0, verbose_name="Priority")
- type = models.ForeignKey(
- TicketType,
- on_delete=models.SET_NULL,
- default=0,
- null=True,
- related_name="tickets_tickettype",
- verbose_name="Type")
- title = models.CharField(max_length=64, verbose_name="Title")
- note = models.TextField(blank=True, null=True, verbose_name="Note")
- due_date = models.DateField(blank=True, null=True, verbose_name="Due date")
- assigned_to = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- blank=True,
- null=True,
- related_name="tickets_assigned_to",
- on_delete=models.SET_NULL,
- verbose_name="Assigned to"
- )
- def get_states_list(self):
- data = self.type.life_cycle
- data = data.split(",")
- for i, item in enumerate(data):
- data[i] = item.split("-")
- for inner_index, inner_item in enumerate(data[i]):
- data[i][inner_index] = int(inner_item)
- return data
- def now_state_list(self):
- states_list = self.get_states_list()
- if self.status == None:
- return states_list[0]
- else:
- for i in range(len(states_list)):
- if states_list[i][0] == self.status:
- return states_list[i]
- def next_state(self):
- states_list = self.get_states_list()
- try:
- return self.now_state_list(self, states_list)[1]
- except IndexError:
- return 1
-
- def first_state(self):
- return self.get_states_list()[0][0]
- # Has due date for an instance of this object passed?
- def overdue_status(self):
- "Returns whether the Tasks's due date has passed or not."
- if self.due_date and datetime.date.today() > self.due_date:
- return True
- def __str__(self):
- return self.title
- def get_absolute_url(self):
- return reverse("tickets:task_detail", kwargs={"task_id": self.id})
- def save(self, **kwargs):
- if not self.id:
- self.created_date = timezone.now()
- self.status_changed_date = timezone.now()
- super(Task, self).save()
-
- # def merge_into(self, merge_target):
- # if merge_target.pk == self.pk:
- # raise ValueError("can't merge a task with self")
- # # lock the comments to avoid concurrent additions of comments after the
- # # update request. these comments would be irremediably lost because of
- # # the cascade clause
- # with LockedAtomicTransaction(Comment):
- # Comment.objects.filter(task=self).update(task=merge_target)
- # self.delete()
- class Meta:
- verbose_name = "Task"
- ordering = ["priority", "created_date"]
- class Comment(models.Model):
- author = models.ForeignKey(
- settings.AUTH_USER_MODEL, on_delete=models.CASCADE, blank=True, null=True,
- related_name="tickets_comments",
- verbose_name='Author'
- )
- task = models.ForeignKey(Task, on_delete=models.CASCADE, verbose_name='Task')
- date = models.DateTimeField(default=timezone.now, verbose_name='Date')
- email_from = models.CharField(max_length=320, blank=True, null=True, verbose_name='Eamil from')
- email_message_id = models.CharField(max_length=255, blank=True, null=True, verbose_name='Eamil message id')
- body = models.TextField(blank=True, verbose_name='Body')
- class Meta:
- # an email should only appear once per task
- verbose_name='Comment'
- unique_together = ("task", "email_message_id")
- @property
- def author_text(self):
- if self.author is not None:
- return str(self.author)
- assert self.email_message_id is not None
- return str(self.email_from)
- @property
- def snippet(self):
- body_snippet = textwrap.shorten(self.body, width=35, placeholder="...")
- # Define here rather than in __str__ so we can use it in the admin list_display
- return "{author} - {snippet}...".format(author=self.author_text, snippet=body_snippet)
- def __str__(self):
- return self.snippet
- class Attachment(models.Model):
- """
- Defines a generic file attachment for use in M2M relation with Task.
- """
- task = models.ForeignKey(Task, on_delete=models.CASCADE, verbose_name='Task')
- added_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='Added by')
- timestamp = models.DateTimeField(default=datetime.datetime.now, verbose_name='Timestamp')
- file = models.FileField(upload_to=get_attachment_upload_dir, max_length=255, verbose_name='File')
- def filename(self):
- return os.path.basename(self.file.name)
- def extension(self):
- name, extension = os.path.splitext(self.file.name)
- return extension
- def __str__(self):
- return f"{self.task.id} - {self.file.name}"
|