123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- ##############################################################################
- #
- # Copyright (c) 2001, 2002 Zope Foundation and Contributors.
- # All Rights Reserved.
- #
- # This software is subject to the provisions of the Zope Public License,
- # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
- # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
- # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
- # FOR A PARTICULAR PURPOSE.
- #
- ##############################################################################
- import socket
- import threading
- import time
- import traceback
- from waitress.buffers import OverflowableBuffer, ReadOnlyFileBasedBuffer
- from waitress.parser import HTTPRequestParser
- from waitress.task import ErrorTask, WSGITask
- from waitress.utilities import InternalServerError
- from . import wasyncore
- class ClientDisconnected(Exception):
- """Raised when attempting to write to a closed socket."""
- class HTTPChannel(wasyncore.dispatcher):
- """
- Setting self.requests = [somerequest] prevents more requests from being
- received until the out buffers have been flushed.
- Setting self.requests = [] allows more requests to be received.
- """
- task_class = WSGITask
- error_task_class = ErrorTask
- parser_class = HTTPRequestParser
- # A request that has not been received yet completely is stored here
- request = None
- last_activity = 0 # Time of last activity
- will_close = False # set to True to close the socket.
- close_when_flushed = False # set to True to close the socket when flushed
- sent_continue = False # used as a latch after sending 100 continue
- total_outbufs_len = 0 # total bytes ready to send
- current_outbuf_count = 0 # total bytes written to current outbuf
- #
- # ASYNCHRONOUS METHODS (including __init__)
- #
- def __init__(self, server, sock, addr, adj, map=None):
- self.server = server
- self.adj = adj
- self.outbufs = [OverflowableBuffer(adj.outbuf_overflow)]
- self.creation_time = self.last_activity = time.time()
- self.sendbuf_len = sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
- # requests_lock used to push/pop requests and modify the request that is
- # currently being created
- self.requests_lock = threading.Lock()
- # outbuf_lock used to access any outbuf (expected to use an RLock)
- self.outbuf_lock = threading.Condition()
- wasyncore.dispatcher.__init__(self, sock, map=map)
- # Don't let wasyncore.dispatcher throttle self.addr on us.
- self.addr = addr
- self.requests = []
- def check_client_disconnected(self):
- """
- This method is inserted into the environment of any created task so it
- may occasionally check if the client has disconnected and interrupt
- execution.
- """
- return not self.connected
- def writable(self):
- # if there's data in the out buffer or we've been instructed to close
- # the channel (possibly by our server maintenance logic), run
- # handle_write
- return self.total_outbufs_len or self.will_close or self.close_when_flushed
- def handle_write(self):
- # Precondition: there's data in the out buffer to be sent, or
- # there's a pending will_close request
- if not self.connected:
- # we dont want to close the channel twice
- return
- # try to flush any pending output
- if not self.requests:
- # 1. There are no running tasks, so we don't need to try to lock
- # the outbuf before sending
- # 2. The data in the out buffer should be sent as soon as possible
- # because it's either data left over from task output
- # or a 100 Continue line sent within "received".
- flush = self._flush_some
- elif self.total_outbufs_len >= self.adj.send_bytes:
- # 1. There's a running task, so we need to try to lock
- # the outbuf before sending
- # 2. Only try to send if the data in the out buffer is larger
- # than self.adj_bytes to avoid TCP fragmentation
- flush = self._flush_some_if_lockable
- else:
- # 1. There's not enough data in the out buffer to bother to send
- # right now.
- flush = None
- self._flush_exception(flush)
- if self.close_when_flushed and not self.total_outbufs_len:
- self.close_when_flushed = False
- self.will_close = True
- if self.will_close:
- self.handle_close()
- def _flush_exception(self, flush, do_close=True):
- if flush:
- try:
- return (flush(do_close=do_close), False)
- except OSError:
- if self.adj.log_socket_errors:
- self.logger.exception("Socket error")
- self.will_close = True
- return (False, True)
- except Exception: # pragma: nocover
- self.logger.exception("Unexpected exception when flushing")
- self.will_close = True
- return (False, True)
- def readable(self):
- # We might want to read more requests. We can only do this if:
- # 1. We're not already about to close the connection.
- # 2. We're not waiting to flush remaining data before closing the
- # connection
- # 3. There are not too many tasks already queued
- # 4. There's no data in the output buffer that needs to be sent
- # before we potentially create a new task.
- return not (
- self.will_close
- or self.close_when_flushed
- or len(self.requests) > self.adj.channel_request_lookahead
- or self.total_outbufs_len
- )
- def handle_read(self):
- try:
- data = self.recv(self.adj.recv_bytes)
- except OSError:
- if self.adj.log_socket_errors:
- self.logger.exception("Socket error")
- self.handle_close()
- return
- if data:
- self.last_activity = time.time()
- self.received(data)
- else:
- # Client disconnected.
- self.connected = False
- def send_continue(self):
- """
- Send a 100-Continue header to the client. This is either called from
- receive (if no requests are running and the client expects it) or at
- the end of service (if no more requests are queued and a request has
- been read partially that expects it).
- """
- self.request.expect_continue = False
- outbuf_payload = b"HTTP/1.1 100 Continue\r\n\r\n"
- num_bytes = len(outbuf_payload)
- with self.outbuf_lock:
- self.outbufs[-1].append(outbuf_payload)
- self.current_outbuf_count += num_bytes
- self.total_outbufs_len += num_bytes
- self.sent_continue = True
- self._flush_some()
- self.request.completed = False
- def received(self, data):
- """
- Receives input asynchronously and assigns one or more requests to the
- channel.
- """
- if not data:
- return False
- with self.requests_lock:
- while data:
- if self.request is None:
- self.request = self.parser_class(self.adj)
- n = self.request.received(data)
- # if there are requests queued, we can not send the continue
- # header yet since the responses need to be kept in order
- if (
- self.request.expect_continue
- and self.request.headers_finished
- and not self.requests
- and not self.sent_continue
- ):
- self.send_continue()
- if self.request.completed:
- # The request (with the body) is ready to use.
- self.sent_continue = False
- if not self.request.empty:
- self.requests.append(self.request)
- if len(self.requests) == 1:
- # self.requests was empty before so the main thread
- # is in charge of starting the task. Otherwise,
- # service() will add a new task after each request
- # has been processed
- self.server.add_task(self)
- self.request = None
- if n >= len(data):
- break
- data = data[n:]
- return True
- def _flush_some_if_lockable(self, do_close=True):
- # Since our task may be appending to the outbuf, we try to acquire
- # the lock, but we don't block if we can't.
- if self.outbuf_lock.acquire(False):
- try:
- self._flush_some(do_close=do_close)
- if self.total_outbufs_len < self.adj.outbuf_high_watermark:
- self.outbuf_lock.notify()
- finally:
- self.outbuf_lock.release()
- def _flush_some(self, do_close=True):
- # Send as much data as possible to our client
- sent = 0
- dobreak = False
- while True:
- outbuf = self.outbufs[0]
- # use outbuf.__len__ rather than len(outbuf) FBO of not getting
- # OverflowError on 32-bit Python
- outbuflen = outbuf.__len__()
- while outbuflen > 0:
- chunk = outbuf.get(self.sendbuf_len)
- num_sent = self.send(chunk, do_close=do_close)
- if num_sent:
- outbuf.skip(num_sent, True)
- outbuflen -= num_sent
- sent += num_sent
- self.total_outbufs_len -= num_sent
- else:
- # failed to write anything, break out entirely
- dobreak = True
- break
- else:
- # self.outbufs[-1] must always be a writable outbuf
- if len(self.outbufs) > 1:
- toclose = self.outbufs.pop(0)
- try:
- toclose.close()
- except Exception:
- self.logger.exception("Unexpected error when closing an outbuf")
- else:
- # caught up, done flushing for now
- dobreak = True
- if dobreak:
- break
- if sent:
- self.last_activity = time.time()
- return True
- return False
- def handle_close(self):
- with self.outbuf_lock:
- for outbuf in self.outbufs:
- try:
- outbuf.close()
- except Exception:
- self.logger.exception(
- "Unknown exception while trying to close outbuf"
- )
- self.total_outbufs_len = 0
- self.connected = False
- self.outbuf_lock.notify()
- wasyncore.dispatcher.close(self)
- def add_channel(self, map=None):
- """See wasyncore.dispatcher
- This hook keeps track of opened channels.
- """
- wasyncore.dispatcher.add_channel(self, map)
- self.server.active_channels[self._fileno] = self
- def del_channel(self, map=None):
- """See wasyncore.dispatcher
- This hook keeps track of closed channels.
- """
- fd = self._fileno # next line sets this to None
- wasyncore.dispatcher.del_channel(self, map)
- ac = self.server.active_channels
- if fd in ac:
- del ac[fd]
- #
- # SYNCHRONOUS METHODS
- #
- def write_soon(self, data):
- if not self.connected:
- # if the socket is closed then interrupt the task so that it
- # can cleanup possibly before the app_iter is exhausted
- raise ClientDisconnected
- if data:
- # the async mainloop might be popping data off outbuf; we can
- # block here waiting for it because we're in a task thread
- with self.outbuf_lock:
- self._flush_outbufs_below_high_watermark()
- if not self.connected:
- raise ClientDisconnected
- num_bytes = len(data)
- if data.__class__ is ReadOnlyFileBasedBuffer:
- # they used wsgi.file_wrapper
- self.outbufs.append(data)
- nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
- self.outbufs.append(nextbuf)
- self.current_outbuf_count = 0
- else:
- if self.current_outbuf_count >= self.adj.outbuf_high_watermark:
- # rotate to a new buffer if the current buffer has hit
- # the watermark to avoid it growing unbounded
- nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
- self.outbufs.append(nextbuf)
- self.current_outbuf_count = 0
- self.outbufs[-1].append(data)
- self.current_outbuf_count += num_bytes
- self.total_outbufs_len += num_bytes
- if self.total_outbufs_len >= self.adj.send_bytes:
- (flushed, exception) = self._flush_exception(
- self._flush_some, do_close=False
- )
- if (
- exception
- or not flushed
- or self.total_outbufs_len >= self.adj.send_bytes
- ):
- self.server.pull_trigger()
- return num_bytes
- return 0
- def _flush_outbufs_below_high_watermark(self):
- # check first to avoid locking if possible
- if self.total_outbufs_len > self.adj.outbuf_high_watermark:
- with self.outbuf_lock:
- (_, exception) = self._flush_exception(self._flush_some, do_close=False)
- if exception:
- # An exception happened while flushing, wake up the main
- # thread, then wait for it to decide what to do next
- # (probably close the socket, and then just return)
- self.server.pull_trigger()
- self.outbuf_lock.wait()
- return
- while (
- self.connected
- and self.total_outbufs_len > self.adj.outbuf_high_watermark
- ):
- self.server.pull_trigger()
- self.outbuf_lock.wait()
- def service(self):
- """Execute one request. If there are more, we add another task to the
- server at the end."""
- request = self.requests[0]
- if request.error:
- task = self.error_task_class(self, request)
- else:
- task = self.task_class(self, request)
- try:
- if self.connected:
- task.service()
- else:
- task.close_on_finish = True
- except ClientDisconnected:
- self.logger.info("Client disconnected while serving %s" % task.request.path)
- task.close_on_finish = True
- except Exception:
- self.logger.exception("Exception while serving %s" % task.request.path)
- if not task.wrote_header:
- if self.adj.expose_tracebacks:
- body = traceback.format_exc()
- else:
- body = "The server encountered an unexpected internal server error"
- req_version = request.version
- req_headers = request.headers
- err_request = self.parser_class(self.adj)
- err_request.error = InternalServerError(body)
- # copy some original request attributes to fulfill
- # HTTP 1.1 requirements
- err_request.version = req_version
- try:
- err_request.headers["CONNECTION"] = req_headers["CONNECTION"]
- except KeyError:
- pass
- task = self.error_task_class(self, err_request)
- try:
- task.service() # must not fail
- except ClientDisconnected:
- task.close_on_finish = True
- else:
- task.close_on_finish = True
- if task.close_on_finish:
- with self.requests_lock:
- self.close_when_flushed = True
- for request in self.requests:
- request.close()
- self.requests = []
- else:
- # before processing a new request, ensure there is not too
- # much data in the outbufs waiting to be flushed
- # NB: currently readable() returns False while we are
- # flushing data so we know no new requests will come in
- # that we need to account for, otherwise it'd be better
- # to do this check at the start of the request instead of
- # at the end to account for consecutive service() calls
- if len(self.requests) > 1:
- self._flush_outbufs_below_high_watermark()
- # this is a little hacky but basically it's forcing the
- # next request to create a new outbuf to avoid sharing
- # outbufs across requests which can cause outbufs to
- # not be deallocated regularly when a connection is open
- # for a long time
- if self.current_outbuf_count > 0:
- self.current_outbuf_count = self.adj.outbuf_high_watermark
- request.close()
- # Add new task to process the next request
- with self.requests_lock:
- self.requests.pop(0)
- if self.connected and self.requests:
- self.server.add_task(self)
- elif (
- self.connected
- and self.request is not None
- and self.request.expect_continue
- and self.request.headers_finished
- and not self.sent_continue
- ):
- # A request waits for a signal to continue, but we could
- # not send it until now because requests were being
- # processed and the output needs to be kept in order
- self.send_continue()
- if self.connected:
- self.server.pull_trigger()
- self.last_activity = time.time()
- def cancel(self):
- """Cancels all pending / active requests"""
- self.will_close = True
- self.connected = False
- self.last_activity = time.time()
- self.requests = []
|