unix_events.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. """Selector event loop for Unix with signal handling."""
  2. import errno
  3. import os
  4. import signal
  5. import socket
  6. import stat
  7. import subprocess
  8. import sys
  9. import threading
  10. import warnings
  11. from . import base_events
  12. from . import base_subprocess
  13. from . import constants
  14. from . import coroutines
  15. from . import events
  16. from . import futures
  17. from . import selector_events
  18. from . import selectors
  19. from . import transports
  20. from .coroutines import coroutine
  21. from .log import logger
  22. __all__ = ['SelectorEventLoop',
  23. 'AbstractChildWatcher', 'SafeChildWatcher',
  24. 'FastChildWatcher', 'DefaultEventLoopPolicy',
  25. ]
  26. if sys.platform == 'win32': # pragma: no cover
  27. raise ImportError('Signals are not really supported on Windows')
  28. def _sighandler_noop(signum, frame):
  29. """Dummy signal handler."""
  30. pass
  31. class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  32. """Unix event loop.
  33. Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
  34. """
  35. def __init__(self, selector=None):
  36. super().__init__(selector)
  37. self._signal_handlers = {}
  38. def _socketpair(self):
  39. return socket.socketpair()
  40. def close(self):
  41. super().close()
  42. for sig in list(self._signal_handlers):
  43. self.remove_signal_handler(sig)
  44. def _process_self_data(self, data):
  45. for signum in data:
  46. if not signum:
  47. # ignore null bytes written by _write_to_self()
  48. continue
  49. self._handle_signal(signum)
  50. def add_signal_handler(self, sig, callback, *args):
  51. """Add a handler for a signal. UNIX only.
  52. Raise ValueError if the signal number is invalid or uncatchable.
  53. Raise RuntimeError if there is a problem setting up the handler.
  54. """
  55. if (coroutines.iscoroutine(callback)
  56. or coroutines.iscoroutinefunction(callback)):
  57. raise TypeError("coroutines cannot be used "
  58. "with add_signal_handler()")
  59. self._check_signal(sig)
  60. self._check_closed()
  61. try:
  62. # set_wakeup_fd() raises ValueError if this is not the
  63. # main thread. By calling it early we ensure that an
  64. # event loop running in another thread cannot add a signal
  65. # handler.
  66. signal.set_wakeup_fd(self._csock.fileno())
  67. except (ValueError, OSError) as exc:
  68. raise RuntimeError(str(exc))
  69. handle = events.Handle(callback, args, self)
  70. self._signal_handlers[sig] = handle
  71. try:
  72. # Register a dummy signal handler to ask Python to write the signal
  73. # number in the wakup file descriptor. _process_self_data() will
  74. # read signal numbers from this file descriptor to handle signals.
  75. signal.signal(sig, _sighandler_noop)
  76. # Set SA_RESTART to limit EINTR occurrences.
  77. signal.siginterrupt(sig, False)
  78. except OSError as exc:
  79. del self._signal_handlers[sig]
  80. if not self._signal_handlers:
  81. try:
  82. signal.set_wakeup_fd(-1)
  83. except (ValueError, OSError) as nexc:
  84. logger.info('set_wakeup_fd(-1) failed: %s', nexc)
  85. if exc.errno == errno.EINVAL:
  86. raise RuntimeError('sig {} cannot be caught'.format(sig))
  87. else:
  88. raise
  89. def _handle_signal(self, sig):
  90. """Internal helper that is the actual signal handler."""
  91. handle = self._signal_handlers.get(sig)
  92. if handle is None:
  93. return # Assume it's some race condition.
  94. if handle._cancelled:
  95. self.remove_signal_handler(sig) # Remove it properly.
  96. else:
  97. self._add_callback_signalsafe(handle)
  98. def remove_signal_handler(self, sig):
  99. """Remove a handler for a signal. UNIX only.
  100. Return True if a signal handler was removed, False if not.
  101. """
  102. self._check_signal(sig)
  103. try:
  104. del self._signal_handlers[sig]
  105. except KeyError:
  106. return False
  107. if sig == signal.SIGINT:
  108. handler = signal.default_int_handler
  109. else:
  110. handler = signal.SIG_DFL
  111. try:
  112. signal.signal(sig, handler)
  113. except OSError as exc:
  114. if exc.errno == errno.EINVAL:
  115. raise RuntimeError('sig {} cannot be caught'.format(sig))
  116. else:
  117. raise
  118. if not self._signal_handlers:
  119. try:
  120. signal.set_wakeup_fd(-1)
  121. except (ValueError, OSError) as exc:
  122. logger.info('set_wakeup_fd(-1) failed: %s', exc)
  123. return True
  124. def _check_signal(self, sig):
  125. """Internal helper to validate a signal.
  126. Raise ValueError if the signal number is invalid or uncatchable.
  127. Raise RuntimeError if there is a problem setting up the handler.
  128. """
  129. if not isinstance(sig, int):
  130. raise TypeError('sig must be an int, not {!r}'.format(sig))
  131. if not (1 <= sig < signal.NSIG):
  132. raise ValueError(
  133. 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
  134. def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
  135. extra=None):
  136. return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
  137. def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
  138. extra=None):
  139. return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
  140. @coroutine
  141. def _make_subprocess_transport(self, protocol, args, shell,
  142. stdin, stdout, stderr, bufsize,
  143. extra=None, **kwargs):
  144. with events.get_child_watcher() as watcher:
  145. waiter = futures.Future(loop=self)
  146. transp = _UnixSubprocessTransport(self, protocol, args, shell,
  147. stdin, stdout, stderr, bufsize,
  148. waiter=waiter, extra=extra,
  149. **kwargs)
  150. watcher.add_child_handler(transp.get_pid(),
  151. self._child_watcher_callback, transp)
  152. try:
  153. yield from waiter
  154. except Exception as exc:
  155. # Workaround CPython bug #23353: using yield/yield-from in an
  156. # except block of a generator doesn't clear properly
  157. # sys.exc_info()
  158. err = exc
  159. else:
  160. err = None
  161. if err is not None:
  162. transp.close()
  163. yield from transp._wait()
  164. raise err
  165. return transp
  166. def _child_watcher_callback(self, pid, returncode, transp):
  167. self.call_soon_threadsafe(transp._process_exited, returncode)
  168. @coroutine
  169. def create_unix_connection(self, protocol_factory, path, *,
  170. ssl=None, sock=None,
  171. server_hostname=None):
  172. assert server_hostname is None or isinstance(server_hostname, str)
  173. if ssl:
  174. if server_hostname is None:
  175. raise ValueError(
  176. 'you have to pass server_hostname when using ssl')
  177. else:
  178. if server_hostname is not None:
  179. raise ValueError('server_hostname is only meaningful with ssl')
  180. if path is not None:
  181. if sock is not None:
  182. raise ValueError(
  183. 'path and sock can not be specified at the same time')
  184. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
  185. try:
  186. sock.setblocking(False)
  187. yield from self.sock_connect(sock, path)
  188. except:
  189. sock.close()
  190. raise
  191. else:
  192. if sock is None:
  193. raise ValueError('no path and sock were specified')
  194. sock.setblocking(False)
  195. transport, protocol = yield from self._create_connection_transport(
  196. sock, protocol_factory, ssl, server_hostname)
  197. return transport, protocol
  198. @coroutine
  199. def create_unix_server(self, protocol_factory, path=None, *,
  200. sock=None, backlog=100, ssl=None):
  201. if isinstance(ssl, bool):
  202. raise TypeError('ssl argument must be an SSLContext or None')
  203. if path is not None:
  204. if sock is not None:
  205. raise ValueError(
  206. 'path and sock can not be specified at the same time')
  207. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  208. try:
  209. sock.bind(path)
  210. except OSError as exc:
  211. sock.close()
  212. if exc.errno == errno.EADDRINUSE:
  213. # Let's improve the error message by adding
  214. # with what exact address it occurs.
  215. msg = 'Address {!r} is already in use'.format(path)
  216. raise OSError(errno.EADDRINUSE, msg) from None
  217. else:
  218. raise
  219. except:
  220. sock.close()
  221. raise
  222. else:
  223. if sock is None:
  224. raise ValueError(
  225. 'path was not specified, and no sock specified')
  226. if sock.family != socket.AF_UNIX:
  227. raise ValueError(
  228. 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
  229. server = base_events.Server(self, [sock])
  230. sock.listen(backlog)
  231. sock.setblocking(False)
  232. self._start_serving(protocol_factory, sock, ssl, server)
  233. return server
  234. if hasattr(os, 'set_blocking'):
  235. def _set_nonblocking(fd):
  236. os.set_blocking(fd, False)
  237. else:
  238. import fcntl
  239. def _set_nonblocking(fd):
  240. flags = fcntl.fcntl(fd, fcntl.F_GETFL)
  241. flags = flags | os.O_NONBLOCK
  242. fcntl.fcntl(fd, fcntl.F_SETFL, flags)
  243. class _UnixReadPipeTransport(transports.ReadTransport):
  244. max_size = 256 * 1024 # max bytes we read in one event loop iteration
  245. def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
  246. super().__init__(extra)
  247. self._extra['pipe'] = pipe
  248. self._loop = loop
  249. self._pipe = pipe
  250. self._fileno = pipe.fileno()
  251. mode = os.fstat(self._fileno).st_mode
  252. if not (stat.S_ISFIFO(mode) or
  253. stat.S_ISSOCK(mode) or
  254. stat.S_ISCHR(mode)):
  255. raise ValueError("Pipe transport is for pipes/sockets only.")
  256. _set_nonblocking(self._fileno)
  257. self._protocol = protocol
  258. self._closing = False
  259. self._loop.call_soon(self._protocol.connection_made, self)
  260. # only start reading when connection_made() has been called
  261. self._loop.call_soon(self._loop.add_reader,
  262. self._fileno, self._read_ready)
  263. if waiter is not None:
  264. # only wake up the waiter when connection_made() has been called
  265. self._loop.call_soon(waiter._set_result_unless_cancelled, None)
  266. def __repr__(self):
  267. info = [self.__class__.__name__]
  268. if self._pipe is None:
  269. info.append('closed')
  270. elif self._closing:
  271. info.append('closing')
  272. info.append('fd=%s' % self._fileno)
  273. if self._pipe is not None:
  274. polling = selector_events._test_selector_event(
  275. self._loop._selector,
  276. self._fileno, selectors.EVENT_READ)
  277. if polling:
  278. info.append('polling')
  279. else:
  280. info.append('idle')
  281. else:
  282. info.append('closed')
  283. return '<%s>' % ' '.join(info)
  284. def _read_ready(self):
  285. try:
  286. data = os.read(self._fileno, self.max_size)
  287. except (BlockingIOError, InterruptedError):
  288. pass
  289. except OSError as exc:
  290. self._fatal_error(exc, 'Fatal read error on pipe transport')
  291. else:
  292. if data:
  293. self._protocol.data_received(data)
  294. else:
  295. if self._loop.get_debug():
  296. logger.info("%r was closed by peer", self)
  297. self._closing = True
  298. self._loop.remove_reader(self._fileno)
  299. self._loop.call_soon(self._protocol.eof_received)
  300. self._loop.call_soon(self._call_connection_lost, None)
  301. def pause_reading(self):
  302. self._loop.remove_reader(self._fileno)
  303. def resume_reading(self):
  304. self._loop.add_reader(self._fileno, self._read_ready)
  305. def close(self):
  306. if not self._closing:
  307. self._close(None)
  308. # On Python 3.3 and older, objects with a destructor part of a reference
  309. # cycle are never destroyed. It's not more the case on Python 3.4 thanks
  310. # to the PEP 442.
  311. if sys.version_info >= (3, 4):
  312. def __del__(self):
  313. if self._pipe is not None:
  314. warnings.warn("unclosed transport %r" % self, ResourceWarning)
  315. self._pipe.close()
  316. def _fatal_error(self, exc, message='Fatal error on pipe transport'):
  317. # should be called by exception handler only
  318. if (isinstance(exc, OSError) and exc.errno == errno.EIO):
  319. if self._loop.get_debug():
  320. logger.debug("%r: %s", self, message, exc_info=True)
  321. else:
  322. self._loop.call_exception_handler({
  323. 'message': message,
  324. 'exception': exc,
  325. 'transport': self,
  326. 'protocol': self._protocol,
  327. })
  328. self._close(exc)
  329. def _close(self, exc):
  330. self._closing = True
  331. self._loop.remove_reader(self._fileno)
  332. self._loop.call_soon(self._call_connection_lost, exc)
  333. def _call_connection_lost(self, exc):
  334. try:
  335. self._protocol.connection_lost(exc)
  336. finally:
  337. self._pipe.close()
  338. self._pipe = None
  339. self._protocol = None
  340. self._loop = None
  341. class _UnixWritePipeTransport(transports._FlowControlMixin,
  342. transports.WriteTransport):
  343. def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
  344. super().__init__(extra, loop)
  345. self._extra['pipe'] = pipe
  346. self._pipe = pipe
  347. self._fileno = pipe.fileno()
  348. mode = os.fstat(self._fileno).st_mode
  349. is_socket = stat.S_ISSOCK(mode)
  350. if not (is_socket or
  351. stat.S_ISFIFO(mode) or
  352. stat.S_ISCHR(mode)):
  353. raise ValueError("Pipe transport is only for "
  354. "pipes, sockets and character devices")
  355. _set_nonblocking(self._fileno)
  356. self._protocol = protocol
  357. self._buffer = []
  358. self._conn_lost = 0
  359. self._closing = False # Set when close() or write_eof() called.
  360. self._loop.call_soon(self._protocol.connection_made, self)
  361. # On AIX, the reader trick (to be notified when the read end of the
  362. # socket is closed) only works for sockets. On other platforms it
  363. # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
  364. if is_socket or not sys.platform.startswith("aix"):
  365. # only start reading when connection_made() has been called
  366. self._loop.call_soon(self._loop.add_reader,
  367. self._fileno, self._read_ready)
  368. if waiter is not None:
  369. # only wake up the waiter when connection_made() has been called
  370. self._loop.call_soon(waiter._set_result_unless_cancelled, None)
  371. def __repr__(self):
  372. info = [self.__class__.__name__]
  373. if self._pipe is None:
  374. info.append('closed')
  375. elif self._closing:
  376. info.append('closing')
  377. info.append('fd=%s' % self._fileno)
  378. if self._pipe is not None:
  379. polling = selector_events._test_selector_event(
  380. self._loop._selector,
  381. self._fileno, selectors.EVENT_WRITE)
  382. if polling:
  383. info.append('polling')
  384. else:
  385. info.append('idle')
  386. bufsize = self.get_write_buffer_size()
  387. info.append('bufsize=%s' % bufsize)
  388. else:
  389. info.append('closed')
  390. return '<%s>' % ' '.join(info)
  391. def get_write_buffer_size(self):
  392. return sum(len(data) for data in self._buffer)
  393. def _read_ready(self):
  394. # Pipe was closed by peer.
  395. if self._loop.get_debug():
  396. logger.info("%r was closed by peer", self)
  397. if self._buffer:
  398. self._close(BrokenPipeError())
  399. else:
  400. self._close()
  401. def write(self, data):
  402. assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
  403. if isinstance(data, bytearray):
  404. data = memoryview(data)
  405. if not data:
  406. return
  407. if self._conn_lost or self._closing:
  408. if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
  409. logger.warning('pipe closed by peer or '
  410. 'os.write(pipe, data) raised exception.')
  411. self._conn_lost += 1
  412. return
  413. if not self._buffer:
  414. # Attempt to send it right away first.
  415. try:
  416. n = os.write(self._fileno, data)
  417. except (BlockingIOError, InterruptedError):
  418. n = 0
  419. except Exception as exc:
  420. self._conn_lost += 1
  421. self._fatal_error(exc, 'Fatal write error on pipe transport')
  422. return
  423. if n == len(data):
  424. return
  425. elif n > 0:
  426. data = data[n:]
  427. self._loop.add_writer(self._fileno, self._write_ready)
  428. self._buffer.append(data)
  429. self._maybe_pause_protocol()
  430. def _write_ready(self):
  431. data = b''.join(self._buffer)
  432. assert data, 'Data should not be empty'
  433. self._buffer.clear()
  434. try:
  435. n = os.write(self._fileno, data)
  436. except (BlockingIOError, InterruptedError):
  437. self._buffer.append(data)
  438. except Exception as exc:
  439. self._conn_lost += 1
  440. # Remove writer here, _fatal_error() doesn't it
  441. # because _buffer is empty.
  442. self._loop.remove_writer(self._fileno)
  443. self._fatal_error(exc, 'Fatal write error on pipe transport')
  444. else:
  445. if n == len(data):
  446. self._loop.remove_writer(self._fileno)
  447. self._maybe_resume_protocol() # May append to buffer.
  448. if not self._buffer and self._closing:
  449. self._loop.remove_reader(self._fileno)
  450. self._call_connection_lost(None)
  451. return
  452. elif n > 0:
  453. data = data[n:]
  454. self._buffer.append(data) # Try again later.
  455. def can_write_eof(self):
  456. return True
  457. def write_eof(self):
  458. if self._closing:
  459. return
  460. assert self._pipe
  461. self._closing = True
  462. if not self._buffer:
  463. self._loop.remove_reader(self._fileno)
  464. self._loop.call_soon(self._call_connection_lost, None)
  465. def close(self):
  466. if self._pipe is not None and not self._closing:
  467. # write_eof is all what we needed to close the write pipe
  468. self.write_eof()
  469. # On Python 3.3 and older, objects with a destructor part of a reference
  470. # cycle are never destroyed. It's not more the case on Python 3.4 thanks
  471. # to the PEP 442.
  472. if sys.version_info >= (3, 4):
  473. def __del__(self):
  474. if self._pipe is not None:
  475. warnings.warn("unclosed transport %r" % self, ResourceWarning)
  476. self._pipe.close()
  477. def abort(self):
  478. self._close(None)
  479. def _fatal_error(self, exc, message='Fatal error on pipe transport'):
  480. # should be called by exception handler only
  481. if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
  482. if self._loop.get_debug():
  483. logger.debug("%r: %s", self, message, exc_info=True)
  484. else:
  485. self._loop.call_exception_handler({
  486. 'message': message,
  487. 'exception': exc,
  488. 'transport': self,
  489. 'protocol': self._protocol,
  490. })
  491. self._close(exc)
  492. def _close(self, exc=None):
  493. self._closing = True
  494. if self._buffer:
  495. self._loop.remove_writer(self._fileno)
  496. self._buffer.clear()
  497. self._loop.remove_reader(self._fileno)
  498. self._loop.call_soon(self._call_connection_lost, exc)
  499. def _call_connection_lost(self, exc):
  500. try:
  501. self._protocol.connection_lost(exc)
  502. finally:
  503. self._pipe.close()
  504. self._pipe = None
  505. self._protocol = None
  506. self._loop = None
  507. if hasattr(os, 'set_inheritable'):
  508. # Python 3.4 and newer
  509. _set_inheritable = os.set_inheritable
  510. else:
  511. import fcntl
  512. def _set_inheritable(fd, inheritable):
  513. cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
  514. old = fcntl.fcntl(fd, fcntl.F_GETFD)
  515. if not inheritable:
  516. fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
  517. else:
  518. fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
  519. class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  520. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  521. stdin_w = None
  522. if stdin == subprocess.PIPE:
  523. # Use a socket pair for stdin, since not all platforms
  524. # support selecting read events on the write end of a
  525. # socket (which we use in order to detect closing of the
  526. # other end). Notably this is needed on AIX, and works
  527. # just fine on other platforms.
  528. stdin, stdin_w = self._loop._socketpair()
  529. # Mark the write end of the stdin pipe as non-inheritable,
  530. # needed by close_fds=False on Python 3.3 and older
  531. # (Python 3.4 implements the PEP 446, socketpair returns
  532. # non-inheritable sockets)
  533. _set_inheritable(stdin_w.fileno(), False)
  534. self._proc = subprocess.Popen(
  535. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  536. universal_newlines=False, bufsize=bufsize, **kwargs)
  537. if stdin_w is not None:
  538. stdin.close()
  539. self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
  540. class AbstractChildWatcher:
  541. """Abstract base class for monitoring child processes.
  542. Objects derived from this class monitor a collection of subprocesses and
  543. report their termination or interruption by a signal.
  544. New callbacks are registered with .add_child_handler(). Starting a new
  545. process must be done within a 'with' block to allow the watcher to suspend
  546. its activity until the new process if fully registered (this is needed to
  547. prevent a race condition in some implementations).
  548. Example:
  549. with watcher:
  550. proc = subprocess.Popen("sleep 1")
  551. watcher.add_child_handler(proc.pid, callback)
  552. Notes:
  553. Implementations of this class must be thread-safe.
  554. Since child watcher objects may catch the SIGCHLD signal and call
  555. waitpid(-1), there should be only one active object per process.
  556. """
  557. def add_child_handler(self, pid, callback, *args):
  558. """Register a new child handler.
  559. Arrange for callback(pid, returncode, *args) to be called when
  560. process 'pid' terminates. Specifying another callback for the same
  561. process replaces the previous handler.
  562. Note: callback() must be thread-safe.
  563. """
  564. raise NotImplementedError()
  565. def remove_child_handler(self, pid):
  566. """Removes the handler for process 'pid'.
  567. The function returns True if the handler was successfully removed,
  568. False if there was nothing to remove."""
  569. raise NotImplementedError()
  570. def attach_loop(self, loop):
  571. """Attach the watcher to an event loop.
  572. If the watcher was previously attached to an event loop, then it is
  573. first detached before attaching to the new loop.
  574. Note: loop may be None.
  575. """
  576. raise NotImplementedError()
  577. def close(self):
  578. """Close the watcher.
  579. This must be called to make sure that any underlying resource is freed.
  580. """
  581. raise NotImplementedError()
  582. def __enter__(self):
  583. """Enter the watcher's context and allow starting new processes
  584. This function must return self"""
  585. raise NotImplementedError()
  586. def __exit__(self, a, b, c):
  587. """Exit the watcher's context"""
  588. raise NotImplementedError()
  589. class BaseChildWatcher(AbstractChildWatcher):
  590. def __init__(self):
  591. self._loop = None
  592. def close(self):
  593. self.attach_loop(None)
  594. def _do_waitpid(self, expected_pid):
  595. raise NotImplementedError()
  596. def _do_waitpid_all(self):
  597. raise NotImplementedError()
  598. def attach_loop(self, loop):
  599. assert loop is None or isinstance(loop, events.AbstractEventLoop)
  600. if self._loop is not None:
  601. self._loop.remove_signal_handler(signal.SIGCHLD)
  602. self._loop = loop
  603. if loop is not None:
  604. loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
  605. # Prevent a race condition in case a child terminated
  606. # during the switch.
  607. self._do_waitpid_all()
  608. def _sig_chld(self):
  609. try:
  610. self._do_waitpid_all()
  611. except Exception as exc:
  612. # self._loop should always be available here
  613. # as '_sig_chld' is added as a signal handler
  614. # in 'attach_loop'
  615. self._loop.call_exception_handler({
  616. 'message': 'Unknown exception in SIGCHLD handler',
  617. 'exception': exc,
  618. })
  619. def _compute_returncode(self, status):
  620. if os.WIFSIGNALED(status):
  621. # The child process died because of a signal.
  622. return -os.WTERMSIG(status)
  623. elif os.WIFEXITED(status):
  624. # The child process exited (e.g sys.exit()).
  625. return os.WEXITSTATUS(status)
  626. else:
  627. # The child exited, but we don't understand its status.
  628. # This shouldn't happen, but if it does, let's just
  629. # return that status; perhaps that helps debug it.
  630. return status
  631. class SafeChildWatcher(BaseChildWatcher):
  632. """'Safe' child watcher implementation.
  633. This implementation avoids disrupting other code spawning processes by
  634. polling explicitly each process in the SIGCHLD handler instead of calling
  635. os.waitpid(-1).
  636. This is a safe solution but it has a significant overhead when handling a
  637. big number of children (O(n) each time SIGCHLD is raised)
  638. """
  639. def __init__(self):
  640. super().__init__()
  641. self._callbacks = {}
  642. def close(self):
  643. self._callbacks.clear()
  644. super().close()
  645. def __enter__(self):
  646. return self
  647. def __exit__(self, a, b, c):
  648. pass
  649. def add_child_handler(self, pid, callback, *args):
  650. self._callbacks[pid] = (callback, args)
  651. # Prevent a race condition in case the child is already terminated.
  652. self._do_waitpid(pid)
  653. def remove_child_handler(self, pid):
  654. try:
  655. del self._callbacks[pid]
  656. return True
  657. except KeyError:
  658. return False
  659. def _do_waitpid_all(self):
  660. for pid in list(self._callbacks):
  661. self._do_waitpid(pid)
  662. def _do_waitpid(self, expected_pid):
  663. assert expected_pid > 0
  664. try:
  665. pid, status = os.waitpid(expected_pid, os.WNOHANG)
  666. except ChildProcessError:
  667. # The child process is already reaped
  668. # (may happen if waitpid() is called elsewhere).
  669. pid = expected_pid
  670. returncode = 255
  671. logger.warning(
  672. "Unknown child process pid %d, will report returncode 255",
  673. pid)
  674. else:
  675. if pid == 0:
  676. # The child process is still alive.
  677. return
  678. returncode = self._compute_returncode(status)
  679. if self._loop.get_debug():
  680. logger.debug('process %s exited with returncode %s',
  681. expected_pid, returncode)
  682. try:
  683. callback, args = self._callbacks.pop(pid)
  684. except KeyError: # pragma: no cover
  685. # May happen if .remove_child_handler() is called
  686. # after os.waitpid() returns.
  687. if self._loop.get_debug():
  688. logger.warning("Child watcher got an unexpected pid: %r",
  689. pid, exc_info=True)
  690. else:
  691. callback(pid, returncode, *args)
  692. class FastChildWatcher(BaseChildWatcher):
  693. """'Fast' child watcher implementation.
  694. This implementation reaps every terminated processes by calling
  695. os.waitpid(-1) directly, possibly breaking other code spawning processes
  696. and waiting for their termination.
  697. There is no noticeable overhead when handling a big number of children
  698. (O(1) each time a child terminates).
  699. """
  700. def __init__(self):
  701. super().__init__()
  702. self._callbacks = {}
  703. self._lock = threading.Lock()
  704. self._zombies = {}
  705. self._forks = 0
  706. def close(self):
  707. self._callbacks.clear()
  708. self._zombies.clear()
  709. super().close()
  710. def __enter__(self):
  711. with self._lock:
  712. self._forks += 1
  713. return self
  714. def __exit__(self, a, b, c):
  715. with self._lock:
  716. self._forks -= 1
  717. if self._forks or not self._zombies:
  718. return
  719. collateral_victims = str(self._zombies)
  720. self._zombies.clear()
  721. logger.warning(
  722. "Caught subprocesses termination from unknown pids: %s",
  723. collateral_victims)
  724. def add_child_handler(self, pid, callback, *args):
  725. assert self._forks, "Must use the context manager"
  726. with self._lock:
  727. try:
  728. returncode = self._zombies.pop(pid)
  729. except KeyError:
  730. # The child is running.
  731. self._callbacks[pid] = callback, args
  732. return
  733. # The child is dead already. We can fire the callback.
  734. callback(pid, returncode, *args)
  735. def remove_child_handler(self, pid):
  736. try:
  737. del self._callbacks[pid]
  738. return True
  739. except KeyError:
  740. return False
  741. def _do_waitpid_all(self):
  742. # Because of signal coalescing, we must keep calling waitpid() as
  743. # long as we're able to reap a child.
  744. while True:
  745. try:
  746. pid, status = os.waitpid(-1, os.WNOHANG)
  747. except ChildProcessError:
  748. # No more child processes exist.
  749. return
  750. else:
  751. if pid == 0:
  752. # A child process is still alive.
  753. return
  754. returncode = self._compute_returncode(status)
  755. with self._lock:
  756. try:
  757. callback, args = self._callbacks.pop(pid)
  758. except KeyError:
  759. # unknown child
  760. if self._forks:
  761. # It may not be registered yet.
  762. self._zombies[pid] = returncode
  763. if self._loop.get_debug():
  764. logger.debug('unknown process %s exited '
  765. 'with returncode %s',
  766. pid, returncode)
  767. continue
  768. callback = None
  769. else:
  770. if self._loop.get_debug():
  771. logger.debug('process %s exited with returncode %s',
  772. pid, returncode)
  773. if callback is None:
  774. logger.warning(
  775. "Caught subprocess termination from unknown pid: "
  776. "%d -> %d", pid, returncode)
  777. else:
  778. callback(pid, returncode, *args)
  779. class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  780. """UNIX event loop policy with a watcher for child processes."""
  781. _loop_factory = _UnixSelectorEventLoop
  782. def __init__(self):
  783. super().__init__()
  784. self._watcher = None
  785. def _init_watcher(self):
  786. with events._lock:
  787. if self._watcher is None: # pragma: no branch
  788. self._watcher = SafeChildWatcher()
  789. if isinstance(threading.current_thread(),
  790. threading._MainThread):
  791. self._watcher.attach_loop(self._local._loop)
  792. def set_event_loop(self, loop):
  793. """Set the event loop.
  794. As a side effect, if a child watcher was set before, then calling
  795. .set_event_loop() from the main thread will call .attach_loop(loop) on
  796. the child watcher.
  797. """
  798. super().set_event_loop(loop)
  799. if self._watcher is not None and \
  800. isinstance(threading.current_thread(), threading._MainThread):
  801. self._watcher.attach_loop(loop)
  802. def get_child_watcher(self):
  803. """Get the watcher for child processes.
  804. If not yet set, a SafeChildWatcher object is automatically created.
  805. """
  806. if self._watcher is None:
  807. self._init_watcher()
  808. return self._watcher
  809. def set_child_watcher(self, watcher):
  810. """Set the watcher for child processes."""
  811. assert watcher is None or isinstance(watcher, AbstractChildWatcher)
  812. if self._watcher is not None:
  813. self._watcher.close()
  814. self._watcher = watcher
  815. SelectorEventLoop = _UnixSelectorEventLoop
  816. DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy