scheduler.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # -*- coding: utf-8 -*-
  2. """
  3. sleekxmpp.xmlstream.scheduler
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module provides a task scheduler that works better
  6. with SleekXMPP's threading usage than the stock version.
  7. Part of SleekXMPP: The Sleek XMPP Library
  8. :copyright: (c) 2011 Nathanael C. Fritz
  9. :license: MIT, see LICENSE for more details
  10. """
  11. import time
  12. import threading
  13. import logging
  14. import itertools
  15. from sleekxmpp.util import Queue, QueueEmpty
  16. #: The time in seconds to wait for events from the event queue, and also the
  17. #: time between checks for the process stop signal.
  18. WAIT_TIMEOUT = 1.0
  19. log = logging.getLogger(__name__)
  20. class Task(object):
  21. """
  22. A scheduled task that will be executed by the scheduler
  23. after a given time interval has passed.
  24. :param string name: The name of the task.
  25. :param int seconds: The number of seconds to wait before executing.
  26. :param callback: The function to execute.
  27. :param tuple args: The arguments to pass to the callback.
  28. :param dict kwargs: The keyword arguments to pass to the callback.
  29. :param bool repeat: Indicates if the task should repeat.
  30. Defaults to ``False``.
  31. :param pointer: A pointer to an event queue for queuing callback
  32. execution instead of executing immediately.
  33. """
  34. def __init__(self, name, seconds, callback, args=None,
  35. kwargs=None, repeat=False, qpointer=None):
  36. #: The name of the task.
  37. self.name = name
  38. #: The number of seconds to wait before executing.
  39. self.seconds = seconds
  40. #: The function to execute once enough time has passed.
  41. self.callback = callback
  42. #: The arguments to pass to :attr:`callback`.
  43. self.args = args or tuple()
  44. #: The keyword arguments to pass to :attr:`callback`.
  45. self.kwargs = kwargs or {}
  46. #: Indicates if the task should repeat after executing,
  47. #: using the same :attr:`seconds` delay.
  48. self.repeat = repeat
  49. #: The time when the task should execute next.
  50. self.next = time.time() + self.seconds
  51. #: The main event queue, which allows for callbacks to
  52. #: be queued for execution instead of executing immediately.
  53. self.qpointer = qpointer
  54. def run(self):
  55. """Execute the task's callback.
  56. If an event queue was supplied, place the callback in the queue;
  57. otherwise, execute the callback immediately.
  58. """
  59. if self.qpointer is not None:
  60. self.qpointer.put(('schedule', self.callback,
  61. self.args, self.kwargs, self.name))
  62. else:
  63. self.callback(*self.args, **self.kwargs)
  64. self.reset()
  65. return self.repeat
  66. def reset(self):
  67. """Reset the task's timer so that it will repeat."""
  68. self.next = time.time() + self.seconds
  69. class Scheduler(object):
  70. """
  71. A threaded scheduler that allows for updates mid-execution unlike the
  72. scheduler in the standard library.
  73. Based on: http://docs.python.org/library/sched.html#module-sched
  74. :param parentstop: An :class:`~threading.Event` to signal stopping
  75. the scheduler.
  76. """
  77. def __init__(self, parentstop=None):
  78. #: A queue for storing tasks
  79. self.addq = Queue()
  80. #: A list of tasks in order of execution time.
  81. self.schedule = []
  82. #: If running in threaded mode, this will be the thread processing
  83. #: the schedule.
  84. self.thread = None
  85. #: A flag indicating that the scheduler is running.
  86. self.run = False
  87. #: An :class:`~threading.Event` instance for signalling to stop
  88. #: the scheduler.
  89. self.stop = parentstop
  90. #: Lock for accessing the task queue.
  91. self.schedule_lock = threading.RLock()
  92. #: The time in seconds to wait for events from the event queue,
  93. #: and also the time between checks for the process stop signal.
  94. self.wait_timeout = WAIT_TIMEOUT
  95. def process(self, threaded=True, daemon=False):
  96. """Begin accepting and processing scheduled tasks.
  97. :param bool threaded: Indicates if the scheduler should execute
  98. in its own thread. Defaults to ``True``.
  99. """
  100. if threaded:
  101. self.thread = threading.Thread(name='scheduler_process',
  102. target=self._process)
  103. self.thread.daemon = daemon
  104. self.thread.start()
  105. else:
  106. self._process()
  107. def _process(self):
  108. """Process scheduled tasks."""
  109. self.run = True
  110. try:
  111. while self.run and not self.stop.is_set():
  112. updated = False
  113. if self.schedule:
  114. wait = self.schedule[0].next - time.time()
  115. else:
  116. wait = self.wait_timeout
  117. try:
  118. if wait <= 0.0:
  119. newtask = self.addq.get(False)
  120. else:
  121. newtask = None
  122. while self.run and \
  123. not self.stop.is_set() and \
  124. newtask is None and \
  125. wait > 0:
  126. try:
  127. newtask = self.addq.get(True, min(wait, self.wait_timeout))
  128. except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
  129. wait -= self.wait_timeout
  130. except QueueEmpty: # Time to run some tasks, and no new tasks to add.
  131. self.schedule_lock.acquire()
  132. # select only those tasks which are to be executed now
  133. relevant = itertools.takewhile(
  134. lambda task: time.time() >= task.next, self.schedule)
  135. # run the tasks and keep the return value in a tuple
  136. status = map(lambda task: (task, task.run()), relevant)
  137. # remove non-repeating tasks
  138. for task, doRepeat in status:
  139. if not doRepeat:
  140. try:
  141. self.schedule.remove(task)
  142. except ValueError:
  143. pass
  144. else:
  145. # only need to resort tasks if a repeated task has
  146. # been kept in the list.
  147. updated = True
  148. else: # Add new task
  149. self.schedule_lock.acquire()
  150. if newtask is not None:
  151. self.schedule.append(newtask)
  152. updated = True
  153. finally:
  154. if updated:
  155. self.schedule.sort(key=lambda task: task.next)
  156. self.schedule_lock.release()
  157. except KeyboardInterrupt:
  158. self.run = False
  159. except SystemExit:
  160. self.run = False
  161. log.debug("Quitting Scheduler thread")
  162. def add(self, name, seconds, callback, args=None,
  163. kwargs=None, repeat=False, qpointer=None):
  164. """Schedule a new task.
  165. :param string name: The name of the task.
  166. :param int seconds: The number of seconds to wait before executing.
  167. :param callback: The function to execute.
  168. :param tuple args: The arguments to pass to the callback.
  169. :param dict kwargs: The keyword arguments to pass to the callback.
  170. :param bool repeat: Indicates if the task should repeat.
  171. Defaults to ``False``.
  172. :param pointer: A pointer to an event queue for queuing callback
  173. execution instead of executing immediately.
  174. """
  175. try:
  176. self.schedule_lock.acquire()
  177. for task in self.schedule:
  178. if task.name == name:
  179. raise ValueError("Key %s already exists" % name)
  180. self.addq.put(Task(name, seconds, callback, args,
  181. kwargs, repeat, qpointer))
  182. except:
  183. raise
  184. finally:
  185. self.schedule_lock.release()
  186. def remove(self, name):
  187. """Remove a scheduled task ahead of schedule, and without
  188. executing it.
  189. :param string name: The name of the task to remove.
  190. """
  191. try:
  192. self.schedule_lock.acquire()
  193. the_task = None
  194. for task in self.schedule:
  195. if task.name == name:
  196. the_task = task
  197. if the_task is not None:
  198. self.schedule.remove(the_task)
  199. except:
  200. raise
  201. finally:
  202. self.schedule_lock.release()
  203. def quit(self):
  204. """Shutdown the scheduler."""
  205. self.run = False