subprocess.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. __all__ = ['create_subprocess_exec', 'create_subprocess_shell']
  2. import collections
  3. import subprocess
  4. from . import events
  5. from . import futures
  6. from . import protocols
  7. from . import streams
  8. from . import tasks
  9. from .coroutines import coroutine
  10. from .log import logger
  11. PIPE = subprocess.PIPE
  12. STDOUT = subprocess.STDOUT
  13. DEVNULL = subprocess.DEVNULL
  14. class SubprocessStreamProtocol(streams.FlowControlMixin,
  15. protocols.SubprocessProtocol):
  16. """Like StreamReaderProtocol, but for a subprocess."""
  17. def __init__(self, limit, loop):
  18. super().__init__(loop=loop)
  19. self._limit = limit
  20. self.stdin = self.stdout = self.stderr = None
  21. self._transport = None
  22. def __repr__(self):
  23. info = [self.__class__.__name__]
  24. if self.stdin is not None:
  25. info.append('stdin=%r' % self.stdin)
  26. if self.stdout is not None:
  27. info.append('stdout=%r' % self.stdout)
  28. if self.stderr is not None:
  29. info.append('stderr=%r' % self.stderr)
  30. return '<%s>' % ' '.join(info)
  31. def connection_made(self, transport):
  32. self._transport = transport
  33. stdout_transport = transport.get_pipe_transport(1)
  34. if stdout_transport is not None:
  35. self.stdout = streams.StreamReader(limit=self._limit,
  36. loop=self._loop)
  37. self.stdout.set_transport(stdout_transport)
  38. stderr_transport = transport.get_pipe_transport(2)
  39. if stderr_transport is not None:
  40. self.stderr = streams.StreamReader(limit=self._limit,
  41. loop=self._loop)
  42. self.stderr.set_transport(stderr_transport)
  43. stdin_transport = transport.get_pipe_transport(0)
  44. if stdin_transport is not None:
  45. self.stdin = streams.StreamWriter(stdin_transport,
  46. protocol=self,
  47. reader=None,
  48. loop=self._loop)
  49. def pipe_data_received(self, fd, data):
  50. if fd == 1:
  51. reader = self.stdout
  52. elif fd == 2:
  53. reader = self.stderr
  54. else:
  55. reader = None
  56. if reader is not None:
  57. reader.feed_data(data)
  58. def pipe_connection_lost(self, fd, exc):
  59. if fd == 0:
  60. pipe = self.stdin
  61. if pipe is not None:
  62. pipe.close()
  63. self.connection_lost(exc)
  64. return
  65. if fd == 1:
  66. reader = self.stdout
  67. elif fd == 2:
  68. reader = self.stderr
  69. else:
  70. reader = None
  71. if reader != None:
  72. if exc is None:
  73. reader.feed_eof()
  74. else:
  75. reader.set_exception(exc)
  76. def process_exited(self):
  77. self._transport.close()
  78. self._transport = None
  79. class Process:
  80. def __init__(self, transport, protocol, loop):
  81. self._transport = transport
  82. self._protocol = protocol
  83. self._loop = loop
  84. self.stdin = protocol.stdin
  85. self.stdout = protocol.stdout
  86. self.stderr = protocol.stderr
  87. self.pid = transport.get_pid()
  88. def __repr__(self):
  89. return '<%s %s>' % (self.__class__.__name__, self.pid)
  90. @property
  91. def returncode(self):
  92. return self._transport.get_returncode()
  93. @coroutine
  94. def wait(self):
  95. """Wait until the process exit and return the process return code.
  96. This method is a coroutine."""
  97. return (yield from self._transport._wait())
  98. def send_signal(self, signal):
  99. self._transport.send_signal(signal)
  100. def terminate(self):
  101. self._transport.terminate()
  102. def kill(self):
  103. self._transport.kill()
  104. @coroutine
  105. def _feed_stdin(self, input):
  106. debug = self._loop.get_debug()
  107. self.stdin.write(input)
  108. if debug:
  109. logger.debug('%r communicate: feed stdin (%s bytes)',
  110. self, len(input))
  111. try:
  112. yield from self.stdin.drain()
  113. except (BrokenPipeError, ConnectionResetError) as exc:
  114. # communicate() ignores BrokenPipeError and ConnectionResetError
  115. if debug:
  116. logger.debug('%r communicate: stdin got %r', self, exc)
  117. if debug:
  118. logger.debug('%r communicate: close stdin', self)
  119. self.stdin.close()
  120. @coroutine
  121. def _noop(self):
  122. return None
  123. @coroutine
  124. def _read_stream(self, fd):
  125. transport = self._transport.get_pipe_transport(fd)
  126. if fd == 2:
  127. stream = self.stderr
  128. else:
  129. assert fd == 1
  130. stream = self.stdout
  131. if self._loop.get_debug():
  132. name = 'stdout' if fd == 1 else 'stderr'
  133. logger.debug('%r communicate: read %s', self, name)
  134. output = yield from stream.read()
  135. if self._loop.get_debug():
  136. name = 'stdout' if fd == 1 else 'stderr'
  137. logger.debug('%r communicate: close %s', self, name)
  138. transport.close()
  139. return output
  140. @coroutine
  141. def communicate(self, input=None):
  142. if input:
  143. stdin = self._feed_stdin(input)
  144. else:
  145. stdin = self._noop()
  146. if self.stdout is not None:
  147. stdout = self._read_stream(1)
  148. else:
  149. stdout = self._noop()
  150. if self.stderr is not None:
  151. stderr = self._read_stream(2)
  152. else:
  153. stderr = self._noop()
  154. stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
  155. loop=self._loop)
  156. yield from self.wait()
  157. return (stdout, stderr)
  158. @coroutine
  159. def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
  160. loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
  161. if loop is None:
  162. loop = events.get_event_loop()
  163. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  164. loop=loop)
  165. transport, protocol = yield from loop.subprocess_shell(
  166. protocol_factory,
  167. cmd, stdin=stdin, stdout=stdout,
  168. stderr=stderr, **kwds)
  169. return Process(transport, protocol, loop)
  170. @coroutine
  171. def create_subprocess_exec(program, *args, stdin=None, stdout=None,
  172. stderr=None, loop=None,
  173. limit=streams._DEFAULT_LIMIT, **kwds):
  174. if loop is None:
  175. loop = events.get_event_loop()
  176. protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
  177. loop=loop)
  178. transport, protocol = yield from loop.subprocess_exec(
  179. protocol_factory,
  180. program, *args,
  181. stdin=stdin, stdout=stdout,
  182. stderr=stderr, **kwds)
  183. return Process(transport, protocol, loop)