123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371 |
- import base64
- import email.utils
- import re
- import typing
- import typing as t
- import warnings
- from datetime import date
- from datetime import datetime
- from datetime import time
- from datetime import timedelta
- from datetime import timezone
- from enum import Enum
- from hashlib import sha1
- from time import mktime
- from time import struct_time
- from urllib.parse import unquote_to_bytes as _unquote
- from urllib.request import parse_http_list as _parse_list_header
- from ._internal import _cookie_parse_impl
- from ._internal import _cookie_quote
- from ._internal import _make_cookie_domain
- from ._internal import _to_bytes
- from ._internal import _to_str
- from ._internal import _wsgi_decoding_dance
- from werkzeug._internal import _dt_as_utc
- if t.TYPE_CHECKING:
- import typing_extensions as te
- from _typeshed.wsgi import WSGIEnvironment
- # for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231
- _accept_re = re.compile(
- r"""
- ( # media-range capturing-parenthesis
- [^\s;,]+ # type/subtype
- (?:[ \t]*;[ \t]* # ";"
- (?: # parameter non-capturing-parenthesis
- [^\s;,q][^\s;,]* # token that doesn't start with "q"
- | # or
- q[^\s;,=][^\s;,]* # token that is more than just "q"
- )
- )* # zero or more parameters
- ) # end of media-range
- (?:[ \t]*;[ \t]*q= # weight is a "q" parameter
- (\d*(?:\.\d+)?) # qvalue capturing-parentheses
- [^,]* # "extension" accept params: who cares?
- )? # accept params are optional
- """,
- re.VERBOSE,
- )
- _token_chars = frozenset(
- "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
- )
- _etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
- _option_header_piece_re = re.compile(
- r"""
- ;\s*,?\s* # newlines were replaced with commas
- (?P<key>
- "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
- |
- [^\s;,=*]+ # token
- )
- (?:\*(?P<count>\d+))? # *1, optional continuation index
- \s*
- (?: # optionally followed by =value
- (?: # equals sign, possibly with encoding
- \*\s*=\s* # * indicates extended notation
- (?: # optional encoding
- (?P<encoding>[^\s]+?)
- '(?P<language>[^\s]*?)'
- )?
- |
- =\s* # basic notation
- )
- (?P<value>
- "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
- |
- [^;,]+ # token
- )?
- )?
- \s*
- """,
- flags=re.VERBOSE,
- )
- _option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?")
- _entity_headers = frozenset(
- [
- "allow",
- "content-encoding",
- "content-language",
- "content-length",
- "content-location",
- "content-md5",
- "content-range",
- "content-type",
- "expires",
- "last-modified",
- ]
- )
- _hop_by_hop_headers = frozenset(
- [
- "connection",
- "keep-alive",
- "proxy-authenticate",
- "proxy-authorization",
- "te",
- "trailer",
- "transfer-encoding",
- "upgrade",
- ]
- )
- HTTP_STATUS_CODES = {
- 100: "Continue",
- 101: "Switching Protocols",
- 102: "Processing",
- 103: "Early Hints", # see RFC 8297
- 200: "OK",
- 201: "Created",
- 202: "Accepted",
- 203: "Non Authoritative Information",
- 204: "No Content",
- 205: "Reset Content",
- 206: "Partial Content",
- 207: "Multi Status",
- 208: "Already Reported", # see RFC 5842
- 226: "IM Used", # see RFC 3229
- 300: "Multiple Choices",
- 301: "Moved Permanently",
- 302: "Found",
- 303: "See Other",
- 304: "Not Modified",
- 305: "Use Proxy",
- 306: "Switch Proxy", # unused
- 307: "Temporary Redirect",
- 308: "Permanent Redirect",
- 400: "Bad Request",
- 401: "Unauthorized",
- 402: "Payment Required", # unused
- 403: "Forbidden",
- 404: "Not Found",
- 405: "Method Not Allowed",
- 406: "Not Acceptable",
- 407: "Proxy Authentication Required",
- 408: "Request Timeout",
- 409: "Conflict",
- 410: "Gone",
- 411: "Length Required",
- 412: "Precondition Failed",
- 413: "Request Entity Too Large",
- 414: "Request URI Too Long",
- 415: "Unsupported Media Type",
- 416: "Requested Range Not Satisfiable",
- 417: "Expectation Failed",
- 418: "I'm a teapot", # see RFC 2324
- 421: "Misdirected Request", # see RFC 7540
- 422: "Unprocessable Entity",
- 423: "Locked",
- 424: "Failed Dependency",
- 425: "Too Early", # see RFC 8470
- 426: "Upgrade Required",
- 428: "Precondition Required", # see RFC 6585
- 429: "Too Many Requests",
- 431: "Request Header Fields Too Large",
- 449: "Retry With", # proprietary MS extension
- 451: "Unavailable For Legal Reasons",
- 500: "Internal Server Error",
- 501: "Not Implemented",
- 502: "Bad Gateway",
- 503: "Service Unavailable",
- 504: "Gateway Timeout",
- 505: "HTTP Version Not Supported",
- 506: "Variant Also Negotiates", # see RFC 2295
- 507: "Insufficient Storage",
- 508: "Loop Detected", # see RFC 5842
- 510: "Not Extended",
- 511: "Network Authentication Failed",
- }
- class COEP(Enum):
- """Cross Origin Embedder Policies"""
- UNSAFE_NONE = "unsafe-none"
- REQUIRE_CORP = "require-corp"
- class COOP(Enum):
- """Cross Origin Opener Policies"""
- UNSAFE_NONE = "unsafe-none"
- SAME_ORIGIN_ALLOW_POPUPS = "same-origin-allow-popups"
- SAME_ORIGIN = "same-origin"
- def quote_header_value(
- value: t.Union[str, int], extra_chars: str = "", allow_token: bool = True
- ) -> str:
- """Quote a header value if necessary.
- .. versionadded:: 0.5
- :param value: the value to quote.
- :param extra_chars: a list of extra characters to skip quoting.
- :param allow_token: if this is enabled token values are returned
- unchanged.
- """
- if isinstance(value, bytes):
- value = value.decode("latin1")
- value = str(value)
- if allow_token:
- token_chars = _token_chars | set(extra_chars)
- if set(value).issubset(token_chars):
- return value
- value = value.replace("\\", "\\\\").replace('"', '\\"')
- return f'"{value}"'
- def unquote_header_value(value: str, is_filename: bool = False) -> str:
- r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
- This does not use the real unquoting but what browsers are actually
- using for quoting.
- .. versionadded:: 0.5
- :param value: the header value to unquote.
- :param is_filename: The value represents a filename or path.
- """
- if value and value[0] == value[-1] == '"':
- # this is not the real unquoting, but fixing this so that the
- # RFC is met will result in bugs with internet explorer and
- # probably some other browsers as well. IE for example is
- # uploading files with "C:\foo\bar.txt" as filename
- value = value[1:-1]
- # if this is a filename and the starting characters look like
- # a UNC path, then just return the value without quotes. Using the
- # replace sequence below on a UNC path has the effect of turning
- # the leading double slash into a single slash and then
- # _fix_ie_filename() doesn't work correctly. See #458.
- if not is_filename or value[:2] != "\\\\":
- return value.replace("\\\\", "\\").replace('\\"', '"')
- return value
- def dump_options_header(
- header: t.Optional[str], options: t.Mapping[str, t.Optional[t.Union[str, int]]]
- ) -> str:
- """The reverse function to :func:`parse_options_header`.
- :param header: the header to dump
- :param options: a dict of options to append.
- """
- segments = []
- if header is not None:
- segments.append(header)
- for key, value in options.items():
- if value is None:
- segments.append(key)
- else:
- segments.append(f"{key}={quote_header_value(value)}")
- return "; ".join(segments)
- def dump_header(
- iterable: t.Union[t.Dict[str, t.Union[str, int]], t.Iterable[str]],
- allow_token: bool = True,
- ) -> str:
- """Dump an HTTP header again. This is the reversal of
- :func:`parse_list_header`, :func:`parse_set_header` and
- :func:`parse_dict_header`. This also quotes strings that include an
- equals sign unless you pass it as dict of key, value pairs.
- >>> dump_header({'foo': 'bar baz'})
- 'foo="bar baz"'
- >>> dump_header(('foo', 'bar baz'))
- 'foo, "bar baz"'
- :param iterable: the iterable or dict of values to quote.
- :param allow_token: if set to `False` tokens as values are disallowed.
- See :func:`quote_header_value` for more details.
- """
- if isinstance(iterable, dict):
- items = []
- for key, value in iterable.items():
- if value is None:
- items.append(key)
- else:
- items.append(
- f"{key}={quote_header_value(value, allow_token=allow_token)}"
- )
- else:
- items = [quote_header_value(x, allow_token=allow_token) for x in iterable]
- return ", ".join(items)
- def dump_csp_header(header: "ds.ContentSecurityPolicy") -> str:
- """Dump a Content Security Policy header.
- These are structured into policies such as "default-src 'self';
- script-src 'self'".
- .. versionadded:: 1.0.0
- Support for Content Security Policy headers was added.
- """
- return "; ".join(f"{key} {value}" for key, value in header.items())
- def parse_list_header(value: str) -> t.List[str]:
- """Parse lists as described by RFC 2068 Section 2.
- In particular, parse comma-separated lists where the elements of
- the list may include quoted-strings. A quoted-string could
- contain a comma. A non-quoted string could have quotes in the
- middle. Quotes are removed automatically after parsing.
- It basically works like :func:`parse_set_header` just that items
- may appear multiple times and case sensitivity is preserved.
- The return value is a standard :class:`list`:
- >>> parse_list_header('token, "quoted value"')
- ['token', 'quoted value']
- To create a header from the :class:`list` again, use the
- :func:`dump_header` function.
- :param value: a string with a list header.
- :return: :class:`list`
- """
- result = []
- for item in _parse_list_header(value):
- if item[:1] == item[-1:] == '"':
- item = unquote_header_value(item[1:-1])
- result.append(item)
- return result
- def parse_dict_header(value: str, cls: t.Type[dict] = dict) -> t.Dict[str, str]:
- """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
- convert them into a python dict (or any other mapping object created from
- the type with a dict like interface provided by the `cls` argument):
- >>> d = parse_dict_header('foo="is a fish", bar="as well"')
- >>> type(d) is dict
- True
- >>> sorted(d.items())
- [('bar', 'as well'), ('foo', 'is a fish')]
- If there is no value for a key it will be `None`:
- >>> parse_dict_header('key_without_value')
- {'key_without_value': None}
- To create a header from the :class:`dict` again, use the
- :func:`dump_header` function.
- .. versionchanged:: 0.9
- Added support for `cls` argument.
- :param value: a string with a dict header.
- :param cls: callable to use for storage of parsed results.
- :return: an instance of `cls`
- """
- result = cls()
- if isinstance(value, bytes):
- value = value.decode("latin1")
- for item in _parse_list_header(value):
- if "=" not in item:
- result[item] = None
- continue
- name, value = item.split("=", 1)
- if value[:1] == value[-1:] == '"':
- value = unquote_header_value(value[1:-1])
- result[name] = value
- return result
- def parse_options_header(
- value: t.Optional[str], multiple: "te.Literal[None]" = None
- ) -> t.Tuple[str, t.Dict[str, str]]:
- """Parse a ``Content-Type``-like header into a tuple with the
- value and any options:
- >>> parse_options_header('text/html; charset=utf8')
- ('text/html', {'charset': 'utf8'})
- This should is not for ``Cache-Control``-like headers, which use a
- different format. For those, use :func:`parse_dict_header`.
- :param value: The header value to parse.
- .. versionchanged:: 2.1
- The ``multiple`` parameter is deprecated and will be removed in
- Werkzeug 2.2.
- .. versionchanged:: 0.15
- :rfc:`2231` parameter continuations are handled.
- .. versionadded:: 0.5
- """
- if multiple is not None:
- import warnings
- warnings.warn(
- "The 'multiple' parameter of 'parse_options_header' is"
- " deprecated and will be removed in Werkzeug 2.2.",
- DeprecationWarning,
- stacklevel=2,
- )
- if not value:
- return "", {}
- result: t.List[t.Any] = []
- value = "," + value.replace("\n", ",")
- while value:
- match = _option_header_start_mime_type.match(value)
- if not match:
- break
- result.append(match.group(1)) # mimetype
- options: t.Dict[str, str] = {}
- # Parse options
- rest = match.group(2)
- encoding: t.Optional[str]
- continued_encoding: t.Optional[str] = None
- while rest:
- optmatch = _option_header_piece_re.match(rest)
- if not optmatch:
- break
- option, count, encoding, language, option_value = optmatch.groups()
- # Continuations don't have to supply the encoding after the
- # first line. If we're in a continuation, track the current
- # encoding to use for subsequent lines. Reset it when the
- # continuation ends.
- if not count:
- continued_encoding = None
- else:
- if not encoding:
- encoding = continued_encoding
- continued_encoding = encoding
- option = unquote_header_value(option)
- if option_value is not None:
- option_value = unquote_header_value(option_value, option == "filename")
- if encoding is not None:
- option_value = _unquote(option_value).decode(encoding)
- if count:
- # Continuations append to the existing value. For
- # simplicity, this ignores the possibility of
- # out-of-order indices, which shouldn't happen anyway.
- if option_value is not None:
- options[option] = options.get(option, "") + option_value
- else:
- options[option] = option_value # type: ignore[assignment]
- rest = rest[optmatch.end() :]
- result.append(options)
- if not multiple:
- return tuple(result) # type: ignore[return-value]
- value = rest
- return tuple(result) if result else ("", {}) # type: ignore[return-value]
- _TAnyAccept = t.TypeVar("_TAnyAccept", bound="ds.Accept")
- @typing.overload
- def parse_accept_header(value: t.Optional[str]) -> "ds.Accept":
- ...
- @typing.overload
- def parse_accept_header(
- value: t.Optional[str], cls: t.Type[_TAnyAccept]
- ) -> _TAnyAccept:
- ...
- def parse_accept_header(
- value: t.Optional[str], cls: t.Optional[t.Type[_TAnyAccept]] = None
- ) -> _TAnyAccept:
- """Parses an HTTP Accept-* header. This does not implement a complete
- valid algorithm but one that supports at least value and quality
- extraction.
- Returns a new :class:`Accept` object (basically a list of ``(value, quality)``
- tuples sorted by the quality with some additional accessor methods).
- The second parameter can be a subclass of :class:`Accept` that is created
- with the parsed values and returned.
- :param value: the accept header string to be parsed.
- :param cls: the wrapper class for the return value (can be
- :class:`Accept` or a subclass thereof)
- :return: an instance of `cls`.
- """
- if cls is None:
- cls = t.cast(t.Type[_TAnyAccept], ds.Accept)
- if not value:
- return cls(None)
- result = []
- for match in _accept_re.finditer(value):
- quality_match = match.group(2)
- if not quality_match:
- quality: float = 1
- else:
- quality = max(min(float(quality_match), 1), 0)
- result.append((match.group(1), quality))
- return cls(result)
- _TAnyCC = t.TypeVar("_TAnyCC", bound="ds._CacheControl")
- _t_cc_update = t.Optional[t.Callable[[_TAnyCC], None]]
- @typing.overload
- def parse_cache_control_header(
- value: t.Optional[str], on_update: _t_cc_update, cls: None = None
- ) -> "ds.RequestCacheControl":
- ...
- @typing.overload
- def parse_cache_control_header(
- value: t.Optional[str], on_update: _t_cc_update, cls: t.Type[_TAnyCC]
- ) -> _TAnyCC:
- ...
- def parse_cache_control_header(
- value: t.Optional[str],
- on_update: _t_cc_update = None,
- cls: t.Optional[t.Type[_TAnyCC]] = None,
- ) -> _TAnyCC:
- """Parse a cache control header. The RFC differs between response and
- request cache control, this method does not. It's your responsibility
- to not use the wrong control statements.
- .. versionadded:: 0.5
- The `cls` was added. If not specified an immutable
- :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
- :param value: a cache control header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.CacheControl`
- object is changed.
- :param cls: the class for the returned object. By default
- :class:`~werkzeug.datastructures.RequestCacheControl` is used.
- :return: a `cls` object.
- """
- if cls is None:
- cls = t.cast(t.Type[_TAnyCC], ds.RequestCacheControl)
- if not value:
- return cls((), on_update)
- return cls(parse_dict_header(value), on_update)
- _TAnyCSP = t.TypeVar("_TAnyCSP", bound="ds.ContentSecurityPolicy")
- _t_csp_update = t.Optional[t.Callable[[_TAnyCSP], None]]
- @typing.overload
- def parse_csp_header(
- value: t.Optional[str], on_update: _t_csp_update, cls: None = None
- ) -> "ds.ContentSecurityPolicy":
- ...
- @typing.overload
- def parse_csp_header(
- value: t.Optional[str], on_update: _t_csp_update, cls: t.Type[_TAnyCSP]
- ) -> _TAnyCSP:
- ...
- def parse_csp_header(
- value: t.Optional[str],
- on_update: _t_csp_update = None,
- cls: t.Optional[t.Type[_TAnyCSP]] = None,
- ) -> _TAnyCSP:
- """Parse a Content Security Policy header.
- .. versionadded:: 1.0.0
- Support for Content Security Policy headers was added.
- :param value: a csp header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the object is changed.
- :param cls: the class for the returned object. By default
- :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
- :return: a `cls` object.
- """
- if cls is None:
- cls = t.cast(t.Type[_TAnyCSP], ds.ContentSecurityPolicy)
- if value is None:
- return cls((), on_update)
- items = []
- for policy in value.split(";"):
- policy = policy.strip()
- # Ignore badly formatted policies (no space)
- if " " in policy:
- directive, value = policy.strip().split(" ", 1)
- items.append((directive.strip(), value.strip()))
- return cls(items, on_update)
- def parse_set_header(
- value: t.Optional[str],
- on_update: t.Optional[t.Callable[["ds.HeaderSet"], None]] = None,
- ) -> "ds.HeaderSet":
- """Parse a set-like header and return a
- :class:`~werkzeug.datastructures.HeaderSet` object:
- >>> hs = parse_set_header('token, "quoted value"')
- The return value is an object that treats the items case-insensitively
- and keeps the order of the items:
- >>> 'TOKEN' in hs
- True
- >>> hs.index('quoted value')
- 1
- >>> hs
- HeaderSet(['token', 'quoted value'])
- To create a header from the :class:`HeaderSet` again, use the
- :func:`dump_header` function.
- :param value: a set header to be parsed.
- :param on_update: an optional callable that is called every time a
- value on the :class:`~werkzeug.datastructures.HeaderSet`
- object is changed.
- :return: a :class:`~werkzeug.datastructures.HeaderSet`
- """
- if not value:
- return ds.HeaderSet(None, on_update)
- return ds.HeaderSet(parse_list_header(value), on_update)
- def parse_authorization_header(
- value: t.Optional[str],
- ) -> t.Optional["ds.Authorization"]:
- """Parse an HTTP basic/digest authorization header transmitted by the web
- browser. The return value is either `None` if the header was invalid or
- not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
- object.
- :param value: the authorization header to parse.
- :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
- """
- if not value:
- return None
- value = _wsgi_decoding_dance(value)
- try:
- auth_type, auth_info = value.split(None, 1)
- auth_type = auth_type.lower()
- except ValueError:
- return None
- if auth_type == "basic":
- try:
- username, password = base64.b64decode(auth_info).split(b":", 1)
- except Exception:
- return None
- try:
- return ds.Authorization(
- "basic",
- {
- "username": _to_str(username, "utf-8"),
- "password": _to_str(password, "utf-8"),
- },
- )
- except UnicodeDecodeError:
- return None
- elif auth_type == "digest":
- auth_map = parse_dict_header(auth_info)
- for key in "username", "realm", "nonce", "uri", "response":
- if key not in auth_map:
- return None
- if "qop" in auth_map:
- if not auth_map.get("nc") or not auth_map.get("cnonce"):
- return None
- return ds.Authorization("digest", auth_map)
- return None
- def parse_www_authenticate_header(
- value: t.Optional[str],
- on_update: t.Optional[t.Callable[["ds.WWWAuthenticate"], None]] = None,
- ) -> "ds.WWWAuthenticate":
- """Parse an HTTP WWW-Authenticate header into a
- :class:`~werkzeug.datastructures.WWWAuthenticate` object.
- :param value: a WWW-Authenticate header to parse.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.WWWAuthenticate`
- object is changed.
- :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object.
- """
- if not value:
- return ds.WWWAuthenticate(on_update=on_update)
- try:
- auth_type, auth_info = value.split(None, 1)
- auth_type = auth_type.lower()
- except (ValueError, AttributeError):
- return ds.WWWAuthenticate(value.strip().lower(), on_update=on_update)
- return ds.WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update)
- def parse_if_range_header(value: t.Optional[str]) -> "ds.IfRange":
- """Parses an if-range header which can be an etag or a date. Returns
- a :class:`~werkzeug.datastructures.IfRange` object.
- .. versionchanged:: 2.0
- If the value represents a datetime, it is timezone-aware.
- .. versionadded:: 0.7
- """
- if not value:
- return ds.IfRange()
- date = parse_date(value)
- if date is not None:
- return ds.IfRange(date=date)
- # drop weakness information
- return ds.IfRange(unquote_etag(value)[0])
- def parse_range_header(
- value: t.Optional[str], make_inclusive: bool = True
- ) -> t.Optional["ds.Range"]:
- """Parses a range header into a :class:`~werkzeug.datastructures.Range`
- object. If the header is missing or malformed `None` is returned.
- `ranges` is a list of ``(start, stop)`` tuples where the ranges are
- non-inclusive.
- .. versionadded:: 0.7
- """
- if not value or "=" not in value:
- return None
- ranges = []
- last_end = 0
- units, rng = value.split("=", 1)
- units = units.strip().lower()
- for item in rng.split(","):
- item = item.strip()
- if "-" not in item:
- return None
- if item.startswith("-"):
- if last_end < 0:
- return None
- try:
- begin = int(item)
- except ValueError:
- return None
- end = None
- last_end = -1
- elif "-" in item:
- begin_str, end_str = item.split("-", 1)
- begin_str = begin_str.strip()
- end_str = end_str.strip()
- if not begin_str.isdigit():
- return None
- begin = int(begin_str)
- if begin < last_end or last_end < 0:
- return None
- if end_str:
- if not end_str.isdigit():
- return None
- end = int(end_str) + 1
- if begin >= end:
- return None
- else:
- end = None
- last_end = end if end is not None else -1
- ranges.append((begin, end))
- return ds.Range(units, ranges)
- def parse_content_range_header(
- value: t.Optional[str],
- on_update: t.Optional[t.Callable[["ds.ContentRange"], None]] = None,
- ) -> t.Optional["ds.ContentRange"]:
- """Parses a range header into a
- :class:`~werkzeug.datastructures.ContentRange` object or `None` if
- parsing is not possible.
- .. versionadded:: 0.7
- :param value: a content range header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.ContentRange`
- object is changed.
- """
- if value is None:
- return None
- try:
- units, rangedef = (value or "").strip().split(None, 1)
- except ValueError:
- return None
- if "/" not in rangedef:
- return None
- rng, length_str = rangedef.split("/", 1)
- if length_str == "*":
- length = None
- elif length_str.isdigit():
- length = int(length_str)
- else:
- return None
- if rng == "*":
- return ds.ContentRange(units, None, None, length, on_update=on_update)
- elif "-" not in rng:
- return None
- start_str, stop_str = rng.split("-", 1)
- try:
- start = int(start_str)
- stop = int(stop_str) + 1
- except ValueError:
- return None
- if is_byte_range_valid(start, stop, length):
- return ds.ContentRange(units, start, stop, length, on_update=on_update)
- return None
- def quote_etag(etag: str, weak: bool = False) -> str:
- """Quote an etag.
- :param etag: the etag to quote.
- :param weak: set to `True` to tag it "weak".
- """
- if '"' in etag:
- raise ValueError("invalid etag")
- etag = f'"{etag}"'
- if weak:
- etag = f"W/{etag}"
- return etag
- def unquote_etag(
- etag: t.Optional[str],
- ) -> t.Union[t.Tuple[str, bool], t.Tuple[None, None]]:
- """Unquote a single etag:
- >>> unquote_etag('W/"bar"')
- ('bar', True)
- >>> unquote_etag('"bar"')
- ('bar', False)
- :param etag: the etag identifier to unquote.
- :return: a ``(etag, weak)`` tuple.
- """
- if not etag:
- return None, None
- etag = etag.strip()
- weak = False
- if etag.startswith(("W/", "w/")):
- weak = True
- etag = etag[2:]
- if etag[:1] == etag[-1:] == '"':
- etag = etag[1:-1]
- return etag, weak
- def parse_etags(value: t.Optional[str]) -> "ds.ETags":
- """Parse an etag header.
- :param value: the tag header to parse
- :return: an :class:`~werkzeug.datastructures.ETags` object.
- """
- if not value:
- return ds.ETags()
- strong = []
- weak = []
- end = len(value)
- pos = 0
- while pos < end:
- match = _etag_re.match(value, pos)
- if match is None:
- break
- is_weak, quoted, raw = match.groups()
- if raw == "*":
- return ds.ETags(star_tag=True)
- elif quoted:
- raw = quoted
- if is_weak:
- weak.append(raw)
- else:
- strong.append(raw)
- pos = match.end()
- return ds.ETags(strong, weak)
- def generate_etag(data: bytes) -> str:
- """Generate an etag for some data.
- .. versionchanged:: 2.0
- Use SHA-1. MD5 may not be available in some environments.
- """
- return sha1(data).hexdigest()
- def parse_date(value: t.Optional[str]) -> t.Optional[datetime]:
- """Parse an :rfc:`2822` date into a timezone-aware
- :class:`datetime.datetime` object, or ``None`` if parsing fails.
- This is a wrapper for :func:`email.utils.parsedate_to_datetime`. It
- returns ``None`` if parsing fails instead of raising an exception,
- and always returns a timezone-aware datetime object. If the string
- doesn't have timezone information, it is assumed to be UTC.
- :param value: A string with a supported date format.
- .. versionchanged:: 2.0
- Return a timezone-aware datetime object. Use
- ``email.utils.parsedate_to_datetime``.
- """
- if value is None:
- return None
- try:
- dt = email.utils.parsedate_to_datetime(value)
- except (TypeError, ValueError):
- return None
- if dt.tzinfo is None:
- return dt.replace(tzinfo=timezone.utc)
- return dt
- def http_date(
- timestamp: t.Optional[t.Union[datetime, date, int, float, struct_time]] = None
- ) -> str:
- """Format a datetime object or timestamp into an :rfc:`2822` date
- string.
- This is a wrapper for :func:`email.utils.format_datetime`. It
- assumes naive datetime objects are in UTC instead of raising an
- exception.
- :param timestamp: The datetime or timestamp to format. Defaults to
- the current time.
- .. versionchanged:: 2.0
- Use ``email.utils.format_datetime``. Accept ``date`` objects.
- """
- if isinstance(timestamp, date):
- if not isinstance(timestamp, datetime):
- # Assume plain date is midnight UTC.
- timestamp = datetime.combine(timestamp, time(), tzinfo=timezone.utc)
- else:
- # Ensure datetime is timezone-aware.
- timestamp = _dt_as_utc(timestamp)
- return email.utils.format_datetime(timestamp, usegmt=True)
- if isinstance(timestamp, struct_time):
- timestamp = mktime(timestamp)
- return email.utils.formatdate(timestamp, usegmt=True)
- def parse_age(value: t.Optional[str] = None) -> t.Optional[timedelta]:
- """Parses a base-10 integer count of seconds into a timedelta.
- If parsing fails, the return value is `None`.
- :param value: a string consisting of an integer represented in base-10
- :return: a :class:`datetime.timedelta` object or `None`.
- """
- if not value:
- return None
- try:
- seconds = int(value)
- except ValueError:
- return None
- if seconds < 0:
- return None
- try:
- return timedelta(seconds=seconds)
- except OverflowError:
- return None
- def dump_age(age: t.Optional[t.Union[timedelta, int]] = None) -> t.Optional[str]:
- """Formats the duration as a base-10 integer.
- :param age: should be an integer number of seconds,
- a :class:`datetime.timedelta` object, or,
- if the age is unknown, `None` (default).
- """
- if age is None:
- return None
- if isinstance(age, timedelta):
- age = int(age.total_seconds())
- else:
- age = int(age)
- if age < 0:
- raise ValueError("age cannot be negative")
- return str(age)
- def is_resource_modified(
- environ: "WSGIEnvironment",
- etag: t.Optional[str] = None,
- data: t.Optional[bytes] = None,
- last_modified: t.Optional[t.Union[datetime, str]] = None,
- ignore_if_range: bool = True,
- ) -> bool:
- """Convenience method for conditional requests.
- :param environ: the WSGI environment of the request to be checked.
- :param etag: the etag for the response for comparison.
- :param data: or alternatively the data of the response to automatically
- generate an etag using :func:`generate_etag`.
- :param last_modified: an optional date of the last modification.
- :param ignore_if_range: If `False`, `If-Range` header will be taken into
- account.
- :return: `True` if the resource was modified, otherwise `False`.
- .. versionchanged:: 2.0
- SHA-1 is used to generate an etag value for the data. MD5 may
- not be available in some environments.
- .. versionchanged:: 1.0.0
- The check is run for methods other than ``GET`` and ``HEAD``.
- """
- if etag is None and data is not None:
- etag = generate_etag(data)
- elif data is not None:
- raise TypeError("both data and etag given")
- unmodified = False
- if isinstance(last_modified, str):
- last_modified = parse_date(last_modified)
- # HTTP doesn't use microsecond, remove it to avoid false positive
- # comparisons. Mark naive datetimes as UTC.
- if last_modified is not None:
- last_modified = _dt_as_utc(last_modified.replace(microsecond=0))
- if_range = None
- if not ignore_if_range and "HTTP_RANGE" in environ:
- # https://tools.ietf.org/html/rfc7233#section-3.2
- # A server MUST ignore an If-Range header field received in a request
- # that does not contain a Range header field.
- if_range = parse_if_range_header(environ.get("HTTP_IF_RANGE"))
- if if_range is not None and if_range.date is not None:
- modified_since: t.Optional[datetime] = if_range.date
- else:
- modified_since = parse_date(environ.get("HTTP_IF_MODIFIED_SINCE"))
- if modified_since and last_modified and last_modified <= modified_since:
- unmodified = True
- if etag:
- etag, _ = unquote_etag(etag)
- etag = t.cast(str, etag)
- if if_range is not None and if_range.etag is not None:
- unmodified = parse_etags(if_range.etag).contains(etag)
- else:
- if_none_match = parse_etags(environ.get("HTTP_IF_NONE_MATCH"))
- if if_none_match:
- # https://tools.ietf.org/html/rfc7232#section-3.2
- # "A recipient MUST use the weak comparison function when comparing
- # entity-tags for If-None-Match"
- unmodified = if_none_match.contains_weak(etag)
- # https://tools.ietf.org/html/rfc7232#section-3.1
- # "Origin server MUST use the strong comparison function when
- # comparing entity-tags for If-Match"
- if_match = parse_etags(environ.get("HTTP_IF_MATCH"))
- if if_match:
- unmodified = not if_match.is_strong(etag)
- return not unmodified
- def remove_entity_headers(
- headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]],
- allowed: t.Iterable[str] = ("expires", "content-location"),
- ) -> None:
- """Remove all entity headers from a list or :class:`Headers` object. This
- operation works in-place. `Expires` and `Content-Location` headers are
- by default not removed. The reason for this is :rfc:`2616` section
- 10.3.5 which specifies some entity headers that should be sent.
- .. versionchanged:: 0.5
- added `allowed` parameter.
- :param headers: a list or :class:`Headers` object.
- :param allowed: a list of headers that should still be allowed even though
- they are entity headers.
- """
- allowed = {x.lower() for x in allowed}
- headers[:] = [
- (key, value)
- for key, value in headers
- if not is_entity_header(key) or key.lower() in allowed
- ]
- def remove_hop_by_hop_headers(
- headers: t.Union["ds.Headers", t.List[t.Tuple[str, str]]]
- ) -> None:
- """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
- :class:`Headers` object. This operation works in-place.
- .. versionadded:: 0.5
- :param headers: a list or :class:`Headers` object.
- """
- headers[:] = [
- (key, value) for key, value in headers if not is_hop_by_hop_header(key)
- ]
- def is_entity_header(header: str) -> bool:
- """Check if a header is an entity header.
- .. versionadded:: 0.5
- :param header: the header to test.
- :return: `True` if it's an entity header, `False` otherwise.
- """
- return header.lower() in _entity_headers
- def is_hop_by_hop_header(header: str) -> bool:
- """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
- .. versionadded:: 0.5
- :param header: the header to test.
- :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
- """
- return header.lower() in _hop_by_hop_headers
- def parse_cookie(
- header: t.Union["WSGIEnvironment", str, bytes, None],
- charset: str = "utf-8",
- errors: str = "replace",
- cls: t.Optional[t.Type["ds.MultiDict"]] = None,
- ) -> "ds.MultiDict[str, str]":
- """Parse a cookie from a string or WSGI environ.
- The same key can be provided multiple times, the values are stored
- in-order. The default :class:`MultiDict` will have the first value
- first, and all values can be retrieved with
- :meth:`MultiDict.getlist`.
- :param header: The cookie header as a string, or a WSGI environ dict
- with a ``HTTP_COOKIE`` key.
- :param charset: The charset for the cookie values.
- :param errors: The error behavior for the charset decoding.
- :param cls: A dict-like class to store the parsed cookies in.
- Defaults to :class:`MultiDict`.
- .. versionchanged:: 1.0.0
- Returns a :class:`MultiDict` instead of a
- ``TypeConversionDict``.
- .. versionchanged:: 0.5
- Returns a :class:`TypeConversionDict` instead of a regular dict.
- The ``cls`` parameter was added.
- """
- if isinstance(header, dict):
- header = header.get("HTTP_COOKIE", "")
- elif header is None:
- header = ""
- # PEP 3333 sends headers through the environ as latin1 decoded
- # strings. Encode strings back to bytes for parsing.
- if isinstance(header, str):
- header = header.encode("latin1", "replace")
- if cls is None:
- cls = ds.MultiDict
- def _parse_pairs() -> t.Iterator[t.Tuple[str, str]]:
- for key, val in _cookie_parse_impl(header): # type: ignore
- key_str = _to_str(key, charset, errors, allow_none_charset=True)
- if not key_str:
- continue
- val_str = _to_str(val, charset, errors, allow_none_charset=True)
- yield key_str, val_str
- return cls(_parse_pairs())
- def dump_cookie(
- key: str,
- value: t.Union[bytes, str] = "",
- max_age: t.Optional[t.Union[timedelta, int]] = None,
- expires: t.Optional[t.Union[str, datetime, int, float]] = None,
- path: t.Optional[str] = "/",
- domain: t.Optional[str] = None,
- secure: bool = False,
- httponly: bool = False,
- charset: str = "utf-8",
- sync_expires: bool = True,
- max_size: int = 4093,
- samesite: t.Optional[str] = None,
- ) -> str:
- """Create a Set-Cookie header without the ``Set-Cookie`` prefix.
- The return value is usually restricted to ascii as the vast majority
- of values are properly escaped, but that is no guarantee. It's
- tunneled through latin1 as required by :pep:`3333`.
- The return value is not ASCII safe if the key contains unicode
- characters. This is technically against the specification but
- happens in the wild. It's strongly recommended to not use
- non-ASCII values for the keys.
- :param max_age: should be a number of seconds, or `None` (default) if
- the cookie should last only as long as the client's
- browser session. Additionally `timedelta` objects
- are accepted, too.
- :param expires: should be a `datetime` object or unix timestamp.
- :param path: limits the cookie to a given path, per default it will
- span the whole domain.
- :param domain: Use this if you want to set a cross-domain cookie. For
- example, ``domain=".example.com"`` will set a cookie
- that is readable by the domain ``www.example.com``,
- ``foo.example.com`` etc. Otherwise, a cookie will only
- be readable by the domain that set it.
- :param secure: The cookie will only be available via HTTPS
- :param httponly: disallow JavaScript to access the cookie. This is an
- extension to the cookie standard and probably not
- supported by all browsers.
- :param charset: the encoding for string values.
- :param sync_expires: automatically set expires if max_age is defined
- but expires not.
- :param max_size: Warn if the final header value exceeds this size. The
- default, 4093, should be safely `supported by most browsers
- <cookie_>`_. Set to 0 to disable this check.
- :param samesite: Limits the scope of the cookie such that it will
- only be attached to requests if those requests are same-site.
- .. _`cookie`: http://browsercookielimits.squawky.net/
- .. versionchanged:: 1.0.0
- The string ``'None'`` is accepted for ``samesite``.
- """
- key = _to_bytes(key, charset)
- value = _to_bytes(value, charset)
- if path is not None:
- from .urls import iri_to_uri
- path = iri_to_uri(path, charset)
- domain = _make_cookie_domain(domain)
- if isinstance(max_age, timedelta):
- max_age = int(max_age.total_seconds())
- if expires is not None:
- if not isinstance(expires, str):
- expires = http_date(expires)
- elif max_age is not None and sync_expires:
- expires = http_date(datetime.now(tz=timezone.utc).timestamp() + max_age)
- if samesite is not None:
- samesite = samesite.title()
- if samesite not in {"Strict", "Lax", "None"}:
- raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
- buf = [key + b"=" + _cookie_quote(value)]
- # XXX: In theory all of these parameters that are not marked with `None`
- # should be quoted. Because stdlib did not quote it before I did not
- # want to introduce quoting there now.
- for k, v, q in (
- (b"Domain", domain, True),
- (b"Expires", expires, False),
- (b"Max-Age", max_age, False),
- (b"Secure", secure, None),
- (b"HttpOnly", httponly, None),
- (b"Path", path, False),
- (b"SameSite", samesite, False),
- ):
- if q is None:
- if v:
- buf.append(k)
- continue
- if v is None:
- continue
- tmp = bytearray(k)
- if not isinstance(v, (bytes, bytearray)):
- v = _to_bytes(str(v), charset)
- if q:
- v = _cookie_quote(v)
- tmp += b"=" + v
- buf.append(bytes(tmp))
- # The return value will be an incorrectly encoded latin1 header for
- # consistency with the headers object.
- rv = b"; ".join(buf)
- rv = rv.decode("latin1")
- # Warn if the final value of the cookie is larger than the limit. If the
- # cookie is too large, then it may be silently ignored by the browser,
- # which can be quite hard to debug.
- cookie_size = len(rv)
- if max_size and cookie_size > max_size:
- value_size = len(value)
- warnings.warn(
- f"The {key.decode(charset)!r} cookie is too large: the value was"
- f" {value_size} bytes but the"
- f" header required {cookie_size - value_size} extra bytes. The final size"
- f" was {cookie_size} bytes but the limit is {max_size} bytes. Browsers may"
- f" silently ignore cookies larger than this.",
- stacklevel=2,
- )
- return rv
- def is_byte_range_valid(
- start: t.Optional[int], stop: t.Optional[int], length: t.Optional[int]
- ) -> bool:
- """Checks if a given byte content range is valid for the given length.
- .. versionadded:: 0.7
- """
- if (start is None) != (stop is None):
- return False
- elif start is None:
- return length is None or length >= 0
- elif length is None:
- return 0 <= start < stop # type: ignore
- elif start >= stop: # type: ignore
- return False
- return 0 <= start < length
- # circular dependencies
- from . import datastructures as ds
|