1import unittest
2import random
3import os
4from commontest import abs_test_dir, old_test_dir, abs_output_dir
5from rdiff_backup import Globals, Rdiff, rpath
6
7
8def MakeRandomFile(path):
9    """Writes a random file of length between 10000 and 100000"""
10    with open(path, "w") as fp:
11        randseq = []
12        for i in range(random.randrange(5000, 30000)):
13            randseq.append(chr(random.randrange(256)))
14        fp.write("".join(randseq))
15
16
17class RdiffTest(unittest.TestCase):
18    """Test rdiff"""
19    lc = Globals.local_connection
20
21    basis = rpath.RPath(lc, os.path.join(abs_test_dir, b"basis"))
22    new = rpath.RPath(lc, os.path.join(abs_test_dir, b"new"))
23    output = rpath.RPath(lc, abs_output_dir)
24    delta = rpath.RPath(lc, os.path.join(abs_test_dir, b"delta"))
25    signature = rpath.RPath(lc, os.path.join(abs_test_dir, b"signature"))
26
27    def testRdiffSig(self):
28        """Test making rdiff signatures"""
29        sig = rpath.RPath(
30            self.lc,
31            os.path.join(old_test_dir, b"various_file_types",
32                         b"regular_file.sig"))
33        sigfp = sig.open("rb")
34        rfsig = Rdiff.get_signature(
35            rpath.RPath(
36                self.lc,
37                os.path.join(old_test_dir, b"various_file_types",
38                             b"regular_file")), 2048)
39        assert rpath.cmpfileobj(sigfp, rfsig)
40        sigfp.close()
41        rfsig.close()
42
43    def testRdiffDeltaPatch(self):
44        """Test making deltas and patching files"""
45        rplist = [
46            self.basis, self.new, self.delta, self.signature, self.output
47        ]
48        for rp in rplist:
49            if rp.lstat():
50                rp.delete()
51
52        for i in range(2):
53            MakeRandomFile(self.basis.path)
54            MakeRandomFile(self.new.path)
55            list(map(rpath.RPath.setdata, [self.basis, self.new]))
56            assert self.basis.lstat() and self.new.lstat()
57            self.signature.write_from_fileobj(Rdiff.get_signature(self.basis))
58            assert self.signature.lstat()
59            self.delta.write_from_fileobj(
60                Rdiff.get_delta_sigrp(self.signature, self.new))
61            assert self.delta.lstat()
62            Rdiff.patch_local(self.basis, self.delta, self.output)
63            assert rpath.cmp(self.new, self.output)
64            list(map(rpath.RPath.delete, rplist))
65
66    def testRdiffDeltaPatchGzip(self):
67        """Same as above by try gzipping patches"""
68        rplist = [
69            self.basis, self.new, self.delta, self.signature, self.output
70        ]
71        for rp in rplist:
72            if rp.lstat():
73                rp.delete()
74
75        MakeRandomFile(self.basis.path)
76        MakeRandomFile(self.new.path)
77        list(map(rpath.RPath.setdata, [self.basis, self.new]))
78        assert self.basis.lstat() and self.new.lstat()
79        self.signature.write_from_fileobj(Rdiff.get_signature(self.basis))
80        assert self.signature.lstat()
81        self.delta.write_from_fileobj(
82            Rdiff.get_delta_sigrp(self.signature, self.new))
83        assert self.delta.lstat()
84        os.system(b"gzip %s" % self.delta.path)
85        os.system(b"mv %s.gz %s" % (self.delta.path, self.delta.path))
86        self.delta.setdata()
87
88        Rdiff.patch_local(self.basis,
89                          self.delta,
90                          self.output,
91                          delta_compressed=1)
92        assert rpath.cmp(self.new, self.output)
93        list(map(rpath.RPath.delete, rplist))
94
95    def testWriteDelta(self):
96        """Test write delta feature of rdiff"""
97        if self.delta.lstat():
98            self.delta.delete()
99        rplist = [self.basis, self.new, self.delta, self.output]
100        MakeRandomFile(self.basis.path)
101        MakeRandomFile(self.new.path)
102        list(map(rpath.RPath.setdata, [self.basis, self.new]))
103        assert self.basis.lstat() and self.new.lstat()
104
105        Rdiff.write_delta(self.basis, self.new, self.delta)
106        assert self.delta.lstat()
107        Rdiff.patch_local(self.basis, self.delta, self.output)
108        assert rpath.cmp(self.new, self.output)
109        list(map(rpath.RPath.delete, rplist))
110
111    def testWriteDeltaGzip(self):
112        """Same as above but delta is written gzipped"""
113        rplist = [self.basis, self.new, self.delta, self.output]
114        MakeRandomFile(self.basis.path)
115        MakeRandomFile(self.new.path)
116        list(map(rpath.RPath.setdata, [self.basis, self.new]))
117        assert self.basis.lstat() and self.new.lstat()
118        delta_gz = rpath.RPath(self.delta.conn, self.delta.path + b".gz")
119        if delta_gz.lstat():
120            delta_gz.delete()
121
122        Rdiff.write_delta(self.basis, self.new, delta_gz, 1)
123        assert delta_gz.lstat()
124        os.system(b"gunzip %s" % delta_gz.path)
125        delta_gz.setdata()
126        self.delta.setdata()
127        Rdiff.patch_local(self.basis, self.delta, self.output)
128        assert rpath.cmp(self.new, self.output)
129        list(map(rpath.RPath.delete, rplist))
130
131    def testRdiffRename(self):
132        """Rdiff replacing original file with patch outfile"""
133        rplist = [self.basis, self.new, self.delta, self.signature]
134        for rp in rplist:
135            if rp.lstat():
136                rp.delete()
137
138        MakeRandomFile(self.basis.path)
139        MakeRandomFile(self.new.path)
140        list(map(rpath.RPath.setdata, [self.basis, self.new]))
141        assert self.basis.lstat() and self.new.lstat()
142        self.signature.write_from_fileobj(Rdiff.get_signature(self.basis))
143        assert self.signature.lstat()
144        self.delta.write_from_fileobj(
145            Rdiff.get_delta_sigrp(self.signature, self.new))
146        assert self.delta.lstat()
147        Rdiff.patch_local(self.basis, self.delta)
148        assert rpath.cmp(self.basis, self.new)
149        list(map(rpath.RPath.delete, rplist))
150
151    def testCopy(self):
152        """Using rdiff to copy two files"""
153        rplist = [self.basis, self.new]
154        for rp in rplist:
155            if rp.lstat():
156                rp.delete()
157
158        MakeRandomFile(self.basis.path)
159        MakeRandomFile(self.new.path)
160        list(map(rpath.RPath.setdata, rplist))
161        Rdiff.copy_local(self.basis, self.new)
162        assert rpath.cmp(self.basis, self.new)
163        list(map(rpath.RPath.delete, rplist))
164
165
166if __name__ == '__main__':
167    unittest.main()
168