1# =================================================================== 2# 3# Copyright (c) 2014, Legrandin <helderijs@gmail.com> 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions 8# are met: 9# 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in 14# the documentation and/or other materials provided with the 15# distribution. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 20# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 21# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 22# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 23# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 24# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 25# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 27# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 28# POSSIBILITY OF SUCH DAMAGE. 29# =================================================================== 30 31import os 32import re 33import unittest 34import warnings 35from binascii import unhexlify, hexlify 36 37from Crypto.Util.py3compat import tobytes 38from Crypto.Util.strxor import strxor_c 39from Crypto.SelfTest.st_common import list_test_cases 40 41from Crypto.Hash import BLAKE2b, BLAKE2s 42 43 44class Blake2Test(unittest.TestCase): 45 46 def test_new_positive(self): 47 48 h = self.BLAKE2.new(digest_bits=self.max_bits) 49 for new_func in self.BLAKE2.new, h.new: 50 51 for dbits in range(8, self.max_bits + 1, 8): 52 hobj = new_func(digest_bits=dbits) 53 self.assertEqual(hobj.digest_size, dbits // 8) 54 55 for dbytes in range(1, self.max_bytes + 1): 56 hobj = new_func(digest_bytes=dbytes) 57 self.assertEqual(hobj.digest_size, dbytes) 58 59 digest1 = new_func(data=b"\x90", digest_bytes=self.max_bytes).digest() 60 digest2 = new_func(digest_bytes=self.max_bytes).update(b"\x90").digest() 61 self.assertEqual(digest1, digest2) 62 63 new_func(data=b"A", key=b"5", digest_bytes=self.max_bytes) 64 65 hobj = h.new() 66 self.assertEqual(hobj.digest_size, self.max_bytes) 67 68 def test_new_negative(self): 69 70 h = self.BLAKE2.new(digest_bits=self.max_bits) 71 for new_func in self.BLAKE2.new, h.new: 72 self.assertRaises(TypeError, new_func, 73 digest_bytes=self.max_bytes, 74 digest_bits=self.max_bits) 75 self.assertRaises(ValueError, new_func, digest_bytes=0) 76 self.assertRaises(ValueError, new_func, 77 digest_bytes=self.max_bytes + 1) 78 self.assertRaises(ValueError, new_func, digest_bits=7) 79 self.assertRaises(ValueError, new_func, digest_bits=15) 80 self.assertRaises(ValueError, new_func, 81 digest_bits=self.max_bits + 1) 82 self.assertRaises(TypeError, new_func, 83 digest_bytes=self.max_bytes, 84 key=u"string") 85 self.assertRaises(TypeError, new_func, 86 digest_bytes=self.max_bytes, 87 data=u"string") 88 89 def test_default_digest_size(self): 90 digest = self.BLAKE2.new(data=b'abc').digest() 91 self.assertEquals(len(digest), self.max_bytes) 92 93 def test_update(self): 94 pieces = [b"\x0A" * 200, b"\x14" * 300] 95 h = self.BLAKE2.new(digest_bytes=self.max_bytes) 96 h.update(pieces[0]).update(pieces[1]) 97 digest = h.digest() 98 h = self.BLAKE2.new(digest_bytes=self.max_bytes) 99 h.update(pieces[0] + pieces[1]) 100 self.assertEqual(h.digest(), digest) 101 102 def test_update_negative(self): 103 h = self.BLAKE2.new(digest_bytes=self.max_bytes) 104 self.assertRaises(TypeError, h.update, u"string") 105 106 def test_digest(self): 107 h = self.BLAKE2.new(digest_bytes=self.max_bytes) 108 digest = h.digest() 109 110 # hexdigest does not change the state 111 self.assertEqual(h.digest(), digest) 112 # digest returns a byte string 113 self.failUnless(isinstance(digest, type(b"digest"))) 114 115 def test_update_after_digest(self): 116 msg = b"rrrrttt" 117 118 # Normally, update() cannot be done after digest() 119 h = self.BLAKE2.new(digest_bits=256, data=msg[:4]) 120 dig1 = h.digest() 121 self.assertRaises(TypeError, h.update, msg[4:]) 122 dig2 = self.BLAKE2.new(digest_bits=256, data=msg).digest() 123 124 # With the proper flag, it is allowed 125 h = self.BLAKE2.new(digest_bits=256, data=msg[:4], update_after_digest=True) 126 self.assertEquals(h.digest(), dig1) 127 # ... and the subsequent digest applies to the entire message 128 # up to that point 129 h.update(msg[4:]) 130 self.assertEquals(h.digest(), dig2) 131 132 def test_hex_digest(self): 133 mac = self.BLAKE2.new(digest_bits=self.max_bits) 134 digest = mac.digest() 135 hexdigest = mac.hexdigest() 136 137 # hexdigest is equivalent to digest 138 self.assertEqual(hexlify(digest), tobytes(hexdigest)) 139 # hexdigest does not change the state 140 self.assertEqual(mac.hexdigest(), hexdigest) 141 # hexdigest returns a string 142 self.failUnless(isinstance(hexdigest, type("digest"))) 143 144 def test_verify(self): 145 h = self.BLAKE2.new(digest_bytes=self.max_bytes, key=b"4") 146 mac = h.digest() 147 h.verify(mac) 148 wrong_mac = strxor_c(mac, 255) 149 self.assertRaises(ValueError, h.verify, wrong_mac) 150 151 def test_hexverify(self): 152 h = self.BLAKE2.new(digest_bytes=self.max_bytes, key=b"4") 153 mac = h.hexdigest() 154 h.hexverify(mac) 155 self.assertRaises(ValueError, h.hexverify, "4556") 156 157 def test_oid(self): 158 159 prefix = "1.3.6.1.4.1.1722.12.2." + self.oid_variant + "." 160 161 for digest_bits in self.digest_bits_oid: 162 h = self.BLAKE2.new(digest_bits=digest_bits) 163 self.assertEqual(h.oid, prefix + str(digest_bits // 8)) 164 165 h = self.BLAKE2.new(digest_bits=digest_bits, key=b"secret") 166 self.assertRaises(AttributeError, lambda: h.oid) 167 168 for digest_bits in (8, self.max_bits): 169 if digest_bits in self.digest_bits_oid: 170 continue 171 self.assertRaises(AttributeError, lambda: h.oid) 172 173 def test_bytearray(self): 174 175 key = b'0' * 16 176 data = b"\x00\x01\x02" 177 178 # Data and key can be a bytearray (during initialization) 179 key_ba = bytearray(key) 180 data_ba = bytearray(data) 181 182 h1 = self.BLAKE2.new(data=data, key=key) 183 h2 = self.BLAKE2.new(data=data_ba, key=key_ba) 184 key_ba[:1] = b'\xFF' 185 data_ba[:1] = b'\xFF' 186 187 self.assertEqual(h1.digest(), h2.digest()) 188 189 # Data can be a bytearray (during operation) 190 data_ba = bytearray(data) 191 192 h1 = self.BLAKE2.new() 193 h2 = self.BLAKE2.new() 194 h1.update(data) 195 h2.update(data_ba) 196 data_ba[:1] = b'\xFF' 197 198 self.assertEqual(h1.digest(), h2.digest()) 199 200 def test_memoryview(self): 201 202 key = b'0' * 16 203 data = b"\x00\x01\x02" 204 205 def get_mv_ro(data): 206 return memoryview(data) 207 208 def get_mv_rw(data): 209 return memoryview(bytearray(data)) 210 211 for get_mv in (get_mv_ro, get_mv_rw): 212 213 # Data and key can be a memoryview (during initialization) 214 key_mv = get_mv(key) 215 data_mv = get_mv(data) 216 217 h1 = self.BLAKE2.new(data=data, key=key) 218 h2 = self.BLAKE2.new(data=data_mv, key=key_mv) 219 if not data_mv.readonly: 220 data_mv[:1] = b'\xFF' 221 key_mv[:1] = b'\xFF' 222 223 self.assertEqual(h1.digest(), h2.digest()) 224 225 # Data can be a memoryview (during operation) 226 data_mv = get_mv(data) 227 228 h1 = self.BLAKE2.new() 229 h2 = self.BLAKE2.new() 230 h1.update(data) 231 h2.update(data_mv) 232 if not data_mv.readonly: 233 data_mv[:1] = b'\xFF' 234 235 self.assertEqual(h1.digest(), h2.digest()) 236 237 238class Blake2bTest(Blake2Test): 239 #: Module 240 BLAKE2 = BLAKE2b 241 #: Max output size (in bits) 242 max_bits = 512 243 #: Max output size (in bytes) 244 max_bytes = 64 245 #: Bit size of the digests for which an ASN OID exists 246 digest_bits_oid = (160, 256, 384, 512) 247 # http://tools.ietf.org/html/draft-saarinen-blake2-02 248 oid_variant = "1" 249 250 251class Blake2sTest(Blake2Test): 252 #: Module 253 BLAKE2 = BLAKE2s 254 #: Max output size (in bits) 255 max_bits = 256 256 #: Max output size (in bytes) 257 max_bytes = 32 258 #: Bit size of the digests for which an ASN OID exists 259 digest_bits_oid = (128, 160, 224, 256) 260 # http://tools.ietf.org/html/draft-saarinen-blake2-02 261 oid_variant = "2" 262 263 264class Blake2OfficialTestVector(unittest.TestCase): 265 266 def _load_tests(self, test_vector_file): 267 expected = "in" 268 test_vectors = [] 269 with open(test_vector_file, "rt") as test_vector_fd: 270 for line_number, line in enumerate(test_vector_fd): 271 272 if line.strip() == "" or line.startswith("#"): 273 continue 274 275 res = re.match("%s:\t([0-9A-Fa-f]*)" % expected, line) 276 if not res: 277 raise ValueError("Incorrect test vector format (line %d)" 278 % line_number) 279 280 if res.group(1): 281 bin_value = unhexlify(tobytes(res.group(1))) 282 else: 283 bin_value = b"" 284 if expected == "in": 285 input_data = bin_value 286 expected = "key" 287 elif expected == "key": 288 key = bin_value 289 expected = "hash" 290 else: 291 result = bin_value 292 expected = "in" 293 test_vectors.append((input_data, key, result)) 294 return test_vectors 295 296 def setUp(self): 297 298 dir_comps = ("Hash", self.name) 299 file_name = self.name.lower() + "-test.txt" 300 self.description = "%s tests" % self.name 301 302 try: 303 import pycryptodome_test_vectors # type: ignore 304 except ImportError: 305 warnings.warn("Warning: skipping extended tests for %s" % self.name, 306 UserWarning) 307 self.test_vectors = [] 308 return 309 310 init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) 311 full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) 312 self.test_vectors = self._load_tests(full_file_name) 313 314 def runTest(self): 315 for (input_data, key, result) in self.test_vectors: 316 mac = self.BLAKE2.new(key=key, digest_bytes=self.max_bytes) 317 mac.update(input_data) 318 self.assertEqual(mac.digest(), result) 319 320 321class Blake2bOfficialTestVector(Blake2OfficialTestVector): 322 #: Module 323 BLAKE2 = BLAKE2b 324 #: Hash name 325 name = "BLAKE2b" 326 #: Max digest size 327 max_bytes = 64 328 329 330class Blake2sOfficialTestVector(Blake2OfficialTestVector): 331 #: Module 332 BLAKE2 = BLAKE2s 333 #: Hash name 334 name = "BLAKE2s" 335 #: Max digest size 336 max_bytes = 32 337 338 339class Blake2TestVector1(unittest.TestCase): 340 341 def _load_tests(self, test_vector_file): 342 test_vectors = [] 343 with open(test_vector_file, "rt") as test_vector_fd: 344 for line_number, line in enumerate(test_vector_fd): 345 if line.strip() == "" or line.startswith("#"): 346 continue 347 res = re.match("digest: ([0-9A-Fa-f]*)", line) 348 if not res: 349 raise ValueError("Incorrect test vector format (line %d)" 350 % line_number) 351 352 test_vectors.append(unhexlify(tobytes(res.group(1)))) 353 return test_vectors 354 355 def setUp(self): 356 dir_comps = ("Hash", self.name) 357 file_name = "tv1.txt" 358 self.description = "%s tests" % self.name 359 360 try: 361 import pycryptodome_test_vectors 362 except ImportError: 363 warnings.warn("Warning: skipping extended tests for %s" % self.name, 364 UserWarning) 365 self.test_vectors = [] 366 return 367 368 init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) 369 full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) 370 self.test_vectors = self._load_tests(full_file_name) 371 372 def runTest(self): 373 374 for tv in self.test_vectors: 375 digest_bytes = len(tv) 376 next_data = b"" 377 for _ in range(100): 378 h = self.BLAKE2.new(digest_bytes=digest_bytes) 379 h.update(next_data) 380 next_data = h.digest() + next_data 381 self.assertEqual(h.digest(), tv) 382 383 384class Blake2bTestVector1(Blake2TestVector1): 385 #: Module 386 BLAKE2 = BLAKE2b 387 #: Hash name 388 name = "BLAKE2b" 389 390 391class Blake2sTestVector1(Blake2TestVector1): 392 #: Module 393 BLAKE2 = BLAKE2s 394 #: Hash name 395 name = "BLAKE2s" 396 397 398class Blake2TestVector2(unittest.TestCase): 399 400 def _load_tests(self, test_vector_file): 401 test_vectors = [] 402 with open(test_vector_file, "rt") as test_vector_fd: 403 for line_number, line in enumerate(test_vector_fd): 404 if line.strip() == "" or line.startswith("#"): 405 continue 406 res = re.match(r"digest\(([0-9]+)\): ([0-9A-Fa-f]*)", line) 407 if not res: 408 raise ValueError("Incorrect test vector format (line %d)" 409 % line_number) 410 key_size = int(res.group(1)) 411 result = unhexlify(tobytes(res.group(2))) 412 test_vectors.append((key_size, result)) 413 return test_vectors 414 415 def setUp(self): 416 dir_comps = ("Hash", self.name) 417 file_name = "tv2.txt" 418 self.description = "%s tests" % self.name 419 420 try: 421 import pycryptodome_test_vectors # type: ignore 422 except ImportError: 423 warnings.warn("Warning: skipping extended tests for %s" % self.name, 424 UserWarning) 425 self.test_vectors = [] 426 return 427 428 init_dir = os.path.dirname(pycryptodome_test_vectors.__file__) 429 full_file_name = os.path.join(os.path.join(init_dir, *dir_comps), file_name) 430 self.test_vectors = self._load_tests(full_file_name) 431 432 def runTest(self): 433 434 for key_size, result in self.test_vectors: 435 next_data = b"" 436 for _ in range(100): 437 h = self.BLAKE2.new(digest_bytes=self.max_bytes, 438 key=b"A" * key_size) 439 h.update(next_data) 440 next_data = h.digest() + next_data 441 self.assertEqual(h.digest(), result) 442 443 444class Blake2bTestVector2(Blake2TestVector1): 445 #: Module 446 BLAKE2 = BLAKE2b 447 #: Hash name 448 name = "BLAKE2b" 449 #: Max digest size in bytes 450 max_bytes = 64 451 452 453class Blake2sTestVector2(Blake2TestVector1): 454 #: Module 455 BLAKE2 = BLAKE2s 456 #: Hash name 457 name = "BLAKE2s" 458 #: Max digest size in bytes 459 max_bytes = 32 460 461 462def get_tests(config={}): 463 tests = [] 464 465 tests += list_test_cases(Blake2bTest) 466 tests.append(Blake2bOfficialTestVector()) 467 tests.append(Blake2bTestVector1()) 468 tests.append(Blake2bTestVector2()) 469 470 tests += list_test_cases(Blake2sTest) 471 tests.append(Blake2sOfficialTestVector()) 472 tests.append(Blake2sTestVector1()) 473 tests.append(Blake2sTestVector2()) 474 475 return tests 476 477 478if __name__ == '__main__': 479 import unittest 480 def suite(): 481 return unittest.TestSuite(get_tests()) 482 unittest.main(defaultTest='suite') 483