123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- # Copyright 2013-2019 Donald Stufft and individual contributors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import NoReturn, TypeVar
- from nacl import exceptions as exc
- from nacl._sodium import ffi, lib
- from nacl.exceptions import ensure
- crypto_generichash_BYTES: int = lib.crypto_generichash_blake2b_bytes()
- crypto_generichash_BYTES_MIN: int = lib.crypto_generichash_blake2b_bytes_min()
- crypto_generichash_BYTES_MAX: int = lib.crypto_generichash_blake2b_bytes_max()
- crypto_generichash_KEYBYTES: int = lib.crypto_generichash_blake2b_keybytes()
- crypto_generichash_KEYBYTES_MIN: int = (
- lib.crypto_generichash_blake2b_keybytes_min()
- )
- crypto_generichash_KEYBYTES_MAX: int = (
- lib.crypto_generichash_blake2b_keybytes_max()
- )
- crypto_generichash_SALTBYTES: int = lib.crypto_generichash_blake2b_saltbytes()
- crypto_generichash_PERSONALBYTES: int = (
- lib.crypto_generichash_blake2b_personalbytes()
- )
- crypto_generichash_STATEBYTES: int = lib.crypto_generichash_statebytes()
- _OVERLONG = "{0} length greater than {1} bytes"
- _TOOBIG = "{0} greater than {1}"
- def _checkparams(
- digest_size: int, key: bytes, salt: bytes, person: bytes
- ) -> None:
- """Check hash parameters"""
- ensure(
- isinstance(key, bytes),
- "Key must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(salt, bytes),
- "Salt must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(person, bytes),
- "Person must be a bytes sequence",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(digest_size, int),
- "Digest size must be an integer number",
- raising=exc.TypeError,
- )
- ensure(
- digest_size <= crypto_generichash_BYTES_MAX,
- _TOOBIG.format("Digest_size", crypto_generichash_BYTES_MAX),
- raising=exc.ValueError,
- )
- ensure(
- len(key) <= crypto_generichash_KEYBYTES_MAX,
- _OVERLONG.format("Key", crypto_generichash_KEYBYTES_MAX),
- raising=exc.ValueError,
- )
- ensure(
- len(salt) <= crypto_generichash_SALTBYTES,
- _OVERLONG.format("Salt", crypto_generichash_SALTBYTES),
- raising=exc.ValueError,
- )
- ensure(
- len(person) <= crypto_generichash_PERSONALBYTES,
- _OVERLONG.format("Person", crypto_generichash_PERSONALBYTES),
- raising=exc.ValueError,
- )
- def generichash_blake2b_salt_personal(
- data: bytes,
- digest_size: int = crypto_generichash_BYTES,
- key: bytes = b"",
- salt: bytes = b"",
- person: bytes = b"",
- ) -> bytes:
- """One shot hash interface
- :param data: the input data to the hash function
- :type data: bytes
- :param digest_size: must be at most
- :py:data:`.crypto_generichash_BYTES_MAX`;
- the default digest size is
- :py:data:`.crypto_generichash_BYTES`
- :type digest_size: int
- :param key: must be at most
- :py:data:`.crypto_generichash_KEYBYTES_MAX` long
- :type key: bytes
- :param salt: must be at most
- :py:data:`.crypto_generichash_SALTBYTES` long;
- will be zero-padded if needed
- :type salt: bytes
- :param person: must be at most
- :py:data:`.crypto_generichash_PERSONALBYTES` long:
- will be zero-padded if needed
- :type person: bytes
- :return: digest_size long digest
- :rtype: bytes
- """
- _checkparams(digest_size, key, salt, person)
- ensure(
- isinstance(data, bytes),
- "Input data must be a bytes sequence",
- raising=exc.TypeError,
- )
- digest = ffi.new("unsigned char[]", digest_size)
- # both _salt and _personal must be zero-padded to the correct length
- _salt = ffi.new("unsigned char []", crypto_generichash_SALTBYTES)
- _person = ffi.new("unsigned char []", crypto_generichash_PERSONALBYTES)
- ffi.memmove(_salt, salt, len(salt))
- ffi.memmove(_person, person, len(person))
- rc = lib.crypto_generichash_blake2b_salt_personal(
- digest, digest_size, data, len(data), key, len(key), _salt, _person
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- return ffi.buffer(digest, digest_size)[:]
- _Blake2State = TypeVar("_Blake2State", bound="Blake2State")
- class Blake2State:
- """
- Python-level wrapper for the crypto_generichash_blake2b state buffer
- """
- __slots__ = ["_statebuf", "digest_size"]
- def __init__(self, digest_size: int):
- self._statebuf = ffi.new(
- "unsigned char[]", crypto_generichash_STATEBYTES
- )
- self.digest_size = digest_size
- def __reduce__(self) -> NoReturn:
- """
- Raise the same exception as hashlib's blake implementation
- on copy.copy()
- """
- raise TypeError(
- "can't pickle {} objects".format(self.__class__.__name__)
- )
- def copy(self: _Blake2State) -> _Blake2State:
- _st = self.__class__(self.digest_size)
- ffi.memmove(
- _st._statebuf, self._statebuf, crypto_generichash_STATEBYTES
- )
- return _st
- def generichash_blake2b_init(
- key: bytes = b"",
- salt: bytes = b"",
- person: bytes = b"",
- digest_size: int = crypto_generichash_BYTES,
- ) -> Blake2State:
- """
- Create a new initialized blake2b hash state
- :param key: must be at most
- :py:data:`.crypto_generichash_KEYBYTES_MAX` long
- :type key: bytes
- :param salt: must be at most
- :py:data:`.crypto_generichash_SALTBYTES` long;
- will be zero-padded if needed
- :type salt: bytes
- :param person: must be at most
- :py:data:`.crypto_generichash_PERSONALBYTES` long:
- will be zero-padded if needed
- :type person: bytes
- :param digest_size: must be at most
- :py:data:`.crypto_generichash_BYTES_MAX`;
- the default digest size is
- :py:data:`.crypto_generichash_BYTES`
- :type digest_size: int
- :return: a initialized :py:class:`.Blake2State`
- :rtype: object
- """
- _checkparams(digest_size, key, salt, person)
- state = Blake2State(digest_size)
- # both _salt and _personal must be zero-padded to the correct length
- _salt = ffi.new("unsigned char []", crypto_generichash_SALTBYTES)
- _person = ffi.new("unsigned char []", crypto_generichash_PERSONALBYTES)
- ffi.memmove(_salt, salt, len(salt))
- ffi.memmove(_person, person, len(person))
- rc = lib.crypto_generichash_blake2b_init_salt_personal(
- state._statebuf, key, len(key), digest_size, _salt, _person
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- return state
- def generichash_blake2b_update(state: Blake2State, data: bytes) -> None:
- """Update the blake2b hash state
- :param state: a initialized Blake2bState object as returned from
- :py:func:`.crypto_generichash_blake2b_init`
- :type state: :py:class:`.Blake2State`
- :param data:
- :type data: bytes
- """
- ensure(
- isinstance(state, Blake2State),
- "State must be a Blake2State object",
- raising=exc.TypeError,
- )
- ensure(
- isinstance(data, bytes),
- "Input data must be a bytes sequence",
- raising=exc.TypeError,
- )
- rc = lib.crypto_generichash_blake2b_update(
- state._statebuf, data, len(data)
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- def generichash_blake2b_final(state: Blake2State) -> bytes:
- """Finalize the blake2b hash state and return the digest.
- :param state: a initialized Blake2bState object as returned from
- :py:func:`.crypto_generichash_blake2b_init`
- :type state: :py:class:`.Blake2State`
- :return: the blake2 digest of the passed-in data stream
- :rtype: bytes
- """
- ensure(
- isinstance(state, Blake2State),
- "State must be a Blake2State object",
- raising=exc.TypeError,
- )
- _digest = ffi.new("unsigned char[]", crypto_generichash_BYTES_MAX)
- rc = lib.crypto_generichash_blake2b_final(
- state._statebuf, _digest, state.digest_size
- )
- ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
- return ffi.buffer(_digest, state.digest_size)[:]
|