123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- """Synchronization primitives."""
- __all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
- import collections
- from . import events
- from . import futures
- from .coroutines import coroutine
- class _ContextManager:
- """Context manager.
- This enables the following idiom for acquiring and releasing a
- lock around a block:
- with (yield from lock):
- <block>
- while failing loudly when accidentally using:
- with lock:
- <block>
- """
- def __init__(self, lock):
- self._lock = lock
- def __enter__(self):
- # We have no use for the "as ..." clause in the with
- # statement for locks.
- return None
- def __exit__(self, *args):
- try:
- self._lock.release()
- finally:
- self._lock = None # Crudely prevent reuse.
- class Lock:
- """Primitive lock objects.
- A primitive lock is a synchronization primitive that is not owned
- by a particular coroutine when locked. A primitive lock is in one
- of two states, 'locked' or 'unlocked'.
- It is created in the unlocked state. It has two basic methods,
- acquire() and release(). When the state is unlocked, acquire()
- changes the state to locked and returns immediately. When the
- state is locked, acquire() blocks until a call to release() in
- another coroutine changes it to unlocked, then the acquire() call
- resets it to locked and returns. The release() method should only
- be called in the locked state; it changes the state to unlocked
- and returns immediately. If an attempt is made to release an
- unlocked lock, a RuntimeError will be raised.
- When more than one coroutine is blocked in acquire() waiting for
- the state to turn to unlocked, only one coroutine proceeds when a
- release() call resets the state to unlocked; first coroutine which
- is blocked in acquire() is being processed.
- acquire() is a coroutine and should be called with 'yield from'.
- Locks also support the context management protocol. '(yield from lock)'
- should be used as context manager expression.
- Usage:
- lock = Lock()
- ...
- yield from lock
- try:
- ...
- finally:
- lock.release()
- Context manager usage:
- lock = Lock()
- ...
- with (yield from lock):
- ...
- Lock objects can be tested for locking state:
- if not lock.locked():
- yield from lock
- else:
- # lock is acquired
- ...
- """
- def __init__(self, *, loop=None):
- self._waiters = collections.deque()
- self._locked = False
- if loop is not None:
- self._loop = loop
- else:
- self._loop = events.get_event_loop()
- def __repr__(self):
- res = super().__repr__()
- extra = 'locked' if self._locked else 'unlocked'
- if self._waiters:
- extra = '{},waiters:{}'.format(extra, len(self._waiters))
- return '<{} [{}]>'.format(res[1:-1], extra)
- def locked(self):
- """Return True if lock is acquired."""
- return self._locked
- @coroutine
- def acquire(self):
- """Acquire a lock.
- This method blocks until the lock is unlocked, then sets it to
- locked and returns True.
- """
- if not self._waiters and not self._locked:
- self._locked = True
- return True
- fut = futures.Future(loop=self._loop)
- self._waiters.append(fut)
- try:
- yield from fut
- self._locked = True
- return True
- finally:
- self._waiters.remove(fut)
- def release(self):
- """Release a lock.
- When the lock is locked, reset it to unlocked, and return.
- If any other coroutines are blocked waiting for the lock to become
- unlocked, allow exactly one of them to proceed.
- When invoked on an unlocked lock, a RuntimeError is raised.
- There is no return value.
- """
- if self._locked:
- self._locked = False
- # Wake up the first waiter who isn't cancelled.
- for fut in self._waiters:
- if not fut.done():
- fut.set_result(True)
- break
- else:
- raise RuntimeError('Lock is not acquired.')
- def __enter__(self):
- raise RuntimeError(
- '"yield from" should be used as context manager expression')
- def __exit__(self, *args):
- # This must exist because __enter__ exists, even though that
- # always raises; that's how the with-statement works.
- pass
- def __iter__(self):
- # This is not a coroutine. It is meant to enable the idiom:
- #
- # with (yield from lock):
- # <block>
- #
- # as an alternative to:
- #
- # yield from lock.acquire()
- # try:
- # <block>
- # finally:
- # lock.release()
- yield from self.acquire()
- return _ContextManager(self)
- class Event:
- """Asynchronous equivalent to threading.Event.
- Class implementing event objects. An event manages a flag that can be set
- to true with the set() method and reset to false with the clear() method.
- The wait() method blocks until the flag is true. The flag is initially
- false.
- """
- def __init__(self, *, loop=None):
- self._waiters = collections.deque()
- self._value = False
- if loop is not None:
- self._loop = loop
- else:
- self._loop = events.get_event_loop()
- def __repr__(self):
- res = super().__repr__()
- extra = 'set' if self._value else 'unset'
- if self._waiters:
- extra = '{},waiters:{}'.format(extra, len(self._waiters))
- return '<{} [{}]>'.format(res[1:-1], extra)
- def is_set(self):
- """Return True if and only if the internal flag is true."""
- return self._value
- def set(self):
- """Set the internal flag to true. All coroutines waiting for it to
- become true are awakened. Coroutine that call wait() once the flag is
- true will not block at all.
- """
- if not self._value:
- self._value = True
- for fut in self._waiters:
- if not fut.done():
- fut.set_result(True)
- def clear(self):
- """Reset the internal flag to false. Subsequently, coroutines calling
- wait() will block until set() is called to set the internal flag
- to true again."""
- self._value = False
- @coroutine
- def wait(self):
- """Block until the internal flag is true.
- If the internal flag is true on entry, return True
- immediately. Otherwise, block until another coroutine calls
- set() to set the flag to true, then return True.
- """
- if self._value:
- return True
- fut = futures.Future(loop=self._loop)
- self._waiters.append(fut)
- try:
- yield from fut
- return True
- finally:
- self._waiters.remove(fut)
- class Condition:
- """Asynchronous equivalent to threading.Condition.
- This class implements condition variable objects. A condition variable
- allows one or more coroutines to wait until they are notified by another
- coroutine.
- A new Lock object is created and used as the underlying lock.
- """
- def __init__(self, lock=None, *, loop=None):
- if loop is not None:
- self._loop = loop
- else:
- self._loop = events.get_event_loop()
- if lock is None:
- lock = Lock(loop=self._loop)
- elif lock._loop is not self._loop:
- raise ValueError("loop argument must agree with lock")
- self._lock = lock
- # Export the lock's locked(), acquire() and release() methods.
- self.locked = lock.locked
- self.acquire = lock.acquire
- self.release = lock.release
- self._waiters = collections.deque()
- def __repr__(self):
- res = super().__repr__()
- extra = 'locked' if self.locked() else 'unlocked'
- if self._waiters:
- extra = '{},waiters:{}'.format(extra, len(self._waiters))
- return '<{} [{}]>'.format(res[1:-1], extra)
- @coroutine
- def wait(self):
- """Wait until notified.
- If the calling coroutine has not acquired the lock when this
- method is called, a RuntimeError is raised.
- This method releases the underlying lock, and then blocks
- until it is awakened by a notify() or notify_all() call for
- the same condition variable in another coroutine. Once
- awakened, it re-acquires the lock and returns True.
- """
- if not self.locked():
- raise RuntimeError('cannot wait on un-acquired lock')
- self.release()
- try:
- fut = futures.Future(loop=self._loop)
- self._waiters.append(fut)
- try:
- yield from fut
- return True
- finally:
- self._waiters.remove(fut)
- finally:
- yield from self.acquire()
- @coroutine
- def wait_for(self, predicate):
- """Wait until a predicate becomes true.
- The predicate should be a callable which result will be
- interpreted as a boolean value. The final predicate value is
- the return value.
- """
- result = predicate()
- while not result:
- yield from self.wait()
- result = predicate()
- return result
- def notify(self, n=1):
- """By default, wake up one coroutine waiting on this condition, if any.
- If the calling coroutine has not acquired the lock when this method
- is called, a RuntimeError is raised.
- This method wakes up at most n of the coroutines waiting for the
- condition variable; it is a no-op if no coroutines are waiting.
- Note: an awakened coroutine does not actually return from its
- wait() call until it can reacquire the lock. Since notify() does
- not release the lock, its caller should.
- """
- if not self.locked():
- raise RuntimeError('cannot notify on un-acquired lock')
- idx = 0
- for fut in self._waiters:
- if idx >= n:
- break
- if not fut.done():
- idx += 1
- fut.set_result(False)
- def notify_all(self):
- """Wake up all threads waiting on this condition. This method acts
- like notify(), but wakes up all waiting threads instead of one. If the
- calling thread has not acquired the lock when this method is called,
- a RuntimeError is raised.
- """
- self.notify(len(self._waiters))
- def __enter__(self):
- raise RuntimeError(
- '"yield from" should be used as context manager expression')
- def __exit__(self, *args):
- pass
- def __iter__(self):
- # See comment in Lock.__iter__().
- yield from self.acquire()
- return _ContextManager(self)
- class Semaphore:
- """A Semaphore implementation.
- A semaphore manages an internal counter which is decremented by each
- acquire() call and incremented by each release() call. The counter
- can never go below zero; when acquire() finds that it is zero, it blocks,
- waiting until some other thread calls release().
- Semaphores also support the context management protocol.
- The optional argument gives the initial value for the internal
- counter; it defaults to 1. If the value given is less than 0,
- ValueError is raised.
- """
- def __init__(self, value=1, *, loop=None):
- if value < 0:
- raise ValueError("Semaphore initial value must be >= 0")
- self._value = value
- self._waiters = collections.deque()
- if loop is not None:
- self._loop = loop
- else:
- self._loop = events.get_event_loop()
- def __repr__(self):
- res = super().__repr__()
- extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
- self._value)
- if self._waiters:
- extra = '{},waiters:{}'.format(extra, len(self._waiters))
- return '<{} [{}]>'.format(res[1:-1], extra)
- def locked(self):
- """Returns True if semaphore can not be acquired immediately."""
- return self._value == 0
- @coroutine
- def acquire(self):
- """Acquire a semaphore.
- If the internal counter is larger than zero on entry,
- decrement it by one and return True immediately. If it is
- zero on entry, block, waiting until some other coroutine has
- called release() to make it larger than 0, and then return
- True.
- """
- if not self._waiters and self._value > 0:
- self._value -= 1
- return True
- fut = futures.Future(loop=self._loop)
- self._waiters.append(fut)
- try:
- yield from fut
- self._value -= 1
- return True
- finally:
- self._waiters.remove(fut)
- def release(self):
- """Release a semaphore, incrementing the internal counter by one.
- When it was zero on entry and another coroutine is waiting for it to
- become larger than zero again, wake up that coroutine.
- """
- self._value += 1
- for waiter in self._waiters:
- if not waiter.done():
- waiter.set_result(True)
- break
- def __enter__(self):
- raise RuntimeError(
- '"yield from" should be used as context manager expression')
- def __exit__(self, *args):
- pass
- def __iter__(self):
- # See comment in Lock.__iter__().
- yield from self.acquire()
- return _ContextManager(self)
- class BoundedSemaphore(Semaphore):
- """A bounded semaphore implementation.
- This raises ValueError in release() if it would increase the value
- above the initial value.
- """
- def __init__(self, value=1, *, loop=None):
- self._bound_value = value
- super().__init__(value, loop=loop)
- def release(self):
- if self._value >= self._bound_value:
- raise ValueError('BoundedSemaphore released too many times')
- super().release()
|