1
2from __future__ import absolute_import
3import io, math, os
4
5from bup import _helpers, compat, helpers
6from bup._helpers import cat_bytes
7from bup.compat import buffer, py_maj
8from bup.helpers import sc_page_size
9
10
11_fmincore = getattr(helpers, 'fmincore', None)
12
13BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
14BLOB_READ_SIZE = 1024*1024
15MAX_PER_TREE = 256
16progress_callback = None
17fanout = 16
18
19GIT_MODE_FILE = 0o100644
20GIT_MODE_TREE = 0o40000
21GIT_MODE_SYMLINK = 0o120000
22
23# The purpose of this type of buffer is to avoid copying on peek(), get(),
24# and eat().  We do copy the buffer contents on put(), but that should
25# be ok if we always only put() large amounts of data at a time.
26class Buf:
27    def __init__(self):
28        self.data = b''
29        self.start = 0
30
31    def put(self, s):
32        if not self.data:
33            self.data = s
34            self.start = 0
35        elif s:
36            remaining = len(self.data) - self.start
37            self.data = cat_bytes(self.data, self.start, remaining,
38                                  s, 0, len(s))
39            self.start = 0
40
41    def peek(self, count):
42        if count <= 256:
43            return self.data[self.start : self.start + count]
44        return buffer(self.data, self.start, count)
45
46    def eat(self, count):
47        self.start += count
48
49    def get(self, count):
50        if count <= 256:
51            v = self.data[self.start : self.start + count]
52        else:
53            v = buffer(self.data, self.start, count)
54        self.start += count
55        return v
56
57    def used(self):
58        return len(self.data) - self.start
59
60
61def _fadvise_pages_done(fd, first_page, count):
62    assert(first_page >= 0)
63    assert(count >= 0)
64    if count > 0:
65        _helpers.fadvise_done(fd,
66                              first_page * sc_page_size,
67                              count * sc_page_size)
68
69
70def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
71    """Return (start_page, count) pairs in ascending start_page order for
72    each contiguous region of nonresident pages indicated by the
73    mincore() status_bytes.  Limit the number of pages in each region
74    to max_region_len."""
75    assert(max_region_len is None or max_region_len > 0)
76    start = None
77    for i, x in enumerate(status_bytes):
78        in_core = x & incore_mask
79        if start is None:
80            if not in_core:
81                start = i
82        else:
83            count = i - start
84            if in_core:
85                yield (start, count)
86                start = None
87            elif max_region_len and count >= max_region_len:
88                yield (start, count)
89                start = i
90    if start is not None:
91        yield (start, len(status_bytes) - start)
92
93
94def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
95    """Uncache the pages of fd indicated by first_region and
96    remaining_regions that are before offset, where each region is a
97    (start_page, count) pair.  The final region must have a start_page
98    of None."""
99    rstart, rlen = first_region
100    while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
101        _fadvise_pages_done(fd, rstart, rlen)
102        rstart, rlen = next(remaining_regions, (None, None))
103    return (rstart, rlen)
104
105
106def readfile_iter(files, progress=None):
107    for filenum,f in enumerate(files):
108        ofs = 0
109        b = ''
110        fd = rpr = rstart = rlen = None
111        if _fmincore and hasattr(f, 'fileno'):
112            try:
113                fd = f.fileno()
114            except io.UnsupportedOperation:
115                pass
116            if fd:
117                mcore = _fmincore(fd)
118                if mcore:
119                    max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
120                    rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
121                                                    max_chunk)
122                    rstart, rlen = next(rpr, (None, None))
123        while 1:
124            if progress:
125                progress(filenum, len(b))
126            b = f.read(BLOB_READ_SIZE)
127            ofs += len(b)
128            if rpr:
129                rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
130            if not b:
131                break
132            yield b
133        if rpr:
134            rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
135
136
137def _splitbuf(buf, basebits, fanbits):
138    while 1:
139        b = buf.peek(buf.used())
140        (ofs, bits) = _helpers.splitbuf(b)
141        if ofs:
142            if ofs > BLOB_MAX:
143                ofs = BLOB_MAX
144                level = 0
145            else:
146                level = (bits-basebits)//fanbits  # integer division
147            buf.eat(ofs)
148            yield buffer(b, 0, ofs), level
149        else:
150            break
151    while buf.used() >= BLOB_MAX:
152        # limit max blob size
153        yield buf.get(BLOB_MAX), 0
154
155
156def _hashsplit_iter(files, progress):
157    assert(BLOB_READ_SIZE > BLOB_MAX)
158    basebits = _helpers.blobbits()
159    fanbits = int(math.log(fanout or 128, 2))
160    buf = Buf()
161    for inblock in readfile_iter(files, progress):
162        buf.put(inblock)
163        for buf_and_level in _splitbuf(buf, basebits, fanbits):
164            yield buf_and_level
165    if buf.used():
166        yield buf.get(buf.used()), 0
167
168
169def _hashsplit_iter_keep_boundaries(files, progress):
170    for real_filenum,f in enumerate(files):
171        if progress:
172            def prog(filenum, nbytes):
173                # the inner _hashsplit_iter doesn't know the real file count,
174                # so we'll replace it here.
175                return progress(real_filenum, nbytes)
176        else:
177            prog = None
178        for buf_and_level in _hashsplit_iter([f], progress=prog):
179            yield buf_and_level
180
181
182def hashsplit_iter(files, keep_boundaries, progress):
183    if keep_boundaries:
184        return _hashsplit_iter_keep_boundaries(files, progress)
185    else:
186        return _hashsplit_iter(files, progress)
187
188
189total_split = 0
190def split_to_blobs(makeblob, files, keep_boundaries, progress):
191    global total_split
192    for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
193        sha = makeblob(blob)
194        total_split += len(blob)
195        if progress_callback:
196            progress_callback(len(blob))
197        yield (sha, len(blob), level)
198
199
200def _make_shalist(l):
201    ofs = 0
202    l = list(l)
203    total = sum(size for mode,sha,size, in l)
204    vlen = len(b'%x' % total)
205    shalist = []
206    for (mode, sha, size) in l:
207        shalist.append((mode, b'%0*x' % (vlen,ofs), sha))
208        ofs += size
209    assert(ofs == total)
210    return (shalist, total)
211
212
213def _squish(maketree, stacks, n):
214    i = 0
215    while i < n or len(stacks[i]) >= MAX_PER_TREE:
216        while len(stacks) <= i+1:
217            stacks.append([])
218        if len(stacks[i]) == 1:
219            stacks[i+1] += stacks[i]
220        elif stacks[i]:
221            (shalist, size) = _make_shalist(stacks[i])
222            tree = maketree(shalist)
223            stacks[i+1].append((GIT_MODE_TREE, tree, size))
224        stacks[i] = []
225        i += 1
226
227
228def split_to_shalist(makeblob, maketree, files,
229                     keep_boundaries, progress=None):
230    sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
231    assert(fanout != 0)
232    if not fanout:
233        shal = []
234        for (sha,size,level) in sl:
235            shal.append((GIT_MODE_FILE, sha, size))
236        return _make_shalist(shal)[0]
237    else:
238        stacks = [[]]
239        for (sha,size,level) in sl:
240            stacks[0].append((GIT_MODE_FILE, sha, size))
241            _squish(maketree, stacks, level)
242        #log('stacks: %r\n' % [len(i) for i in stacks])
243        _squish(maketree, stacks, len(stacks)-1)
244        #log('stacks: %r\n' % [len(i) for i in stacks])
245        return _make_shalist(stacks[-1])[0]
246
247
248def split_to_blob_or_tree(makeblob, maketree, files,
249                          keep_boundaries, progress=None):
250    shalist = list(split_to_shalist(makeblob, maketree,
251                                    files, keep_boundaries, progress))
252    if len(shalist) == 1:
253        return (shalist[0][0], shalist[0][2])
254    elif len(shalist) == 0:
255        return (GIT_MODE_FILE, makeblob(b''))
256    else:
257        return (GIT_MODE_TREE, maketree(shalist))
258
259
260def open_noatime(name):
261    fd = _helpers.open_noatime(name)
262    try:
263        return os.fdopen(fd, 'rb', 1024*1024)
264    except:
265        try:
266            os.close(fd)
267        except:
268            pass
269        raise
270