1 2from __future__ import absolute_import 3import io, math, os 4 5from bup import _helpers, compat, helpers 6from bup._helpers import cat_bytes 7from bup.compat import buffer, py_maj 8from bup.helpers import sc_page_size 9 10 11_fmincore = getattr(helpers, 'fmincore', None) 12 13BLOB_MAX = 8192*4 # 8192 is the "typical" blob size for bupsplit 14BLOB_READ_SIZE = 1024*1024 15MAX_PER_TREE = 256 16progress_callback = None 17fanout = 16 18 19GIT_MODE_FILE = 0o100644 20GIT_MODE_TREE = 0o40000 21GIT_MODE_SYMLINK = 0o120000 22 23# The purpose of this type of buffer is to avoid copying on peek(), get(), 24# and eat(). We do copy the buffer contents on put(), but that should 25# be ok if we always only put() large amounts of data at a time. 26class Buf: 27 def __init__(self): 28 self.data = b'' 29 self.start = 0 30 31 def put(self, s): 32 if not self.data: 33 self.data = s 34 self.start = 0 35 elif s: 36 remaining = len(self.data) - self.start 37 self.data = cat_bytes(self.data, self.start, remaining, 38 s, 0, len(s)) 39 self.start = 0 40 41 def peek(self, count): 42 if count <= 256: 43 return self.data[self.start : self.start + count] 44 return buffer(self.data, self.start, count) 45 46 def eat(self, count): 47 self.start += count 48 49 def get(self, count): 50 if count <= 256: 51 v = self.data[self.start : self.start + count] 52 else: 53 v = buffer(self.data, self.start, count) 54 self.start += count 55 return v 56 57 def used(self): 58 return len(self.data) - self.start 59 60 61def _fadvise_pages_done(fd, first_page, count): 62 assert(first_page >= 0) 63 assert(count >= 0) 64 if count > 0: 65 _helpers.fadvise_done(fd, 66 first_page * sc_page_size, 67 count * sc_page_size) 68 69 70def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None): 71 """Return (start_page, count) pairs in ascending start_page order for 72 each contiguous region of nonresident pages indicated by the 73 mincore() status_bytes. Limit the number of pages in each region 74 to max_region_len.""" 75 assert(max_region_len is None or max_region_len > 0) 76 start = None 77 for i, x in enumerate(status_bytes): 78 in_core = x & incore_mask 79 if start is None: 80 if not in_core: 81 start = i 82 else: 83 count = i - start 84 if in_core: 85 yield (start, count) 86 start = None 87 elif max_region_len and count >= max_region_len: 88 yield (start, count) 89 start = i 90 if start is not None: 91 yield (start, len(status_bytes) - start) 92 93 94def _uncache_ours_upto(fd, offset, first_region, remaining_regions): 95 """Uncache the pages of fd indicated by first_region and 96 remaining_regions that are before offset, where each region is a 97 (start_page, count) pair. The final region must have a start_page 98 of None.""" 99 rstart, rlen = first_region 100 while rstart is not None and (rstart + rlen) * sc_page_size <= offset: 101 _fadvise_pages_done(fd, rstart, rlen) 102 rstart, rlen = next(remaining_regions, (None, None)) 103 return (rstart, rlen) 104 105 106def readfile_iter(files, progress=None): 107 for filenum,f in enumerate(files): 108 ofs = 0 109 b = '' 110 fd = rpr = rstart = rlen = None 111 if _fmincore and hasattr(f, 'fileno'): 112 try: 113 fd = f.fileno() 114 except io.UnsupportedOperation: 115 pass 116 if fd: 117 mcore = _fmincore(fd) 118 if mcore: 119 max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size) 120 rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE, 121 max_chunk) 122 rstart, rlen = next(rpr, (None, None)) 123 while 1: 124 if progress: 125 progress(filenum, len(b)) 126 b = f.read(BLOB_READ_SIZE) 127 ofs += len(b) 128 if rpr: 129 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr) 130 if not b: 131 break 132 yield b 133 if rpr: 134 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr) 135 136 137def _splitbuf(buf, basebits, fanbits): 138 while 1: 139 b = buf.peek(buf.used()) 140 (ofs, bits) = _helpers.splitbuf(b) 141 if ofs: 142 if ofs > BLOB_MAX: 143 ofs = BLOB_MAX 144 level = 0 145 else: 146 level = (bits-basebits)//fanbits # integer division 147 buf.eat(ofs) 148 yield buffer(b, 0, ofs), level 149 else: 150 break 151 while buf.used() >= BLOB_MAX: 152 # limit max blob size 153 yield buf.get(BLOB_MAX), 0 154 155 156def _hashsplit_iter(files, progress): 157 assert(BLOB_READ_SIZE > BLOB_MAX) 158 basebits = _helpers.blobbits() 159 fanbits = int(math.log(fanout or 128, 2)) 160 buf = Buf() 161 for inblock in readfile_iter(files, progress): 162 buf.put(inblock) 163 for buf_and_level in _splitbuf(buf, basebits, fanbits): 164 yield buf_and_level 165 if buf.used(): 166 yield buf.get(buf.used()), 0 167 168 169def _hashsplit_iter_keep_boundaries(files, progress): 170 for real_filenum,f in enumerate(files): 171 if progress: 172 def prog(filenum, nbytes): 173 # the inner _hashsplit_iter doesn't know the real file count, 174 # so we'll replace it here. 175 return progress(real_filenum, nbytes) 176 else: 177 prog = None 178 for buf_and_level in _hashsplit_iter([f], progress=prog): 179 yield buf_and_level 180 181 182def hashsplit_iter(files, keep_boundaries, progress): 183 if keep_boundaries: 184 return _hashsplit_iter_keep_boundaries(files, progress) 185 else: 186 return _hashsplit_iter(files, progress) 187 188 189total_split = 0 190def split_to_blobs(makeblob, files, keep_boundaries, progress): 191 global total_split 192 for (blob, level) in hashsplit_iter(files, keep_boundaries, progress): 193 sha = makeblob(blob) 194 total_split += len(blob) 195 if progress_callback: 196 progress_callback(len(blob)) 197 yield (sha, len(blob), level) 198 199 200def _make_shalist(l): 201 ofs = 0 202 l = list(l) 203 total = sum(size for mode,sha,size, in l) 204 vlen = len(b'%x' % total) 205 shalist = [] 206 for (mode, sha, size) in l: 207 shalist.append((mode, b'%0*x' % (vlen,ofs), sha)) 208 ofs += size 209 assert(ofs == total) 210 return (shalist, total) 211 212 213def _squish(maketree, stacks, n): 214 i = 0 215 while i < n or len(stacks[i]) >= MAX_PER_TREE: 216 while len(stacks) <= i+1: 217 stacks.append([]) 218 if len(stacks[i]) == 1: 219 stacks[i+1] += stacks[i] 220 elif stacks[i]: 221 (shalist, size) = _make_shalist(stacks[i]) 222 tree = maketree(shalist) 223 stacks[i+1].append((GIT_MODE_TREE, tree, size)) 224 stacks[i] = [] 225 i += 1 226 227 228def split_to_shalist(makeblob, maketree, files, 229 keep_boundaries, progress=None): 230 sl = split_to_blobs(makeblob, files, keep_boundaries, progress) 231 assert(fanout != 0) 232 if not fanout: 233 shal = [] 234 for (sha,size,level) in sl: 235 shal.append((GIT_MODE_FILE, sha, size)) 236 return _make_shalist(shal)[0] 237 else: 238 stacks = [[]] 239 for (sha,size,level) in sl: 240 stacks[0].append((GIT_MODE_FILE, sha, size)) 241 _squish(maketree, stacks, level) 242 #log('stacks: %r\n' % [len(i) for i in stacks]) 243 _squish(maketree, stacks, len(stacks)-1) 244 #log('stacks: %r\n' % [len(i) for i in stacks]) 245 return _make_shalist(stacks[-1])[0] 246 247 248def split_to_blob_or_tree(makeblob, maketree, files, 249 keep_boundaries, progress=None): 250 shalist = list(split_to_shalist(makeblob, maketree, 251 files, keep_boundaries, progress)) 252 if len(shalist) == 1: 253 return (shalist[0][0], shalist[0][2]) 254 elif len(shalist) == 0: 255 return (GIT_MODE_FILE, makeblob(b'')) 256 else: 257 return (GIT_MODE_TREE, maketree(shalist)) 258 259 260def open_noatime(name): 261 fd = _helpers.open_noatime(name) 262 try: 263 return os.fdopen(fd, 'rb', 1024*1024) 264 except: 265 try: 266 os.close(fd) 267 except: 268 pass 269 raise 270