1
2from __future__ import print_function
3
4from __future__ import absolute_import
5from binascii import hexlify, unhexlify
6import errno, os, re, struct, sys, time, zlib
7import socket
8
9from bup import git, ssh, vfs
10from bup.compat import environ, range, reraise
11from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
12                         debug2, linereader, lines_until_sentinel,
13                         mkdirp, progress, qprogress, DemuxConn, atoi)
14from bup.io import path_msg
15from bup.vint import read_bvec, read_vuint, write_bvec
16
17
18bwlimit = None
19
20
21class ClientError(Exception):
22    pass
23
24
25def _raw_write_bwlimit(f, buf, bwcount, bwtime):
26    if not bwlimit:
27        f.write(buf)
28        return (len(buf), time.time())
29    else:
30        # We want to write in reasonably large blocks, but not so large that
31        # they're likely to overflow a router's queue.  So our bwlimit timing
32        # has to be pretty granular.  Also, if it takes too long from one
33        # transmit to the next, we can't just make up for lost time to bring
34        # the average back up to bwlimit - that will risk overflowing the
35        # outbound queue, which defeats the purpose.  So if we fall behind
36        # by more than one block delay, we shouldn't ever try to catch up.
37        for i in range(0,len(buf),4096):
38            now = time.time()
39            next = max(now, bwtime + 1.0*bwcount/bwlimit)
40            time.sleep(next-now)
41            sub = buf[i:i+4096]
42            f.write(sub)
43            bwcount = len(sub)  # might be less than 4096
44            bwtime = next
45        return (bwcount, bwtime)
46
47
48_protocol_rs = br'([a-z]+)://'
49_host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
50_port_rs = br'(?::(\d+))?'
51_path_rs = br'(/.*)?'
52_url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs),
53                     re.I)
54
55def parse_remote(remote):
56    url_match = _url_rx.match(remote)
57    if url_match:
58        if not url_match.group(1) in (b'ssh', b'bup', b'file'):
59            raise ClientError('unexpected protocol: %s'
60                              % url_match.group(1).decode('ascii'))
61        return url_match.group(1,3,4,5)
62    else:
63        rs = remote.split(b':', 1)
64        if len(rs) == 1 or rs[0] in (b'', b'-'):
65            return b'file', None, None, rs[-1]
66        else:
67            return b'ssh', rs[0], None, rs[1]
68
69
70class Client:
71    def __init__(self, remote, create=False):
72        self._busy = self.conn = None
73        self.sock = self.p = self.pout = self.pin = None
74        is_reverse = environ.get(b'BUP_SERVER_REVERSE')
75        if is_reverse:
76            assert(not remote)
77            remote = b'%s:' % is_reverse
78        (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
79        # The b'None' here matches python2's behavior of b'%s' % None == 'None',
80        # python3 will (as of version 3.7.5) do the same for str ('%s' % None),
81        # but crashes instead when doing b'%s' % None.
82        cachehost = b'None' if self.host is None else self.host
83        cachedir = b'None' if self.dir is None else self.dir
84        self.cachedir = git.repo(b'index-cache/%s'
85                                 % re.sub(br'[^@\w]',
86                                          b'_',
87                                          b'%s:%s' % (cachehost, cachedir)))
88        if is_reverse:
89            self.pout = os.fdopen(3, 'rb')
90            self.pin = os.fdopen(4, 'wb')
91            self.conn = Conn(self.pout, self.pin)
92        else:
93            if self.protocol in (b'ssh', b'file'):
94                try:
95                    # FIXME: ssh and file shouldn't use the same module
96                    self.p = ssh.connect(self.host, self.port, b'server')
97                    self.pout = self.p.stdout
98                    self.pin = self.p.stdin
99                    self.conn = Conn(self.pout, self.pin)
100                except OSError as e:
101                    reraise(ClientError('connect: %s' % e))
102            elif self.protocol == b'bup':
103                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
104                self.sock.connect((self.host, atoi(self.port) or 1982))
105                self.sockw = self.sock.makefile('wb')
106                self.conn = DemuxConn(self.sock.fileno(), self.sockw)
107        self._available_commands = self._get_available_commands()
108        self._require_command(b'init-dir')
109        self._require_command(b'set-dir')
110        if self.dir:
111            self.dir = re.sub(br'[\r\n]', ' ', self.dir)
112            if create:
113                self.conn.write(b'init-dir %s\n' % self.dir)
114            else:
115                self.conn.write(b'set-dir %s\n' % self.dir)
116            self.check_ok()
117        self.sync_indexes()
118
119    def __del__(self):
120        try:
121            self.close()
122        except IOError as e:
123            if e.errno == errno.EPIPE:
124                pass
125            else:
126                raise
127
128    def close(self):
129        if self.conn and not self._busy:
130            self.conn.write(b'quit\n')
131        if self.pin:
132            self.pin.close()
133        if self.sock and self.sockw:
134            self.sockw.close()
135            self.sock.shutdown(socket.SHUT_WR)
136        if self.conn:
137            self.conn.close()
138        if self.pout:
139            self.pout.close()
140        if self.sock:
141            self.sock.close()
142        if self.p:
143            self.p.wait()
144            rv = self.p.wait()
145            if rv:
146                raise ClientError('server tunnel returned exit code %d' % rv)
147        self.conn = None
148        self.sock = self.p = self.pin = self.pout = None
149
150    def check_ok(self):
151        if self.p:
152            rv = self.p.poll()
153            if rv != None:
154                raise ClientError('server exited unexpectedly with code %r'
155                                  % rv)
156        try:
157            return self.conn.check_ok()
158        except Exception as e:
159            reraise(ClientError(e))
160
161    def check_busy(self):
162        if self._busy:
163            raise ClientError('already busy with command %r' % self._busy)
164
165    def ensure_busy(self):
166        if not self._busy:
167            raise ClientError('expected to be busy, but not busy?!')
168
169    def _not_busy(self):
170        self._busy = None
171
172    def _get_available_commands(self):
173        self.check_busy()
174        self._busy = b'help'
175        conn = self.conn
176        conn.write(b'help\n')
177        result = set()
178        line = self.conn.readline()
179        if not line == b'Commands:\n':
180            raise ClientError('unexpected help header ' + repr(line))
181        while True:
182            line = self.conn.readline()
183            if line == b'\n':
184                break
185            if not line.startswith(b'    '):
186                raise ClientError('unexpected help line ' + repr(line))
187            cmd = line.strip()
188            if not cmd:
189                raise ClientError('unexpected help line ' + repr(line))
190            result.add(cmd)
191        # FIXME: confusing
192        not_ok = self.check_ok()
193        if not_ok:
194            raise not_ok
195        self._not_busy()
196        return frozenset(result)
197
198    def _require_command(self, name):
199        if name not in self._available_commands:
200            raise ClientError('server does not appear to provide %s command'
201                              % name.encode('ascii'))
202
203    def sync_indexes(self):
204        self._require_command(b'list-indexes')
205        self.check_busy()
206        conn = self.conn
207        mkdirp(self.cachedir)
208        # All cached idxs are extra until proven otherwise
209        extra = set()
210        for f in os.listdir(self.cachedir):
211            debug1(path_msg(f) + '\n')
212            if f.endswith(b'.idx'):
213                extra.add(f)
214        needed = set()
215        conn.write(b'list-indexes\n')
216        for line in linereader(conn):
217            if not line:
218                break
219            assert(line.find(b'/') < 0)
220            parts = line.split(b' ')
221            idx = parts[0]
222            if len(parts) == 2 and parts[1] == b'load' and idx not in extra:
223                # If the server requests that we load an idx and we don't
224                # already have a copy of it, it is needed
225                needed.add(idx)
226            # Any idx that the server has heard of is proven not extra
227            extra.discard(idx)
228
229        self.check_ok()
230        debug1('client: removing extra indexes: %s\n' % extra)
231        for idx in extra:
232            os.unlink(os.path.join(self.cachedir, idx))
233        debug1('client: server requested load of: %s\n' % needed)
234        for idx in needed:
235            self.sync_index(idx)
236        git.auto_midx(self.cachedir)
237
238    def sync_index(self, name):
239        self._require_command(b'send-index')
240        #debug1('requesting %r\n' % name)
241        self.check_busy()
242        mkdirp(self.cachedir)
243        fn = os.path.join(self.cachedir, name)
244        if os.path.exists(fn):
245            msg = ("won't request existing .idx, try `bup bloom --check %s`"
246                   % path_msg(fn))
247            raise ClientError(msg)
248        self.conn.write(b'send-index %s\n' % name)
249        n = struct.unpack('!I', self.conn.read(4))[0]
250        assert(n)
251        with atomically_replaced_file(fn, 'wb') as f:
252            count = 0
253            progress('Receiving index from server: %d/%d\r' % (count, n))
254            for b in chunkyreader(self.conn, n):
255                f.write(b)
256                count += len(b)
257                qprogress('Receiving index from server: %d/%d\r' % (count, n))
258            progress('Receiving index from server: %d/%d, done.\n' % (count, n))
259            self.check_ok()
260
261    def _make_objcache(self):
262        return git.PackIdxList(self.cachedir)
263
264    def _suggest_packs(self):
265        ob = self._busy
266        if ob:
267            assert(ob == b'receive-objects-v2')
268            self.conn.write(b'\xff\xff\xff\xff')  # suspend receive-objects-v2
269        suggested = []
270        for line in linereader(self.conn):
271            if not line:
272                break
273            debug2('%r\n' % line)
274            if line.startswith(b'index '):
275                idx = line[6:]
276                debug1('client: received index suggestion: %s\n'
277                       % git.shorten_hash(idx).decode('ascii'))
278                suggested.append(idx)
279            else:
280                assert(line.endswith(b'.idx'))
281                debug1('client: completed writing pack, idx: %s\n'
282                       % git.shorten_hash(line).decode('ascii'))
283                suggested.append(line)
284        self.check_ok()
285        if ob:
286            self._busy = None
287        idx = None
288        for idx in suggested:
289            self.sync_index(idx)
290        git.auto_midx(self.cachedir)
291        if ob:
292            self._busy = ob
293            self.conn.write(b'%s\n' % ob)
294        return idx
295
296    def new_packwriter(self, compression_level=1,
297                       max_pack_size=None, max_pack_objects=None):
298        self._require_command(b'receive-objects-v2')
299        self.check_busy()
300        def _set_busy():
301            self._busy = b'receive-objects-v2'
302            self.conn.write(b'receive-objects-v2\n')
303        return PackWriter_Remote(self.conn,
304                                 objcache_maker = self._make_objcache,
305                                 suggest_packs = self._suggest_packs,
306                                 onopen = _set_busy,
307                                 onclose = self._not_busy,
308                                 ensure_busy = self.ensure_busy,
309                                 compression_level=compression_level,
310                                 max_pack_size=max_pack_size,
311                                 max_pack_objects=max_pack_objects)
312
313    def read_ref(self, refname):
314        self._require_command(b'read-ref')
315        self.check_busy()
316        self.conn.write(b'read-ref %s\n' % refname)
317        r = self.conn.readline().strip()
318        self.check_ok()
319        if r:
320            assert(len(r) == 40)   # hexified sha
321            return unhexlify(r)
322        else:
323            return None   # nonexistent ref
324
325    def update_ref(self, refname, newval, oldval):
326        self._require_command(b'update-ref')
327        self.check_busy()
328        self.conn.write(b'update-ref %s\n%s\n%s\n'
329                        % (refname, hexlify(newval),
330                           hexlify(oldval) if oldval else b''))
331        self.check_ok()
332
333    def join(self, id):
334        self._require_command(b'join')
335        self.check_busy()
336        self._busy = b'join'
337        # Send 'cat' so we'll work fine with older versions
338        self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id))
339        while 1:
340            sz = struct.unpack('!I', self.conn.read(4))[0]
341            if not sz: break
342            yield self.conn.read(sz)
343        # FIXME: ok to assume the only NotOk is a KerError? (it is true atm)
344        e = self.check_ok()
345        self._not_busy()
346        if e:
347            raise KeyError(str(e))
348
349    def cat_batch(self, refs):
350        self._require_command(b'cat-batch')
351        self.check_busy()
352        self._busy = b'cat-batch'
353        conn = self.conn
354        conn.write(b'cat-batch\n')
355        # FIXME: do we want (only) binary protocol?
356        for ref in refs:
357            assert ref
358            assert b'\n' not in ref
359            conn.write(ref)
360            conn.write(b'\n')
361        conn.write(b'\n')
362        for ref in refs:
363            info = conn.readline()
364            if info == b'missing\n':
365                yield None, None, None, None
366                continue
367            if not (info and info.endswith(b'\n')):
368                raise ClientError('Hit EOF while looking for object info: %r'
369                                  % info)
370            oidx, oid_t, size = info.split(b' ')
371            size = int(size)
372            cr = chunkyreader(conn, size)
373            yield oidx, oid_t, size, cr
374            detritus = next(cr, None)
375            if detritus:
376                raise ClientError('unexpected leftover data ' + repr(detritus))
377        # FIXME: confusing
378        not_ok = self.check_ok()
379        if not_ok:
380            raise not_ok
381        self._not_busy()
382
383    def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False):
384        patterns = patterns or tuple()
385        self._require_command(b'refs')
386        self.check_busy()
387        self._busy = b'refs'
388        conn = self.conn
389        conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0,
390                                      1 if limit_to_tags else 0))
391        for pattern in patterns:
392            assert b'\n' not in pattern
393            conn.write(pattern)
394            conn.write(b'\n')
395        conn.write(b'\n')
396        for line in lines_until_sentinel(conn, b'\n', ClientError):
397            line = line[:-1]
398            oidx, name = line.split(b' ')
399            if len(oidx) != 40:
400                raise ClientError('Invalid object fingerprint in %r' % line)
401            if not name:
402                raise ClientError('Invalid reference name in %r' % line)
403            yield name, unhexlify(oidx)
404        # FIXME: confusing
405        not_ok = self.check_ok()
406        if not_ok:
407            raise not_ok
408        self._not_busy()
409
410    def rev_list(self, refs, parse=None, format=None):
411        """See git.rev_list for the general semantics, but note that with the
412        current interface, the parse function must be able to handle
413        (consume) any blank lines produced by the format because the
414        first one received that it doesn't consume will be interpreted
415        as a terminator for the entire rev-list result.
416
417        """
418        self._require_command(b'rev-list')
419        if format:
420            assert b'\n' not in format
421            assert parse
422        for ref in refs:
423            assert ref
424            assert b'\n' not in ref
425        self.check_busy()
426        self._busy = b'rev-list'
427        conn = self.conn
428        conn.write(b'rev-list\n')
429        conn.write(b'\n')
430        if format:
431            conn.write(format)
432        conn.write(b'\n')
433        for ref in refs:
434            conn.write(ref)
435            conn.write(b'\n')
436        conn.write(b'\n')
437        if not format:
438            for line in lines_until_sentinel(conn, b'\n', ClientError):
439                line = line.strip()
440                assert len(line) == 40
441                yield line
442        else:
443            for line in lines_until_sentinel(conn, b'\n', ClientError):
444                if not line.startswith(b'commit '):
445                    raise ClientError('unexpected line ' + repr(line))
446                cmt_oidx = line[7:].strip()
447                assert len(cmt_oidx) == 40
448                yield cmt_oidx, parse(conn)
449        # FIXME: confusing
450        not_ok = self.check_ok()
451        if not_ok:
452            raise not_ok
453        self._not_busy()
454
455    def resolve(self, path, parent=None, want_meta=True, follow=False):
456        self._require_command(b'resolve')
457        self.check_busy()
458        self._busy = b'resolve'
459        conn = self.conn
460        conn.write(b'resolve %d\n' % ((1 if want_meta else 0)
461                                      | (2 if follow else 0)
462                                      | (4 if parent else 0)))
463        if parent:
464            vfs.write_resolution(conn, parent)
465        write_bvec(conn, path)
466        success = ord(conn.read(1))
467        assert success in (0, 1)
468        if success:
469            result = vfs.read_resolution(conn)
470        else:
471            result = vfs.read_ioerror(conn)
472        # FIXME: confusing
473        not_ok = self.check_ok()
474        if not_ok:
475            raise not_ok
476        self._not_busy()
477        if isinstance(result, vfs.IOError):
478            raise result
479        return result
480
481
482class PackWriter_Remote(git.PackWriter):
483    def __init__(self, conn, objcache_maker, suggest_packs,
484                 onopen, onclose,
485                 ensure_busy,
486                 compression_level=1,
487                 max_pack_size=None,
488                 max_pack_objects=None):
489        git.PackWriter.__init__(self,
490                                objcache_maker=objcache_maker,
491                                compression_level=compression_level,
492                                max_pack_size=max_pack_size,
493                                max_pack_objects=max_pack_objects)
494        self.file = conn
495        self.filename = b'remote socket'
496        self.suggest_packs = suggest_packs
497        self.onopen = onopen
498        self.onclose = onclose
499        self.ensure_busy = ensure_busy
500        self._packopen = False
501        self._bwcount = 0
502        self._bwtime = time.time()
503
504    def _open(self):
505        if not self._packopen:
506            self.onopen()
507            self._packopen = True
508
509    def _end(self, run_midx=True):
510        assert(run_midx)  # We don't support this via remote yet
511        if self._packopen and self.file:
512            self.file.write(b'\0\0\0\0')
513            self._packopen = False
514            self.onclose() # Unbusy
515            self.objcache = None
516            return self.suggest_packs() # Returns last idx received
517
518    def close(self):
519        id = self._end()
520        self.file = None
521        return id
522
523    def abort(self):
524        raise ClientError("don't know how to abort remote pack writing")
525
526    def _raw_write(self, datalist, sha):
527        assert(self.file)
528        if not self._packopen:
529            self._open()
530        self.ensure_busy()
531        data = b''.join(datalist)
532        assert(data)
533        assert(sha)
534        crc = zlib.crc32(data) & 0xffffffff
535        outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4),
536                           sha,
537                           struct.pack('!I', crc),
538                           data))
539        try:
540            (self._bwcount, self._bwtime) = _raw_write_bwlimit(
541                    self.file, outbuf, self._bwcount, self._bwtime)
542        except IOError as e:
543            reraise(ClientError(e))
544        self.outbytes += len(data)
545        self.count += 1
546
547        if self.file.has_input():
548            self.suggest_packs()
549            self.objcache.refresh()
550
551        return sha, crc
552