channel.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. ##############################################################################
  2. #
  3. # Copyright (c) 2001, 2002 Zope Foundation and Contributors.
  4. # All Rights Reserved.
  5. #
  6. # This software is subject to the provisions of the Zope Public License,
  7. # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
  8. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
  9. # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  10. # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
  11. # FOR A PARTICULAR PURPOSE.
  12. #
  13. ##############################################################################
  14. import socket
  15. import threading
  16. import time
  17. import traceback
  18. from waitress.buffers import OverflowableBuffer, ReadOnlyFileBasedBuffer
  19. from waitress.parser import HTTPRequestParser
  20. from waitress.task import ErrorTask, WSGITask
  21. from waitress.utilities import InternalServerError
  22. from . import wasyncore
  23. class ClientDisconnected(Exception):
  24. """Raised when attempting to write to a closed socket."""
  25. class HTTPChannel(wasyncore.dispatcher):
  26. """
  27. Setting self.requests = [somerequest] prevents more requests from being
  28. received until the out buffers have been flushed.
  29. Setting self.requests = [] allows more requests to be received.
  30. """
  31. task_class = WSGITask
  32. error_task_class = ErrorTask
  33. parser_class = HTTPRequestParser
  34. # A request that has not been received yet completely is stored here
  35. request = None
  36. last_activity = 0 # Time of last activity
  37. will_close = False # set to True to close the socket.
  38. close_when_flushed = False # set to True to close the socket when flushed
  39. sent_continue = False # used as a latch after sending 100 continue
  40. total_outbufs_len = 0 # total bytes ready to send
  41. current_outbuf_count = 0 # total bytes written to current outbuf
  42. #
  43. # ASYNCHRONOUS METHODS (including __init__)
  44. #
  45. def __init__(self, server, sock, addr, adj, map=None):
  46. self.server = server
  47. self.adj = adj
  48. self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
  49. self.creation_time = self.last_activity = time.time()
  50. self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
  51. # requests_lock used to push/pop requests and modify the request that is
  52. # currently being created
  53. self.requests_lock = threading.Lock()
  54. # outbuf_lock used to access any outbuf (expected to use an RLock)
  55. self.outbuf_lock = threading.Condition()
  56. wasyncore.dispatcher.__init__(self, sock, map=map)
  57. # Don't let wasyncore.dispatcher throttle self.addr on us.
  58. self.addr = addr
  59. self.requests = []
  60. def check_client_disconnected(self):
  61. """
  62. This method is inserted into the environment of any created task so it
  63. may occasionally check if the client has disconnected and interrupt
  64. execution.
  65. """
  66. return not self.connected
  67. def writable(self):
  68. # if there's data in the out buffer or we've been instructed to close
  69. # the channel (possibly by our server maintenance logic), run
  70. # handle_write
  71. return self.total_outbufs_len or self.will_close or self.close_when_flushed
  72. def handle_write(self):
  73. # Precondition: there's data in the out buffer to be sent, or
  74. # there's a pending will_close request
  75. if not self.connected:
  76. # we dont want to close the channel twice
  77. return
  78. # try to flush any pending output
  79. if not self.requests:
  80. # 1. There are no running tasks, so we don't need to try to lock
  81. # the outbuf before sending
  82. # 2. The data in the out buffer should be sent as soon as possible
  83. # because it's either data left over from task output
  84. # or a 100 Continue line sent within "received".
  85. flush = self._flush_some
  86. elif self.total_outbufs_len >= self.adj.send_bytes:
  87. # 1. There's a running task, so we need to try to lock
  88. # the outbuf before sending
  89. # 2. Only try to send if the data in the out buffer is larger
  90. # than self.adj_bytes to avoid TCP fragmentation
  91. flush = self._flush_some_if_lockable
  92. else:
  93. # 1. There's not enough data in the out buffer to bother to send
  94. # right now.
  95. flush = None
  96. self._flush_exception(flush)
  97. if self.close_when_flushed and not self.total_outbufs_len:
  98. self.close_when_flushed = False
  99. self.will_close = True
  100. if self.will_close:
  101. self.handle_close()
  102. def _flush_exception(self, flush, do_close=True):
  103. if flush:
  104. try:
  105. return (flush(do_close=do_close), False)
  106. except OSError:
  107. if self.adj.log_socket_errors:
  108. self.logger.exception("Socket error")
  109. self.will_close = True
  110. return (False, True)
  111. except Exception: # pragma: nocover
  112. self.logger.exception("Unexpected exception when flushing")
  113. self.will_close = True
  114. return (False, True)
  115. def readable(self):
  116. # We might want to read more requests. We can only do this if:
  117. # 1. We're not already about to close the connection.
  118. # 2. We're not waiting to flush remaining data before closing the
  119. # connection
  120. # 3. There are not too many tasks already queued
  121. # 4. There's no data in the output buffer that needs to be sent
  122. # before we potentially create a new task.
  123. return not (
  124. self.will_close
  125. or self.close_when_flushed
  126. or len(self.requests) > self.adj.channel_request_lookahead
  127. or self.total_outbufs_len
  128. )
  129. def handle_read(self):
  130. try:
  131. data = self.recv(self.adj.recv_bytes)
  132. except OSError:
  133. if self.adj.log_socket_errors:
  134. self.logger.exception("Socket error")
  135. self.handle_close()
  136. return
  137. if data:
  138. self.last_activity = time.time()
  139. self.received(data)
  140. else:
  141. # Client disconnected.
  142. self.connected = False
  143. def send_continue(self):
  144. """
  145. Send a 100-Continue header to the client. This is either called from
  146. receive (if no requests are running and the client expects it) or at
  147. the end of service (if no more requests are queued and a request has
  148. been read partially that expects it).
  149. """
  150. self.request.expect_continue = False
  151. outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n"
  152. num_bytes = len(outbuf_payload)
  153. with self.outbuf_lock:
  154. self.outbufs[-1].append(outbuf_payload)
  155. self.current_outbuf_count += num_bytes
  156. self.total_outbufs_len += num_bytes
  157. self.sent_continue = True
  158. self._flush_some()
  159. self.request.completed = False
  160. def received(self, data):
  161. """
  162. Receives input asynchronously and assigns one or more requests to the
  163. channel.
  164. """
  165. if not data:
  166. return False
  167. with self.requests_lock:
  168. while data:
  169. if self.request is None:
  170. self.request = self.parser_class(self.adj)
  171. n = self.request.received(data)
  172. # if there are requests queued, we can not send the continue
  173. # header yet since the responses need to be kept in order
  174. if (
  175. self.request.expect_continue
  176. and self.request.headers_finished
  177. and not self.requests
  178. and not self.sent_continue
  179. ):
  180. self.send_continue()
  181. if self.request.completed:
  182. # The request (with the body) is ready to use.
  183. self.sent_continue = False
  184. if not self.request.empty:
  185. self.requests.append(self.request)
  186. if len(self.requests) == 1:
  187. # self.requests was empty before so the main thread
  188. # is in charge of starting the task. Otherwise,
  189. # service() will add a new task after each request
  190. # has been processed
  191. self.server.add_task(self)
  192. self.request = None
  193. if n >= len(data):
  194. break
  195. data = data[n:]
  196. return True
  197. def _flush_some_if_lockable(self, do_close=True):
  198. # Since our task may be appending to the outbuf, we try to acquire
  199. # the lock, but we don't block if we can't.
  200. if self.outbuf_lock.acquire(False):
  201. try:
  202. self._flush_some(do_close=do_close)
  203. if self.total_outbufs_len < self.adj.outbuf_high_watermark:
  204. self.outbuf_lock.notify()
  205. finally:
  206. self.outbuf_lock.release()
  207. def _flush_some(self, do_close=True):
  208. # Send as much data as possible to our client
  209. sent = 0
  210. dobreak = False
  211. while True:
  212. outbuf = self.outbufs[0]
  213. # use outbuf.__len__ rather than len(outbuf) FBO of not getting
  214. # OverflowError on 32-bit Python
  215. outbuflen = outbuf.__len__()
  216. while outbuflen > 0:
  217. chunk = outbuf.get(self.sendbuf_len)
  218. num_sent = self.send(chunk, do_close=do_close)
  219. if num_sent:
  220. outbuf.skip(num_sent, True)
  221. outbuflen -= num_sent
  222. sent += num_sent
  223. self.total_outbufs_len -= num_sent
  224. else:
  225. # failed to write anything, break out entirely
  226. dobreak = True
  227. break
  228. else:
  229. # self.outbufs[-1] must always be a writable outbuf
  230. if len(self.outbufs) > 1:
  231. toclose = self.outbufs.pop(0)
  232. try:
  233. toclose.close()
  234. except Exception:
  235. self.logger.exception("Unexpected error when closing an outbuf")
  236. else:
  237. # caught up, done flushing for now
  238. dobreak = True
  239. if dobreak:
  240. break
  241. if sent:
  242. self.last_activity = time.time()
  243. return True
  244. return False
  245. def handle_close(self):
  246. with self.outbuf_lock:
  247. for outbuf in self.outbufs:
  248. try:
  249. outbuf.close()
  250. except Exception:
  251. self.logger.exception(
  252. "Unknown exception while trying to close outbuf"
  253. )
  254. self.total_outbufs_len = 0
  255. self.connected = False
  256. self.outbuf_lock.notify()
  257. wasyncore.dispatcher.close(self)
  258. def add_channel(self, map=None):
  259. """See wasyncore.dispatcher
  260. This hook keeps track of opened channels.
  261. """
  262. wasyncore.dispatcher.add_channel(self, map)
  263. self.server.active_channels[self._fileno] = self
  264. def del_channel(self, map=None):
  265. """See wasyncore.dispatcher
  266. This hook keeps track of closed channels.
  267. """
  268. fd = self._fileno # next line sets this to None
  269. wasyncore.dispatcher.del_channel(self, map)
  270. ac = self.server.active_channels
  271. if fd in ac:
  272. del ac[fd]
  273. #
  274. # SYNCHRONOUS METHODS
  275. #
  276. def write_soon(self, data):
  277. if not self.connected:
  278. # if the socket is closed then interrupt the task so that it
  279. # can cleanup possibly before the app_iter is exhausted
  280. raise ClientDisconnected
  281. if data:
  282. # the async mainloop might be popping data off outbuf; we can
  283. # block here waiting for it because we're in a task thread
  284. with self.outbuf_lock:
  285. self._flush_outbufs_below_high_watermark()
  286. if not self.connected:
  287. raise ClientDisconnected
  288. num_bytes = len(data)
  289. if data.__class__ is ReadOnlyFileBasedBuffer:
  290. # they used wsgi.file_wrapper
  291. self.outbufs.append(data)
  292. nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
  293. self.outbufs.append(nextbuf)
  294. self.current_outbuf_count = 0
  295. else:
  296. if self.current_outbuf_count >= self.adj.outbuf_high_watermark:
  297. # rotate to a new buffer if the current buffer has hit
  298. # the watermark to avoid it growing unbounded
  299. nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
  300. self.outbufs.append(nextbuf)
  301. self.current_outbuf_count = 0
  302. self.outbufs[-1].append(data)
  303. self.current_outbuf_count += num_bytes
  304. self.total_outbufs_len += num_bytes
  305. if self.total_outbufs_len >= self.adj.send_bytes:
  306. (flushed, exception) = self._flush_exception(
  307. self._flush_some, do_close=False
  308. )
  309. if (
  310. exception
  311. or not flushed
  312. or self.total_outbufs_len >= self.adj.send_bytes
  313. ):
  314. self.server.pull_trigger()
  315. return num_bytes
  316. return 0
  317. def _flush_outbufs_below_high_watermark(self):
  318. # check first to avoid locking if possible
  319. if self.total_outbufs_len > self.adj.outbuf_high_watermark:
  320. with self.outbuf_lock:
  321. (_, exception) = self._flush_exception(self._flush_some, do_close=False)
  322. if exception:
  323. # An exception happened while flushing, wake up the main
  324. # thread, then wait for it to decide what to do next
  325. # (probably close the socket, and then just return)
  326. self.server.pull_trigger()
  327. self.outbuf_lock.wait()
  328. return
  329. while (
  330. self.connected
  331. and self.total_outbufs_len > self.adj.outbuf_high_watermark
  332. ):
  333. self.server.pull_trigger()
  334. self.outbuf_lock.wait()
  335. def service(self):
  336. """Execute one request. If there are more, we add another task to the
  337. server at the end."""
  338. request = self.requests[0]
  339. if request.error:
  340. task = self.error_task_class(self, request)
  341. else:
  342. task = self.task_class(self, request)
  343. try:
  344. if self.connected:
  345. task.service()
  346. else:
  347. task.close_on_finish = True
  348. except ClientDisconnected:
  349. self.logger.info("Client disconnected while serving %s" % task.request.path)
  350. task.close_on_finish = True
  351. except Exception:
  352. self.logger.exception("Exception while serving %s" % task.request.path)
  353. if not task.wrote_header:
  354. if self.adj.expose_tracebacks:
  355. body = traceback.format_exc()
  356. else:
  357. body = "The server encountered an unexpected internal server error"
  358. req_version = request.version
  359. req_headers = request.headers
  360. err_request = self.parser_class(self.adj)
  361. err_request.error = InternalServerError(body)
  362. # copy some original request attributes to fulfill
  363. # HTTP 1.1 requirements
  364. err_request.version = req_version
  365. try:
  366. err_request.headers["CONNECTION"] = req_headers["CONNECTION"]
  367. except KeyError:
  368. pass
  369. task = self.error_task_class(self, err_request)
  370. try:
  371. task.service() # must not fail
  372. except ClientDisconnected:
  373. task.close_on_finish = True
  374. else:
  375. task.close_on_finish = True
  376. if task.close_on_finish:
  377. with self.requests_lock:
  378. self.close_when_flushed = True
  379. for request in self.requests:
  380. request.close()
  381. self.requests = []
  382. else:
  383. # before processing a new request, ensure there is not too
  384. # much data in the outbufs waiting to be flushed
  385. # NB: currently readable() returns False while we are
  386. # flushing data so we know no new requests will come in
  387. # that we need to account for, otherwise it'd be better
  388. # to do this check at the start of the request instead of
  389. # at the end to account for consecutive service() calls
  390. if len(self.requests) > 1:
  391. self._flush_outbufs_below_high_watermark()
  392. # this is a little hacky but basically it's forcing the
  393. # next request to create a new outbuf to avoid sharing
  394. # outbufs across requests which can cause outbufs to
  395. # not be deallocated regularly when a connection is open
  396. # for a long time
  397. if self.current_outbuf_count > 0:
  398. self.current_outbuf_count = self.adj.outbuf_high_watermark
  399. request.close()
  400. # Add new task to process the next request
  401. with self.requests_lock:
  402. self.requests.pop(0)
  403. if self.connected and self.requests:
  404. self.server.add_task(self)
  405. elif (
  406. self.connected
  407. and self.request is not None
  408. and self.request.expect_continue
  409. and self.request.headers_finished
  410. and not self.sent_continue
  411. ):
  412. # A request waits for a signal to continue, but we could
  413. # not send it until now because requests were being
  414. # processed and the output needs to be kept in order
  415. self.send_continue()
  416. if self.connected:
  417. self.server.pull_trigger()
  418. self.last_activity = time.time()
  419. def cancel(self):
  420. """Cancels all pending / active requests"""
  421. self.will_close = True
  422. self.connected = False
  423. self.last_activity = time.time()
  424. self.requests = []