server.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. ##############################################################################
  2. #
  3. # Copyright (c) 2001, 2002 Zope Foundation and Contributors.
  4. # All Rights Reserved.
  5. #
  6. # This software is subject to the provisions of the Zope Public License,
  7. # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
  8. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
  9. # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  10. # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
  11. # FOR A PARTICULAR PURPOSE.
  12. #
  13. ##############################################################################
  14. import os
  15. import os.path
  16. import socket
  17. import time
  18. from waitress import trigger
  19. from waitress.adjustments import Adjustments
  20. from waitress.channel import HTTPChannel
  21. from waitress.compat import IPPROTO_IPV6, IPV6_V6ONLY
  22. from waitress.task import ThreadedTaskDispatcher
  23. from waitress.utilities import cleanup_unix_socket
  24. from . import wasyncore
  25. from .proxy_headers import proxy_headers_middleware
  26. def create_server(
  27. application,
  28. map=None,
  29. _start=True, # test shim
  30. _sock=None, # test shim
  31. _dispatcher=None, # test shim
  32. **kw # adjustments
  33. ):
  34. """
  35. if __name__ == '__main__':
  36. server = create_server(app)
  37. server.run()
  38. """
  39. if application is None:
  40. raise ValueError(
  41. 'The "app" passed to ``create_server`` was ``None``. You forgot '
  42. "to return a WSGI app within your application."
  43. )
  44. adj = Adjustments(**kw)
  45. if map is None: # pragma: nocover
  46. map = {}
  47. dispatcher = _dispatcher
  48. if dispatcher is None:
  49. dispatcher = ThreadedTaskDispatcher()
  50. dispatcher.set_thread_count(adj.threads)
  51. if adj.unix_socket and hasattr(socket, "AF_UNIX"):
  52. sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None)
  53. return UnixWSGIServer(
  54. application,
  55. map,
  56. _start,
  57. _sock,
  58. dispatcher=dispatcher,
  59. adj=adj,
  60. sockinfo=sockinfo,
  61. )
  62. effective_listen = []
  63. last_serv = None
  64. if not adj.sockets:
  65. for sockinfo in adj.listen:
  66. # When TcpWSGIServer is called, it registers itself in the map. This
  67. # side-effect is all we need it for, so we don't store a reference to
  68. # or return it to the user.
  69. last_serv = TcpWSGIServer(
  70. application,
  71. map,
  72. _start,
  73. _sock,
  74. dispatcher=dispatcher,
  75. adj=adj,
  76. sockinfo=sockinfo,
  77. )
  78. effective_listen.append(
  79. (last_serv.effective_host, last_serv.effective_port)
  80. )
  81. for sock in adj.sockets:
  82. sockinfo = (sock.family, sock.type, sock.proto, sock.getsockname())
  83. if sock.family == socket.AF_INET or sock.family == socket.AF_INET6:
  84. last_serv = TcpWSGIServer(
  85. application,
  86. map,
  87. _start,
  88. sock,
  89. dispatcher=dispatcher,
  90. adj=adj,
  91. bind_socket=False,
  92. sockinfo=sockinfo,
  93. )
  94. effective_listen.append(
  95. (last_serv.effective_host, last_serv.effective_port)
  96. )
  97. elif hasattr(socket, "AF_UNIX") and sock.family == socket.AF_UNIX:
  98. last_serv = UnixWSGIServer(
  99. application,
  100. map,
  101. _start,
  102. sock,
  103. dispatcher=dispatcher,
  104. adj=adj,
  105. bind_socket=False,
  106. sockinfo=sockinfo,
  107. )
  108. effective_listen.append(
  109. (last_serv.effective_host, last_serv.effective_port)
  110. )
  111. # We are running a single server, so we can just return the last server,
  112. # saves us from having to create one more object
  113. if len(effective_listen) == 1:
  114. # In this case we have no need to use a MultiSocketServer
  115. return last_serv
  116. log_info = last_serv.log_info
  117. # Return a class that has a utility function to print out the sockets it's
  118. # listening on, and has a .run() function. All of the TcpWSGIServers
  119. # registered themselves in the map above.
  120. return MultiSocketServer(map, adj, effective_listen, dispatcher, log_info)
  121. # This class is only ever used if we have multiple listen sockets. It allows
  122. # the serve() API to call .run() which starts the wasyncore loop, and catches
  123. # SystemExit/KeyboardInterrupt so that it can atempt to cleanly shut down.
  124. class MultiSocketServer:
  125. asyncore = wasyncore # test shim
  126. def __init__(
  127. self,
  128. map=None,
  129. adj=None,
  130. effective_listen=None,
  131. dispatcher=None,
  132. log_info=None,
  133. ):
  134. self.adj = adj
  135. self.map = map
  136. self.effective_listen = effective_listen
  137. self.task_dispatcher = dispatcher
  138. self.log_info = log_info
  139. def print_listen(self, format_str): # pragma: nocover
  140. for l in self.effective_listen:
  141. l = list(l)
  142. if ":" in l[0]:
  143. l[0] = f"[{l[0]}]"
  144. self.log_info(format_str.format(*l))
  145. def run(self):
  146. try:
  147. self.asyncore.loop(
  148. timeout=self.adj.asyncore_loop_timeout,
  149. map=self.map,
  150. use_poll=self.adj.asyncore_use_poll,
  151. )
  152. except (SystemExit, KeyboardInterrupt):
  153. self.close()
  154. def close(self):
  155. self.task_dispatcher.shutdown()
  156. wasyncore.close_all(self.map)
  157. class BaseWSGIServer(wasyncore.dispatcher):
  158. channel_class = HTTPChannel
  159. next_channel_cleanup = 0
  160. socketmod = socket # test shim
  161. asyncore = wasyncore # test shim
  162. in_connection_overflow = False
  163. def __init__(
  164. self,
  165. application,
  166. map=None,
  167. _start=True, # test shim
  168. _sock=None, # test shim
  169. dispatcher=None, # dispatcher
  170. adj=None, # adjustments
  171. sockinfo=None, # opaque object
  172. bind_socket=True,
  173. **kw
  174. ):
  175. if adj is None:
  176. adj = Adjustments(**kw)
  177. if adj.trusted_proxy or adj.clear_untrusted_proxy_headers:
  178. # wrap the application to deal with proxy headers
  179. # we wrap it here because webtest subclasses the TcpWSGIServer
  180. # directly and thus doesn't run any code that's in create_server
  181. application = proxy_headers_middleware(
  182. application,
  183. trusted_proxy=adj.trusted_proxy,
  184. trusted_proxy_count=adj.trusted_proxy_count,
  185. trusted_proxy_headers=adj.trusted_proxy_headers,
  186. clear_untrusted=adj.clear_untrusted_proxy_headers,
  187. log_untrusted=adj.log_untrusted_proxy_headers,
  188. logger=self.logger,
  189. )
  190. if map is None:
  191. # use a nonglobal socket map by default to hopefully prevent
  192. # conflicts with apps and libs that use the wasyncore global socket
  193. # map ala https://github.com/Pylons/waitress/issues/63
  194. map = {}
  195. if sockinfo is None:
  196. sockinfo = adj.listen[0]
  197. self.sockinfo = sockinfo
  198. self.family = sockinfo[0]
  199. self.socktype = sockinfo[1]
  200. self.application = application
  201. self.adj = adj
  202. self.trigger = trigger.trigger(map)
  203. if dispatcher is None:
  204. dispatcher = ThreadedTaskDispatcher()
  205. dispatcher.set_thread_count(self.adj.threads)
  206. self.task_dispatcher = dispatcher
  207. self.asyncore.dispatcher.__init__(self, _sock, map=map)
  208. if _sock is None:
  209. self.create_socket(self.family, self.socktype)
  210. if self.family == socket.AF_INET6: # pragma: nocover
  211. self.socket.setsockopt(IPPROTO_IPV6, IPV6_V6ONLY, 1)
  212. self.set_reuse_addr()
  213. if bind_socket:
  214. self.bind_server_socket()
  215. self.effective_host, self.effective_port = self.getsockname()
  216. self.server_name = adj.server_name
  217. self.active_channels = {}
  218. if _start:
  219. self.accept_connections()
  220. def bind_server_socket(self):
  221. raise NotImplementedError # pragma: no cover
  222. def getsockname(self):
  223. raise NotImplementedError # pragma: no cover
  224. def accept_connections(self):
  225. self.accepting = True
  226. self.socket.listen(self.adj.backlog) # Get around asyncore NT limit
  227. def add_task(self, task):
  228. self.task_dispatcher.add_task(task)
  229. def readable(self):
  230. now = time.time()
  231. if now >= self.next_channel_cleanup:
  232. self.next_channel_cleanup = now + self.adj.cleanup_interval
  233. self.maintenance(now)
  234. if self.accepting:
  235. if (
  236. not self.in_connection_overflow
  237. and len(self._map) >= self.adj.connection_limit
  238. ):
  239. self.in_connection_overflow = True
  240. self.logger.warning(
  241. "total open connections reached the connection limit, "
  242. "no longer accepting new connections"
  243. )
  244. elif (
  245. self.in_connection_overflow
  246. and len(self._map) < self.adj.connection_limit
  247. ):
  248. self.in_connection_overflow = False
  249. self.logger.info(
  250. "total open connections dropped below the connection limit, "
  251. "listening again"
  252. )
  253. return not self.in_connection_overflow
  254. return False
  255. def writable(self):
  256. return False
  257. def handle_read(self):
  258. pass
  259. def handle_connect(self):
  260. pass
  261. def handle_accept(self):
  262. try:
  263. v = self.accept()
  264. if v is None:
  265. return
  266. conn, addr = v
  267. except OSError:
  268. # Linux: On rare occasions we get a bogus socket back from
  269. # accept. socketmodule.c:makesockaddr complains that the
  270. # address family is unknown. We don't want the whole server
  271. # to shut down because of this.
  272. if self.adj.log_socket_errors:
  273. self.logger.warning("server accept() threw an exception", exc_info=True)
  274. return
  275. self.set_socket_options(conn)
  276. addr = self.fix_addr(addr)
  277. self.channel_class(self, conn, addr, self.adj, map=self._map)
  278. def run(self):
  279. try:
  280. self.asyncore.loop(
  281. timeout=self.adj.asyncore_loop_timeout,
  282. map=self._map,
  283. use_poll=self.adj.asyncore_use_poll,
  284. )
  285. except (SystemExit, KeyboardInterrupt):
  286. self.task_dispatcher.shutdown()
  287. def pull_trigger(self):
  288. self.trigger.pull_trigger()
  289. def set_socket_options(self, conn):
  290. pass
  291. def fix_addr(self, addr):
  292. return addr
  293. def maintenance(self, now):
  294. """
  295. Closes channels that have not had any activity in a while.
  296. The timeout is configured through adj.channel_timeout (seconds).
  297. """
  298. cutoff = now - self.adj.channel_timeout
  299. for channel in self.active_channels.values():
  300. if (not channel.requests) and channel.last_activity < cutoff:
  301. channel.will_close = True
  302. def print_listen(self, format_str): # pragma: no cover
  303. self.log_info(format_str.format(self.effective_host, self.effective_port))
  304. def close(self):
  305. self.trigger.close()
  306. return wasyncore.dispatcher.close(self)
  307. class TcpWSGIServer(BaseWSGIServer):
  308. def bind_server_socket(self):
  309. (_, _, _, sockaddr) = self.sockinfo
  310. self.bind(sockaddr)
  311. def getsockname(self):
  312. # Return the IP address, port as numeric
  313. return self.socketmod.getnameinfo(
  314. self.socket.getsockname(),
  315. self.socketmod.NI_NUMERICHOST | self.socketmod.NI_NUMERICSERV,
  316. )
  317. def set_socket_options(self, conn):
  318. for (level, optname, value) in self.adj.socket_options:
  319. conn.setsockopt(level, optname, value)
  320. if hasattr(socket, "AF_UNIX"):
  321. class UnixWSGIServer(BaseWSGIServer):
  322. def __init__(
  323. self,
  324. application,
  325. map=None,
  326. _start=True, # test shim
  327. _sock=None, # test shim
  328. dispatcher=None, # dispatcher
  329. adj=None, # adjustments
  330. sockinfo=None, # opaque object
  331. **kw
  332. ):
  333. if sockinfo is None:
  334. sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None)
  335. super().__init__(
  336. application,
  337. map=map,
  338. _start=_start,
  339. _sock=_sock,
  340. dispatcher=dispatcher,
  341. adj=adj,
  342. sockinfo=sockinfo,
  343. **kw,
  344. )
  345. def bind_server_socket(self):
  346. cleanup_unix_socket(self.adj.unix_socket)
  347. self.bind(self.adj.unix_socket)
  348. if os.path.exists(self.adj.unix_socket):
  349. os.chmod(self.adj.unix_socket, self.adj.unix_socket_perms)
  350. def getsockname(self):
  351. return ("unix", self.socket.getsockname())
  352. def fix_addr(self, addr):
  353. return ("localhost", None)
  354. # Compatibility alias.
  355. WSGIServer = TcpWSGIServer