async_.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Based on https://github.com/petkaantonov/bluebird/blob/master/src/promise.js
  2. from collections import deque
  3. from threading import local
  4. if False:
  5. from .promise import Promise
  6. from typing import Any, Callable, Optional, Union # flake8: noqa
  7. class Async(local):
  8. def __init__(self, trampoline_enabled=True):
  9. self.is_tick_used = False
  10. self.late_queue = deque() # type: ignore
  11. self.normal_queue = deque() # type: ignore
  12. self.have_drained_queues = False
  13. self.trampoline_enabled = trampoline_enabled
  14. def enable_trampoline(self):
  15. self.trampoline_enabled = True
  16. def disable_trampoline(self):
  17. self.trampoline_enabled = False
  18. def have_items_queued(self):
  19. return self.is_tick_used or self.have_drained_queues
  20. def _async_invoke_later(self, fn, scheduler):
  21. self.late_queue.append(fn)
  22. self.queue_tick(scheduler)
  23. def _async_invoke(self, fn, scheduler):
  24. # type: (Callable, Any) -> None
  25. self.normal_queue.append(fn)
  26. self.queue_tick(scheduler)
  27. def _async_settle_promise(self, promise):
  28. # type: (Promise) -> None
  29. self.normal_queue.append(promise)
  30. self.queue_tick(promise.scheduler)
  31. def invoke_later(self, fn):
  32. if self.trampoline_enabled:
  33. self._async_invoke_later(fn, scheduler)
  34. else:
  35. scheduler.call_later(0.1, fn)
  36. def invoke(self, fn, scheduler):
  37. # type: (Callable, Any) -> None
  38. if self.trampoline_enabled:
  39. self._async_invoke(fn, scheduler)
  40. else:
  41. scheduler.call(fn)
  42. def settle_promises(self, promise):
  43. # type: (Promise) -> None
  44. if self.trampoline_enabled:
  45. self._async_settle_promise(promise)
  46. else:
  47. promise.scheduler.call(promise._settle_promises)
  48. def throw_later(self, reason, scheduler):
  49. # type: (Exception, Any) -> None
  50. def fn():
  51. # type: () -> None
  52. raise reason
  53. scheduler.call(fn)
  54. fatal_error = throw_later
  55. def drain_queue(self, queue):
  56. # type: (deque) -> None
  57. from .promise import Promise
  58. while queue:
  59. fn = queue.popleft()
  60. if isinstance(fn, Promise):
  61. fn._settle_promises()
  62. continue
  63. fn()
  64. def drain_queue_until_resolved(self, promise):
  65. # type: (Promise) -> None
  66. from .promise import Promise
  67. queue = self.normal_queue
  68. while queue:
  69. if not promise.is_pending:
  70. return
  71. fn = queue.popleft()
  72. if isinstance(fn, Promise):
  73. fn._settle_promises()
  74. continue
  75. fn()
  76. self.reset()
  77. self.have_drained_queues = True
  78. self.drain_queue(self.late_queue)
  79. def wait(self, promise, timeout=None):
  80. # type: (Promise, Optional[float]) -> None
  81. if not promise.is_pending:
  82. # We return if the promise is already
  83. # fulfilled or rejected
  84. return
  85. target = promise._target()
  86. if self.trampoline_enabled:
  87. if self.is_tick_used:
  88. self.drain_queue_until_resolved(target)
  89. if not promise.is_pending:
  90. # We return if the promise is already
  91. # fulfilled or rejected
  92. return
  93. target.scheduler.wait(target, timeout)
  94. def drain_queues(self):
  95. # type: () -> None
  96. assert self.is_tick_used
  97. self.drain_queue(self.normal_queue)
  98. self.reset()
  99. self.have_drained_queues = True
  100. self.drain_queue(self.late_queue)
  101. def queue_tick(self, scheduler):
  102. # type: (Any) -> None
  103. if not self.is_tick_used:
  104. self.is_tick_used = True
  105. scheduler.call(self.drain_queues)
  106. def reset(self):
  107. # type: () -> None
  108. self.is_tick_used = False