123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- from collections import namedtuple
- from .utilities import BadRequest, logger, undquote
- PROXY_HEADERS = frozenset(
- {
- "X_FORWARDED_FOR",
- "X_FORWARDED_HOST",
- "X_FORWARDED_PROTO",
- "X_FORWARDED_PORT",
- "X_FORWARDED_BY",
- "FORWARDED",
- }
- )
- Forwarded = namedtuple("Forwarded", ["by", "for_", "host", "proto"])
- class MalformedProxyHeader(Exception):
- def __init__(self, header, reason, value):
- self.header = header
- self.reason = reason
- self.value = value
- super().__init__(header, reason, value)
- def proxy_headers_middleware(
- app,
- trusted_proxy=None,
- trusted_proxy_count=1,
- trusted_proxy_headers=None,
- clear_untrusted=True,
- log_untrusted=False,
- logger=logger,
- ):
- def translate_proxy_headers(environ, start_response):
- untrusted_headers = PROXY_HEADERS
- remote_peer = environ["REMOTE_ADDR"]
- if trusted_proxy == "*" or remote_peer == trusted_proxy:
- try:
- untrusted_headers = parse_proxy_headers(
- environ,
- trusted_proxy_count=trusted_proxy_count,
- trusted_proxy_headers=trusted_proxy_headers,
- logger=logger,
- )
- except MalformedProxyHeader as ex:
- logger.warning(
- 'Malformed proxy header "%s" from "%s": %s value: %s',
- ex.header,
- remote_peer,
- ex.reason,
- ex.value,
- )
- error = BadRequest(f'Header "{ex.header}" malformed.')
- return error.wsgi_response(environ, start_response)
- # Clear out the untrusted proxy headers
- if clear_untrusted:
- clear_untrusted_headers(
- environ, untrusted_headers, log_warning=log_untrusted, logger=logger
- )
- return app(environ, start_response)
- return translate_proxy_headers
- def parse_proxy_headers(
- environ, trusted_proxy_count, trusted_proxy_headers, logger=logger
- ):
- if trusted_proxy_headers is None:
- trusted_proxy_headers = set()
- forwarded_for = []
- forwarded_host = forwarded_proto = forwarded_port = forwarded = ""
- client_addr = None
- untrusted_headers = set(PROXY_HEADERS)
- def raise_for_multiple_values():
- raise ValueError("Unspecified behavior for multiple values found in header")
- if "x-forwarded-for" in trusted_proxy_headers and "HTTP_X_FORWARDED_FOR" in environ:
- try:
- forwarded_for = []
- for forward_hop in environ["HTTP_X_FORWARDED_FOR"].split(","):
- forward_hop = forward_hop.strip()
- forward_hop = undquote(forward_hop)
- # Make sure that all IPv6 addresses are surrounded by brackets,
- # this is assuming that the IPv6 representation here does not
- # include a port number.
- if "." not in forward_hop and (
- ":" in forward_hop and forward_hop[-1] != "]"
- ):
- forwarded_for.append(f"[{forward_hop}]")
- else:
- forwarded_for.append(forward_hop)
- forwarded_for = forwarded_for[-trusted_proxy_count:]
- client_addr = forwarded_for[0]
- untrusted_headers.remove("X_FORWARDED_FOR")
- except Exception as ex:
- raise MalformedProxyHeader(
- "X-Forwarded-For", str(ex), environ["HTTP_X_FORWARDED_FOR"]
- )
- if (
- "x-forwarded-host" in trusted_proxy_headers
- and "HTTP_X_FORWARDED_HOST" in environ
- ):
- try:
- forwarded_host_multiple = []
- for forward_host in environ["HTTP_X_FORWARDED_HOST"].split(","):
- forward_host = forward_host.strip()
- forward_host = undquote(forward_host)
- forwarded_host_multiple.append(forward_host)
- forwarded_host_multiple = forwarded_host_multiple[-trusted_proxy_count:]
- forwarded_host = forwarded_host_multiple[0]
- untrusted_headers.remove("X_FORWARDED_HOST")
- except Exception as ex:
- raise MalformedProxyHeader(
- "X-Forwarded-Host", str(ex), environ["HTTP_X_FORWARDED_HOST"]
- )
- if "x-forwarded-proto" in trusted_proxy_headers:
- try:
- forwarded_proto = undquote(environ.get("HTTP_X_FORWARDED_PROTO", ""))
- if "," in forwarded_proto:
- raise_for_multiple_values()
- untrusted_headers.remove("X_FORWARDED_PROTO")
- except Exception as ex:
- raise MalformedProxyHeader(
- "X-Forwarded-Proto", str(ex), environ["HTTP_X_FORWARDED_PROTO"]
- )
- if "x-forwarded-port" in trusted_proxy_headers:
- try:
- forwarded_port = undquote(environ.get("HTTP_X_FORWARDED_PORT", ""))
- if "," in forwarded_port:
- raise_for_multiple_values()
- untrusted_headers.remove("X_FORWARDED_PORT")
- except Exception as ex:
- raise MalformedProxyHeader(
- "X-Forwarded-Port", str(ex), environ["HTTP_X_FORWARDED_PORT"]
- )
- if "x-forwarded-by" in trusted_proxy_headers:
- # Waitress itself does not use X-Forwarded-By, but we can not
- # remove it so it can get set in the environ
- untrusted_headers.remove("X_FORWARDED_BY")
- if "forwarded" in trusted_proxy_headers:
- forwarded = environ.get("HTTP_FORWARDED", None)
- untrusted_headers = PROXY_HEADERS - {"FORWARDED"}
- # If the Forwarded header exists, it gets priority
- if forwarded:
- proxies = []
- try:
- for forwarded_element in forwarded.split(","):
- # Remove whitespace that may have been introduced when
- # appending a new entry
- forwarded_element = forwarded_element.strip()
- forwarded_for = forwarded_host = forwarded_proto = ""
- forwarded_port = forwarded_by = ""
- for pair in forwarded_element.split(";"):
- pair = pair.lower()
- if not pair:
- continue
- token, equals, value = pair.partition("=")
- if equals != "=":
- raise ValueError('Invalid forwarded-pair missing "="')
- if token.strip() != token:
- raise ValueError("Token may not be surrounded by whitespace")
- if value.strip() != value:
- raise ValueError("Value may not be surrounded by whitespace")
- if token == "by":
- forwarded_by = undquote(value)
- elif token == "for":
- forwarded_for = undquote(value)
- elif token == "host":
- forwarded_host = undquote(value)
- elif token == "proto":
- forwarded_proto = undquote(value)
- else:
- logger.warning("Unknown Forwarded token: %s" % token)
- proxies.append(
- Forwarded(
- forwarded_by, forwarded_for, forwarded_host, forwarded_proto
- )
- )
- except Exception as ex:
- raise MalformedProxyHeader("Forwarded", str(ex), environ["HTTP_FORWARDED"])
- proxies = proxies[-trusted_proxy_count:]
- # Iterate backwards and fill in some values, the oldest entry that
- # contains the information we expect is the one we use. We expect
- # that intermediate proxies may re-write the host header or proto,
- # but the oldest entry is the one that contains the information the
- # client expects when generating URL's
- #
- # Forwarded: for="[2001:db8::1]";host="example.com:8443";proto="https"
- # Forwarded: for=192.0.2.1;host="example.internal:8080"
- #
- # (After HTTPS header folding) should mean that we use as values:
- #
- # Host: example.com
- # Protocol: https
- # Port: 8443
- for proxy in proxies[::-1]:
- client_addr = proxy.for_ or client_addr
- forwarded_host = proxy.host or forwarded_host
- forwarded_proto = proxy.proto or forwarded_proto
- if forwarded_proto:
- forwarded_proto = forwarded_proto.lower()
- if forwarded_proto not in {"http", "https"}:
- raise MalformedProxyHeader(
- "Forwarded Proto=" if forwarded else "X-Forwarded-Proto",
- "unsupported proto value",
- forwarded_proto,
- )
- # Set the URL scheme to the proxy provided proto
- environ["wsgi.url_scheme"] = forwarded_proto
- if not forwarded_port:
- if forwarded_proto == "http":
- forwarded_port = "80"
- if forwarded_proto == "https":
- forwarded_port = "443"
- if forwarded_host:
- if ":" in forwarded_host and forwarded_host[-1] != "]":
- host, port = forwarded_host.rsplit(":", 1)
- host, port = host.strip(), str(port)
- # We trust the port in the Forwarded Host/X-Forwarded-Host over
- # X-Forwarded-Port, or whatever we got from Forwarded
- # Proto/X-Forwarded-Proto.
- if forwarded_port != port:
- forwarded_port = port
- # We trust the proxy server's forwarded Host
- environ["SERVER_NAME"] = host
- environ["HTTP_HOST"] = forwarded_host
- else:
- # We trust the proxy server's forwarded Host
- environ["SERVER_NAME"] = forwarded_host
- environ["HTTP_HOST"] = forwarded_host
- if forwarded_port:
- if forwarded_port not in {"443", "80"}:
- environ["HTTP_HOST"] = "{}:{}".format(
- forwarded_host, forwarded_port
- )
- elif forwarded_port == "80" and environ["wsgi.url_scheme"] != "http":
- environ["HTTP_HOST"] = "{}:{}".format(
- forwarded_host, forwarded_port
- )
- elif forwarded_port == "443" and environ["wsgi.url_scheme"] != "https":
- environ["HTTP_HOST"] = "{}:{}".format(
- forwarded_host, forwarded_port
- )
- if forwarded_port:
- environ["SERVER_PORT"] = str(forwarded_port)
- if client_addr:
- if ":" in client_addr and client_addr[-1] != "]":
- addr, port = client_addr.rsplit(":", 1)
- environ["REMOTE_ADDR"] = strip_brackets(addr.strip())
- environ["REMOTE_PORT"] = port.strip()
- else:
- environ["REMOTE_ADDR"] = strip_brackets(client_addr.strip())
- environ["REMOTE_HOST"] = environ["REMOTE_ADDR"]
- return untrusted_headers
- def strip_brackets(addr):
- if addr[0] == "[" and addr[-1] == "]":
- return addr[1:-1]
- return addr
- def clear_untrusted_headers(
- environ, untrusted_headers, log_warning=False, logger=logger
- ):
- untrusted_headers_removed = [
- header
- for header in untrusted_headers
- if environ.pop("HTTP_" + header, False) is not False
- ]
- if log_warning and untrusted_headers_removed:
- untrusted_headers_removed = [
- "-".join(x.capitalize() for x in header.split("_"))
- for header in untrusted_headers_removed
- ]
- logger.warning(
- "Removed untrusted headers (%s). Waitress recommends these be "
- "removed upstream.",
- ", ".join(untrusted_headers_removed),
- )
|