1import unittest 2import random 3import subprocess 4import os 5from commontest import abs_test_dir 6from rdiff_backup import Globals, librsync, rpath 7 8 9def MakeRandomFile(path, length=None): 10 """Writes a random file of given length, or random len if unspecified""" 11 if not length: 12 length = random.randrange(5000, 100000) 13 fp = open(path, "wb") 14 fp_random = open('/dev/urandom', 'rb') 15 16 fp.write(fp_random.read(length)) 17 18 fp.close() 19 fp_random.close() 20 21 22class LibrsyncTest(unittest.TestCase): 23 """Test various librsync wrapper functions""" 24 basis = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"basis")) 25 new = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"new")) 26 new2 = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"new2")) 27 sig = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"signature")) 28 sig2 = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"signature2")) 29 delta = rpath.RPath(Globals.local_connection, os.path.join(abs_test_dir, b"delta")) 30 31 def sig_file_test_helper(self, blocksize, iterations, file_len=None): 32 """Compare SigFile output to rdiff output at given blocksize""" 33 for i in range(iterations): 34 MakeRandomFile(self.basis.path, file_len) 35 self._clean_file(self.sig) 36 rdiff_help_text = subprocess.check_output(["rdiff", "--help"]) 37 if b'-R' in rdiff_help_text: 38 assert not os.system( 39 b"rdiff -b %i -R rollsum -S 8 -H md4 signature %b %b" % 40 (blocksize, self.basis.path, self.sig.path)) 41 elif b'-H' in rdiff_help_text: 42 assert not os.system( 43 b"rdiff -b %i -H md4 signature %b %b" % 44 (blocksize, self.basis.path, self.sig.path)) 45 else: 46 assert not os.system( 47 b"rdiff -b %i signature %b %b" % 48 (blocksize, self.basis.path, self.sig.path)) 49 with self.sig.open("rb") as fp: 50 rdiff_sig = fp.read() 51 52 sf = librsync.SigFile(self.basis.open("rb"), blocksize) 53 librsync_sig = sf.read() 54 sf.close() 55 56 assert rdiff_sig == librsync_sig, \ 57 (len(rdiff_sig), len(librsync_sig)) 58 59 def _clean_file(self, rp): 60 """Make sure the given rpath is properly cleaned""" 61 rp.setdata() 62 if rp.lstat(): 63 rp.delete() 64 65 def testSigFile(self): 66 """Make sure SigFile generates same data as rdiff, blocksize 512""" 67 self.sig_file_test_helper(512, 5) 68 69 def testSigFile2(self): 70 """Test SigFile like above, but try various blocksize""" 71 self.sig_file_test_helper(2048, 1, 60000) 72 self.sig_file_test_helper(7168, 1, 6000) 73 self.sig_file_test_helper(204800, 1, 40 * 1024 * 1024) 74 75 def testSigGenerator(self): 76 """Test SigGenerator, make sure it's same as SigFile""" 77 for i in range(5): 78 MakeRandomFile(self.basis.path) 79 80 sf = librsync.SigFile(self.basis.open("rb")) 81 sigfile_string = sf.read() 82 sf.close() 83 84 sig_gen = librsync.SigGenerator() 85 with self.basis.open("rb") as infile: 86 while 1: 87 buf = infile.read(1000) 88 if not buf: 89 break 90 sig_gen.update(buf) 91 siggen_string = sig_gen.getsig() 92 93 assert sigfile_string == siggen_string, \ 94 (len(sigfile_string), len(siggen_string)) 95 96 def OldtestDelta(self): 97 """Test delta generation against Rdiff""" 98 MakeRandomFile(self.basis.path) 99 assert not os.system(b"rdiff signature %s %s" % 100 (self.basis.path, self.sig.path)) 101 for i in range(5): 102 MakeRandomFile(self.new.path) 103 assert not os.system( 104 b"rdiff delta %b %b %b" % 105 (self.sig.path, self.new.path, self.delta.path)) 106 fp = self.delta.open("rb") 107 rdiff_delta = fp.read() 108 fp.close() 109 110 df = librsync.DeltaFile(self.sig.open("rb"), self.new.open("rb")) 111 librsync_delta = df.read() 112 df.close() 113 114 print(len(rdiff_delta), len(librsync_delta)) 115 print(repr(rdiff_delta[:100])) 116 print(repr(librsync_delta[:100])) 117 assert rdiff_delta == librsync_delta 118 119 def testDelta(self): 120 """Test delta generation by making sure rdiff can process output 121 122 There appears to be some indeterminism so we can't just 123 byte-compare the deltas produced by rdiff and DeltaFile. 124 125 """ 126 MakeRandomFile(self.basis.path) 127 self._clean_file(self.sig) 128 assert not os.system(b"rdiff signature %s %s" % 129 (self.basis.path, self.sig.path)) 130 for i in range(5): 131 MakeRandomFile(self.new.path) 132 df = librsync.DeltaFile(self.sig.open("rb"), self.new.open("rb")) 133 librsync_delta = df.read() 134 df.close() 135 fp = self.delta.open("wb") 136 fp.write(librsync_delta) 137 fp.close() 138 139 self._clean_file(self.new2) 140 assert not os.system( 141 b"rdiff patch %s %s %s" % 142 (self.basis.path, self.delta.path, self.new2.path)) 143 new_fp = self.new.open("rb") 144 new = new_fp.read() 145 new_fp.close() 146 147 new2_fp = self.new2.open("rb") 148 new2 = new2_fp.read() 149 new2_fp.close() 150 151 assert new == new2, (len(new), len(new2)) 152 153 def testPatch(self): 154 """Test patching against Rdiff""" 155 MakeRandomFile(self.basis.path) 156 self._clean_file(self.sig) 157 assert not os.system(b"rdiff signature %s %s" % 158 (self.basis.path, self.sig.path)) 159 for i in range(5): 160 MakeRandomFile(self.new.path) 161 self._clean_file(self.delta) 162 assert not os.system( 163 b"rdiff delta %s %s %s" % 164 (self.sig.path, self.new.path, self.delta.path)) 165 fp = self.new.open("rb") 166 real_new = fp.read() 167 fp.close() 168 169 pf = librsync.PatchedFile(self.basis.open("rb"), 170 self.delta.open("rb")) 171 librsync_new = pf.read() 172 pf.close() 173 174 assert real_new == librsync_new, \ 175 (len(real_new), len(librsync_new)) 176 177 178if __name__ == "__main__": 179 unittest.main() 180