futures.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. """A Future class similar to the one in PEP 3148."""
  2. __all__ = ['CancelledError', 'TimeoutError',
  3. 'InvalidStateError',
  4. 'Future', 'wrap_future',
  5. ]
  6. import concurrent.futures._base
  7. import logging
  8. import reprlib
  9. import sys
  10. import traceback
  11. from . import events
  12. # States for Future.
  13. _PENDING = 'PENDING'
  14. _CANCELLED = 'CANCELLED'
  15. _FINISHED = 'FINISHED'
  16. _PY34 = sys.version_info >= (3, 4)
  17. Error = concurrent.futures._base.Error
  18. CancelledError = concurrent.futures.CancelledError
  19. TimeoutError = concurrent.futures.TimeoutError
  20. STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
  21. class InvalidStateError(Error):
  22. """The operation is not allowed in this state."""
  23. class _TracebackLogger:
  24. """Helper to log a traceback upon destruction if not cleared.
  25. This solves a nasty problem with Futures and Tasks that have an
  26. exception set: if nobody asks for the exception, the exception is
  27. never logged. This violates the Zen of Python: 'Errors should
  28. never pass silently. Unless explicitly silenced.'
  29. However, we don't want to log the exception as soon as
  30. set_exception() is called: if the calling code is written
  31. properly, it will get the exception and handle it properly. But
  32. we *do* want to log it if result() or exception() was never called
  33. -- otherwise developers waste a lot of time wondering why their
  34. buggy code fails silently.
  35. An earlier attempt added a __del__() method to the Future class
  36. itself, but this backfired because the presence of __del__()
  37. prevents garbage collection from breaking cycles. A way out of
  38. this catch-22 is to avoid having a __del__() method on the Future
  39. class itself, but instead to have a reference to a helper object
  40. with a __del__() method that logs the traceback, where we ensure
  41. that the helper object doesn't participate in cycles, and only the
  42. Future has a reference to it.
  43. The helper object is added when set_exception() is called. When
  44. the Future is collected, and the helper is present, the helper
  45. object is also collected, and its __del__() method will log the
  46. traceback. When the Future's result() or exception() method is
  47. called (and a helper object is present), it removes the helper
  48. object, after calling its clear() method to prevent it from
  49. logging.
  50. One downside is that we do a fair amount of work to extract the
  51. traceback from the exception, even when it is never logged. It
  52. would seem cheaper to just store the exception object, but that
  53. references the traceback, which references stack frames, which may
  54. reference the Future, which references the _TracebackLogger, and
  55. then the _TracebackLogger would be included in a cycle, which is
  56. what we're trying to avoid! As an optimization, we don't
  57. immediately format the exception; we only do the work when
  58. activate() is called, which call is delayed until after all the
  59. Future's callbacks have run. Since usually a Future has at least
  60. one callback (typically set by 'yield from') and usually that
  61. callback extracts the callback, thereby removing the need to
  62. format the exception.
  63. PS. I don't claim credit for this solution. I first heard of it
  64. in a discussion about closing files when they are collected.
  65. """
  66. __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
  67. def __init__(self, future, exc):
  68. self.loop = future._loop
  69. self.source_traceback = future._source_traceback
  70. self.exc = exc
  71. self.tb = None
  72. def activate(self):
  73. exc = self.exc
  74. if exc is not None:
  75. self.exc = None
  76. self.tb = traceback.format_exception(exc.__class__, exc,
  77. exc.__traceback__)
  78. def clear(self):
  79. self.exc = None
  80. self.tb = None
  81. def __del__(self):
  82. if self.tb:
  83. msg = 'Future/Task exception was never retrieved\n'
  84. if self.source_traceback:
  85. src = ''.join(traceback.format_list(self.source_traceback))
  86. msg += 'Future/Task created at (most recent call last):\n'
  87. msg += '%s\n' % src.rstrip()
  88. msg += ''.join(self.tb).rstrip()
  89. self.loop.call_exception_handler({'message': msg})
  90. class Future:
  91. """This class is *almost* compatible with concurrent.futures.Future.
  92. Differences:
  93. - result() and exception() do not take a timeout argument and
  94. raise an exception when the future isn't done yet.
  95. - Callbacks registered with add_done_callback() are always called
  96. via the event loop's call_soon_threadsafe().
  97. - This class is not compatible with the wait() and as_completed()
  98. methods in the concurrent.futures package.
  99. (In Python 3.4 or later we may be able to unify the implementations.)
  100. """
  101. # Class variables serving as defaults for instance variables.
  102. _state = _PENDING
  103. _result = None
  104. _exception = None
  105. _loop = None
  106. _source_traceback = None
  107. _blocking = False # proper use of future (yield vs yield from)
  108. _log_traceback = False # Used for Python 3.4 and later
  109. _tb_logger = None # Used for Python 3.3 only
  110. def __init__(self, *, loop=None):
  111. """Initialize the future.
  112. The optional event_loop argument allows to explicitly set the event
  113. loop object used by the future. If it's not provided, the future uses
  114. the default event loop.
  115. """
  116. if loop is None:
  117. self._loop = events.get_event_loop()
  118. else:
  119. self._loop = loop
  120. self._callbacks = []
  121. if self._loop.get_debug():
  122. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  123. def _format_callbacks(self):
  124. cb = self._callbacks
  125. size = len(cb)
  126. if not size:
  127. cb = ''
  128. def format_cb(callback):
  129. return events._format_callback(callback, ())
  130. if size == 1:
  131. cb = format_cb(cb[0])
  132. elif size == 2:
  133. cb = '{}, {}'.format(format_cb(cb[0]), format_cb(cb[1]))
  134. elif size > 2:
  135. cb = '{}, <{} more>, {}'.format(format_cb(cb[0]),
  136. size-2,
  137. format_cb(cb[-1]))
  138. return 'cb=[%s]' % cb
  139. def _repr_info(self):
  140. info = [self._state.lower()]
  141. if self._state == _FINISHED:
  142. if self._exception is not None:
  143. info.append('exception={!r}'.format(self._exception))
  144. else:
  145. # use reprlib to limit the length of the output, especially
  146. # for very long strings
  147. result = reprlib.repr(self._result)
  148. info.append('result={}'.format(result))
  149. if self._callbacks:
  150. info.append(self._format_callbacks())
  151. if self._source_traceback:
  152. frame = self._source_traceback[-1]
  153. info.append('created at %s:%s' % (frame[0], frame[1]))
  154. return info
  155. def __repr__(self):
  156. info = self._repr_info()
  157. return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
  158. # On Python 3.3 and older, objects with a destructor part of a reference
  159. # cycle are never destroyed. It's not more the case on Python 3.4 thanks
  160. # to the PEP 442.
  161. if _PY34:
  162. def __del__(self):
  163. if not self._log_traceback:
  164. # set_exception() was not called, or result() or exception()
  165. # has consumed the exception
  166. return
  167. exc = self._exception
  168. context = {
  169. 'message': ('%s exception was never retrieved'
  170. % self.__class__.__name__),
  171. 'exception': exc,
  172. 'future': self,
  173. }
  174. if self._source_traceback:
  175. context['source_traceback'] = self._source_traceback
  176. self._loop.call_exception_handler(context)
  177. def cancel(self):
  178. """Cancel the future and schedule callbacks.
  179. If the future is already done or cancelled, return False. Otherwise,
  180. change the future's state to cancelled, schedule the callbacks and
  181. return True.
  182. """
  183. if self._state != _PENDING:
  184. return False
  185. self._state = _CANCELLED
  186. self._schedule_callbacks()
  187. return True
  188. def _schedule_callbacks(self):
  189. """Internal: Ask the event loop to call all callbacks.
  190. The callbacks are scheduled to be called as soon as possible. Also
  191. clears the callback list.
  192. """
  193. callbacks = self._callbacks[:]
  194. if not callbacks:
  195. return
  196. self._callbacks[:] = []
  197. for callback in callbacks:
  198. self._loop.call_soon(callback, self)
  199. def cancelled(self):
  200. """Return True if the future was cancelled."""
  201. return self._state == _CANCELLED
  202. # Don't implement running(); see http://bugs.python.org/issue18699
  203. def done(self):
  204. """Return True if the future is done.
  205. Done means either that a result / exception are available, or that the
  206. future was cancelled.
  207. """
  208. return self._state != _PENDING
  209. def result(self):
  210. """Return the result this future represents.
  211. If the future has been cancelled, raises CancelledError. If the
  212. future's result isn't yet available, raises InvalidStateError. If
  213. the future is done and has an exception set, this exception is raised.
  214. """
  215. if self._state == _CANCELLED:
  216. raise CancelledError
  217. if self._state != _FINISHED:
  218. raise InvalidStateError('Result is not ready.')
  219. self._log_traceback = False
  220. if self._tb_logger is not None:
  221. self._tb_logger.clear()
  222. self._tb_logger = None
  223. if self._exception is not None:
  224. raise self._exception
  225. return self._result
  226. def exception(self):
  227. """Return the exception that was set on this future.
  228. The exception (or None if no exception was set) is returned only if
  229. the future is done. If the future has been cancelled, raises
  230. CancelledError. If the future isn't done yet, raises
  231. InvalidStateError.
  232. """
  233. if self._state == _CANCELLED:
  234. raise CancelledError
  235. if self._state != _FINISHED:
  236. raise InvalidStateError('Exception is not set.')
  237. self._log_traceback = False
  238. if self._tb_logger is not None:
  239. self._tb_logger.clear()
  240. self._tb_logger = None
  241. return self._exception
  242. def add_done_callback(self, fn):
  243. """Add a callback to be run when the future becomes done.
  244. The callback is called with a single argument - the future object. If
  245. the future is already done when this is called, the callback is
  246. scheduled with call_soon.
  247. """
  248. if self._state != _PENDING:
  249. self._loop.call_soon(fn, self)
  250. else:
  251. self._callbacks.append(fn)
  252. # New method not in PEP 3148.
  253. def remove_done_callback(self, fn):
  254. """Remove all instances of a callback from the "call when done" list.
  255. Returns the number of callbacks removed.
  256. """
  257. filtered_callbacks = [f for f in self._callbacks if f != fn]
  258. removed_count = len(self._callbacks) - len(filtered_callbacks)
  259. if removed_count:
  260. self._callbacks[:] = filtered_callbacks
  261. return removed_count
  262. # So-called internal methods (note: no set_running_or_notify_cancel()).
  263. def _set_result_unless_cancelled(self, result):
  264. """Helper setting the result only if the future was not cancelled."""
  265. if self.cancelled():
  266. return
  267. self.set_result(result)
  268. def set_result(self, result):
  269. """Mark the future done and set its result.
  270. If the future is already done when this method is called, raises
  271. InvalidStateError.
  272. """
  273. if self._state != _PENDING:
  274. raise InvalidStateError('{}: {!r}'.format(self._state, self))
  275. self._result = result
  276. self._state = _FINISHED
  277. self._schedule_callbacks()
  278. def set_exception(self, exception):
  279. """Mark the future done and set an exception.
  280. If the future is already done when this method is called, raises
  281. InvalidStateError.
  282. """
  283. if self._state != _PENDING:
  284. raise InvalidStateError('{}: {!r}'.format(self._state, self))
  285. if isinstance(exception, type):
  286. exception = exception()
  287. self._exception = exception
  288. self._state = _FINISHED
  289. self._schedule_callbacks()
  290. if _PY34:
  291. self._log_traceback = True
  292. else:
  293. self._tb_logger = _TracebackLogger(self, exception)
  294. # Arrange for the logger to be activated after all callbacks
  295. # have had a chance to call result() or exception().
  296. self._loop.call_soon(self._tb_logger.activate)
  297. # Truly internal methods.
  298. def _copy_state(self, other):
  299. """Internal helper to copy state from another Future.
  300. The other Future may be a concurrent.futures.Future.
  301. """
  302. assert other.done()
  303. if self.cancelled():
  304. return
  305. assert not self.done()
  306. if other.cancelled():
  307. self.cancel()
  308. else:
  309. exception = other.exception()
  310. if exception is not None:
  311. self.set_exception(exception)
  312. else:
  313. result = other.result()
  314. self.set_result(result)
  315. def __iter__(self):
  316. if not self.done():
  317. self._blocking = True
  318. yield self # This tells Task to wait for completion.
  319. assert self.done(), "yield from wasn't used with future"
  320. return self.result() # May raise too.
  321. def wrap_future(fut, *, loop=None):
  322. """Wrap concurrent.futures.Future object."""
  323. if isinstance(fut, Future):
  324. return fut
  325. assert isinstance(fut, concurrent.futures.Future), \
  326. 'concurrent.futures.Future is expected, got {!r}'.format(fut)
  327. if loop is None:
  328. loop = events.get_event_loop()
  329. new_future = Future(loop=loop)
  330. def _check_cancel_other(f):
  331. if f.cancelled():
  332. fut.cancel()
  333. new_future.add_done_callback(_check_cancel_other)
  334. fut.add_done_callback(
  335. lambda future: loop.call_soon_threadsafe(
  336. new_future._copy_state, future))
  337. return new_future