windows_events.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. """Selector and proactor event loops for Windows."""
  2. import _winapi
  3. import errno
  4. import math
  5. import socket
  6. import struct
  7. import weakref
  8. from . import events
  9. from . import base_subprocess
  10. from . import futures
  11. from . import proactor_events
  12. from . import selector_events
  13. from . import tasks
  14. from . import windows_utils
  15. from . import _overlapped
  16. from .coroutines import coroutine
  17. from .log import logger
  18. __all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
  19. 'DefaultEventLoopPolicy',
  20. ]
  21. NULL = 0
  22. INFINITE = 0xffffffff
  23. ERROR_CONNECTION_REFUSED = 1225
  24. ERROR_CONNECTION_ABORTED = 1236
  25. # Initial delay in seconds for connect_pipe() before retrying to connect
  26. CONNECT_PIPE_INIT_DELAY = 0.001
  27. # Maximum delay in seconds for connect_pipe() before retrying to connect
  28. CONNECT_PIPE_MAX_DELAY = 0.100
  29. class _OverlappedFuture(futures.Future):
  30. """Subclass of Future which represents an overlapped operation.
  31. Cancelling it will immediately cancel the overlapped operation.
  32. """
  33. def __init__(self, ov, *, loop=None):
  34. super().__init__(loop=loop)
  35. if self._source_traceback:
  36. del self._source_traceback[-1]
  37. self._ov = ov
  38. def _repr_info(self):
  39. info = super()._repr_info()
  40. if self._ov is not None:
  41. state = 'pending' if self._ov.pending else 'completed'
  42. info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
  43. return info
  44. def _cancel_overlapped(self):
  45. if self._ov is None:
  46. return
  47. try:
  48. self._ov.cancel()
  49. except OSError as exc:
  50. context = {
  51. 'message': 'Cancelling an overlapped future failed',
  52. 'exception': exc,
  53. 'future': self,
  54. }
  55. if self._source_traceback:
  56. context['source_traceback'] = self._source_traceback
  57. self._loop.call_exception_handler(context)
  58. self._ov = None
  59. def cancel(self):
  60. self._cancel_overlapped()
  61. return super().cancel()
  62. def set_exception(self, exception):
  63. super().set_exception(exception)
  64. self._cancel_overlapped()
  65. def set_result(self, result):
  66. super().set_result(result)
  67. self._ov = None
  68. class _BaseWaitHandleFuture(futures.Future):
  69. """Subclass of Future which represents a wait handle."""
  70. def __init__(self, ov, handle, wait_handle, *, loop=None):
  71. super().__init__(loop=loop)
  72. if self._source_traceback:
  73. del self._source_traceback[-1]
  74. # Keep a reference to the Overlapped object to keep it alive until the
  75. # wait is unregistered
  76. self._ov = ov
  77. self._handle = handle
  78. self._wait_handle = wait_handle
  79. # Should we call UnregisterWaitEx() if the wait completes
  80. # or is cancelled?
  81. self._registered = True
  82. def _poll(self):
  83. # non-blocking wait: use a timeout of 0 millisecond
  84. return (_winapi.WaitForSingleObject(self._handle, 0) ==
  85. _winapi.WAIT_OBJECT_0)
  86. def _repr_info(self):
  87. info = super()._repr_info()
  88. info.append('handle=%#x' % self._handle)
  89. if self._handle is not None:
  90. state = 'signaled' if self._poll() else 'waiting'
  91. info.append(state)
  92. if self._wait_handle is not None:
  93. info.append('wait_handle=%#x' % self._wait_handle)
  94. return info
  95. def _unregister_wait_cb(self, fut):
  96. # The wait was unregistered: it's not safe to destroy the Overlapped
  97. # object
  98. self._ov = None
  99. def _unregister_wait(self):
  100. if not self._registered:
  101. return
  102. self._registered = False
  103. wait_handle = self._wait_handle
  104. self._wait_handle = None
  105. try:
  106. _overlapped.UnregisterWait(wait_handle)
  107. except OSError as exc:
  108. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  109. context = {
  110. 'message': 'Failed to unregister the wait handle',
  111. 'exception': exc,
  112. 'future': self,
  113. }
  114. if self._source_traceback:
  115. context['source_traceback'] = self._source_traceback
  116. self._loop.call_exception_handler(context)
  117. return
  118. # ERROR_IO_PENDING means that the unregister is pending
  119. self._unregister_wait_cb(None)
  120. def cancel(self):
  121. self._unregister_wait()
  122. return super().cancel()
  123. def set_exception(self, exception):
  124. self._unregister_wait()
  125. super().set_exception(exception)
  126. def set_result(self, result):
  127. self._unregister_wait()
  128. super().set_result(result)
  129. class _WaitCancelFuture(_BaseWaitHandleFuture):
  130. """Subclass of Future which represents a wait for the cancellation of a
  131. _WaitHandleFuture using an event.
  132. """
  133. def __init__(self, ov, event, wait_handle, *, loop=None):
  134. super().__init__(ov, event, wait_handle, loop=loop)
  135. self._done_callback = None
  136. def cancel(self):
  137. raise RuntimeError("_WaitCancelFuture must not be cancelled")
  138. def _schedule_callbacks(self):
  139. super(_WaitCancelFuture, self)._schedule_callbacks()
  140. if self._done_callback is not None:
  141. self._done_callback(self)
  142. class _WaitHandleFuture(_BaseWaitHandleFuture):
  143. def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
  144. super().__init__(ov, handle, wait_handle, loop=loop)
  145. self._proactor = proactor
  146. self._unregister_proactor = True
  147. self._event = _overlapped.CreateEvent(None, True, False, None)
  148. self._event_fut = None
  149. def _unregister_wait_cb(self, fut):
  150. if self._event is not None:
  151. _winapi.CloseHandle(self._event)
  152. self._event = None
  153. self._event_fut = None
  154. # If the wait was cancelled, the wait may never be signalled, so
  155. # it's required to unregister it. Otherwise, IocpProactor.close() will
  156. # wait forever for an event which will never come.
  157. #
  158. # If the IocpProactor already received the event, it's safe to call
  159. # _unregister() because we kept a reference to the Overlapped object
  160. # which is used as an unique key.
  161. self._proactor._unregister(self._ov)
  162. self._proactor = None
  163. super()._unregister_wait_cb(fut)
  164. def _unregister_wait(self):
  165. if not self._registered:
  166. return
  167. self._registered = False
  168. wait_handle = self._wait_handle
  169. self._wait_handle = None
  170. try:
  171. _overlapped.UnregisterWaitEx(wait_handle, self._event)
  172. except OSError as exc:
  173. if exc.winerror != _overlapped.ERROR_IO_PENDING:
  174. context = {
  175. 'message': 'Failed to unregister the wait handle',
  176. 'exception': exc,
  177. 'future': self,
  178. }
  179. if self._source_traceback:
  180. context['source_traceback'] = self._source_traceback
  181. self._loop.call_exception_handler(context)
  182. return
  183. # ERROR_IO_PENDING is not an error, the wait was unregistered
  184. self._event_fut = self._proactor._wait_cancel(self._event,
  185. self._unregister_wait_cb)
  186. class PipeServer(object):
  187. """Class representing a pipe server.
  188. This is much like a bound, listening socket.
  189. """
  190. def __init__(self, address):
  191. self._address = address
  192. self._free_instances = weakref.WeakSet()
  193. # initialize the pipe attribute before calling _server_pipe_handle()
  194. # because this function can raise an exception and the destructor calls
  195. # the close() method
  196. self._pipe = None
  197. self._accept_pipe_future = None
  198. self._pipe = self._server_pipe_handle(True)
  199. def _get_unconnected_pipe(self):
  200. # Create new instance and return previous one. This ensures
  201. # that (until the server is closed) there is always at least
  202. # one pipe handle for address. Therefore if a client attempt
  203. # to connect it will not fail with FileNotFoundError.
  204. tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
  205. return tmp
  206. def _server_pipe_handle(self, first):
  207. # Return a wrapper for a new pipe handle.
  208. if self.closed():
  209. return None
  210. flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
  211. if first:
  212. flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
  213. h = _winapi.CreateNamedPipe(
  214. self._address, flags,
  215. _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
  216. _winapi.PIPE_WAIT,
  217. _winapi.PIPE_UNLIMITED_INSTANCES,
  218. windows_utils.BUFSIZE, windows_utils.BUFSIZE,
  219. _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
  220. pipe = windows_utils.PipeHandle(h)
  221. self._free_instances.add(pipe)
  222. return pipe
  223. def closed(self):
  224. return (self._address is None)
  225. def close(self):
  226. if self._accept_pipe_future is not None:
  227. self._accept_pipe_future.cancel()
  228. self._accept_pipe_future = None
  229. # Close all instances which have not been connected to by a client.
  230. if self._address is not None:
  231. for pipe in self._free_instances:
  232. pipe.close()
  233. self._pipe = None
  234. self._address = None
  235. self._free_instances.clear()
  236. __del__ = close
  237. class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
  238. """Windows version of selector event loop."""
  239. def _socketpair(self):
  240. return windows_utils.socketpair()
  241. class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
  242. """Windows version of proactor event loop using IOCP."""
  243. def __init__(self, proactor=None):
  244. if proactor is None:
  245. proactor = IocpProactor()
  246. super().__init__(proactor)
  247. def _socketpair(self):
  248. return windows_utils.socketpair()
  249. @coroutine
  250. def create_pipe_connection(self, protocol_factory, address):
  251. f = self._proactor.connect_pipe(address)
  252. pipe = yield from f
  253. protocol = protocol_factory()
  254. trans = self._make_duplex_pipe_transport(pipe, protocol,
  255. extra={'addr': address})
  256. return trans, protocol
  257. @coroutine
  258. def start_serving_pipe(self, protocol_factory, address):
  259. server = PipeServer(address)
  260. def loop_accept_pipe(f=None):
  261. pipe = None
  262. try:
  263. if f:
  264. pipe = f.result()
  265. server._free_instances.discard(pipe)
  266. if server.closed():
  267. # A client connected before the server was closed:
  268. # drop the client (close the pipe) and exit
  269. pipe.close()
  270. return
  271. protocol = protocol_factory()
  272. self._make_duplex_pipe_transport(
  273. pipe, protocol, extra={'addr': address})
  274. pipe = server._get_unconnected_pipe()
  275. if pipe is None:
  276. return
  277. f = self._proactor.accept_pipe(pipe)
  278. except OSError as exc:
  279. if pipe and pipe.fileno() != -1:
  280. self.call_exception_handler({
  281. 'message': 'Pipe accept failed',
  282. 'exception': exc,
  283. 'pipe': pipe,
  284. })
  285. pipe.close()
  286. elif self._debug:
  287. logger.warning("Accept pipe failed on pipe %r",
  288. pipe, exc_info=True)
  289. except futures.CancelledError:
  290. if pipe:
  291. pipe.close()
  292. else:
  293. server._accept_pipe_future = f
  294. f.add_done_callback(loop_accept_pipe)
  295. self.call_soon(loop_accept_pipe)
  296. return [server]
  297. @coroutine
  298. def _make_subprocess_transport(self, protocol, args, shell,
  299. stdin, stdout, stderr, bufsize,
  300. extra=None, **kwargs):
  301. waiter = futures.Future(loop=self)
  302. transp = _WindowsSubprocessTransport(self, protocol, args, shell,
  303. stdin, stdout, stderr, bufsize,
  304. waiter=waiter, extra=extra,
  305. **kwargs)
  306. try:
  307. yield from waiter
  308. except Exception as exc:
  309. # Workaround CPython bug #23353: using yield/yield-from in an
  310. # except block of a generator doesn't clear properly sys.exc_info()
  311. err = exc
  312. else:
  313. err = None
  314. if err is not None:
  315. transp.close()
  316. yield from transp._wait()
  317. raise err
  318. return transp
  319. class IocpProactor:
  320. """Proactor implementation using IOCP."""
  321. def __init__(self, concurrency=0xffffffff):
  322. self._loop = None
  323. self._results = []
  324. self._iocp = _overlapped.CreateIoCompletionPort(
  325. _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
  326. self._cache = {}
  327. self._registered = weakref.WeakSet()
  328. self._unregistered = []
  329. self._stopped_serving = weakref.WeakSet()
  330. def __repr__(self):
  331. return ('<%s overlapped#=%s result#=%s>'
  332. % (self.__class__.__name__, len(self._cache),
  333. len(self._results)))
  334. def set_loop(self, loop):
  335. self._loop = loop
  336. def select(self, timeout=None):
  337. if not self._results:
  338. self._poll(timeout)
  339. tmp = self._results
  340. self._results = []
  341. return tmp
  342. def _result(self, value):
  343. fut = futures.Future(loop=self._loop)
  344. fut.set_result(value)
  345. return fut
  346. def recv(self, conn, nbytes, flags=0):
  347. self._register_with_iocp(conn)
  348. ov = _overlapped.Overlapped(NULL)
  349. try:
  350. if isinstance(conn, socket.socket):
  351. ov.WSARecv(conn.fileno(), nbytes, flags)
  352. else:
  353. ov.ReadFile(conn.fileno(), nbytes)
  354. except BrokenPipeError:
  355. return self._result(b'')
  356. def finish_recv(trans, key, ov):
  357. try:
  358. return ov.getresult()
  359. except OSError as exc:
  360. if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
  361. raise ConnectionResetError(*exc.args)
  362. else:
  363. raise
  364. return self._register(ov, conn, finish_recv)
  365. def send(self, conn, buf, flags=0):
  366. self._register_with_iocp(conn)
  367. ov = _overlapped.Overlapped(NULL)
  368. if isinstance(conn, socket.socket):
  369. ov.WSASend(conn.fileno(), buf, flags)
  370. else:
  371. ov.WriteFile(conn.fileno(), buf)
  372. def finish_send(trans, key, ov):
  373. try:
  374. return ov.getresult()
  375. except OSError as exc:
  376. if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
  377. raise ConnectionResetError(*exc.args)
  378. else:
  379. raise
  380. return self._register(ov, conn, finish_send)
  381. def accept(self, listener):
  382. self._register_with_iocp(listener)
  383. conn = self._get_accept_socket(listener.family)
  384. ov = _overlapped.Overlapped(NULL)
  385. ov.AcceptEx(listener.fileno(), conn.fileno())
  386. def finish_accept(trans, key, ov):
  387. ov.getresult()
  388. # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
  389. buf = struct.pack('@P', listener.fileno())
  390. conn.setsockopt(socket.SOL_SOCKET,
  391. _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
  392. conn.settimeout(listener.gettimeout())
  393. return conn, conn.getpeername()
  394. @coroutine
  395. def accept_coro(future, conn):
  396. # Coroutine closing the accept socket if the future is cancelled
  397. try:
  398. yield from future
  399. except futures.CancelledError:
  400. conn.close()
  401. raise
  402. future = self._register(ov, listener, finish_accept)
  403. coro = accept_coro(future, conn)
  404. tasks.async(coro, loop=self._loop)
  405. return future
  406. def connect(self, conn, address):
  407. self._register_with_iocp(conn)
  408. # The socket needs to be locally bound before we call ConnectEx().
  409. try:
  410. _overlapped.BindLocal(conn.fileno(), conn.family)
  411. except OSError as e:
  412. if e.winerror != errno.WSAEINVAL:
  413. raise
  414. # Probably already locally bound; check using getsockname().
  415. if conn.getsockname()[1] == 0:
  416. raise
  417. ov = _overlapped.Overlapped(NULL)
  418. ov.ConnectEx(conn.fileno(), address)
  419. def finish_connect(trans, key, ov):
  420. ov.getresult()
  421. # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
  422. conn.setsockopt(socket.SOL_SOCKET,
  423. _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
  424. return conn
  425. return self._register(ov, conn, finish_connect)
  426. def accept_pipe(self, pipe):
  427. self._register_with_iocp(pipe)
  428. ov = _overlapped.Overlapped(NULL)
  429. connected = ov.ConnectNamedPipe(pipe.fileno())
  430. if connected:
  431. # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
  432. # that the pipe is connected. There is no need to wait for the
  433. # completion of the connection.
  434. return self._result(pipe)
  435. def finish_accept_pipe(trans, key, ov):
  436. ov.getresult()
  437. return pipe
  438. return self._register(ov, pipe, finish_accept_pipe)
  439. @coroutine
  440. def connect_pipe(self, address):
  441. delay = CONNECT_PIPE_INIT_DELAY
  442. while True:
  443. # Unfortunately there is no way to do an overlapped connect to a pipe.
  444. # Call CreateFile() in a loop until it doesn't fail with
  445. # ERROR_PIPE_BUSY
  446. try:
  447. handle = _overlapped.ConnectPipe(address)
  448. break
  449. except OSError as exc:
  450. if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
  451. raise
  452. # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
  453. delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
  454. yield from tasks.sleep(delay, loop=self._loop)
  455. return windows_utils.PipeHandle(handle)
  456. def wait_for_handle(self, handle, timeout=None):
  457. """Wait for a handle.
  458. Return a Future object. The result of the future is True if the wait
  459. completed, or False if the wait did not complete (on timeout).
  460. """
  461. return self._wait_for_handle(handle, timeout, False)
  462. def _wait_cancel(self, event, done_callback):
  463. fut = self._wait_for_handle(event, None, True)
  464. # add_done_callback() cannot be used because the wait may only complete
  465. # in IocpProactor.close(), while the event loop is not running.
  466. fut._done_callback = done_callback
  467. return fut
  468. def _wait_for_handle(self, handle, timeout, _is_cancel):
  469. if timeout is None:
  470. ms = _winapi.INFINITE
  471. else:
  472. # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
  473. # round away from zero to wait *at least* timeout seconds.
  474. ms = math.ceil(timeout * 1e3)
  475. # We only create ov so we can use ov.address as a key for the cache.
  476. ov = _overlapped.Overlapped(NULL)
  477. wait_handle = _overlapped.RegisterWaitWithQueue(
  478. handle, self._iocp, ov.address, ms)
  479. if _is_cancel:
  480. f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
  481. else:
  482. f = _WaitHandleFuture(ov, handle, wait_handle, self,
  483. loop=self._loop)
  484. if f._source_traceback:
  485. del f._source_traceback[-1]
  486. def finish_wait_for_handle(trans, key, ov):
  487. # Note that this second wait means that we should only use
  488. # this with handles types where a successful wait has no
  489. # effect. So events or processes are all right, but locks
  490. # or semaphores are not. Also note if the handle is
  491. # signalled and then quickly reset, then we may return
  492. # False even though we have not timed out.
  493. return f._poll()
  494. self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
  495. return f
  496. def _register_with_iocp(self, obj):
  497. # To get notifications of finished ops on this objects sent to the
  498. # completion port, were must register the handle.
  499. if obj not in self._registered:
  500. self._registered.add(obj)
  501. _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
  502. # XXX We could also use SetFileCompletionNotificationModes()
  503. # to avoid sending notifications to completion port of ops
  504. # that succeed immediately.
  505. def _register(self, ov, obj, callback):
  506. # Return a future which will be set with the result of the
  507. # operation when it completes. The future's value is actually
  508. # the value returned by callback().
  509. f = _OverlappedFuture(ov, loop=self._loop)
  510. if f._source_traceback:
  511. del f._source_traceback[-1]
  512. if not ov.pending:
  513. # The operation has completed, so no need to postpone the
  514. # work. We cannot take this short cut if we need the
  515. # NumberOfBytes, CompletionKey values returned by
  516. # PostQueuedCompletionStatus().
  517. try:
  518. value = callback(None, None, ov)
  519. except OSError as e:
  520. f.set_exception(e)
  521. else:
  522. f.set_result(value)
  523. # Even if GetOverlappedResult() was called, we have to wait for the
  524. # notification of the completion in GetQueuedCompletionStatus().
  525. # Register the overlapped operation to keep a reference to the
  526. # OVERLAPPED object, otherwise the memory is freed and Windows may
  527. # read uninitialized memory.
  528. # Register the overlapped operation for later. Note that
  529. # we only store obj to prevent it from being garbage
  530. # collected too early.
  531. self._cache[ov.address] = (f, ov, obj, callback)
  532. return f
  533. def _unregister(self, ov):
  534. """Unregister an overlapped object.
  535. Call this method when its future has been cancelled. The event can
  536. already be signalled (pending in the proactor event queue). It is also
  537. safe if the event is never signalled (because it was cancelled).
  538. """
  539. self._unregistered.append(ov)
  540. def _get_accept_socket(self, family):
  541. s = socket.socket(family)
  542. s.settimeout(0)
  543. return s
  544. def _poll(self, timeout=None):
  545. if timeout is None:
  546. ms = INFINITE
  547. elif timeout < 0:
  548. raise ValueError("negative timeout")
  549. else:
  550. # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
  551. # round away from zero to wait *at least* timeout seconds.
  552. ms = math.ceil(timeout * 1e3)
  553. if ms >= INFINITE:
  554. raise ValueError("timeout too big")
  555. while True:
  556. status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
  557. if status is None:
  558. break
  559. ms = 0
  560. err, transferred, key, address = status
  561. try:
  562. f, ov, obj, callback = self._cache.pop(address)
  563. except KeyError:
  564. if self._loop.get_debug():
  565. self._loop.call_exception_handler({
  566. 'message': ('GetQueuedCompletionStatus() returned an '
  567. 'unexpected event'),
  568. 'status': ('err=%s transferred=%s key=%#x address=%#x'
  569. % (err, transferred, key, address)),
  570. })
  571. # key is either zero, or it is used to return a pipe
  572. # handle which should be closed to avoid a leak.
  573. if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
  574. _winapi.CloseHandle(key)
  575. continue
  576. if obj in self._stopped_serving:
  577. f.cancel()
  578. # Don't call the callback if _register() already read the result or
  579. # if the overlapped has been cancelled
  580. elif not f.done():
  581. try:
  582. value = callback(transferred, key, ov)
  583. except OSError as e:
  584. f.set_exception(e)
  585. self._results.append(f)
  586. else:
  587. f.set_result(value)
  588. self._results.append(f)
  589. # Remove unregisted futures
  590. for ov in self._unregistered:
  591. self._cache.pop(ov.address, None)
  592. self._unregistered.clear()
  593. def _stop_serving(self, obj):
  594. # obj is a socket or pipe handle. It will be closed in
  595. # BaseProactorEventLoop._stop_serving() which will make any
  596. # pending operations fail quickly.
  597. self._stopped_serving.add(obj)
  598. def close(self):
  599. # Cancel remaining registered operations.
  600. for address, (fut, ov, obj, callback) in list(self._cache.items()):
  601. if fut.cancelled():
  602. # Nothing to do with cancelled futures
  603. pass
  604. elif isinstance(fut, _WaitCancelFuture):
  605. # _WaitCancelFuture must not be cancelled
  606. pass
  607. else:
  608. try:
  609. fut.cancel()
  610. except OSError as exc:
  611. if self._loop is not None:
  612. context = {
  613. 'message': 'Cancelling a future failed',
  614. 'exception': exc,
  615. 'future': fut,
  616. }
  617. if fut._source_traceback:
  618. context['source_traceback'] = fut._source_traceback
  619. self._loop.call_exception_handler(context)
  620. while self._cache:
  621. if not self._poll(1):
  622. logger.debug('taking long time to close proactor')
  623. self._results = []
  624. if self._iocp is not None:
  625. _winapi.CloseHandle(self._iocp)
  626. self._iocp = None
  627. def __del__(self):
  628. self.close()
  629. class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
  630. def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
  631. self._proc = windows_utils.Popen(
  632. args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
  633. bufsize=bufsize, **kwargs)
  634. def callback(f):
  635. returncode = self._proc.poll()
  636. self._process_exited(returncode)
  637. f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
  638. f.add_done_callback(callback)
  639. SelectorEventLoop = _WindowsSelectorEventLoop
  640. class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
  641. _loop_factory = SelectorEventLoop
  642. DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy