123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- # Based on https://github.com/petkaantonov/bluebird/blob/master/src/promise.js
- from collections import deque
- from threading import local
- if False:
- from .promise import Promise
- from typing import Any, Callable, Optional, Union # flake8: noqa
- class Async(local):
- def __init__(self, trampoline_enabled=True):
- self.is_tick_used = False
- self.late_queue = deque() # type: ignore
- self.normal_queue = deque() # type: ignore
- self.have_drained_queues = False
- self.trampoline_enabled = trampoline_enabled
- def enable_trampoline(self):
- self.trampoline_enabled = True
- def disable_trampoline(self):
- self.trampoline_enabled = False
- def have_items_queued(self):
- return self.is_tick_used or self.have_drained_queues
- def _async_invoke_later(self, fn, scheduler):
- self.late_queue.append(fn)
- self.queue_tick(scheduler)
- def _async_invoke(self, fn, scheduler):
- # type: (Callable, Any) -> None
- self.normal_queue.append(fn)
- self.queue_tick(scheduler)
- def _async_settle_promise(self, promise):
- # type: (Promise) -> None
- self.normal_queue.append(promise)
- self.queue_tick(promise.scheduler)
- def invoke_later(self, fn):
- if self.trampoline_enabled:
- self._async_invoke_later(fn, scheduler)
- else:
- scheduler.call_later(0.1, fn)
- def invoke(self, fn, scheduler):
- # type: (Callable, Any) -> None
- if self.trampoline_enabled:
- self._async_invoke(fn, scheduler)
- else:
- scheduler.call(fn)
- def settle_promises(self, promise):
- # type: (Promise) -> None
- if self.trampoline_enabled:
- self._async_settle_promise(promise)
- else:
- promise.scheduler.call(promise._settle_promises)
- def throw_later(self, reason, scheduler):
- # type: (Exception, Any) -> None
- def fn():
- # type: () -> None
- raise reason
- scheduler.call(fn)
- fatal_error = throw_later
- def drain_queue(self, queue):
- # type: (deque) -> None
- from .promise import Promise
- while queue:
- fn = queue.popleft()
- if isinstance(fn, Promise):
- fn._settle_promises()
- continue
- fn()
- def drain_queue_until_resolved(self, promise):
- # type: (Promise) -> None
- from .promise import Promise
- queue = self.normal_queue
- while queue:
- if not promise.is_pending:
- return
- fn = queue.popleft()
- if isinstance(fn, Promise):
- fn._settle_promises()
- continue
- fn()
- self.reset()
- self.have_drained_queues = True
- self.drain_queue(self.late_queue)
- def wait(self, promise, timeout=None):
- # type: (Promise, Optional[float]) -> None
- if not promise.is_pending:
- # We return if the promise is already
- # fulfilled or rejected
- return
- target = promise._target()
- if self.trampoline_enabled:
- if self.is_tick_used:
- self.drain_queue_until_resolved(target)
- if not promise.is_pending:
- # We return if the promise is already
- # fulfilled or rejected
- return
- target.scheduler.wait(target, timeout)
- def drain_queues(self):
- # type: () -> None
- assert self.is_tick_used
- self.drain_queue(self.normal_queue)
- self.reset()
- self.have_drained_queues = True
- self.drain_queue(self.late_queue)
- def queue_tick(self, scheduler):
- # type: (Any) -> None
- if not self.is_tick_used:
- self.is_tick_used = True
- scheduler.call(self.drain_queues)
- def reset(self):
- # type: () -> None
- self.is_tick_used = False
|