123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- # Copyright 2013-2018 Donald Stufft and individual contributors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import ByteString, Optional, Tuple, cast
- from nacl import exceptions as exc
- from nacl._sodium import ffi, lib
- from nacl.exceptions import ensure
- crypto_secretstream_xchacha20poly1305_ABYTES: int = (
- lib.crypto_secretstream_xchacha20poly1305_abytes()
- )
- crypto_secretstream_xchacha20poly1305_HEADERBYTES: int = (
- lib.crypto_secretstream_xchacha20poly1305_headerbytes()
- )
- crypto_secretstream_xchacha20poly1305_KEYBYTES: int = (
- lib.crypto_secretstream_xchacha20poly1305_keybytes()
- )
- crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX: int = (
- lib.crypto_secretstream_xchacha20poly1305_messagebytes_max()
- )
- crypto_secretstream_xchacha20poly1305_STATEBYTES: int = (
- lib.crypto_secretstream_xchacha20poly1305_statebytes()
- )
- crypto_secretstream_xchacha20poly1305_TAG_MESSAGE: int = (
- lib.crypto_secretstream_xchacha20poly1305_tag_message()
- )
- crypto_secretstream_xchacha20poly1305_TAG_PUSH: int = (
- lib.crypto_secretstream_xchacha20poly1305_tag_push()
- )
- crypto_secretstream_xchacha20poly1305_TAG_REKEY: int = (
- lib.crypto_secretstream_xchacha20poly1305_tag_rekey()
- )
- crypto_secretstream_xchacha20poly1305_TAG_FINAL: int = (
- lib.crypto_secretstream_xchacha20poly1305_tag_final()
- )
- def crypto_secretstream_xchacha20poly1305_keygen() -> bytes:
- """
- Generate a key for use with
- :func:`.crypto_secretstream_xchacha20poly1305_init_push`.
- """
- keybuf = ffi.new(
- "unsigned char[]",
- crypto_secretstream_xchacha20poly1305_KEYBYTES,
- )
- lib.crypto_secretstream_xchacha20poly1305_keygen(keybuf)
- return ffi.buffer(keybuf)[:]
- class crypto_secretstream_xchacha20poly1305_state:
- """
- An object wrapping the crypto_secretstream_xchacha20poly1305 state.
- """
- __slots__ = ["statebuf", "rawbuf", "tagbuf"]
- def __init__(self) -> None:
- """Initialize a clean state object."""
- self.statebuf: ByteString = ffi.new(
- "unsigned char[]",
- crypto_secretstream_xchacha20poly1305_STATEBYTES,
- )
- self.rawbuf: Optional[ByteString] = None
- self.tagbuf: Optional[ByteString] = None
- def crypto_secretstream_xchacha20poly1305_init_push(
- state: crypto_secretstream_xchacha20poly1305_state, key: bytes
- ) -> bytes:
- """
- Initialize a crypto_secretstream_xchacha20poly1305 encryption buffer.
- :param state: a secretstream state object
- :type state: crypto_secretstream_xchacha20poly1305_state
- :param key: must be
- :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
- :type key: bytes
- :return: header
- :rtype: bytes
- """
- ensure(
- isinstance(state, crypto_secretstream_xchacha20poly1305_state),
- "State must be a crypto_secretstream_xchacha20poly1305_state object",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(key, bytes),
- "Key must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
- "Invalid key length",
- raising=exc.ValueError,
- )
- headerbuf = ffi.new(
- "unsigned char []",
- crypto_secretstream_xchacha20poly1305_HEADERBYTES,
- )
- rc = lib.crypto_secretstream_xchacha20poly1305_init_push(
- state.statebuf, headerbuf, key
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- return ffi.buffer(headerbuf)[:]
- def crypto_secretstream_xchacha20poly1305_push(
- state: crypto_secretstream_xchacha20poly1305_state,
- m: bytes,
- ad: Optional[bytes] = None,
- tag: int = crypto_secretstream_xchacha20poly1305_TAG_MESSAGE,
- ) -> bytes:
- """
- Add an encrypted message to the secret stream.
- :param state: a secretstream state object
- :type state: crypto_secretstream_xchacha20poly1305_state
- :param m: the message to encrypt, the maximum length of an individual
- message is
- :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX`.
- :type m: bytes
- :param ad: additional data to include in the authentication tag
- :type ad: bytes or None
- :param tag: the message tag, usually
- :data:`.crypto_secretstream_xchacha20poly1305_TAG_MESSAGE` or
- :data:`.crypto_secretstream_xchacha20poly1305_TAG_FINAL`.
- :type tag: int
- :return: ciphertext
- :rtype: bytes
- """
- ensure(
- isinstance(state, crypto_secretstream_xchacha20poly1305_state),
- "State must be a crypto_secretstream_xchacha20poly1305_state object",
- raising=exc.TypeError,
- )
- ensure(isinstance(m, bytes), "Message is not bytes", raising=exc.TypeError)
- ensure(
- len(m) <= crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX,
- "Message is too long",
- raising=exc.ValueError,
- )
- ensure(
- ad is None or isinstance(ad, bytes),
- "Additional data must be bytes or None",
- raising=exc.TypeError,
- )
- clen = len(m) + crypto_secretstream_xchacha20poly1305_ABYTES
- if state.rawbuf is None or len(state.rawbuf) < clen:
- state.rawbuf = ffi.new("unsigned char[]", clen)
- if ad is None:
- ad = ffi.NULL
- adlen = 0
- else:
- adlen = len(ad)
- rc = lib.crypto_secretstream_xchacha20poly1305_push(
- state.statebuf,
- state.rawbuf,
- ffi.NULL,
- m,
- len(m),
- ad,
- adlen,
- tag,
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- return ffi.buffer(state.rawbuf, clen)[:]
- def crypto_secretstream_xchacha20poly1305_init_pull(
- state: crypto_secretstream_xchacha20poly1305_state,
- header: bytes,
- key: bytes,
- ) -> None:
- """
- Initialize a crypto_secretstream_xchacha20poly1305 decryption buffer.
- :param state: a secretstream state object
- :type state: crypto_secretstream_xchacha20poly1305_state
- :param header: must be
- :data:`.crypto_secretstream_xchacha20poly1305_HEADERBYTES` long
- :type header: bytes
- :param key: must be
- :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
- :type key: bytes
- """
- ensure(
- isinstance(state, crypto_secretstream_xchacha20poly1305_state),
- "State must be a crypto_secretstream_xchacha20poly1305_state object",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(header, bytes),
- "Header must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- len(header) == crypto_secretstream_xchacha20poly1305_HEADERBYTES,
- "Invalid header length",
- raising=exc.ValueError,
- )
- ensure(
- isinstance(key, bytes),
- "Key must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
- "Invalid key length",
- raising=exc.ValueError,
- )
- if state.tagbuf is None:
- state.tagbuf = ffi.new("unsigned char *")
- rc = lib.crypto_secretstream_xchacha20poly1305_init_pull(
- state.statebuf, header, key
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- def crypto_secretstream_xchacha20poly1305_pull(
- state: crypto_secretstream_xchacha20poly1305_state,
- c: bytes,
- ad: Optional[bytes] = None,
- ) -> Tuple[bytes, int]:
- """
- Read a decrypted message from the secret stream.
- :param state: a secretstream state object
- :type state: crypto_secretstream_xchacha20poly1305_state
- :param c: the ciphertext to decrypt, the maximum length of an individual
- ciphertext is
- :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX` +
- :data:`.crypto_secretstream_xchacha20poly1305_ABYTES`.
- :type c: bytes
- :param ad: additional data to include in the authentication tag
- :type ad: bytes or None
- :return: (message, tag)
- :rtype: (bytes, int)
- """
- ensure(
- isinstance(state, crypto_secretstream_xchacha20poly1305_state),
- "State must be a crypto_secretstream_xchacha20poly1305_state object",
- raising=exc.TypeError,
- )
- ensure(
- state.tagbuf is not None,
- (
- "State must be initialized using "
- "crypto_secretstream_xchacha20poly1305_init_pull"
- ),
- raising=exc.ValueError,
- )
- ensure(
- isinstance(c, bytes),
- "Ciphertext is not bytes",
- raising=exc.TypeError,
- )
- ensure(
- len(c) >= crypto_secretstream_xchacha20poly1305_ABYTES,
- "Ciphertext is too short",
- raising=exc.ValueError,
- )
- ensure(
- len(c)
- <= (
- crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX
- + crypto_secretstream_xchacha20poly1305_ABYTES
- ),
- "Ciphertext is too long",
- raising=exc.ValueError,
- )
- ensure(
- ad is None or isinstance(ad, bytes),
- "Additional data must be bytes or None",
- raising=exc.TypeError,
- )
- mlen = len(c) - crypto_secretstream_xchacha20poly1305_ABYTES
- if state.rawbuf is None or len(state.rawbuf) < mlen:
- state.rawbuf = ffi.new("unsigned char[]", mlen)
- if ad is None:
- ad = ffi.NULL
- adlen = 0
- else:
- adlen = len(ad)
- rc = lib.crypto_secretstream_xchacha20poly1305_pull(
- state.statebuf,
- state.rawbuf,
- ffi.NULL,
- state.tagbuf,
- c,
- len(c),
- ad,
- adlen,
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- # Cast safety: we `ensure` above that `state.tagbuf is not None`.
- return (
- ffi.buffer(state.rawbuf, mlen)[:],
- int(cast(bytes, state.tagbuf)[0]),
- )
- def crypto_secretstream_xchacha20poly1305_rekey(
- state: crypto_secretstream_xchacha20poly1305_state,
- ) -> None:
- """
- Explicitly change the encryption key in the stream.
- Normally the stream is re-keyed as needed or an explicit ``tag`` of
- :data:`.crypto_secretstream_xchacha20poly1305_TAG_REKEY` is added to a
- message to ensure forward secrecy, but this method can be used instead
- if the re-keying is controlled without adding the tag.
- :param state: a secretstream state object
- :type state: crypto_secretstream_xchacha20poly1305_state
- """
- ensure(
- isinstance(state, crypto_secretstream_xchacha20poly1305_state),
- "State must be a crypto_secretstream_xchacha20poly1305_state object",
- raising=exc.TypeError,
- )
- lib.crypto_secretstream_xchacha20poly1305_rekey(state.statebuf)
|