1from io import BytesIO
2
3from ..chunker import Chunker, buzhash, buzhash_update
4from ..constants import *  # NOQA
5from . import BaseTestCase
6
7# Note: these tests are part of the self test, do not use or import py.test functionality here.
8#       See borg.selftest for details. If you add/remove test methods, update SELFTEST_COUNT
9
10
11class ChunkerTestCase(BaseTestCase):
12
13    def test_chunkify(self):
14        data = b'0' * int(1.5 * (1 << CHUNK_MAX_EXP)) + b'Y'
15        parts = [bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(data))]
16        self.assert_equal(len(parts), 2)
17        self.assert_equal(b''.join(parts), data)
18        self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b''))], [])
19        self.assert_equal([bytes(c) for c in Chunker(0, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fooba', b'rboobaz', b'fooba', b'rboobaz', b'fooba', b'rboobaz'])
20        self.assert_equal([bytes(c) for c in Chunker(1, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'fo', b'obarb', b'oob', b'azf', b'oobarb', b'oob', b'azf', b'oobarb', b'oobaz'])
21        self.assert_equal([bytes(c) for c in Chunker(2, 1, CHUNK_MAX_EXP, 2, 2).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'ar', b'boobazfoob', b'ar', b'boobazfoob', b'ar', b'boobaz'])
22        self.assert_equal([bytes(c) for c in Chunker(0, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
23        self.assert_equal([bytes(c) for c in Chunker(1, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobar', b'boobazfo', b'obar', b'boobazfo', b'obar', b'boobaz'])
24        self.assert_equal([bytes(c) for c in Chunker(2, 2, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foob', b'arboobaz', b'foob', b'arboobaz', b'foob', b'arboobaz'])
25        self.assert_equal([bytes(c) for c in Chunker(0, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz' * 3])
26        self.assert_equal([bytes(c) for c in Chunker(1, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarbo', b'obazfoobar', b'boobazfo', b'obarboobaz'])
27        self.assert_equal([bytes(c) for c in Chunker(2, 3, CHUNK_MAX_EXP, 2, 3).chunkify(BytesIO(b'foobarboobaz' * 3))], [b'foobarboobaz', b'foobarboobaz', b'foobarboobaz'])
28
29    def test_buzhash(self):
30        self.assert_equal(buzhash(b'abcdefghijklmnop', 0), 3795437769)
31        self.assert_equal(buzhash(b'abcdefghijklmnop', 1), 3795400502)
32        self.assert_equal(buzhash(b'abcdefghijklmnop', 1), buzhash_update(buzhash(b'Xabcdefghijklmno', 1), ord('X'), ord('p'), 16, 1))
33        # Test with more than 31 bytes to make sure our barrel_shift macro works correctly
34        self.assert_equal(buzhash(b'abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz', 0), 566521248)
35
36    def test_small_reads(self):
37        class SmallReadFile:
38            input = b'a' * (20 + 1)
39
40            def read(self, nbytes):
41                self.input = self.input[:-1]
42                return self.input[:1]
43
44        reconstructed = b''.join(Chunker(0, *CHUNKER_PARAMS).chunkify(SmallReadFile()))
45        assert reconstructed == b'a' * 20
46