events.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. """Event loop and event loop policy."""
  2. __all__ = ['AbstractEventLoopPolicy',
  3. 'AbstractEventLoop', 'AbstractServer',
  4. 'Handle', 'TimerHandle',
  5. 'get_event_loop_policy', 'set_event_loop_policy',
  6. 'get_event_loop', 'set_event_loop', 'new_event_loop',
  7. 'get_child_watcher', 'set_child_watcher',
  8. ]
  9. import functools
  10. import inspect
  11. import reprlib
  12. import socket
  13. import subprocess
  14. import sys
  15. import threading
  16. import traceback
  17. _PY34 = sys.version_info >= (3, 4)
  18. def _get_function_source(func):
  19. if _PY34:
  20. func = inspect.unwrap(func)
  21. elif hasattr(func, '__wrapped__'):
  22. func = func.__wrapped__
  23. if inspect.isfunction(func):
  24. code = func.__code__
  25. return (code.co_filename, code.co_firstlineno)
  26. if isinstance(func, functools.partial):
  27. return _get_function_source(func.func)
  28. if _PY34 and isinstance(func, functools.partialmethod):
  29. return _get_function_source(func.func)
  30. return None
  31. def _format_args(args):
  32. """Format function arguments.
  33. Special case for a single parameter: ('hello',) is formatted as ('hello').
  34. """
  35. # use reprlib to limit the length of the output
  36. args_repr = reprlib.repr(args)
  37. if len(args) == 1 and args_repr.endswith(',)'):
  38. args_repr = args_repr[:-2] + ')'
  39. return args_repr
  40. def _format_callback(func, args, suffix=''):
  41. if isinstance(func, functools.partial):
  42. if args is not None:
  43. suffix = _format_args(args) + suffix
  44. return _format_callback(func.func, func.args, suffix)
  45. func_repr = getattr(func, '__qualname__', None)
  46. if not func_repr:
  47. func_repr = repr(func)
  48. if args is not None:
  49. func_repr += _format_args(args)
  50. if suffix:
  51. func_repr += suffix
  52. source = _get_function_source(func)
  53. if source:
  54. func_repr += ' at %s:%s' % source
  55. return func_repr
  56. class Handle:
  57. """Object returned by callback registration methods."""
  58. __slots__ = ('_callback', '_args', '_cancelled', '_loop',
  59. '_source_traceback', '_repr', '__weakref__')
  60. def __init__(self, callback, args, loop):
  61. assert not isinstance(callback, Handle), 'A Handle is not a callback'
  62. self._loop = loop
  63. self._callback = callback
  64. self._args = args
  65. self._cancelled = False
  66. self._repr = None
  67. if self._loop.get_debug():
  68. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  69. else:
  70. self._source_traceback = None
  71. def _repr_info(self):
  72. info = [self.__class__.__name__]
  73. if self._cancelled:
  74. info.append('cancelled')
  75. if self._callback is not None:
  76. info.append(_format_callback(self._callback, self._args))
  77. if self._source_traceback:
  78. frame = self._source_traceback[-1]
  79. info.append('created at %s:%s' % (frame[0], frame[1]))
  80. return info
  81. def __repr__(self):
  82. if self._repr is not None:
  83. return self._repr
  84. info = self._repr_info()
  85. return '<%s>' % ' '.join(info)
  86. def cancel(self):
  87. if not self._cancelled:
  88. self._cancelled = True
  89. if self._loop.get_debug():
  90. # Keep a representation in debug mode to keep callback and
  91. # parameters. For example, to log the warning
  92. # "Executing <Handle...> took 2.5 second"
  93. self._repr = repr(self)
  94. self._callback = None
  95. self._args = None
  96. def _run(self):
  97. try:
  98. self._callback(*self._args)
  99. except Exception as exc:
  100. cb = _format_callback(self._callback, self._args)
  101. msg = 'Exception in callback {}'.format(cb)
  102. context = {
  103. 'message': msg,
  104. 'exception': exc,
  105. 'handle': self,
  106. }
  107. if self._source_traceback:
  108. context['source_traceback'] = self._source_traceback
  109. self._loop.call_exception_handler(context)
  110. self = None # Needed to break cycles when an exception occurs.
  111. class TimerHandle(Handle):
  112. """Object returned by timed callback registration methods."""
  113. __slots__ = ['_scheduled', '_when']
  114. def __init__(self, when, callback, args, loop):
  115. assert when is not None
  116. super().__init__(callback, args, loop)
  117. if self._source_traceback:
  118. del self._source_traceback[-1]
  119. self._when = when
  120. self._scheduled = False
  121. def _repr_info(self):
  122. info = super()._repr_info()
  123. pos = 2 if self._cancelled else 1
  124. info.insert(pos, 'when=%s' % self._when)
  125. return info
  126. def __hash__(self):
  127. return hash(self._when)
  128. def __lt__(self, other):
  129. return self._when < other._when
  130. def __le__(self, other):
  131. if self._when < other._when:
  132. return True
  133. return self.__eq__(other)
  134. def __gt__(self, other):
  135. return self._when > other._when
  136. def __ge__(self, other):
  137. if self._when > other._when:
  138. return True
  139. return self.__eq__(other)
  140. def __eq__(self, other):
  141. if isinstance(other, TimerHandle):
  142. return (self._when == other._when and
  143. self._callback == other._callback and
  144. self._args == other._args and
  145. self._cancelled == other._cancelled)
  146. return NotImplemented
  147. def __ne__(self, other):
  148. equal = self.__eq__(other)
  149. return NotImplemented if equal is NotImplemented else not equal
  150. def cancel(self):
  151. if not self._cancelled:
  152. self._loop._timer_handle_cancelled(self)
  153. super().cancel()
  154. class AbstractServer:
  155. """Abstract server returned by create_server()."""
  156. def close(self):
  157. """Stop serving. This leaves existing connections open."""
  158. return NotImplemented
  159. def wait_closed(self):
  160. """Coroutine to wait until service is closed."""
  161. return NotImplemented
  162. class AbstractEventLoop:
  163. """Abstract event loop."""
  164. # Running and stopping the event loop.
  165. def run_forever(self):
  166. """Run the event loop until stop() is called."""
  167. raise NotImplementedError
  168. def run_until_complete(self, future):
  169. """Run the event loop until a Future is done.
  170. Return the Future's result, or raise its exception.
  171. """
  172. raise NotImplementedError
  173. def stop(self):
  174. """Stop the event loop as soon as reasonable.
  175. Exactly how soon that is may depend on the implementation, but
  176. no more I/O callbacks should be scheduled.
  177. """
  178. raise NotImplementedError
  179. def is_running(self):
  180. """Return whether the event loop is currently running."""
  181. raise NotImplementedError
  182. def is_closed(self):
  183. """Returns True if the event loop was closed."""
  184. raise NotImplementedError
  185. def close(self):
  186. """Close the loop.
  187. The loop should not be running.
  188. This is idempotent and irreversible.
  189. No other methods should be called after this one.
  190. """
  191. raise NotImplementedError
  192. # Methods scheduling callbacks. All these return Handles.
  193. def _timer_handle_cancelled(self, handle):
  194. """Notification that a TimerHandle has been cancelled."""
  195. raise NotImplementedError
  196. def call_soon(self, callback, *args):
  197. return self.call_later(0, callback, *args)
  198. def call_later(self, delay, callback, *args):
  199. raise NotImplementedError
  200. def call_at(self, when, callback, *args):
  201. raise NotImplementedError
  202. def time(self):
  203. raise NotImplementedError
  204. # Method scheduling a coroutine object: create a task.
  205. def create_task(self, coro):
  206. raise NotImplementedError
  207. # Methods for interacting with threads.
  208. def call_soon_threadsafe(self, callback, *args):
  209. raise NotImplementedError
  210. def run_in_executor(self, executor, callback, *args):
  211. raise NotImplementedError
  212. def set_default_executor(self, executor):
  213. raise NotImplementedError
  214. # Network I/O methods returning Futures.
  215. def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
  216. raise NotImplementedError
  217. def getnameinfo(self, sockaddr, flags=0):
  218. raise NotImplementedError
  219. def create_connection(self, protocol_factory, host=None, port=None, *,
  220. ssl=None, family=0, proto=0, flags=0, sock=None,
  221. local_addr=None, server_hostname=None):
  222. raise NotImplementedError
  223. def create_server(self, protocol_factory, host=None, port=None, *,
  224. family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
  225. sock=None, backlog=100, ssl=None, reuse_address=None):
  226. """A coroutine which creates a TCP server bound to host and port.
  227. The return value is a Server object which can be used to stop
  228. the service.
  229. If host is an empty string or None all interfaces are assumed
  230. and a list of multiple sockets will be returned (most likely
  231. one for IPv4 and another one for IPv6).
  232. family can be set to either AF_INET or AF_INET6 to force the
  233. socket to use IPv4 or IPv6. If not set it will be determined
  234. from host (defaults to AF_UNSPEC).
  235. flags is a bitmask for getaddrinfo().
  236. sock can optionally be specified in order to use a preexisting
  237. socket object.
  238. backlog is the maximum number of queued connections passed to
  239. listen() (defaults to 100).
  240. ssl can be set to an SSLContext to enable SSL over the
  241. accepted connections.
  242. reuse_address tells the kernel to reuse a local socket in
  243. TIME_WAIT state, without waiting for its natural timeout to
  244. expire. If not specified will automatically be set to True on
  245. UNIX.
  246. """
  247. raise NotImplementedError
  248. def create_unix_connection(self, protocol_factory, path, *,
  249. ssl=None, sock=None,
  250. server_hostname=None):
  251. raise NotImplementedError
  252. def create_unix_server(self, protocol_factory, path, *,
  253. sock=None, backlog=100, ssl=None):
  254. """A coroutine which creates a UNIX Domain Socket server.
  255. The return value is a Server object, which can be used to stop
  256. the service.
  257. path is a str, representing a file systsem path to bind the
  258. server socket to.
  259. sock can optionally be specified in order to use a preexisting
  260. socket object.
  261. backlog is the maximum number of queued connections passed to
  262. listen() (defaults to 100).
  263. ssl can be set to an SSLContext to enable SSL over the
  264. accepted connections.
  265. """
  266. raise NotImplementedError
  267. def create_datagram_endpoint(self, protocol_factory,
  268. local_addr=None, remote_addr=None, *,
  269. family=0, proto=0, flags=0):
  270. raise NotImplementedError
  271. # Pipes and subprocesses.
  272. def connect_read_pipe(self, protocol_factory, pipe):
  273. """Register read pipe in event loop. Set the pipe to non-blocking mode.
  274. protocol_factory should instantiate object with Protocol interface.
  275. pipe is a file-like object.
  276. Return pair (transport, protocol), where transport supports the
  277. ReadTransport interface."""
  278. # The reason to accept file-like object instead of just file descriptor
  279. # is: we need to own pipe and close it at transport finishing
  280. # Can got complicated errors if pass f.fileno(),
  281. # close fd in pipe transport then close f and vise versa.
  282. raise NotImplementedError
  283. def connect_write_pipe(self, protocol_factory, pipe):
  284. """Register write pipe in event loop.
  285. protocol_factory should instantiate object with BaseProtocol interface.
  286. Pipe is file-like object already switched to nonblocking.
  287. Return pair (transport, protocol), where transport support
  288. WriteTransport interface."""
  289. # The reason to accept file-like object instead of just file descriptor
  290. # is: we need to own pipe and close it at transport finishing
  291. # Can got complicated errors if pass f.fileno(),
  292. # close fd in pipe transport then close f and vise versa.
  293. raise NotImplementedError
  294. def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
  295. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  296. **kwargs):
  297. raise NotImplementedError
  298. def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
  299. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  300. **kwargs):
  301. raise NotImplementedError
  302. # Ready-based callback registration methods.
  303. # The add_*() methods return None.
  304. # The remove_*() methods return True if something was removed,
  305. # False if there was nothing to delete.
  306. def add_reader(self, fd, callback, *args):
  307. raise NotImplementedError
  308. def remove_reader(self, fd):
  309. raise NotImplementedError
  310. def add_writer(self, fd, callback, *args):
  311. raise NotImplementedError
  312. def remove_writer(self, fd):
  313. raise NotImplementedError
  314. # Completion based I/O methods returning Futures.
  315. def sock_recv(self, sock, nbytes):
  316. raise NotImplementedError
  317. def sock_sendall(self, sock, data):
  318. raise NotImplementedError
  319. def sock_connect(self, sock, address):
  320. raise NotImplementedError
  321. def sock_accept(self, sock):
  322. raise NotImplementedError
  323. # Signal handling.
  324. def add_signal_handler(self, sig, callback, *args):
  325. raise NotImplementedError
  326. def remove_signal_handler(self, sig):
  327. raise NotImplementedError
  328. # Error handlers.
  329. def set_exception_handler(self, handler):
  330. raise NotImplementedError
  331. def default_exception_handler(self, context):
  332. raise NotImplementedError
  333. def call_exception_handler(self, context):
  334. raise NotImplementedError
  335. # Debug flag management.
  336. def get_debug(self):
  337. raise NotImplementedError
  338. def set_debug(self, enabled):
  339. raise NotImplementedError
  340. class AbstractEventLoopPolicy:
  341. """Abstract policy for accessing the event loop."""
  342. def get_event_loop(self):
  343. """Get the event loop for the current context.
  344. Returns an event loop object implementing the BaseEventLoop interface,
  345. or raises an exception in case no event loop has been set for the
  346. current context and the current policy does not specify to create one.
  347. It should never return None."""
  348. raise NotImplementedError
  349. def set_event_loop(self, loop):
  350. """Set the event loop for the current context to loop."""
  351. raise NotImplementedError
  352. def new_event_loop(self):
  353. """Create and return a new event loop object according to this
  354. policy's rules. If there's need to set this loop as the event loop for
  355. the current context, set_event_loop must be called explicitly."""
  356. raise NotImplementedError
  357. # Child processes handling (Unix only).
  358. def get_child_watcher(self):
  359. "Get the watcher for child processes."
  360. raise NotImplementedError
  361. def set_child_watcher(self, watcher):
  362. """Set the watcher for child processes."""
  363. raise NotImplementedError
  364. class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
  365. """Default policy implementation for accessing the event loop.
  366. In this policy, each thread has its own event loop. However, we
  367. only automatically create an event loop by default for the main
  368. thread; other threads by default have no event loop.
  369. Other policies may have different rules (e.g. a single global
  370. event loop, or automatically creating an event loop per thread, or
  371. using some other notion of context to which an event loop is
  372. associated).
  373. """
  374. _loop_factory = None
  375. class _Local(threading.local):
  376. _loop = None
  377. _set_called = False
  378. def __init__(self):
  379. self._local = self._Local()
  380. def get_event_loop(self):
  381. """Get the event loop.
  382. This may be None or an instance of EventLoop.
  383. """
  384. if (self._local._loop is None and
  385. not self._local._set_called and
  386. isinstance(threading.current_thread(), threading._MainThread)):
  387. self.set_event_loop(self.new_event_loop())
  388. if self._local._loop is None:
  389. raise RuntimeError('There is no current event loop in thread %r.'
  390. % threading.current_thread().name)
  391. return self._local._loop
  392. def set_event_loop(self, loop):
  393. """Set the event loop."""
  394. self._local._set_called = True
  395. assert loop is None or isinstance(loop, AbstractEventLoop)
  396. self._local._loop = loop
  397. def new_event_loop(self):
  398. """Create a new event loop.
  399. You must call set_event_loop() to make this the current event
  400. loop.
  401. """
  402. return self._loop_factory()
  403. # Event loop policy. The policy itself is always global, even if the
  404. # policy's rules say that there is an event loop per thread (or other
  405. # notion of context). The default policy is installed by the first
  406. # call to get_event_loop_policy().
  407. _event_loop_policy = None
  408. # Lock for protecting the on-the-fly creation of the event loop policy.
  409. _lock = threading.Lock()
  410. def _init_event_loop_policy():
  411. global _event_loop_policy
  412. with _lock:
  413. if _event_loop_policy is None: # pragma: no branch
  414. from . import DefaultEventLoopPolicy
  415. _event_loop_policy = DefaultEventLoopPolicy()
  416. def get_event_loop_policy():
  417. """Get the current event loop policy."""
  418. if _event_loop_policy is None:
  419. _init_event_loop_policy()
  420. return _event_loop_policy
  421. def set_event_loop_policy(policy):
  422. """Set the current event loop policy.
  423. If policy is None, the default policy is restored."""
  424. global _event_loop_policy
  425. assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
  426. _event_loop_policy = policy
  427. def get_event_loop():
  428. """Equivalent to calling get_event_loop_policy().get_event_loop()."""
  429. return get_event_loop_policy().get_event_loop()
  430. def set_event_loop(loop):
  431. """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
  432. get_event_loop_policy().set_event_loop(loop)
  433. def new_event_loop():
  434. """Equivalent to calling get_event_loop_policy().new_event_loop()."""
  435. return get_event_loop_policy().new_event_loop()
  436. def get_child_watcher():
  437. """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
  438. return get_event_loop_policy().get_child_watcher()
  439. def set_child_watcher(watcher):
  440. """Equivalent to calling
  441. get_event_loop_policy().set_child_watcher(watcher)."""
  442. return get_event_loop_policy().set_child_watcher(watcher)