base_events.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179
  1. """Base implementation of event loop.
  2. The event loop can be broken up into a multiplexer (the part
  3. responsible for notifying us of I/O events) and the event loop proper,
  4. which wraps a multiplexer with functionality for scheduling callbacks,
  5. immediately or at a given time in the future.
  6. Whenever a public API takes a callback, subsequent positional
  7. arguments will be passed to the callback if/when it is called. This
  8. avoids the proliferation of trivial lambdas implementing closures.
  9. Keyword arguments for the callback are not supported; this is a
  10. conscious design decision, leaving the door open for keyword arguments
  11. to modify the meaning of the API call itself.
  12. """
  13. import collections
  14. import concurrent.futures
  15. import heapq
  16. import inspect
  17. import logging
  18. import os
  19. import socket
  20. import subprocess
  21. import threading
  22. import time
  23. import traceback
  24. import sys
  25. import warnings
  26. from . import coroutines
  27. from . import events
  28. from . import futures
  29. from . import tasks
  30. from .coroutines import coroutine
  31. from .log import logger
  32. __all__ = ['BaseEventLoop']
  33. # Argument for default thread pool executor creation.
  34. _MAX_WORKERS = 5
  35. # Minimum number of _scheduled timer handles before cleanup of
  36. # cancelled handles is performed.
  37. _MIN_SCHEDULED_TIMER_HANDLES = 100
  38. # Minimum fraction of _scheduled timer handles that are cancelled
  39. # before cleanup of cancelled handles is performed.
  40. _MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
  41. def _format_handle(handle):
  42. cb = handle._callback
  43. if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
  44. # format the task
  45. return repr(cb.__self__)
  46. else:
  47. return str(handle)
  48. def _format_pipe(fd):
  49. if fd == subprocess.PIPE:
  50. return '<pipe>'
  51. elif fd == subprocess.STDOUT:
  52. return '<stdout>'
  53. else:
  54. return repr(fd)
  55. class _StopError(BaseException):
  56. """Raised to stop the event loop."""
  57. def _check_resolved_address(sock, address):
  58. # Ensure that the address is already resolved to avoid the trap of hanging
  59. # the entire event loop when the address requires doing a DNS lookup.
  60. #
  61. # getaddrinfo() is slow (around 10 us per call): this function should only
  62. # be called in debug mode
  63. family = sock.family
  64. if family == socket.AF_INET:
  65. host, port = address
  66. elif family == socket.AF_INET6:
  67. host, port = address[:2]
  68. else:
  69. return
  70. # On Windows, socket.inet_pton() is only available since Python 3.4
  71. if hasattr(socket, 'inet_pton'):
  72. # getaddrinfo() is slow and has known issue: prefer inet_pton()
  73. # if available
  74. try:
  75. socket.inet_pton(family, host)
  76. except OSError as exc:
  77. raise ValueError("address must be resolved (IP address), "
  78. "got host %r: %s"
  79. % (host, exc))
  80. else:
  81. # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
  82. # already resolved.
  83. type_mask = 0
  84. if hasattr(socket, 'SOCK_NONBLOCK'):
  85. type_mask |= socket.SOCK_NONBLOCK
  86. if hasattr(socket, 'SOCK_CLOEXEC'):
  87. type_mask |= socket.SOCK_CLOEXEC
  88. try:
  89. socket.getaddrinfo(host, port,
  90. family=family,
  91. type=(sock.type & ~type_mask),
  92. proto=sock.proto,
  93. flags=socket.AI_NUMERICHOST)
  94. except socket.gaierror as err:
  95. raise ValueError("address must be resolved (IP address), "
  96. "got host %r: %s"
  97. % (host, err))
  98. def _raise_stop_error(*args):
  99. raise _StopError
  100. def _run_until_complete_cb(fut):
  101. exc = fut._exception
  102. if (isinstance(exc, BaseException)
  103. and not isinstance(exc, Exception)):
  104. # Issue #22429: run_forever() already finished, no need to
  105. # stop it.
  106. return
  107. _raise_stop_error()
  108. class Server(events.AbstractServer):
  109. def __init__(self, loop, sockets):
  110. self._loop = loop
  111. self.sockets = sockets
  112. self._active_count = 0
  113. self._waiters = []
  114. def __repr__(self):
  115. return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
  116. def _attach(self):
  117. assert self.sockets is not None
  118. self._active_count += 1
  119. def _detach(self):
  120. assert self._active_count > 0
  121. self._active_count -= 1
  122. if self._active_count == 0 and self.sockets is None:
  123. self._wakeup()
  124. def close(self):
  125. sockets = self.sockets
  126. if sockets is None:
  127. return
  128. self.sockets = None
  129. for sock in sockets:
  130. self._loop._stop_serving(sock)
  131. if self._active_count == 0:
  132. self._wakeup()
  133. def _wakeup(self):
  134. waiters = self._waiters
  135. self._waiters = None
  136. for waiter in waiters:
  137. if not waiter.done():
  138. waiter.set_result(waiter)
  139. @coroutine
  140. def wait_closed(self):
  141. if self.sockets is None or self._waiters is None:
  142. return
  143. waiter = futures.Future(loop=self._loop)
  144. self._waiters.append(waiter)
  145. yield from waiter
  146. class BaseEventLoop(events.AbstractEventLoop):
  147. def __init__(self):
  148. self._timer_cancelled_count = 0
  149. self._closed = False
  150. self._ready = collections.deque()
  151. self._scheduled = []
  152. self._default_executor = None
  153. self._internal_fds = 0
  154. # Identifier of the thread running the event loop, or None if the
  155. # event loop is not running
  156. self._thread_id = None
  157. self._clock_resolution = time.get_clock_info('monotonic').resolution
  158. self._exception_handler = None
  159. self._debug = (not sys.flags.ignore_environment
  160. and bool(os.environ.get('PYTHONASYNCIODEBUG')))
  161. # In debug mode, if the execution of a callback or a step of a task
  162. # exceed this duration in seconds, the slow callback/task is logged.
  163. self.slow_callback_duration = 0.1
  164. self._current_handle = None
  165. def __repr__(self):
  166. return ('<%s running=%s closed=%s debug=%s>'
  167. % (self.__class__.__name__, self.is_running(),
  168. self.is_closed(), self.get_debug()))
  169. def create_task(self, coro):
  170. """Schedule a coroutine object.
  171. Return a task object.
  172. """
  173. self._check_closed()
  174. task = tasks.Task(coro, loop=self)
  175. if task._source_traceback:
  176. del task._source_traceback[-1]
  177. return task
  178. def _make_socket_transport(self, sock, protocol, waiter=None, *,
  179. extra=None, server=None):
  180. """Create socket transport."""
  181. raise NotImplementedError
  182. def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
  183. *, server_side=False, server_hostname=None,
  184. extra=None, server=None):
  185. """Create SSL transport."""
  186. raise NotImplementedError
  187. def _make_datagram_transport(self, sock, protocol,
  188. address=None, waiter=None, extra=None):
  189. """Create datagram transport."""
  190. raise NotImplementedError
  191. def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
  192. extra=None):
  193. """Create read pipe transport."""
  194. raise NotImplementedError
  195. def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
  196. extra=None):
  197. """Create write pipe transport."""
  198. raise NotImplementedError
  199. @coroutine
  200. def _make_subprocess_transport(self, protocol, args, shell,
  201. stdin, stdout, stderr, bufsize,
  202. extra=None, **kwargs):
  203. """Create subprocess transport."""
  204. raise NotImplementedError
  205. def _write_to_self(self):
  206. """Write a byte to self-pipe, to wake up the event loop.
  207. This may be called from a different thread.
  208. The subclass is responsible for implementing the self-pipe.
  209. """
  210. raise NotImplementedError
  211. def _process_events(self, event_list):
  212. """Process selector events."""
  213. raise NotImplementedError
  214. def _check_closed(self):
  215. if self._closed:
  216. raise RuntimeError('Event loop is closed')
  217. def run_forever(self):
  218. """Run until stop() is called."""
  219. self._check_closed()
  220. if self.is_running():
  221. raise RuntimeError('Event loop is running.')
  222. self._thread_id = threading.get_ident()
  223. try:
  224. while True:
  225. try:
  226. self._run_once()
  227. except _StopError:
  228. break
  229. finally:
  230. self._thread_id = None
  231. def run_until_complete(self, future):
  232. """Run until the Future is done.
  233. If the argument is a coroutine, it is wrapped in a Task.
  234. WARNING: It would be disastrous to call run_until_complete()
  235. with the same coroutine twice -- it would wrap it in two
  236. different Tasks and that can't be good.
  237. Return the Future's result, or raise its exception.
  238. """
  239. self._check_closed()
  240. new_task = not isinstance(future, futures.Future)
  241. future = tasks.async(future, loop=self)
  242. if new_task:
  243. # An exception is raised if the future didn't complete, so there
  244. # is no need to log the "destroy pending task" message
  245. future._log_destroy_pending = False
  246. future.add_done_callback(_run_until_complete_cb)
  247. try:
  248. self.run_forever()
  249. except:
  250. if new_task and future.done() and not future.cancelled():
  251. # The coroutine raised a BaseException. Consume the exception
  252. # to not log a warning, the caller doesn't have access to the
  253. # local task.
  254. future.exception()
  255. raise
  256. future.remove_done_callback(_run_until_complete_cb)
  257. if not future.done():
  258. raise RuntimeError('Event loop stopped before Future completed.')
  259. return future.result()
  260. def stop(self):
  261. """Stop running the event loop.
  262. Every callback scheduled before stop() is called will run. Callbacks
  263. scheduled after stop() is called will not run. However, those callbacks
  264. will run if run_forever is called again later.
  265. """
  266. self.call_soon(_raise_stop_error)
  267. def close(self):
  268. """Close the event loop.
  269. This clears the queues and shuts down the executor,
  270. but does not wait for the executor to finish.
  271. The event loop must not be running.
  272. """
  273. if self.is_running():
  274. raise RuntimeError("Cannot close a running event loop")
  275. if self._closed:
  276. return
  277. if self._debug:
  278. logger.debug("Close %r", self)
  279. self._closed = True
  280. self._ready.clear()
  281. self._scheduled.clear()
  282. executor = self._default_executor
  283. if executor is not None:
  284. self._default_executor = None
  285. executor.shutdown(wait=False)
  286. def is_closed(self):
  287. """Returns True if the event loop was closed."""
  288. return self._closed
  289. # On Python 3.3 and older, objects with a destructor part of a reference
  290. # cycle are never destroyed. It's not more the case on Python 3.4 thanks
  291. # to the PEP 442.
  292. if sys.version_info >= (3, 4):
  293. def __del__(self):
  294. if not self.is_closed():
  295. warnings.warn("unclosed event loop %r" % self, ResourceWarning)
  296. if not self.is_running():
  297. self.close()
  298. def is_running(self):
  299. """Returns True if the event loop is running."""
  300. return (self._thread_id is not None)
  301. def time(self):
  302. """Return the time according to the event loop's clock.
  303. This is a float expressed in seconds since an epoch, but the
  304. epoch, precision, accuracy and drift are unspecified and may
  305. differ per event loop.
  306. """
  307. return time.monotonic()
  308. def call_later(self, delay, callback, *args):
  309. """Arrange for a callback to be called at a given time.
  310. Return a Handle: an opaque object with a cancel() method that
  311. can be used to cancel the call.
  312. The delay can be an int or float, expressed in seconds. It is
  313. always relative to the current time.
  314. Each callback will be called exactly once. If two callbacks
  315. are scheduled for exactly the same time, it undefined which
  316. will be called first.
  317. Any positional arguments after the callback will be passed to
  318. the callback when it is called.
  319. """
  320. timer = self.call_at(self.time() + delay, callback, *args)
  321. if timer._source_traceback:
  322. del timer._source_traceback[-1]
  323. return timer
  324. def call_at(self, when, callback, *args):
  325. """Like call_later(), but uses an absolute time.
  326. Absolute time corresponds to the event loop's time() method.
  327. """
  328. if (coroutines.iscoroutine(callback)
  329. or coroutines.iscoroutinefunction(callback)):
  330. raise TypeError("coroutines cannot be used with call_at()")
  331. self._check_closed()
  332. if self._debug:
  333. self._check_thread()
  334. timer = events.TimerHandle(when, callback, args, self)
  335. if timer._source_traceback:
  336. del timer._source_traceback[-1]
  337. heapq.heappush(self._scheduled, timer)
  338. timer._scheduled = True
  339. return timer
  340. def call_soon(self, callback, *args):
  341. """Arrange for a callback to be called as soon as possible.
  342. This operates as a FIFO queue: callbacks are called in the
  343. order in which they are registered. Each callback will be
  344. called exactly once.
  345. Any positional arguments after the callback will be passed to
  346. the callback when it is called.
  347. """
  348. if self._debug:
  349. self._check_thread()
  350. handle = self._call_soon(callback, args)
  351. if handle._source_traceback:
  352. del handle._source_traceback[-1]
  353. return handle
  354. def _call_soon(self, callback, args):
  355. if (coroutines.iscoroutine(callback)
  356. or coroutines.iscoroutinefunction(callback)):
  357. raise TypeError("coroutines cannot be used with call_soon()")
  358. self._check_closed()
  359. handle = events.Handle(callback, args, self)
  360. if handle._source_traceback:
  361. del handle._source_traceback[-1]
  362. self._ready.append(handle)
  363. return handle
  364. def _check_thread(self):
  365. """Check that the current thread is the thread running the event loop.
  366. Non-thread-safe methods of this class make this assumption and will
  367. likely behave incorrectly when the assumption is violated.
  368. Should only be called when (self._debug == True). The caller is
  369. responsible for checking this condition for performance reasons.
  370. """
  371. if self._thread_id is None:
  372. return
  373. thread_id = threading.get_ident()
  374. if thread_id != self._thread_id:
  375. raise RuntimeError(
  376. "Non-thread-safe operation invoked on an event loop other "
  377. "than the current one")
  378. def call_soon_threadsafe(self, callback, *args):
  379. """Like call_soon(), but thread-safe."""
  380. handle = self._call_soon(callback, args)
  381. if handle._source_traceback:
  382. del handle._source_traceback[-1]
  383. self._write_to_self()
  384. return handle
  385. def run_in_executor(self, executor, callback, *args):
  386. if (coroutines.iscoroutine(callback)
  387. or coroutines.iscoroutinefunction(callback)):
  388. raise TypeError("coroutines cannot be used with run_in_executor()")
  389. self._check_closed()
  390. if isinstance(callback, events.Handle):
  391. assert not args
  392. assert not isinstance(callback, events.TimerHandle)
  393. if callback._cancelled:
  394. f = futures.Future(loop=self)
  395. f.set_result(None)
  396. return f
  397. callback, args = callback._callback, callback._args
  398. if executor is None:
  399. executor = self._default_executor
  400. if executor is None:
  401. executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
  402. self._default_executor = executor
  403. return futures.wrap_future(executor.submit(callback, *args), loop=self)
  404. def set_default_executor(self, executor):
  405. self._default_executor = executor
  406. def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
  407. msg = ["%s:%r" % (host, port)]
  408. if family:
  409. msg.append('family=%r' % family)
  410. if type:
  411. msg.append('type=%r' % type)
  412. if proto:
  413. msg.append('proto=%r' % proto)
  414. if flags:
  415. msg.append('flags=%r' % flags)
  416. msg = ', '.join(msg)
  417. logger.debug('Get address info %s', msg)
  418. t0 = self.time()
  419. addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
  420. dt = self.time() - t0
  421. msg = ('Getting address info %s took %.3f ms: %r'
  422. % (msg, dt * 1e3, addrinfo))
  423. if dt >= self.slow_callback_duration:
  424. logger.info(msg)
  425. else:
  426. logger.debug(msg)
  427. return addrinfo
  428. def getaddrinfo(self, host, port, *,
  429. family=0, type=0, proto=0, flags=0):
  430. if self._debug:
  431. return self.run_in_executor(None, self._getaddrinfo_debug,
  432. host, port, family, type, proto, flags)
  433. else:
  434. return self.run_in_executor(None, socket.getaddrinfo,
  435. host, port, family, type, proto, flags)
  436. def getnameinfo(self, sockaddr, flags=0):
  437. return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
  438. @coroutine
  439. def create_connection(self, protocol_factory, host=None, port=None, *,
  440. ssl=None, family=0, proto=0, flags=0, sock=None,
  441. local_addr=None, server_hostname=None):
  442. """Connect to a TCP server.
  443. Create a streaming transport connection to a given Internet host and
  444. port: socket family AF_INET or socket.AF_INET6 depending on host (or
  445. family if specified), socket type SOCK_STREAM. protocol_factory must be
  446. a callable returning a protocol instance.
  447. This method is a coroutine which will try to establish the connection
  448. in the background. When successful, the coroutine returns a
  449. (transport, protocol) pair.
  450. """
  451. if server_hostname is not None and not ssl:
  452. raise ValueError('server_hostname is only meaningful with ssl')
  453. if server_hostname is None and ssl:
  454. # Use host as default for server_hostname. It is an error
  455. # if host is empty or not set, e.g. when an
  456. # already-connected socket was passed or when only a port
  457. # is given. To avoid this error, you can pass
  458. # server_hostname='' -- this will bypass the hostname
  459. # check. (This also means that if host is a numeric
  460. # IP/IPv6 address, we will attempt to verify that exact
  461. # address; this will probably fail, but it is possible to
  462. # create a certificate for a specific IP address, so we
  463. # don't judge it here.)
  464. if not host:
  465. raise ValueError('You must set server_hostname '
  466. 'when using ssl without a host')
  467. server_hostname = host
  468. if host is not None or port is not None:
  469. if sock is not None:
  470. raise ValueError(
  471. 'host/port and sock can not be specified at the same time')
  472. f1 = self.getaddrinfo(
  473. host, port, family=family,
  474. type=socket.SOCK_STREAM, proto=proto, flags=flags)
  475. fs = [f1]
  476. if local_addr is not None:
  477. f2 = self.getaddrinfo(
  478. *local_addr, family=family,
  479. type=socket.SOCK_STREAM, proto=proto, flags=flags)
  480. fs.append(f2)
  481. else:
  482. f2 = None
  483. yield from tasks.wait(fs, loop=self)
  484. infos = f1.result()
  485. if not infos:
  486. raise OSError('getaddrinfo() returned empty list')
  487. if f2 is not None:
  488. laddr_infos = f2.result()
  489. if not laddr_infos:
  490. raise OSError('getaddrinfo() returned empty list')
  491. exceptions = []
  492. for family, type, proto, cname, address in infos:
  493. try:
  494. sock = socket.socket(family=family, type=type, proto=proto)
  495. sock.setblocking(False)
  496. if f2 is not None:
  497. for _, _, _, _, laddr in laddr_infos:
  498. try:
  499. sock.bind(laddr)
  500. break
  501. except OSError as exc:
  502. exc = OSError(
  503. exc.errno, 'error while '
  504. 'attempting to bind on address '
  505. '{!r}: {}'.format(
  506. laddr, exc.strerror.lower()))
  507. exceptions.append(exc)
  508. else:
  509. sock.close()
  510. sock = None
  511. continue
  512. if self._debug:
  513. logger.debug("connect %r to %r", sock, address)
  514. yield from self.sock_connect(sock, address)
  515. except OSError as exc:
  516. if sock is not None:
  517. sock.close()
  518. exceptions.append(exc)
  519. except:
  520. if sock is not None:
  521. sock.close()
  522. raise
  523. else:
  524. break
  525. else:
  526. if len(exceptions) == 1:
  527. raise exceptions[0]
  528. else:
  529. # If they all have the same str(), raise one.
  530. model = str(exceptions[0])
  531. if all(str(exc) == model for exc in exceptions):
  532. raise exceptions[0]
  533. # Raise a combined exception so the user can see all
  534. # the various error messages.
  535. raise OSError('Multiple exceptions: {}'.format(
  536. ', '.join(str(exc) for exc in exceptions)))
  537. elif sock is None:
  538. raise ValueError(
  539. 'host and port was not specified and no sock specified')
  540. sock.setblocking(False)
  541. transport, protocol = yield from self._create_connection_transport(
  542. sock, protocol_factory, ssl, server_hostname)
  543. if self._debug:
  544. # Get the socket from the transport because SSL transport closes
  545. # the old socket and creates a new SSL socket
  546. sock = transport.get_extra_info('socket')
  547. logger.debug("%r connected to %s:%r: (%r, %r)",
  548. sock, host, port, transport, protocol)
  549. return transport, protocol
  550. @coroutine
  551. def _create_connection_transport(self, sock, protocol_factory, ssl,
  552. server_hostname):
  553. protocol = protocol_factory()
  554. waiter = futures.Future(loop=self)
  555. if ssl:
  556. sslcontext = None if isinstance(ssl, bool) else ssl
  557. transport = self._make_ssl_transport(
  558. sock, protocol, sslcontext, waiter,
  559. server_side=False, server_hostname=server_hostname)
  560. else:
  561. transport = self._make_socket_transport(sock, protocol, waiter)
  562. try:
  563. yield from waiter
  564. except:
  565. transport.close()
  566. raise
  567. return transport, protocol
  568. @coroutine
  569. def create_datagram_endpoint(self, protocol_factory,
  570. local_addr=None, remote_addr=None, *,
  571. family=0, proto=0, flags=0):
  572. """Create datagram connection."""
  573. if not (local_addr or remote_addr):
  574. if family == 0:
  575. raise ValueError('unexpected address family')
  576. addr_pairs_info = (((family, proto), (None, None)),)
  577. else:
  578. # join address by (family, protocol)
  579. addr_infos = collections.OrderedDict()
  580. for idx, addr in ((0, local_addr), (1, remote_addr)):
  581. if addr is not None:
  582. assert isinstance(addr, tuple) and len(addr) == 2, (
  583. '2-tuple is expected')
  584. infos = yield from self.getaddrinfo(
  585. *addr, family=family, type=socket.SOCK_DGRAM,
  586. proto=proto, flags=flags)
  587. if not infos:
  588. raise OSError('getaddrinfo() returned empty list')
  589. for fam, _, pro, _, address in infos:
  590. key = (fam, pro)
  591. if key not in addr_infos:
  592. addr_infos[key] = [None, None]
  593. addr_infos[key][idx] = address
  594. # each addr has to have info for each (family, proto) pair
  595. addr_pairs_info = [
  596. (key, addr_pair) for key, addr_pair in addr_infos.items()
  597. if not ((local_addr and addr_pair[0] is None) or
  598. (remote_addr and addr_pair[1] is None))]
  599. if not addr_pairs_info:
  600. raise ValueError('can not get address information')
  601. exceptions = []
  602. for ((family, proto),
  603. (local_address, remote_address)) in addr_pairs_info:
  604. sock = None
  605. r_addr = None
  606. try:
  607. sock = socket.socket(
  608. family=family, type=socket.SOCK_DGRAM, proto=proto)
  609. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  610. sock.setblocking(False)
  611. if local_addr:
  612. sock.bind(local_address)
  613. if remote_addr:
  614. yield from self.sock_connect(sock, remote_address)
  615. r_addr = remote_address
  616. except OSError as exc:
  617. if sock is not None:
  618. sock.close()
  619. exceptions.append(exc)
  620. except:
  621. if sock is not None:
  622. sock.close()
  623. raise
  624. else:
  625. break
  626. else:
  627. raise exceptions[0]
  628. protocol = protocol_factory()
  629. waiter = futures.Future(loop=self)
  630. transport = self._make_datagram_transport(sock, protocol, r_addr,
  631. waiter)
  632. if self._debug:
  633. if local_addr:
  634. logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
  635. "created: (%r, %r)",
  636. local_addr, remote_addr, transport, protocol)
  637. else:
  638. logger.debug("Datagram endpoint remote_addr=%r created: "
  639. "(%r, %r)",
  640. remote_addr, transport, protocol)
  641. try:
  642. yield from waiter
  643. except:
  644. transport.close()
  645. raise
  646. return transport, protocol
  647. @coroutine
  648. def create_server(self, protocol_factory, host=None, port=None,
  649. *,
  650. family=socket.AF_UNSPEC,
  651. flags=socket.AI_PASSIVE,
  652. sock=None,
  653. backlog=100,
  654. ssl=None,
  655. reuse_address=None):
  656. """Create a TCP server bound to host and port.
  657. Return a Server object which can be used to stop the service.
  658. This method is a coroutine.
  659. """
  660. if isinstance(ssl, bool):
  661. raise TypeError('ssl argument must be an SSLContext or None')
  662. if host is not None or port is not None:
  663. if sock is not None:
  664. raise ValueError(
  665. 'host/port and sock can not be specified at the same time')
  666. AF_INET6 = getattr(socket, 'AF_INET6', 0)
  667. if reuse_address is None:
  668. reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
  669. sockets = []
  670. if host == '':
  671. host = None
  672. infos = yield from self.getaddrinfo(
  673. host, port, family=family,
  674. type=socket.SOCK_STREAM, proto=0, flags=flags)
  675. if not infos:
  676. raise OSError('getaddrinfo() returned empty list')
  677. completed = False
  678. try:
  679. for res in infos:
  680. af, socktype, proto, canonname, sa = res
  681. try:
  682. sock = socket.socket(af, socktype, proto)
  683. except socket.error:
  684. # Assume it's a bad family/type/protocol combination.
  685. if self._debug:
  686. logger.warning('create_server() failed to create '
  687. 'socket.socket(%r, %r, %r)',
  688. af, socktype, proto, exc_info=True)
  689. continue
  690. sockets.append(sock)
  691. if reuse_address:
  692. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
  693. True)
  694. # Disable IPv4/IPv6 dual stack support (enabled by
  695. # default on Linux) which makes a single socket
  696. # listen on both address families.
  697. if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
  698. sock.setsockopt(socket.IPPROTO_IPV6,
  699. socket.IPV6_V6ONLY,
  700. True)
  701. try:
  702. sock.bind(sa)
  703. except OSError as err:
  704. raise OSError(err.errno, 'error while attempting '
  705. 'to bind on address %r: %s'
  706. % (sa, err.strerror.lower()))
  707. completed = True
  708. finally:
  709. if not completed:
  710. for sock in sockets:
  711. sock.close()
  712. else:
  713. if sock is None:
  714. raise ValueError('Neither host/port nor sock were specified')
  715. sockets = [sock]
  716. server = Server(self, sockets)
  717. for sock in sockets:
  718. sock.listen(backlog)
  719. sock.setblocking(False)
  720. self._start_serving(protocol_factory, sock, ssl, server)
  721. if self._debug:
  722. logger.info("%r is serving", server)
  723. return server
  724. @coroutine
  725. def connect_read_pipe(self, protocol_factory, pipe):
  726. protocol = protocol_factory()
  727. waiter = futures.Future(loop=self)
  728. transport = self._make_read_pipe_transport(pipe, protocol, waiter)
  729. try:
  730. yield from waiter
  731. except:
  732. transport.close()
  733. raise
  734. if self._debug:
  735. logger.debug('Read pipe %r connected: (%r, %r)',
  736. pipe.fileno(), transport, protocol)
  737. return transport, protocol
  738. @coroutine
  739. def connect_write_pipe(self, protocol_factory, pipe):
  740. protocol = protocol_factory()
  741. waiter = futures.Future(loop=self)
  742. transport = self._make_write_pipe_transport(pipe, protocol, waiter)
  743. try:
  744. yield from waiter
  745. except:
  746. transport.close()
  747. raise
  748. if self._debug:
  749. logger.debug('Write pipe %r connected: (%r, %r)',
  750. pipe.fileno(), transport, protocol)
  751. return transport, protocol
  752. def _log_subprocess(self, msg, stdin, stdout, stderr):
  753. info = [msg]
  754. if stdin is not None:
  755. info.append('stdin=%s' % _format_pipe(stdin))
  756. if stdout is not None and stderr == subprocess.STDOUT:
  757. info.append('stdout=stderr=%s' % _format_pipe(stdout))
  758. else:
  759. if stdout is not None:
  760. info.append('stdout=%s' % _format_pipe(stdout))
  761. if stderr is not None:
  762. info.append('stderr=%s' % _format_pipe(stderr))
  763. logger.debug(' '.join(info))
  764. @coroutine
  765. def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
  766. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  767. universal_newlines=False, shell=True, bufsize=0,
  768. **kwargs):
  769. if not isinstance(cmd, (bytes, str)):
  770. raise ValueError("cmd must be a string")
  771. if universal_newlines:
  772. raise ValueError("universal_newlines must be False")
  773. if not shell:
  774. raise ValueError("shell must be True")
  775. if bufsize != 0:
  776. raise ValueError("bufsize must be 0")
  777. protocol = protocol_factory()
  778. if self._debug:
  779. # don't log parameters: they may contain sensitive information
  780. # (password) and may be too long
  781. debug_log = 'run shell command %r' % cmd
  782. self._log_subprocess(debug_log, stdin, stdout, stderr)
  783. transport = yield from self._make_subprocess_transport(
  784. protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
  785. if self._debug:
  786. logger.info('%s: %r' % (debug_log, transport))
  787. return transport, protocol
  788. @coroutine
  789. def subprocess_exec(self, protocol_factory, program, *args,
  790. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  791. stderr=subprocess.PIPE, universal_newlines=False,
  792. shell=False, bufsize=0, **kwargs):
  793. if universal_newlines:
  794. raise ValueError("universal_newlines must be False")
  795. if shell:
  796. raise ValueError("shell must be False")
  797. if bufsize != 0:
  798. raise ValueError("bufsize must be 0")
  799. popen_args = (program,) + args
  800. for arg in popen_args:
  801. if not isinstance(arg, (str, bytes)):
  802. raise TypeError("program arguments must be "
  803. "a bytes or text string, not %s"
  804. % type(arg).__name__)
  805. protocol = protocol_factory()
  806. if self._debug:
  807. # don't log parameters: they may contain sensitive information
  808. # (password) and may be too long
  809. debug_log = 'execute program %r' % program
  810. self._log_subprocess(debug_log, stdin, stdout, stderr)
  811. transport = yield from self._make_subprocess_transport(
  812. protocol, popen_args, False, stdin, stdout, stderr,
  813. bufsize, **kwargs)
  814. if self._debug:
  815. logger.info('%s: %r' % (debug_log, transport))
  816. return transport, protocol
  817. def set_exception_handler(self, handler):
  818. """Set handler as the new event loop exception handler.
  819. If handler is None, the default exception handler will
  820. be set.
  821. If handler is a callable object, it should have a
  822. signature matching '(loop, context)', where 'loop'
  823. will be a reference to the active event loop, 'context'
  824. will be a dict object (see `call_exception_handler()`
  825. documentation for details about context).
  826. """
  827. if handler is not None and not callable(handler):
  828. raise TypeError('A callable object or None is expected, '
  829. 'got {!r}'.format(handler))
  830. self._exception_handler = handler
  831. def default_exception_handler(self, context):
  832. """Default exception handler.
  833. This is called when an exception occurs and no exception
  834. handler is set, and can be called by a custom exception
  835. handler that wants to defer to the default behavior.
  836. The context parameter has the same meaning as in
  837. `call_exception_handler()`.
  838. """
  839. message = context.get('message')
  840. if not message:
  841. message = 'Unhandled exception in event loop'
  842. exception = context.get('exception')
  843. if exception is not None:
  844. exc_info = (type(exception), exception, exception.__traceback__)
  845. else:
  846. exc_info = False
  847. if ('source_traceback' not in context
  848. and self._current_handle is not None
  849. and self._current_handle._source_traceback):
  850. context['handle_traceback'] = self._current_handle._source_traceback
  851. log_lines = [message]
  852. for key in sorted(context):
  853. if key in {'message', 'exception'}:
  854. continue
  855. value = context[key]
  856. if key == 'source_traceback':
  857. tb = ''.join(traceback.format_list(value))
  858. value = 'Object created at (most recent call last):\n'
  859. value += tb.rstrip()
  860. elif key == 'handle_traceback':
  861. tb = ''.join(traceback.format_list(value))
  862. value = 'Handle created at (most recent call last):\n'
  863. value += tb.rstrip()
  864. else:
  865. value = repr(value)
  866. log_lines.append('{}: {}'.format(key, value))
  867. logger.error('\n'.join(log_lines), exc_info=exc_info)
  868. def call_exception_handler(self, context):
  869. """Call the current event loop's exception handler.
  870. The context argument is a dict containing the following keys:
  871. - 'message': Error message;
  872. - 'exception' (optional): Exception object;
  873. - 'future' (optional): Future instance;
  874. - 'handle' (optional): Handle instance;
  875. - 'protocol' (optional): Protocol instance;
  876. - 'transport' (optional): Transport instance;
  877. - 'socket' (optional): Socket instance.
  878. New keys maybe introduced in the future.
  879. Note: do not overload this method in an event loop subclass.
  880. For custom exception handling, use the
  881. `set_exception_handler()` method.
  882. """
  883. if self._exception_handler is None:
  884. try:
  885. self.default_exception_handler(context)
  886. except Exception:
  887. # Second protection layer for unexpected errors
  888. # in the default implementation, as well as for subclassed
  889. # event loops with overloaded "default_exception_handler".
  890. logger.error('Exception in default exception handler',
  891. exc_info=True)
  892. else:
  893. try:
  894. self._exception_handler(self, context)
  895. except Exception as exc:
  896. # Exception in the user set custom exception handler.
  897. try:
  898. # Let's try default handler.
  899. self.default_exception_handler({
  900. 'message': 'Unhandled error in exception handler',
  901. 'exception': exc,
  902. 'context': context,
  903. })
  904. except Exception:
  905. # Guard 'default_exception_handler' in case it is
  906. # overloaded.
  907. logger.error('Exception in default exception handler '
  908. 'while handling an unexpected error '
  909. 'in custom exception handler',
  910. exc_info=True)
  911. def _add_callback(self, handle):
  912. """Add a Handle to _scheduled (TimerHandle) or _ready."""
  913. assert isinstance(handle, events.Handle), 'A Handle is required here'
  914. if handle._cancelled:
  915. return
  916. assert not isinstance(handle, events.TimerHandle)
  917. self._ready.append(handle)
  918. def _add_callback_signalsafe(self, handle):
  919. """Like _add_callback() but called from a signal handler."""
  920. self._add_callback(handle)
  921. self._write_to_self()
  922. def _timer_handle_cancelled(self, handle):
  923. """Notification that a TimerHandle has been cancelled."""
  924. if handle._scheduled:
  925. self._timer_cancelled_count += 1
  926. def _run_once(self):
  927. """Run one full iteration of the event loop.
  928. This calls all currently ready callbacks, polls for I/O,
  929. schedules the resulting callbacks, and finally schedules
  930. 'call_later' callbacks.
  931. """
  932. sched_count = len(self._scheduled)
  933. if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
  934. self._timer_cancelled_count / sched_count >
  935. _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
  936. # Remove delayed calls that were cancelled if their number
  937. # is too high
  938. new_scheduled = []
  939. for handle in self._scheduled:
  940. if handle._cancelled:
  941. handle._scheduled = False
  942. else:
  943. new_scheduled.append(handle)
  944. heapq.heapify(new_scheduled)
  945. self._scheduled = new_scheduled
  946. self._timer_cancelled_count = 0
  947. else:
  948. # Remove delayed calls that were cancelled from head of queue.
  949. while self._scheduled and self._scheduled[0]._cancelled:
  950. self._timer_cancelled_count -= 1
  951. handle = heapq.heappop(self._scheduled)
  952. handle._scheduled = False
  953. timeout = None
  954. if self._ready:
  955. timeout = 0
  956. elif self._scheduled:
  957. # Compute the desired timeout.
  958. when = self._scheduled[0]._when
  959. timeout = max(0, when - self.time())
  960. if self._debug and timeout != 0:
  961. t0 = self.time()
  962. event_list = self._selector.select(timeout)
  963. dt = self.time() - t0
  964. if dt >= 1.0:
  965. level = logging.INFO
  966. else:
  967. level = logging.DEBUG
  968. nevent = len(event_list)
  969. if timeout is None:
  970. logger.log(level, 'poll took %.3f ms: %s events',
  971. dt * 1e3, nevent)
  972. elif nevent:
  973. logger.log(level,
  974. 'poll %.3f ms took %.3f ms: %s events',
  975. timeout * 1e3, dt * 1e3, nevent)
  976. elif dt >= 1.0:
  977. logger.log(level,
  978. 'poll %.3f ms took %.3f ms: timeout',
  979. timeout * 1e3, dt * 1e3)
  980. else:
  981. event_list = self._selector.select(timeout)
  982. self._process_events(event_list)
  983. # Handle 'later' callbacks that are ready.
  984. end_time = self.time() + self._clock_resolution
  985. while self._scheduled:
  986. handle = self._scheduled[0]
  987. if handle._when >= end_time:
  988. break
  989. handle = heapq.heappop(self._scheduled)
  990. handle._scheduled = False
  991. self._ready.append(handle)
  992. # This is the only place where callbacks are actually *called*.
  993. # All other places just add them to ready.
  994. # Note: We run all currently scheduled callbacks, but not any
  995. # callbacks scheduled by callbacks run this time around --
  996. # they will be run the next time (after another I/O poll).
  997. # Use an idiom that is thread-safe without using locks.
  998. ntodo = len(self._ready)
  999. for i in range(ntodo):
  1000. handle = self._ready.popleft()
  1001. if handle._cancelled:
  1002. continue
  1003. if self._debug:
  1004. try:
  1005. self._current_handle = handle
  1006. t0 = self.time()
  1007. handle._run()
  1008. dt = self.time() - t0
  1009. if dt >= self.slow_callback_duration:
  1010. logger.warning('Executing %s took %.3f seconds',
  1011. _format_handle(handle), dt)
  1012. finally:
  1013. self._current_handle = None
  1014. else:
  1015. handle._run()
  1016. handle = None # Needed to break cycles when an exception occurs.
  1017. def get_debug(self):
  1018. return self._debug
  1019. def set_debug(self, enabled):
  1020. self._debug = enabled