trigger.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. ##############################################################################
  2. #
  3. # Copyright (c) 2001-2005 Zope Foundation and Contributors.
  4. # All Rights Reserved.
  5. #
  6. # This software is subject to the provisions of the Zope Public License,
  7. # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
  8. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
  9. # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  10. # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
  11. # FOR A PARTICULAR PURPOSE
  12. #
  13. ##############################################################################
  14. import errno
  15. import os
  16. import socket
  17. import threading
  18. from . import wasyncore
  19. # Wake up a call to select() running in the main thread.
  20. #
  21. # This is useful in a context where you are using Medusa's I/O
  22. # subsystem to deliver data, but the data is generated by another
  23. # thread. Normally, if Medusa is in the middle of a call to
  24. # select(), new output data generated by another thread will have
  25. # to sit until the call to select() either times out or returns.
  26. # If the trigger is 'pulled' by another thread, it should immediately
  27. # generate a READ event on the trigger object, which will force the
  28. # select() invocation to return.
  29. #
  30. # A common use for this facility: letting Medusa manage I/O for a
  31. # large number of connections; but routing each request through a
  32. # thread chosen from a fixed-size thread pool. When a thread is
  33. # acquired, a transaction is performed, but output data is
  34. # accumulated into buffers that will be emptied more efficiently
  35. # by Medusa. [picture a server that can process database queries
  36. # rapidly, but doesn't want to tie up threads waiting to send data
  37. # to low-bandwidth connections]
  38. #
  39. # The other major feature provided by this class is the ability to
  40. # move work back into the main thread: if you call pull_trigger()
  41. # with a thunk argument, when select() wakes up and receives the
  42. # event it will call your thunk from within that thread. The main
  43. # purpose of this is to remove the need to wrap thread locks around
  44. # Medusa's data structures, which normally do not need them. [To see
  45. # why this is true, imagine this scenario: A thread tries to push some
  46. # new data onto a channel's outgoing data queue at the same time that
  47. # the main thread is trying to remove some]
  48. class _triggerbase:
  49. """OS-independent base class for OS-dependent trigger class."""
  50. kind = None # subclass must set to "pipe" or "loopback"; used by repr
  51. def __init__(self):
  52. self._closed = False
  53. # `lock` protects the `thunks` list from being traversed and
  54. # appended to simultaneously.
  55. self.lock = threading.Lock()
  56. # List of no-argument callbacks to invoke when the trigger is
  57. # pulled. These run in the thread running the wasyncore mainloop,
  58. # regardless of which thread pulls the trigger.
  59. self.thunks = []
  60. def readable(self):
  61. return True
  62. def writable(self):
  63. return False
  64. def handle_connect(self):
  65. pass
  66. def handle_close(self):
  67. self.close()
  68. # Override the wasyncore close() method, because it doesn't know about
  69. # (so can't close) all the gimmicks we have open. Subclass must
  70. # supply a _close() method to do platform-specific closing work. _close()
  71. # will be called iff we're not already closed.
  72. def close(self):
  73. if not self._closed:
  74. self._closed = True
  75. self.del_channel()
  76. self._close() # subclass does OS-specific stuff
  77. def pull_trigger(self, thunk=None):
  78. if thunk:
  79. with self.lock:
  80. self.thunks.append(thunk)
  81. self._physical_pull()
  82. def handle_read(self):
  83. try:
  84. self.recv(8192)
  85. except OSError:
  86. return
  87. with self.lock:
  88. for thunk in self.thunks:
  89. try:
  90. thunk()
  91. except:
  92. nil, t, v, tbinfo = wasyncore.compact_traceback()
  93. self.log_info(f"exception in trigger thunk: ({t}:{v} {tbinfo})")
  94. self.thunks = []
  95. if os.name == "posix":
  96. class trigger(_triggerbase, wasyncore.file_dispatcher):
  97. kind = "pipe"
  98. def __init__(self, map):
  99. _triggerbase.__init__(self)
  100. r, self.trigger = self._fds = os.pipe()
  101. wasyncore.file_dispatcher.__init__(self, r, map=map)
  102. def _close(self):
  103. for fd in self._fds:
  104. os.close(fd)
  105. self._fds = []
  106. wasyncore.file_dispatcher.close(self)
  107. def _physical_pull(self):
  108. os.write(self.trigger, b"x")
  109. else: # pragma: no cover
  110. # Windows version; uses just sockets, because a pipe isn't select'able
  111. # on Windows.
  112. class trigger(_triggerbase, wasyncore.dispatcher):
  113. kind = "loopback"
  114. def __init__(self, map):
  115. _triggerbase.__init__(self)
  116. # Get a pair of connected sockets. The trigger is the 'w'
  117. # end of the pair, which is connected to 'r'. 'r' is put
  118. # in the wasyncore socket map. "pulling the trigger" then
  119. # means writing something on w, which will wake up r.
  120. w = socket.socket()
  121. # Disable buffering -- pulling the trigger sends 1 byte,
  122. # and we want that sent immediately, to wake up wasyncore's
  123. # select() ASAP.
  124. w.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  125. count = 0
  126. while True:
  127. count += 1
  128. # Bind to a local port; for efficiency, let the OS pick
  129. # a free port for us.
  130. # Unfortunately, stress tests showed that we may not
  131. # be able to connect to that port ("Address already in
  132. # use") despite that the OS picked it. This appears
  133. # to be a race bug in the Windows socket implementation.
  134. # So we loop until a connect() succeeds (almost always
  135. # on the first try). See the long thread at
  136. # http://mail.zope.org/pipermail/zope/2005-July/160433.html
  137. # for hideous details.
  138. a = socket.socket()
  139. a.bind(("127.0.0.1", 0))
  140. connect_address = a.getsockname() # assigned (host, port) pair
  141. a.listen(1)
  142. try:
  143. w.connect(connect_address)
  144. break # success
  145. except OSError as detail:
  146. if getattr(detail, "winerror", None) != errno.WSAEADDRINUSE:
  147. # "Address already in use" is the only error
  148. # I've seen on two WinXP Pro SP2 boxes, under
  149. # Pythons 2.3.5 and 2.4.1.
  150. raise
  151. # (10048, 'Address already in use')
  152. # assert count <= 2 # never triggered in Tim's tests
  153. if count >= 10: # I've never seen it go above 2
  154. a.close()
  155. w.close()
  156. raise RuntimeError("Cannot bind trigger!")
  157. # Close `a` and try again. Note: I originally put a short
  158. # sleep() here, but it didn't appear to help or hurt.
  159. a.close()
  160. r, addr = a.accept() # r becomes wasyncore's (self.)socket
  161. a.close()
  162. self.trigger = w
  163. wasyncore.dispatcher.__init__(self, r, map=map)
  164. def _close(self):
  165. # self.socket is r, and self.trigger is w, from __init__
  166. self.socket.close()
  167. self.trigger.close()
  168. def _physical_pull(self):
  169. self.trigger.send(b"x")