diff --git a/Changelog.rst b/Changelog.rst index 98710eb8..7a0a3694 100644 --- a/Changelog.rst +++ b/Changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +Under Development +++++++++++++++++++++++++++ + +New features +--------------- +* Addde support for TurboSHAKE128 and TurboSHAKE256. + 3.19.1 (28 December 2023) ++++++++++++++++++++++++++ diff --git a/Doc/src/features.rst b/Doc/src/features.rst index 6cd5a9c4..4653dd44 100644 --- a/Doc/src/features.rst +++ b/Doc/src/features.rst @@ -49,7 +49,7 @@ A list of useful resources in that area can be found on `Matthew Green's blog`_. - SHA-2 hashes (224, 256, 384, 512, 512/224, 512/256) - SHA-3 hashes (224, 256, 384, 512) and XOFs (SHAKE128, SHAKE256) - Functions derived from SHA-3 (cSHAKE128, cSHAKE256, TupleHash128, TupleHash256) - - KangarooTwelve (XOF) + - KangarooTwelve, TurboSHAKE128, TurboSHAKE256 (XOF) - Keccak (original submission to SHA-3) - BLAKE2b and BLAKE2s - RIPE-MD160 (legacy) diff --git a/Doc/src/hash/hash.rst b/Doc/src/hash/hash.rst index 63e9da70..71a992f2 100644 --- a/Doc/src/hash/hash.rst +++ b/Doc/src/hash/hash.rst @@ -138,6 +138,7 @@ Instead, it has a ``read(N)`` method to extract the next ``N`` bytes of the outp - :doc:`cshake256` - :doc:`k12` +- :doc:`turboshake` Message Authentication Code (MAC) algorithms -------------------------------------------- diff --git a/Doc/src/hash/turboshake.rst b/Doc/src/hash/turboshake.rst new file mode 100644 index 00000000..ceee9fd6 --- /dev/null +++ b/Doc/src/hash/turboshake.rst @@ -0,0 +1,60 @@ +TurboSHAKE128 and TurboSHAKE256 +=============================== + +TurboSHAKE is a family of *eXtendable-Output Functions* (XOFs) based on the Keccak permutation, +which is also the basis for SHA-3. + +A XOF is a generalization of a cryptographic hash. +The output digest of a XOF can take any length, as required by the caller, +unlike SHA-256 (for instance) that always produces exactly 32 bytes. +The output bits of a XOF do **not** depend on the output length, +which means that the output length +does not even need to be known (or declared) when the XOF is created. + +Therefore, a XOF object has a ``read(N: int)`` method (much like a file object) +instead of a ``digest()`` method. ``read()`` can be called any number of times, +and it will return different bytes each time. + +.. figure:: xof.png + :align: center + :figwidth: 50% + + Generic state diagram for a XOF object + +The TurboSHAKE family is not standardized. However, an RFC_ is being written. +It comprises of two members: + +.. csv-table:: + :header: Name, (2nd) Pre-image strength, Collision strength + + TurboSHAKE128, "128 bits (output >= 16 bytes)", "256 bits (output >= 32 bytes)" + TurboSHAKE256, "256 bits (output >= 32 bytes)", "512 bits (output >= 64 bytes)" + +In addition to hashing, TurboSHAKE allows for domain separation +via a *domain separation byte* (that is, the ``domain`` parameter to :func:`Crypto.Hash.TurboSHAKE128.new` +and to :func:`Crypto.Hash.TurboSHAKE256.new`). + +.. hint:: + + For instance, if you are using TurboSHAKE in two applications, + by picking different domain separation bytes you can ensure + that they will never end up using the same digest in practice. + The important factor is that the strings are different; + the actual value of the domain separation byte is irrelevant. + +In the following example, we extract 26 bytes (208 bits) from the TurboSHAKE128 XOF:: + + >>> from Crypto.Hash import TurboSHAKE128 + >>> + >>> xof = TurboSHAKE128.new() + >>> xof.update(b'Some data') + >>> print(xof.read(26).hex()) + d9dfade4ff8be344749908073916d3abd185ef88f5401024f029 + +.. _RFC: https://datatracker.ietf.org/doc/draft-irtf-cfrg-kangarootwelve/ + +.. automodule:: Crypto.Hash.TurboSHAKE128 + :members: + +.. automodule:: Crypto.Hash.TurboSHAKE256 + :members: diff --git a/README.rst b/README.rst index 0a8fa62b..863c1698 100644 --- a/README.rst +++ b/README.rst @@ -58,7 +58,7 @@ with respect to the last official version of PyCrypto (2.6.1): - KMAC128 and KMAC256 - TupleHash128 and TupleHash256 -* KangarooTwelve XOF (derived from Keccak) +* KangarooTwelve, TurboSHAKE128, and TurboSHAKE256 XOFs * Truncated hash algorithms SHA-512/224 and SHA-512/256 (FIPS 180-4) * BLAKE2b and BLAKE2s hash algorithms * Salsa20 and ChaCha20/XChaCha20 stream ciphers diff --git a/lib/Crypto/Hash/KangarooTwelve.py b/lib/Crypto/Hash/KangarooTwelve.py index f5358d44..5cf68579 100644 --- a/lib/Crypto/Hash/KangarooTwelve.py +++ b/lib/Crypto/Hash/KangarooTwelve.py @@ -28,16 +28,10 @@ # POSSIBILITY OF SUCH DAMAGE. # =================================================================== -from Crypto.Util._raw_api import (VoidPointer, SmartPointer, - create_string_buffer, - get_raw_buffer, c_size_t, - c_uint8_ptr, c_ubyte) - from Crypto.Util.number import long_to_bytes from Crypto.Util.py3compat import bchr -from .keccak import _raw_keccak_lib - +from . import TurboSHAKE128 def _length_encode(x): if x == 0: @@ -70,7 +64,8 @@ def __init__(self, data, custom): self._padding = None # Final padding is only decided in read() # Internal hash that consumes FinalNode - self._hash1 = self._create_keccak() + # The real domain separation byte will be known before squeezing + self._hash1 = TurboSHAKE128.new(domain=1) self._length1 = 0 # Internal hash that produces CV_i (reset each time) @@ -83,42 +78,6 @@ def __init__(self, data, custom): if data: self.update(data) - def _create_keccak(self): - state = VoidPointer() - result = _raw_keccak_lib.keccak_init(state.address_of(), - c_size_t(32), # 32 bytes of capacity (256 bits) - c_ubyte(12)) # Reduced number of rounds - if result: - raise ValueError("Error %d while instantiating KangarooTwelve" - % result) - return SmartPointer(state.get(), _raw_keccak_lib.keccak_destroy) - - def _update(self, data, hash_obj): - result = _raw_keccak_lib.keccak_absorb(hash_obj.get(), - c_uint8_ptr(data), - c_size_t(len(data))) - if result: - raise ValueError("Error %d while updating KangarooTwelve state" - % result) - - def _squeeze(self, hash_obj, length, padding): - bfr = create_string_buffer(length) - result = _raw_keccak_lib.keccak_squeeze(hash_obj.get(), - bfr, - c_size_t(length), - c_ubyte(padding)) - if result: - raise ValueError("Error %d while extracting from KangarooTwelve" - % result) - - return get_raw_buffer(bfr) - - def _reset(self, hash_obj): - result = _raw_keccak_lib.keccak_reset(hash_obj.get()) - if result: - raise ValueError("Error %d while resetting KangarooTwelve state" - % result) - def update(self, data): """Hash the next piece of data. @@ -127,7 +86,7 @@ def update(self, data): Args: data (byte string/byte array/memoryview): The next chunk of the - message to hash. + message to hash. """ if self._state == SQUEEZING: @@ -138,7 +97,7 @@ def update(self, data): if next_length + len(self._custom) <= 8192: self._length1 = next_length - self._update(data, self._hash1) + self._hash1.update(data) return self # Switch to tree hashing @@ -148,7 +107,7 @@ def update(self, data): data_mem = memoryview(data) assert(self._length1 < 8192) dtc = min(len(data), 8192 - self._length1) - self._update(data_mem[:dtc], self._hash1) + self._hash1.update(data_mem[:dtc]) self._length1 += dtc if self._length1 < 8192: @@ -158,10 +117,10 @@ def update(self, data): assert(self._length1 == 8192) divider = b'\x03' + b'\x00' * 7 - self._update(divider, self._hash1) + self._hash1.update(divider) self._length1 += 8 - self._hash2 = self._create_keccak() + self._hash2 = TurboSHAKE128.new(domain=0x0B) self._length2 = 0 self._ctr = 1 @@ -178,15 +137,15 @@ def update(self, data): while index < len_data: new_index = min(index + 8192 - self._length2, len_data) - self._update(data_mem[index:new_index], self._hash2) + self._hash2.update(data_mem[index:new_index]) self._length2 += new_index - index index = new_index if self._length2 == 8192: - cv_i = self._squeeze(self._hash2, 32, 0x0B) - self._update(cv_i, self._hash1) + cv_i = self._hash2.read(32) + self._hash1.update(cv_i) self._length1 += 32 - self._reset(self._hash2) + self._hash2._reset() self._length2 = 0 self._ctr += 1 @@ -210,7 +169,7 @@ def read(self, length): custom_was_consumed = False if self._state == SHORT_MSG: - self._update(self._custom, self._hash1) + self._hash1.update(self._custom) self._padding = 0x07 self._state = SQUEEZING @@ -225,20 +184,21 @@ def read(self, length): # Is there still some leftover data in hash2? if self._length2 > 0: - cv_i = self._squeeze(self._hash2, 32, 0x0B) - self._update(cv_i, self._hash1) + cv_i = self._hash2.read(32) + self._hash1.update(cv_i) self._length1 += 32 - self._reset(self._hash2) + self._hash2._reset() self._length2 = 0 self._ctr += 1 trailer = _length_encode(self._ctr - 1) + b'\xFF\xFF' - self._update(trailer, self._hash1) + self._hash1.update(trailer) self._padding = 0x06 self._state = SQUEEZING - return self._squeeze(self._hash1, length, self._padding) + self._hash1._domain = self._padding + return self._hash1.read(length) def new(self, data=None, custom=b''): return type(self)(data, custom) diff --git a/lib/Crypto/Hash/TupleHash128.py b/lib/Crypto/Hash/TupleHash128.py index 663a643d..a3fa96a1 100644 --- a/lib/Crypto/Hash/TupleHash128.py +++ b/lib/Crypto/Hash/TupleHash128.py @@ -133,7 +133,4 @@ def new(**kwargs): custom = kwargs.pop("custom", b'') - if kwargs: - raise TypeError("Unknown parameters: " + str(kwargs)) - return TupleHash(custom, cSHAKE128, digest_bytes) diff --git a/lib/Crypto/Hash/TupleHash256.py b/lib/Crypto/Hash/TupleHash256.py index 9b4fba08..40a824a9 100644 --- a/lib/Crypto/Hash/TupleHash256.py +++ b/lib/Crypto/Hash/TupleHash256.py @@ -67,7 +67,4 @@ def new(**kwargs): custom = kwargs.pop("custom", b'') - if kwargs: - raise TypeError("Unknown parameters: " + str(kwargs)) - return TupleHash(custom, cSHAKE256, digest_bytes) diff --git a/lib/Crypto/Hash/TurboSHAKE128.py b/lib/Crypto/Hash/TurboSHAKE128.py new file mode 100644 index 00000000..ab3e4e49 --- /dev/null +++ b/lib/Crypto/Hash/TurboSHAKE128.py @@ -0,0 +1,112 @@ +from Crypto.Util._raw_api import (VoidPointer, SmartPointer, + create_string_buffer, + get_raw_buffer, c_size_t, + c_uint8_ptr, c_ubyte) + +from Crypto.Util.number import long_to_bytes +from Crypto.Util.py3compat import bchr + +from .keccak import _raw_keccak_lib + + +class TurboSHAKE(object): + """A TurboSHAKE hash object. + Do not instantiate directly. + Use the :func:`new` function. + """ + + def __init__(self, capacity, domain_separation, data): + + state = VoidPointer() + result = _raw_keccak_lib.keccak_init(state.address_of(), + c_size_t(capacity), + c_ubyte(12)) # Reduced number of rounds + if result: + raise ValueError("Error %d while instantiating TurboSHAKE" + % result) + self._state = SmartPointer(state.get(), _raw_keccak_lib.keccak_destroy) + + self._is_squeezing = False + self._capacity = capacity + self._domain = domain_separation + + if data: + self.update(data) + + + def update(self, data): + """Continue hashing of a message by consuming the next chunk of data. + + Args: + data (byte string/byte array/memoryview): The next chunk of the message being hashed. + """ + + if self._is_squeezing: + raise TypeError("You cannot call 'update' after the first 'read'") + + result = _raw_keccak_lib.keccak_absorb(self._state.get(), + c_uint8_ptr(data), + c_size_t(len(data))) + if result: + raise ValueError("Error %d while updating TurboSHAKE state" + % result) + return self + + def read(self, length): + """ + Compute the next piece of XOF output. + + .. note:: + You cannot use :meth:`update` anymore after the first call to + :meth:`read`. + + Args: + length (integer): the amount of bytes this method must return + + :return: the next piece of XOF output (of the given length) + :rtype: byte string + """ + + self._is_squeezing = True + bfr = create_string_buffer(length) + result = _raw_keccak_lib.keccak_squeeze(self._state.get(), + bfr, + c_size_t(length), + c_ubyte(self._domain)) + if result: + raise ValueError("Error %d while extracting from TurboSHAKE" + % result) + + return get_raw_buffer(bfr) + + def new(self, data=None): + return type(self)(self._capacity, self._domain, data) + + def _reset(self): + result = _raw_keccak_lib.keccak_reset(self._state.get()) + if result: + raise ValueError("Error %d while resetting TurboSHAKE state" + % result) + self._is_squeezing = False + + +def new(**kwargs): + """Create a new TurboSHAKE128 object. + + Args: + domain (integer): + Optional - A domain separation byte, between 0x01 and 0x7F. + The default value is 0x1F. + data (bytes/bytearray/memoryview): + Optional - The very first chunk of the message to hash. + It is equivalent to an early call to :meth:`update`. + + :Return: A :class:`TurboSHAKE` object + """ + + domain_separation = kwargs.get('domain', 0x1F) + if not (0x01 <= domain_separation <= 0x7F): + raise ValueError("Incorrect domain separation value (%d)" % + domain_separation) + data = kwargs.get('data') + return TurboSHAKE(32, domain_separation, data=data) diff --git a/lib/Crypto/Hash/TurboSHAKE128.pyi b/lib/Crypto/Hash/TurboSHAKE128.pyi new file mode 100644 index 00000000..d74c9c08 --- /dev/null +++ b/lib/Crypto/Hash/TurboSHAKE128.pyi @@ -0,0 +1,17 @@ +from typing import Union, Optional +from typing_extensions import TypedDict, Unpack, NotRequired + +Buffer = Union[bytes, bytearray, memoryview] + +class TurboSHAKE(object): + + def __init__(self, capacity: int, domain_separation: int, data: Union[Buffer, None]) -> None: ... + def update(self, data: Buffer) -> TurboSHAKE : ... + def read(self, length: int) -> bytes: ... + def new(self, data: Optional[Buffer]=None) -> TurboSHAKE: ... + +class Args(TypedDict): + domain: NotRequired[int] + data: NotRequired[Buffer] + +def new(**kwargs: Unpack[Args]) -> TurboSHAKE: ... diff --git a/lib/Crypto/Hash/TurboSHAKE256.py b/lib/Crypto/Hash/TurboSHAKE256.py new file mode 100644 index 00000000..ce27a485 --- /dev/null +++ b/lib/Crypto/Hash/TurboSHAKE256.py @@ -0,0 +1,22 @@ +from .TurboSHAKE128 import TurboSHAKE + +def new(**kwargs): + """Create a new TurboSHAKE256 object. + + Args: + domain (integer): + Optional - A domain separation byte, between 0x01 and 0x7F. + The default value is 0x1F. + data (bytes/bytearray/memoryview): + Optional - The very first chunk of the message to hash. + It is equivalent to an early call to :meth:`update`. + + :Return: A :class:`TurboSHAKE` object + """ + + domain_separation = kwargs.get('domain', 0x1F) + if not (0x01 <= domain_separation <= 0x7F): + raise ValueError("Incorrect domain separation value (%d)" % + domain_separation) + data = kwargs.get('data') + return TurboSHAKE(64, domain_separation, data=data) diff --git a/lib/Crypto/Hash/TurboSHAKE256.pyi b/lib/Crypto/Hash/TurboSHAKE256.pyi new file mode 100644 index 00000000..561e946c --- /dev/null +++ b/lib/Crypto/Hash/TurboSHAKE256.pyi @@ -0,0 +1,12 @@ +from typing import Union +from typing_extensions import TypedDict, Unpack, NotRequired + +from .TurboSHAKE128 import TurboSHAKE + +Buffer = Union[bytes, bytearray, memoryview] + +class Args(TypedDict): + domain: NotRequired[int] + data: NotRequired[Buffer] + +def new(**kwargs: Unpack[Args]) -> TurboSHAKE: ... diff --git a/lib/Crypto/Hash/__init__.py b/lib/Crypto/Hash/__init__.py index 4bda0841..16c808d9 100644 --- a/lib/Crypto/Hash/__init__.py +++ b/lib/Crypto/Hash/__init__.py @@ -21,4 +21,5 @@ __all__ = ['HMAC', 'MD2', 'MD4', 'MD5', 'RIPEMD160', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'CMAC', 'Poly1305', 'cSHAKE128', 'cSHAKE256', 'KMAC128', 'KMAC256', - 'TupleHash128', 'TupleHash256', 'KangarooTwelve'] + 'TupleHash128', 'TupleHash256', 'KangarooTwelve', + 'TurboSHAKE128', 'TurboSHAKE256'] diff --git a/lib/Crypto/SelfTest/Hash/__init__.py b/lib/Crypto/SelfTest/Hash/__init__.py index 008a8103..98fcab20 100644 --- a/lib/Crypto/SelfTest/Hash/__init__.py +++ b/lib/Crypto/SelfTest/Hash/__init__.py @@ -51,6 +51,7 @@ def get_tests(config={}): from Crypto.SelfTest.Hash import test_KMAC; tests += test_KMAC.get_tests(config=config) from Crypto.SelfTest.Hash import test_TupleHash; tests += test_TupleHash.get_tests(config=config) from Crypto.SelfTest.Hash import test_KangarooTwelve; tests += test_KangarooTwelve.get_tests(config=config) + from Crypto.SelfTest.Hash import test_TurboSHAKE; tests += test_TurboSHAKE.get_tests(config=config) return tests if __name__ == '__main__': diff --git a/lib/Crypto/SelfTest/Hash/test_TurboSHAKE.py b/lib/Crypto/SelfTest/Hash/test_TurboSHAKE.py new file mode 100644 index 00000000..b37aee1a --- /dev/null +++ b/lib/Crypto/SelfTest/Hash/test_TurboSHAKE.py @@ -0,0 +1,468 @@ +"""Self-test suite for Crypto.Hash.TurboSHAKE128 and TurboSHAKE256""" + +import unittest +from binascii import unhexlify + +from Crypto.SelfTest.st_common import list_test_cases + +from Crypto.Hash import TurboSHAKE128, TurboSHAKE256 +from Crypto.Util.py3compat import bchr + + +class TurboSHAKETest(unittest.TestCase): + + def test_new_positive(self): + + xof1 = self.TurboSHAKE.new() + xof1.update(b'90') + + xof2 = self.TurboSHAKE.new(domain=0x1F) + xof2.update(b'90') + + xof3 = self.TurboSHAKE.new(data=b'90') + + out1 = xof1.read(128) + out2 = xof2.read(128) + out3 = xof3.read(128) + + self.assertEqual(out1, out2) + self.assertEqual(out1, out3) + + def test_new_domain(self): + xof1 = self.TurboSHAKE.new(domain=0x1D) + xof2 = self.TurboSHAKE.new(domain=0x20) + self.assertNotEqual(xof1.read(128), xof2.read(128)) + + def test_update(self): + pieces = [bchr(10) * 200, bchr(20) * 300] + + xof1 = self.TurboSHAKE.new() + xof1.update(pieces[0]).update(pieces[1]) + digest1 = xof1.read(10) + + xof2 = self.TurboSHAKE.new() + xof2.update(pieces[0] + pieces[1]) + digest2 = xof2.read(10) + + self.assertEqual(digest1, digest2) + + def test_update_negative(self): + xof1 = self.TurboSHAKE.new() + self.assertRaises(TypeError, xof1.update, u"string") + + def test_read(self): + xof1 = self.TurboSHAKE.new() + digest = xof1.read(90) + + # read returns a byte string of the right length + self.assertTrue(isinstance(digest, bytes)) + self.assertEqual(len(digest), 90) + + def test_update_after_read(self): + xof1 = self.TurboSHAKE.new() + xof1.update(b"rrrr") + xof1.read(90) + self.assertRaises(TypeError, xof1.update, b"ttt") + + def test_new(self): + xof1 = self.TurboSHAKE.new(domain=0x07) + xof1.update(b'90') + digest1 = xof1.read(100) + + xof2 = xof1.new() + xof2.update(b'90') + digest2 = xof2.read(100) + + self.assertEqual(digest1, digest2) + + self.assertRaises(TypeError, xof1.new, domain=0x07) + + +class TurboSHAKE128Test(TurboSHAKETest): + TurboSHAKE = TurboSHAKE128 + + +class TurboSHAKE256Test(TurboSHAKETest): + TurboSHAKE = TurboSHAKE256 + + +def txt2bin(txt): + clean = txt.replace(" ", "").replace("\n", "").replace("\r", "") + return unhexlify(clean) + + +def ptn(n): + res = bytearray(n) + pattern = b"".join([bchr(x) for x in range(0, 0xFB)]) + for base in range(0, n - 0xFB, 0xFB): + res[base:base + 0xFB] = pattern + remain = n % 0xFB + if remain: + base = (n // 0xFB) * 0xFB + res[base:] = pattern[:remain] + assert len(res) == n + return res + + +def chunked(source, size): + for i in range(0, len(source), size): + yield source[i:i+size] + + +class TurboSHAKE128TV(unittest.TestCase): + + def test_zero_1(self): + tv = """1E 41 5F 1C 59 83 AF F2 16 92 17 27 7D 17 BB 53 + 8C D9 45 A3 97 DD EC 54 1F 1C E4 1A F2 C1 B7 4C""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new().read(32) + self.assertEqual(res, btv) + + def test_zero_2(self): + tv = """1E 41 5F 1C 59 83 AF F2 16 92 17 27 7D 17 BB 53 + 8C D9 45 A3 97 DD EC 54 1F 1C E4 1A F2 C1 B7 4C + 3E 8C CA E2 A4 DA E5 6C 84 A0 4C 23 85 C0 3C 15 + E8 19 3B DF 58 73 73 63 32 16 91 C0 54 62 C8 DF""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new().read(64) + self.assertEqual(res, btv) + + def test_zero_3(self): + tv = """A3 B9 B0 38 59 00 CE 76 1F 22 AE D5 48 E7 54 DA + 10 A5 24 2D 62 E8 C6 58 E3 F3 A9 23 A7 55 56 07""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new().read(10032)[-32:] + self.assertEqual(res, btv) + + def test_ptn_1(self): + tv = """55 CE DD 6F 60 AF 7B B2 9A 40 42 AE 83 2E F3 F5 + 8D B7 29 9F 89 3E BB 92 47 24 7D 85 69 58 DA A9""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=ptn(1)).read(32) + self.assertEqual(res, btv) + + def test_ptn_17(self): + tv = """9C 97 D0 36 A3 BA C8 19 DB 70 ED E0 CA 55 4E C6 + E4 C2 A1 A4 FF BF D9 EC 26 9C A6 A1 11 16 12 33""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=ptn(17)).read(32) + self.assertEqual(res, btv) + + def test_ptn_17_2(self): + tv = """96 C7 7C 27 9E 01 26 F7 FC 07 C9 B0 7F 5C DA E1 + E0 BE 60 BD BE 10 62 00 40 E7 5D 72 23 A6 24 D2""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=ptn(17**2)).read(32) + self.assertEqual(res, btv) + + def test_ptn_17_3(self): + tv = """D4 97 6E B5 6B CF 11 85 20 58 2B 70 9F 73 E1 D6 + 85 3E 00 1F DA F8 0E 1B 13 E0 D0 59 9D 5F B3 72""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=ptn(17**3)).read(32) + self.assertEqual(res, btv) + + def test_ptn_17_4(self): + tv = """DA 67 C7 03 9E 98 BF 53 0C F7 A3 78 30 C6 66 4E + 14 CB AB 7F 54 0F 58 40 3B 1B 82 95 13 18 EE 5C""" + + btv = txt2bin(tv) + data = ptn(17**4) + + # All at once + res = TurboSHAKE128.new(data=data).read(32) + self.assertEqual(res, btv) + + # Byte by byte + xof = TurboSHAKE128.new() + for x in data: + xof.update(bchr(x)) + res = xof.read(32) + self.assertEqual(res, btv) + + # Chunks of various prime sizes + for chunk_size in (13, 17, 19, 23, 31): + xof = TurboSHAKE128.new() + for x in chunked(data, chunk_size): + xof.update(x) + res = xof.read(32) + self.assertEqual(res, btv) + + def test_ptn_17_5(self): + tv = """B9 7A 90 6F BF 83 EF 7C 81 25 17 AB F3 B2 D0 AE + A0 C4 F6 03 18 CE 11 CF 10 39 25 12 7F 59 EE CD""" + + btv = txt2bin(tv) + data = ptn(17**5) + + # All at once + res = TurboSHAKE128.new(data=data).read(32) + self.assertEqual(res, btv) + + # Chunks + xof = TurboSHAKE128.new() + for chunk in chunked(data, 8192): + xof.update(chunk) + res = xof.read(32) + self.assertEqual(res, btv) + + def test_ptn_17_6(self): + tv = """35 CD 49 4A DE DE D2 F2 52 39 AF 09 A7 B8 EF 0C + 4D 1C A4 FE 2D 1A C3 70 FA 63 21 6F E7 B4 C2 B1""" + + btv = txt2bin(tv) + data = ptn(17**6) + + res = TurboSHAKE128.new(data=data).read(32) + self.assertEqual(res, btv) + + def test_ffffff_d01(self): + tv = """BF 32 3F 94 04 94 E8 8E E1 C5 40 FE 66 0B E8 A0 + C9 3F 43 D1 5E C0 06 99 84 62 FA 99 4E ED 5D AB""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b"\xff\xff\xff", domain=0x01).read(32) + self.assertEqual(res, btv) + + def test_ff_d06(self): + tv = """8E C9 C6 64 65 ED 0D 4A 6C 35 D1 35 06 71 8D 68 + 7A 25 CB 05 C7 4C CA 1E 42 50 1A BD 83 87 4A 67""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b'\xFF', domain=0x06).read(32) + self.assertEqual(res, btv) + + def test_ffffff_d07(self): + tv = """B6 58 57 60 01 CA D9 B1 E5 F3 99 A9 F7 77 23 BB + A0 54 58 04 2D 68 20 6F 72 52 68 2D BA 36 63 ED""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b'\xFF' * 3, domain=0x07).read(32) + self.assertEqual(res, btv) + + def test_ffffffffffff_d0b(self): + tv = """8D EE AA 1A EC 47 CC EE 56 9F 65 9C 21 DF A8 E1 + 12 DB 3C EE 37 B1 81 78 B2 AC D8 05 B7 99 CC 37""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b'\xFF' * 7, domain=0x0B).read(32) + self.assertEqual(res, btv) + + def test_ff_d30(self): + tv = """55 31 22 E2 13 5E 36 3C 32 92 BE D2 C6 42 1F A2 + 32 BA B0 3D AA 07 C7 D6 63 66 03 28 65 06 32 5B""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b'\xFF', domain=0x30).read(32) + self.assertEqual(res, btv) + + def test_ffffff_d7f(self): + tv = """16 27 4C C6 56 D4 4C EF D4 22 39 5D 0F 90 53 BD + A6 D2 8E 12 2A BA 15 C7 65 E5 AD 0E 6E AF 26 F9""" + + btv = txt2bin(tv) + res = TurboSHAKE128.new(data=b'\xFF' * 3, domain=0x7F).read(32) + self.assertEqual(res, btv) + + +class TurboSHAKE256TV(unittest.TestCase): + + def test_zero_1(self): + tv = """36 7A 32 9D AF EA 87 1C 78 02 EC 67 F9 05 AE 13 + C5 76 95 DC 2C 66 63 C6 10 35 F5 9A 18 F8 E7 DB + 11 ED C0 E1 2E 91 EA 60 EB 6B 32 DF 06 DD 7F 00 + 2F BA FA BB 6E 13 EC 1C C2 0D 99 55 47 60 0D B0""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new().read(64) + self.assertEqual(res, btv) + + def test_zero_2(self): + tv = """AB EF A1 16 30 C6 61 26 92 49 74 26 85 EC 08 2F + 20 72 65 DC CF 2F 43 53 4E 9C 61 BA 0C 9D 1D 75""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new().read(10032)[-32:] + self.assertEqual(res, btv) + + def test_ptn_1(self): + tv = """3E 17 12 F9 28 F8 EA F1 05 46 32 B2 AA 0A 24 6E + D8 B0 C3 78 72 8F 60 BC 97 04 10 15 5C 28 82 0E + 90 CC 90 D8 A3 00 6A A2 37 2C 5C 5E A1 76 B0 68 + 2B F2 2B AE 74 67 AC 94 F7 4D 43 D3 9B 04 82 E2""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=ptn(1)).read(64) + self.assertEqual(res, btv) + + def test_ptn_17(self): + tv = """B3 BA B0 30 0E 6A 19 1F BE 61 37 93 98 35 92 35 + 78 79 4E A5 48 43 F5 01 10 90 FA 2F 37 80 A9 E5 + CB 22 C5 9D 78 B4 0A 0F BF F9 E6 72 C0 FB E0 97 + 0B D2 C8 45 09 1C 60 44 D6 87 05 4D A5 D8 E9 C7""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=ptn(17)).read(64) + self.assertEqual(res, btv) + + def test_ptn_17_2(self): + tv = """66 B8 10 DB 8E 90 78 04 24 C0 84 73 72 FD C9 57 + 10 88 2F DE 31 C6 DF 75 BE B9 D4 CD 93 05 CF CA + E3 5E 7B 83 E8 B7 E6 EB 4B 78 60 58 80 11 63 16 + FE 2C 07 8A 09 B9 4A D7 B8 21 3C 0A 73 8B 65 C0""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=ptn(17**2)).read(64) + self.assertEqual(res, btv) + + def test_ptn_17_3(self): + tv = """C7 4E BC 91 9A 5B 3B 0D D1 22 81 85 BA 02 D2 9E + F4 42 D6 9D 3D 42 76 A9 3E FE 0B F9 A1 6A 7D C0 + CD 4E AB AD AB 8C D7 A5 ED D9 66 95 F5 D3 60 AB + E0 9E 2C 65 11 A3 EC 39 7D A3 B7 6B 9E 16 74 FB""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=ptn(17**3)).read(64) + self.assertEqual(res, btv) + + def test_ptn_17_4(self): + tv = """02 CC 3A 88 97 E6 F4 F6 CC B6 FD 46 63 1B 1F 52 + 07 B6 6C 6D E9 C7 B5 5B 2D 1A 23 13 4A 17 0A FD + AC 23 4E AB A9 A7 7C FF 88 C1 F0 20 B7 37 24 61 + 8C 56 87 B3 62 C4 30 B2 48 CD 38 64 7F 84 8A 1D""" + + btv = txt2bin(tv) + data = ptn(17**4) + + # All at once + res = TurboSHAKE256.new(data=data).read(64) + self.assertEqual(res, btv) + + # Byte by byte + xof = TurboSHAKE256.new() + for x in data: + xof.update(bchr(x)) + res = xof.read(64) + self.assertEqual(res, btv) + + # Chunks of various prime sizes + for chunk_size in (13, 17, 19, 23, 31): + xof = TurboSHAKE256.new() + for x in chunked(data, chunk_size): + xof.update(x) + res = xof.read(64) + self.assertEqual(res, btv) + + def test_ptn_17_5(self): + tv = """AD D5 3B 06 54 3E 58 4B 58 23 F6 26 99 6A EE 50 + FE 45 ED 15 F2 02 43 A7 16 54 85 AC B4 AA 76 B4 + FF DA 75 CE DF 6D 8C DC 95 C3 32 BD 56 F4 B9 86 + B5 8B B1 7D 17 78 BF C1 B1 A9 75 45 CD F4 EC 9F""" + + btv = txt2bin(tv) + data = ptn(17**5) + + # All at once + res = TurboSHAKE256.new(data=data).read(64) + self.assertEqual(res, btv) + + # Chunks + xof = TurboSHAKE256.new() + for chunk in chunked(data, 8192): + xof.update(chunk) + res = xof.read(64) + self.assertEqual(res, btv) + + def test_ptn_17_6(self): + tv = """9E 11 BC 59 C2 4E 73 99 3C 14 84 EC 66 35 8E F7 + 1D B7 4A EF D8 4E 12 3F 78 00 BA 9C 48 53 E0 2C + FE 70 1D 9E 6B B7 65 A3 04 F0 DC 34 A4 EE 3B A8 + 2C 41 0F 0D A7 0E 86 BF BD 90 EA 87 7C 2D 61 04""" + + btv = txt2bin(tv) + data = ptn(17**6) + + res = TurboSHAKE256.new(data=data).read(64) + self.assertEqual(res, btv) + + def test_ffffff_d01(self): + tv = """D2 1C 6F BB F5 87 FA 22 82 F2 9A EA 62 01 75 FB + 02 57 41 3A F7 8A 0B 1B 2A 87 41 9C E0 31 D9 33 + AE 7A 4D 38 33 27 A8 A1 76 41 A3 4F 8A 1D 10 03 + AD 7D A6 B7 2D BA 84 BB 62 FE F2 8F 62 F1 24 24""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b"\xff\xff\xff", domain=0x01).read(64) + self.assertEqual(res, btv) + + def test_ff_d06(self): + tv = """73 8D 7B 4E 37 D1 8B 7F 22 AD 1B 53 13 E3 57 E3 + DD 7D 07 05 6A 26 A3 03 C4 33 FA 35 33 45 52 80 + F4 F5 A7 D4 F7 00 EF B4 37 FE 6D 28 14 05 E0 7B + E3 2A 0A 97 2E 22 E6 3A DC 1B 09 0D AE FE 00 4B""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b'\xFF', domain=0x06).read(64) + self.assertEqual(res, btv) + + def test_ffffff_d07(self): + tv = """18 B3 B5 B7 06 1C 2E 67 C1 75 3A 00 E6 AD 7E D7 + BA 1C 90 6C F9 3E FB 70 92 EA F2 7F BE EB B7 55 + AE 6E 29 24 93 C1 10 E4 8D 26 00 28 49 2B 8E 09 + B5 50 06 12 B8 F2 57 89 85 DE D5 35 7D 00 EC 67""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b'\xFF' * 3, domain=0x07).read(64) + self.assertEqual(res, btv) + + def test_ffffffffffff_d0b(self): + tv = """BB 36 76 49 51 EC 97 E9 D8 5F 7E E9 A6 7A 77 18 + FC 00 5C F4 25 56 BE 79 CE 12 C0 BD E5 0E 57 36 + D6 63 2B 0D 0D FB 20 2D 1B BB 8F FE 3D D7 4C B0 + 08 34 FA 75 6C B0 34 71 BA B1 3A 1E 2C 16 B3 C0""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b'\xFF' * 7, domain=0x0B).read(64) + self.assertEqual(res, btv) + + def test_ff_d30(self): + tv = """F3 FE 12 87 3D 34 BC BB 2E 60 87 79 D6 B7 0E 7F + 86 BE C7 E9 0B F1 13 CB D4 FD D0 C4 E2 F4 62 5E + 14 8D D7 EE 1A 52 77 6C F7 7F 24 05 14 D9 CC FC + 3B 5D DA B8 EE 25 5E 39 EE 38 90 72 96 2C 11 1A""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b'\xFF', domain=0x30).read(64) + self.assertEqual(res, btv) + + def test_ffffff_d7f(self): + tv = """AB E5 69 C1 F7 7E C3 40 F0 27 05 E7 D3 7C 9A B7 + E1 55 51 6E 4A 6A 15 00 21 D7 0B 6F AC 0B B4 0C + 06 9F 9A 98 28 A0 D5 75 CD 99 F9 BA E4 35 AB 1A + CF 7E D9 11 0B A9 7C E0 38 8D 07 4B AC 76 87 76""" + + btv = txt2bin(tv) + res = TurboSHAKE256.new(data=b'\xFF' * 3, domain=0x7F).read(64) + self.assertEqual(res, btv) + + +def get_tests(config={}): + tests = [] + tests += list_test_cases(TurboSHAKE128Test) + tests += list_test_cases(TurboSHAKE256Test) + tests += list_test_cases(TurboSHAKE128TV) + tests += list_test_cases(TurboSHAKE256TV) + return tests + + +if __name__ == '__main__': + def suite(): + return unittest.TestSuite(get_tests()) + unittest.main(defaultTest='suite') diff --git a/lib/Crypto/__init__.py b/lib/Crypto/__init__.py index ff0d38a8..128a7e11 100644 --- a/lib/Crypto/__init__.py +++ b/lib/Crypto/__init__.py @@ -1,6 +1,6 @@ __all__ = ['Cipher', 'Hash', 'Protocol', 'PublicKey', 'Util', 'Signature', 'IO', 'Math'] -version_info = (3, 19, '1') +version_info = (3, 20, '0b0') __version__ = ".".join([str(x) for x in version_info])