123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- # -*- coding: utf-8 -*-
- """
- sleekxmpp.xmlstream.scheduler
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- This module provides a task scheduler that works better
- with SleekXMPP's threading usage than the stock version.
- Part of SleekXMPP: The Sleek XMPP Library
- :copyright: (c) 2011 Nathanael C. Fritz
- :license: MIT, see LICENSE for more details
- """
- import time
- import threading
- import logging
- import itertools
- from sleekxmpp.util import Queue, QueueEmpty
- #: The time in seconds to wait for events from the event queue, and also the
- #: time between checks for the process stop signal.
- WAIT_TIMEOUT = 1.0
- log = logging.getLogger(__name__)
- class Task(object):
- """
- A scheduled task that will be executed by the scheduler
- after a given time interval has passed.
- :param string name: The name of the task.
- :param int seconds: The number of seconds to wait before executing.
- :param callback: The function to execute.
- :param tuple args: The arguments to pass to the callback.
- :param dict kwargs: The keyword arguments to pass to the callback.
- :param bool repeat: Indicates if the task should repeat.
- Defaults to ``False``.
- :param pointer: A pointer to an event queue for queuing callback
- execution instead of executing immediately.
- """
- def __init__(self, name, seconds, callback, args=None,
- kwargs=None, repeat=False, qpointer=None):
- #: The name of the task.
- self.name = name
- #: The number of seconds to wait before executing.
- self.seconds = seconds
- #: The function to execute once enough time has passed.
- self.callback = callback
- #: The arguments to pass to :attr:`callback`.
- self.args = args or tuple()
- #: The keyword arguments to pass to :attr:`callback`.
- self.kwargs = kwargs or {}
- #: Indicates if the task should repeat after executing,
- #: using the same :attr:`seconds` delay.
- self.repeat = repeat
- #: The time when the task should execute next.
- self.next = time.time() + self.seconds
- #: The main event queue, which allows for callbacks to
- #: be queued for execution instead of executing immediately.
- self.qpointer = qpointer
- def run(self):
- """Execute the task's callback.
- If an event queue was supplied, place the callback in the queue;
- otherwise, execute the callback immediately.
- """
- if self.qpointer is not None:
- self.qpointer.put(('schedule', self.callback,
- self.args, self.kwargs, self.name))
- else:
- self.callback(*self.args, **self.kwargs)
- self.reset()
- return self.repeat
- def reset(self):
- """Reset the task's timer so that it will repeat."""
- self.next = time.time() + self.seconds
- class Scheduler(object):
- """
- A threaded scheduler that allows for updates mid-execution unlike the
- scheduler in the standard library.
- Based on: http://docs.python.org/library/sched.html#module-sched
- :param parentstop: An :class:`~threading.Event` to signal stopping
- the scheduler.
- """
- def __init__(self, parentstop=None):
- #: A queue for storing tasks
- self.addq = Queue()
- #: A list of tasks in order of execution time.
- self.schedule = []
- #: If running in threaded mode, this will be the thread processing
- #: the schedule.
- self.thread = None
- #: A flag indicating that the scheduler is running.
- self.run = False
- #: An :class:`~threading.Event` instance for signalling to stop
- #: the scheduler.
- self.stop = parentstop
- #: Lock for accessing the task queue.
- self.schedule_lock = threading.RLock()
- #: The time in seconds to wait for events from the event queue,
- #: and also the time between checks for the process stop signal.
- self.wait_timeout = WAIT_TIMEOUT
- def process(self, threaded=True, daemon=False):
- """Begin accepting and processing scheduled tasks.
- :param bool threaded: Indicates if the scheduler should execute
- in its own thread. Defaults to ``True``.
- """
- if threaded:
- self.thread = threading.Thread(name='scheduler_process',
- target=self._process)
- self.thread.daemon = daemon
- self.thread.start()
- else:
- self._process()
- def _process(self):
- """Process scheduled tasks."""
- self.run = True
- try:
- while self.run and not self.stop.is_set():
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
- else:
- wait = self.wait_timeout
- try:
- if wait <= 0.0:
- newtask = self.addq.get(False)
- else:
- newtask = None
- while self.run and \
- not self.stop.is_set() and \
- newtask is None and \
- wait > 0:
- try:
- newtask = self.addq.get(True, min(wait, self.wait_timeout))
- except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
- wait -= self.wait_timeout
- except QueueEmpty: # Time to run some tasks, and no new tasks to add.
- self.schedule_lock.acquire()
- # select only those tasks which are to be executed now
- relevant = itertools.takewhile(
- lambda task: time.time() >= task.next, self.schedule)
- # run the tasks and keep the return value in a tuple
- status = map(lambda task: (task, task.run()), relevant)
- # remove non-repeating tasks
- for task, doRepeat in status:
- if not doRepeat:
- try:
- self.schedule.remove(task)
- except ValueError:
- pass
- else:
- # only need to resort tasks if a repeated task has
- # been kept in the list.
- updated = True
- else: # Add new task
- self.schedule_lock.acquire()
- if newtask is not None:
- self.schedule.append(newtask)
- updated = True
- finally:
- if updated:
- self.schedule.sort(key=lambda task: task.next)
- self.schedule_lock.release()
- except KeyboardInterrupt:
- self.run = False
- except SystemExit:
- self.run = False
- log.debug("Quitting Scheduler thread")
- def add(self, name, seconds, callback, args=None,
- kwargs=None, repeat=False, qpointer=None):
- """Schedule a new task.
- :param string name: The name of the task.
- :param int seconds: The number of seconds to wait before executing.
- :param callback: The function to execute.
- :param tuple args: The arguments to pass to the callback.
- :param dict kwargs: The keyword arguments to pass to the callback.
- :param bool repeat: Indicates if the task should repeat.
- Defaults to ``False``.
- :param pointer: A pointer to an event queue for queuing callback
- execution instead of executing immediately.
- """
- try:
- self.schedule_lock.acquire()
- for task in self.schedule:
- if task.name == name:
- raise ValueError("Key %s already exists" % name)
- self.addq.put(Task(name, seconds, callback, args,
- kwargs, repeat, qpointer))
- except:
- raise
- finally:
- self.schedule_lock.release()
- def remove(self, name):
- """Remove a scheduled task ahead of schedule, and without
- executing it.
- :param string name: The name of the task to remove.
- """
- try:
- self.schedule_lock.acquire()
- the_task = None
- for task in self.schedule:
- if task.name == name:
- the_task = task
- if the_task is not None:
- self.schedule.remove(the_task)
- except:
- raise
- finally:
- self.schedule_lock.release()
- def quit(self):
- """Shutdown the scheduler."""
- self.run = False
|