1import unittest
2import random
3import subprocess
4import os
5from commontest import abs_test_dir
6from rdiff_backup import Globals, librsync, rpath
7
8
9def MakeRandomFile(path, length=None):
10    """Writes a random file of given length, or random len if unspecified"""
11    if not length:
12        length = random.randrange(5000, 100000)
13    fp = open(path, "wb")
14    fp_random = open('/dev/urandom', 'rb')
15
16    fp.write(fp_random.read(length))
17
18    fp.close()
19    fp_random.close()
20
21
22class LibrsyncTest(unittest.TestCase):
23    """Test various librsync wrapper functions"""
24    basis = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"basis"))
25    new = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"new"))
26    new2 = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"new2"))
27    sig = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"signature"))
28    sig2 = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"signature2"))
29    delta = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"delta"))
30
31    def sig_file_test_helper(self, blocksize, iterations, file_len=None):
32        """Compare SigFile output to rdiff output at given blocksize"""
33        for i in range(iterations):
34            MakeRandomFile(self.basis.path, file_len)
35            self._clean_file(self.sig)
36            rdiff_help_text = subprocess.check_output(["rdiff", "--help"])
37            if b'-R' in rdiff_help_text:
38                assert not os.system(
39                    b"rdiff -b %i -R rollsum -S 8 -H md4 signature %b %b" %
40                    (blocksize, self.basis.path, self.sig.path))
41            elif b'-H' in rdiff_help_text:
42                assert not os.system(
43                    b"rdiff -b %i -H md4 signature %b %b" %
44                    (blocksize, self.basis.path, self.sig.path))
45            else:
46                assert not os.system(
47                    b"rdiff -b %i signature %b %b" %
48                    (blocksize, self.basis.path, self.sig.path))
49            with self.sig.open("rb") as fp:
50                rdiff_sig = fp.read()
51
52            sf = librsync.SigFile(self.basis.open("rb"), blocksize)
53            librsync_sig = sf.read()
54            sf.close()
55
56            assert rdiff_sig == librsync_sig, \
57                (len(rdiff_sig), len(librsync_sig))
58
59    def _clean_file(self, rp):
60        """Make sure the given rpath is properly cleaned"""
61        rp.setdata()
62        if rp.lstat():
63            rp.delete()
64
65    def testSigFile(self):
66        """Make sure SigFile generates same data as rdiff, blocksize 512"""
67        self.sig_file_test_helper(512, 5)
68
69    def testSigFile2(self):
70        """Test SigFile like above, but try various blocksize"""
71        self.sig_file_test_helper(2048, 1, 60000)
72        self.sig_file_test_helper(7168, 1, 6000)
73        self.sig_file_test_helper(204800, 1, 40 * 1024 * 1024)
74
75    def testSigGenerator(self):
76        """Test SigGenerator, make sure it's same as SigFile"""
77        for i in range(5):
78            MakeRandomFile(self.basis.path)
79
80            sf = librsync.SigFile(self.basis.open("rb"))
81            sigfile_string = sf.read()
82            sf.close()
83
84            sig_gen = librsync.SigGenerator()
85            with self.basis.open("rb") as infile:
86                while 1:
87                    buf = infile.read(1000)
88                    if not buf:
89                        break
90                    sig_gen.update(buf)
91                siggen_string = sig_gen.getsig()
92
93            assert sigfile_string == siggen_string, \
94                (len(sigfile_string), len(siggen_string))
95
96    def OldtestDelta(self):
97        """Test delta generation against Rdiff"""
98        MakeRandomFile(self.basis.path)
99        assert not os.system(b"rdiff signature %s %s" %
100                             (self.basis.path, self.sig.path))
101        for i in range(5):
102            MakeRandomFile(self.new.path)
103            assert not os.system(
104                b"rdiff delta %b %b %b" %
105                (self.sig.path, self.new.path, self.delta.path))
106            fp = self.delta.open("rb")
107            rdiff_delta = fp.read()
108            fp.close()
109
110            df = librsync.DeltaFile(self.sig.open("rb"), self.new.open("rb"))
111            librsync_delta = df.read()
112            df.close()
113
114            print(len(rdiff_delta), len(librsync_delta))
115            print(repr(rdiff_delta[:100]))
116            print(repr(librsync_delta[:100]))
117            assert rdiff_delta == librsync_delta
118
119    def testDelta(self):
120        """Test delta generation by making sure rdiff can process output
121
122        There appears to be some indeterminism so we can't just
123        byte-compare the deltas produced by rdiff and DeltaFile.
124
125        """
126        MakeRandomFile(self.basis.path)
127        self._clean_file(self.sig)
128        assert not os.system(b"rdiff signature %s %s" %
129                             (self.basis.path, self.sig.path))
130        for i in range(5):
131            MakeRandomFile(self.new.path)
132            df = librsync.DeltaFile(self.sig.open("rb"), self.new.open("rb"))
133            librsync_delta = df.read()
134            df.close()
135            fp = self.delta.open("wb")
136            fp.write(librsync_delta)
137            fp.close()
138
139            self._clean_file(self.new2)
140            assert not os.system(
141                b"rdiff patch %s %s %s" %
142                (self.basis.path, self.delta.path, self.new2.path))
143            new_fp = self.new.open("rb")
144            new = new_fp.read()
145            new_fp.close()
146
147            new2_fp = self.new2.open("rb")
148            new2 = new2_fp.read()
149            new2_fp.close()
150
151            assert new == new2, (len(new), len(new2))
152
153    def testPatch(self):
154        """Test patching against Rdiff"""
155        MakeRandomFile(self.basis.path)
156        self._clean_file(self.sig)
157        assert not os.system(b"rdiff signature %s %s" %
158                             (self.basis.path, self.sig.path))
159        for i in range(5):
160            MakeRandomFile(self.new.path)
161            self._clean_file(self.delta)
162            assert not os.system(
163                b"rdiff delta %s %s %s" %
164                (self.sig.path, self.new.path, self.delta.path))
165            fp = self.new.open("rb")
166            real_new = fp.read()
167            fp.close()
168
169            pf = librsync.PatchedFile(self.basis.open("rb"),
170                                      self.delta.open("rb"))
171            librsync_new = pf.read()
172            pf.close()
173
174            assert real_new == librsync_new, \
175                (len(real_new), len(librsync_new))
176
177
178if __name__ == "__main__":
179    unittest.main()
180