timeout.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # This code is originally sourced from the aio-libs project "async_timeout",
  2. # under the Apache 2.0 license. You may see the original project at
  3. # https://github.com/aio-libs/async-timeout
  4. # It is vendored here to reduce chain-dependencies on this library, and
  5. # modified slightly to remove some features we don't use.
  6. import asyncio
  7. from types import TracebackType
  8. from typing import Any, Optional, Type
  9. class timeout:
  10. """timeout context manager.
  11. Useful in cases when you want to apply timeout logic around block
  12. of code or in cases when asyncio.wait_for is not suitable. For example:
  13. >>> with timeout(0.001):
  14. ... async with aiohttp.get('https://github.com') as r:
  15. ... await r.text()
  16. timeout - value in seconds or None to disable timeout logic
  17. loop - asyncio compatible event loop
  18. """
  19. def __init__(
  20. self,
  21. timeout: Optional[float],
  22. *,
  23. loop: Optional[asyncio.AbstractEventLoop] = None,
  24. ) -> None:
  25. self._timeout = timeout
  26. if loop is None:
  27. loop = asyncio.get_event_loop()
  28. self._loop = loop
  29. self._task = None # type: Optional[asyncio.Task[Any]]
  30. self._cancelled = False
  31. self._cancel_handler = None # type: Optional[asyncio.Handle]
  32. self._cancel_at = None # type: Optional[float]
  33. def __enter__(self) -> "timeout":
  34. return self._do_enter()
  35. def __exit__(
  36. self,
  37. exc_type: Type[BaseException],
  38. exc_val: BaseException,
  39. exc_tb: TracebackType,
  40. ) -> Optional[bool]:
  41. self._do_exit(exc_type)
  42. return None
  43. async def __aenter__(self) -> "timeout":
  44. return self._do_enter()
  45. async def __aexit__(
  46. self,
  47. exc_type: Type[BaseException],
  48. exc_val: BaseException,
  49. exc_tb: TracebackType,
  50. ) -> None:
  51. self._do_exit(exc_type)
  52. @property
  53. def expired(self) -> bool:
  54. return self._cancelled
  55. @property
  56. def remaining(self) -> Optional[float]:
  57. if self._cancel_at is not None:
  58. return max(self._cancel_at - self._loop.time(), 0.0)
  59. else:
  60. return None
  61. def _do_enter(self) -> "timeout":
  62. # Support Tornado 5- without timeout
  63. # Details: https://github.com/python/asyncio/issues/392
  64. if self._timeout is None:
  65. return self
  66. self._task = asyncio.current_task(self._loop)
  67. if self._task is None:
  68. raise RuntimeError(
  69. "Timeout context manager should be used " "inside a task"
  70. )
  71. if self._timeout <= 0:
  72. self._loop.call_soon(self._cancel_task)
  73. return self
  74. self._cancel_at = self._loop.time() + self._timeout
  75. self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
  76. return self
  77. def _do_exit(self, exc_type: Type[BaseException]) -> None:
  78. if exc_type is asyncio.CancelledError and self._cancelled:
  79. self._cancel_handler = None
  80. self._task = None
  81. raise asyncio.TimeoutError
  82. if self._timeout is not None and self._cancel_handler is not None:
  83. self._cancel_handler.cancel()
  84. self._cancel_handler = None
  85. self._task = None
  86. return None
  87. def _cancel_task(self) -> None:
  88. if self._task is not None:
  89. self._task.cancel()
  90. self._cancelled = True