1from io import BytesIO 2 3from ..chunker import Chunker, buzhash, buzhash_update 4from ..constants import * # NOQA 5from . import BaseTestCase 6 7# Note: these tests are part of the self test, do not use or import py.test functionality here. 8# See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT 9 10 11class ChunkerTestCase(BaseTestCase): 12 13 def test_chunkify(self): 14 data = b'0' * int(1.5 * (1 << CHUNK_MAX_EXP)) + b'Y' 15 parts = [bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))] 16 self.assert_equal(len(parts), 2) 17 self.assert_equal(b''.join(parts), data) 18 self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b''))], []) 19 self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fooba', b'rboobaz', b'fooba', b'rboobaz', b'fooba', b'rboobaz']) 20 self.assert_equal([bytes(c) for c in Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fo', b'obarb', b'oob', b'azf', b'oobarb', b'oob', b'azf', b'oobarb', b'oobaz']) 21 self.assert_equal([bytes(c) for c in Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'ar', b'boobazfoob', b'ar', b'boobazfoob', b'ar', b'boobaz']) 22 self.assert_equal([bytes(c) for c in Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3]) 23 self.assert_equal([bytes(c) for c in Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobar', b'boobazfo', b'obar', b'boobazfo', b'obar', b'boobaz']) 24 self.assert_equal([bytes(c) for c in Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'arboobaz', b'foob', b'arboobaz', b'foob', b'arboobaz']) 25 self.assert_equal([bytes(c) for c in Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3]) 26 self.assert_equal([bytes(c) for c in Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarbo', b'obazfoobar', b'boobazfo', b'obarboobaz']) 27 self.assert_equal([bytes(c) for c in Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz', b'foobarboobaz', b'foobarboobaz']) 28 29 def test_buzhash(self): 30 self.assert_equal(buzhash(b'abcdefghijklmnop', 0), 3795437769) 31 self.assert_equal(buzhash(b'abcdefghijklmnop', 1), 3795400502) 32 self.assert_equal(buzhash(b'abcdefghijklmnop', 1), buzhash_update(buzhash(b'Xabcdefghijklmno', 1), ord('X'), ord('p'), 16, 1)) 33 # Test with more than 31 bytes to make sure our barrel_shift macro works correctly 34 self.assert_equal(buzhash(b'abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz', 0), 566521248) 35 36 def test_small_reads(self): 37 class SmallReadFile: 38 input = b'a' * (20 + 1) 39 40 def read(self, nbytes): 41 self.input = self.input[:-1] 42 return self.input[:1] 43 44 reconstructed = b''.join(Chunker(0, *CHUNKER_PARAMS).chunkify(SmallReadFile())) 45 assert reconstructed == b'a' * 20 46