123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- """Queues"""
- __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
- 'QueueFull', 'QueueEmpty']
- import collections
- import heapq
- from . import events
- from . import futures
- from . import locks
- from .tasks import coroutine
- class QueueEmpty(Exception):
- """Exception raised when Queue.get_nowait() is called on a Queue object
- which is empty.
- """
- pass
- class QueueFull(Exception):
- """Exception raised when the Queue.put_nowait() method is called on a Queue
- object which is full.
- """
- pass
- class Queue:
- """A queue, useful for coordinating producer and consumer coroutines.
- If maxsize is less than or equal to zero, the queue size is infinite. If it
- is an integer greater than 0, then "yield from put()" will block when the
- queue reaches maxsize, until an item is removed by get().
- Unlike the standard library Queue, you can reliably know this Queue's size
- with qsize(), since your single-threaded asyncio application won't be
- interrupted between calling qsize() and doing an operation on the Queue.
- """
- def __init__(self, maxsize=0, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- self._maxsize = maxsize
- # Futures.
- self._getters = collections.deque()
- # Pairs of (item, Future).
- self._putters = collections.deque()
- self._init(maxsize)
- def _init(self, maxsize):
- self._queue = collections.deque()
- def _get(self):
- return self._queue.popleft()
- def _put(self, item):
- self._queue.append(item)
- def __repr__(self):
- return '<{} at {:#x} {}>'.format(
- type(self).__name__, id(self), self._format())
- def __str__(self):
- return '<{} {}>'.format(type(self).__name__, self._format())
- def _format(self):
- result = 'maxsize={!r}'.format(self._maxsize)
- if getattr(self, '_queue', None):
- result += ' _queue={!r}'.format(list(self._queue))
- if self._getters:
- result += ' _getters[{}]'.format(len(self._getters))
- if self._putters:
- result += ' _putters[{}]'.format(len(self._putters))
- return result
- def _consume_done_getters(self):
- # Delete waiters at the head of the get() queue who've timed out.
- while self._getters and self._getters[0].done():
- self._getters.popleft()
- def _consume_done_putters(self):
- # Delete waiters at the head of the put() queue who've timed out.
- while self._putters and self._putters[0][1].done():
- self._putters.popleft()
- def qsize(self):
- """Number of items in the queue."""
- return len(self._queue)
- @property
- def maxsize(self):
- """Number of items allowed in the queue."""
- return self._maxsize
- def empty(self):
- """Return True if the queue is empty, False otherwise."""
- return not self._queue
- def full(self):
- """Return True if there are maxsize items in the queue.
- Note: if the Queue was initialized with maxsize=0 (the default),
- then full() is never True.
- """
- if self._maxsize <= 0:
- return False
- else:
- return self.qsize() >= self._maxsize
- @coroutine
- def put(self, item):
- """Put an item into the queue.
- Put an item into the queue. If the queue is full, wait until a free
- slot is available before adding item.
- This method is a coroutine.
- """
- self._consume_done_getters()
- if self._getters:
- assert not self._queue, (
- 'queue non-empty, why are getters waiting?')
- getter = self._getters.popleft()
- # Use _put and _get instead of passing item straight to getter, in
- # case a subclass has logic that must run (e.g. JoinableQueue).
- self._put(item)
- # getter cannot be cancelled, we just removed done getters
- getter.set_result(self._get())
- elif self._maxsize > 0 and self._maxsize <= self.qsize():
- waiter = futures.Future(loop=self._loop)
- self._putters.append((item, waiter))
- yield from waiter
- else:
- self._put(item)
- def put_nowait(self, item):
- """Put an item into the queue without blocking.
- If no free slot is immediately available, raise QueueFull.
- """
- self._consume_done_getters()
- if self._getters:
- assert not self._queue, (
- 'queue non-empty, why are getters waiting?')
- getter = self._getters.popleft()
- # Use _put and _get instead of passing item straight to getter, in
- # case a subclass has logic that must run (e.g. JoinableQueue).
- self._put(item)
- # getter cannot be cancelled, we just removed done getters
- getter.set_result(self._get())
- elif self._maxsize > 0 and self._maxsize <= self.qsize():
- raise QueueFull
- else:
- self._put(item)
- @coroutine
- def get(self):
- """Remove and return an item from the queue.
- If queue is empty, wait until an item is available.
- This method is a coroutine.
- """
- self._consume_done_putters()
- if self._putters:
- assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self._put(item)
- # When a getter runs and frees up a slot so this putter can
- # run, we need to defer the put for a tick to ensure that
- # getters and putters alternate perfectly. See
- # ChannelTest.test_wait.
- self._loop.call_soon(putter._set_result_unless_cancelled, None)
- return self._get()
- elif self.qsize():
- return self._get()
- else:
- waiter = futures.Future(loop=self._loop)
- self._getters.append(waiter)
- return (yield from waiter)
- def get_nowait(self):
- """Remove and return an item from the queue.
- Return an item if one is immediately available, else raise QueueEmpty.
- """
- self._consume_done_putters()
- if self._putters:
- assert self.full(), 'queue not full, why are putters waiting?'
- item, putter = self._putters.popleft()
- self._put(item)
- # Wake putter on next tick.
- # getter cannot be cancelled, we just removed done putters
- putter.set_result(None)
- return self._get()
- elif self.qsize():
- return self._get()
- else:
- raise QueueEmpty
- class PriorityQueue(Queue):
- """A subclass of Queue; retrieves entries in priority order (lowest first).
- Entries are typically tuples of the form: (priority number, data).
- """
- def _init(self, maxsize):
- self._queue = []
- def _put(self, item, heappush=heapq.heappush):
- heappush(self._queue, item)
- def _get(self, heappop=heapq.heappop):
- return heappop(self._queue)
- class LifoQueue(Queue):
- """A subclass of Queue that retrieves most recently added entries first."""
- def _init(self, maxsize):
- self._queue = []
- def _put(self, item):
- self._queue.append(item)
- def _get(self):
- return self._queue.pop()
- class JoinableQueue(Queue):
- """A subclass of Queue with task_done() and join() methods."""
- def __init__(self, maxsize=0, *, loop=None):
- super().__init__(maxsize=maxsize, loop=loop)
- self._unfinished_tasks = 0
- self._finished = locks.Event(loop=self._loop)
- self._finished.set()
- def _format(self):
- result = Queue._format(self)
- if self._unfinished_tasks:
- result += ' tasks={}'.format(self._unfinished_tasks)
- return result
- def _put(self, item):
- super()._put(item)
- self._unfinished_tasks += 1
- self._finished.clear()
- def task_done(self):
- """Indicate that a formerly enqueued task is complete.
- Used by queue consumers. For each get() used to fetch a task,
- a subsequent call to task_done() tells the queue that the processing
- on the task is complete.
- If a join() is currently blocking, it will resume when all items have
- been processed (meaning that a task_done() call was received for every
- item that had been put() into the queue).
- Raises ValueError if called more times than there were items placed in
- the queue.
- """
- if self._unfinished_tasks <= 0:
- raise ValueError('task_done() called too many times')
- self._unfinished_tasks -= 1
- if self._unfinished_tasks == 0:
- self._finished.set()
- @coroutine
- def join(self):
- """Block until all items in the queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls task_done()
- to indicate that the item was retrieved and all work on it is complete.
- When the count of unfinished tasks drops to zero, join() unblocks.
- """
- if self._unfinished_tasks > 0:
- yield from self._finished.wait()
|