1 2from __future__ import print_function 3 4from __future__ import absolute_import 5from binascii import hexlify, unhexlify 6import errno, os, re, struct, sys, time, zlib 7import socket 8 9from bup import git, ssh, vfs 10from bup.compat import environ, range, reraise 11from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1, 12 debug2, linereader, lines_until_sentinel, 13 mkdirp, progress, qprogress, DemuxConn, atoi) 14from bup.io import path_msg 15from bup.vint import read_bvec, read_vuint, write_bvec 16 17 18bwlimit = None 19 20 21class ClientError(Exception): 22 pass 23 24 25def _raw_write_bwlimit(f, buf, bwcount, bwtime): 26 if not bwlimit: 27 f.write(buf) 28 return (len(buf), time.time()) 29 else: 30 # We want to write in reasonably large blocks, but not so large that 31 # they're likely to overflow a router's queue. So our bwlimit timing 32 # has to be pretty granular. Also, if it takes too long from one 33 # transmit to the next, we can't just make up for lost time to bring 34 # the average back up to bwlimit - that will risk overflowing the 35 # outbound queue, which defeats the purpose. So if we fall behind 36 # by more than one block delay, we shouldn't ever try to catch up. 37 for i in range(0,len(buf),4096): 38 now = time.time() 39 next = max(now, bwtime + 1.0*bwcount/bwlimit) 40 time.sleep(next-now) 41 sub = buf[i:i+4096] 42 f.write(sub) 43 bwcount = len(sub) # might be less than 4096 44 bwtime = next 45 return (bwcount, bwtime) 46 47 48_protocol_rs = br'([a-z]+)://' 49_host_rs = br'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])' 50_port_rs = br'(?::(\d+))?' 51_path_rs = br'(/.*)?' 52_url_rx = re.compile(br'%s(?:%s%s)?%s' % (_protocol_rs, _host_rs, _port_rs, _path_rs), 53 re.I) 54 55def parse_remote(remote): 56 url_match = _url_rx.match(remote) 57 if url_match: 58 if not url_match.group(1) in (b'ssh', b'bup', b'file'): 59 raise ClientError('unexpected protocol: %s' 60 % url_match.group(1).decode('ascii')) 61 return url_match.group(1,3,4,5) 62 else: 63 rs = remote.split(b':', 1) 64 if len(rs) == 1 or rs[0] in (b'', b'-'): 65 return b'file', None, None, rs[-1] 66 else: 67 return b'ssh', rs[0], None, rs[1] 68 69 70class Client: 71 def __init__(self, remote, create=False): 72 self._busy = self.conn = None 73 self.sock = self.p = self.pout = self.pin = None 74 is_reverse = environ.get(b'BUP_SERVER_REVERSE') 75 if is_reverse: 76 assert(not remote) 77 remote = b'%s:' % is_reverse 78 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote) 79 # The b'None' here matches python2's behavior of b'%s' % None == 'None', 80 # python3 will (as of version 3.7.5) do the same for str ('%s' % None), 81 # but crashes instead when doing b'%s' % None. 82 cachehost = b'None' if self.host is None else self.host 83 cachedir = b'None' if self.dir is None else self.dir 84 self.cachedir = git.repo(b'index-cache/%s' 85 % re.sub(br'[^@\w]', 86 b'_', 87 b'%s:%s' % (cachehost, cachedir))) 88 if is_reverse: 89 self.pout = os.fdopen(3, 'rb') 90 self.pin = os.fdopen(4, 'wb') 91 self.conn = Conn(self.pout, self.pin) 92 else: 93 if self.protocol in (b'ssh', b'file'): 94 try: 95 # FIXME: ssh and file shouldn't use the same module 96 self.p = ssh.connect(self.host, self.port, b'server') 97 self.pout = self.p.stdout 98 self.pin = self.p.stdin 99 self.conn = Conn(self.pout, self.pin) 100 except OSError as e: 101 reraise(ClientError('connect: %s' % e)) 102 elif self.protocol == b'bup': 103 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 104 self.sock.connect((self.host, atoi(self.port) or 1982)) 105 self.sockw = self.sock.makefile('wb') 106 self.conn = DemuxConn(self.sock.fileno(), self.sockw) 107 self._available_commands = self._get_available_commands() 108 self._require_command(b'init-dir') 109 self._require_command(b'set-dir') 110 if self.dir: 111 self.dir = re.sub(br'[\r\n]', ' ', self.dir) 112 if create: 113 self.conn.write(b'init-dir %s\n' % self.dir) 114 else: 115 self.conn.write(b'set-dir %s\n' % self.dir) 116 self.check_ok() 117 self.sync_indexes() 118 119 def __del__(self): 120 try: 121 self.close() 122 except IOError as e: 123 if e.errno == errno.EPIPE: 124 pass 125 else: 126 raise 127 128 def close(self): 129 if self.conn and not self._busy: 130 self.conn.write(b'quit\n') 131 if self.pin: 132 self.pin.close() 133 if self.sock and self.sockw: 134 self.sockw.close() 135 self.sock.shutdown(socket.SHUT_WR) 136 if self.conn: 137 self.conn.close() 138 if self.pout: 139 self.pout.close() 140 if self.sock: 141 self.sock.close() 142 if self.p: 143 self.p.wait() 144 rv = self.p.wait() 145 if rv: 146 raise ClientError('server tunnel returned exit code %d' % rv) 147 self.conn = None 148 self.sock = self.p = self.pin = self.pout = None 149 150 def check_ok(self): 151 if self.p: 152 rv = self.p.poll() 153 if rv != None: 154 raise ClientError('server exited unexpectedly with code %r' 155 % rv) 156 try: 157 return self.conn.check_ok() 158 except Exception as e: 159 reraise(ClientError(e)) 160 161 def check_busy(self): 162 if self._busy: 163 raise ClientError('already busy with command %r' % self._busy) 164 165 def ensure_busy(self): 166 if not self._busy: 167 raise ClientError('expected to be busy, but not busy?!') 168 169 def _not_busy(self): 170 self._busy = None 171 172 def _get_available_commands(self): 173 self.check_busy() 174 self._busy = b'help' 175 conn = self.conn 176 conn.write(b'help\n') 177 result = set() 178 line = self.conn.readline() 179 if not line == b'Commands:\n': 180 raise ClientError('unexpected help header ' + repr(line)) 181 while True: 182 line = self.conn.readline() 183 if line == b'\n': 184 break 185 if not line.startswith(b' '): 186 raise ClientError('unexpected help line ' + repr(line)) 187 cmd = line.strip() 188 if not cmd: 189 raise ClientError('unexpected help line ' + repr(line)) 190 result.add(cmd) 191 # FIXME: confusing 192 not_ok = self.check_ok() 193 if not_ok: 194 raise not_ok 195 self._not_busy() 196 return frozenset(result) 197 198 def _require_command(self, name): 199 if name not in self._available_commands: 200 raise ClientError('server does not appear to provide %s command' 201 % name.encode('ascii')) 202 203 def sync_indexes(self): 204 self._require_command(b'list-indexes') 205 self.check_busy() 206 conn = self.conn 207 mkdirp(self.cachedir) 208 # All cached idxs are extra until proven otherwise 209 extra = set() 210 for f in os.listdir(self.cachedir): 211 debug1(path_msg(f) + '\n') 212 if f.endswith(b'.idx'): 213 extra.add(f) 214 needed = set() 215 conn.write(b'list-indexes\n') 216 for line in linereader(conn): 217 if not line: 218 break 219 assert(line.find(b'/') < 0) 220 parts = line.split(b' ') 221 idx = parts[0] 222 if len(parts) == 2 and parts[1] == b'load' and idx not in extra: 223 # If the server requests that we load an idx and we don't 224 # already have a copy of it, it is needed 225 needed.add(idx) 226 # Any idx that the server has heard of is proven not extra 227 extra.discard(idx) 228 229 self.check_ok() 230 debug1('client: removing extra indexes: %s\n' % extra) 231 for idx in extra: 232 os.unlink(os.path.join(self.cachedir, idx)) 233 debug1('client: server requested load of: %s\n' % needed) 234 for idx in needed: 235 self.sync_index(idx) 236 git.auto_midx(self.cachedir) 237 238 def sync_index(self, name): 239 self._require_command(b'send-index') 240 #debug1('requesting %r\n' % name) 241 self.check_busy() 242 mkdirp(self.cachedir) 243 fn = os.path.join(self.cachedir, name) 244 if os.path.exists(fn): 245 msg = ("won't request existing .idx, try `bup bloom --check %s`" 246 % path_msg(fn)) 247 raise ClientError(msg) 248 self.conn.write(b'send-index %s\n' % name) 249 n = struct.unpack('!I', self.conn.read(4))[0] 250 assert(n) 251 with atomically_replaced_file(fn, 'wb') as f: 252 count = 0 253 progress('Receiving index from server: %d/%d\r' % (count, n)) 254 for b in chunkyreader(self.conn, n): 255 f.write(b) 256 count += len(b) 257 qprogress('Receiving index from server: %d/%d\r' % (count, n)) 258 progress('Receiving index from server: %d/%d, done.\n' % (count, n)) 259 self.check_ok() 260 261 def _make_objcache(self): 262 return git.PackIdxList(self.cachedir) 263 264 def _suggest_packs(self): 265 ob = self._busy 266 if ob: 267 assert(ob == b'receive-objects-v2') 268 self.conn.write(b'\xff\xff\xff\xff') # suspend receive-objects-v2 269 suggested = [] 270 for line in linereader(self.conn): 271 if not line: 272 break 273 debug2('%r\n' % line) 274 if line.startswith(b'index '): 275 idx = line[6:] 276 debug1('client: received index suggestion: %s\n' 277 % git.shorten_hash(idx).decode('ascii')) 278 suggested.append(idx) 279 else: 280 assert(line.endswith(b'.idx')) 281 debug1('client: completed writing pack, idx: %s\n' 282 % git.shorten_hash(line).decode('ascii')) 283 suggested.append(line) 284 self.check_ok() 285 if ob: 286 self._busy = None 287 idx = None 288 for idx in suggested: 289 self.sync_index(idx) 290 git.auto_midx(self.cachedir) 291 if ob: 292 self._busy = ob 293 self.conn.write(b'%s\n' % ob) 294 return idx 295 296 def new_packwriter(self, compression_level=1, 297 max_pack_size=None, max_pack_objects=None): 298 self._require_command(b'receive-objects-v2') 299 self.check_busy() 300 def _set_busy(): 301 self._busy = b'receive-objects-v2' 302 self.conn.write(b'receive-objects-v2\n') 303 return PackWriter_Remote(self.conn, 304 objcache_maker = self._make_objcache, 305 suggest_packs = self._suggest_packs, 306 onopen = _set_busy, 307 onclose = self._not_busy, 308 ensure_busy = self.ensure_busy, 309 compression_level=compression_level, 310 max_pack_size=max_pack_size, 311 max_pack_objects=max_pack_objects) 312 313 def read_ref(self, refname): 314 self._require_command(b'read-ref') 315 self.check_busy() 316 self.conn.write(b'read-ref %s\n' % refname) 317 r = self.conn.readline().strip() 318 self.check_ok() 319 if r: 320 assert(len(r) == 40) # hexified sha 321 return unhexlify(r) 322 else: 323 return None # nonexistent ref 324 325 def update_ref(self, refname, newval, oldval): 326 self._require_command(b'update-ref') 327 self.check_busy() 328 self.conn.write(b'update-ref %s\n%s\n%s\n' 329 % (refname, hexlify(newval), 330 hexlify(oldval) if oldval else b'')) 331 self.check_ok() 332 333 def join(self, id): 334 self._require_command(b'join') 335 self.check_busy() 336 self._busy = b'join' 337 # Send 'cat' so we'll work fine with older versions 338 self.conn.write(b'cat %s\n' % re.sub(br'[\n\r]', b'_', id)) 339 while 1: 340 sz = struct.unpack('!I', self.conn.read(4))[0] 341 if not sz: break 342 yield self.conn.read(sz) 343 # FIXME: ok to assume the only NotOk is a KerError? (it is true atm) 344 e = self.check_ok() 345 self._not_busy() 346 if e: 347 raise KeyError(str(e)) 348 349 def cat_batch(self, refs): 350 self._require_command(b'cat-batch') 351 self.check_busy() 352 self._busy = b'cat-batch' 353 conn = self.conn 354 conn.write(b'cat-batch\n') 355 # FIXME: do we want (only) binary protocol? 356 for ref in refs: 357 assert ref 358 assert b'\n' not in ref 359 conn.write(ref) 360 conn.write(b'\n') 361 conn.write(b'\n') 362 for ref in refs: 363 info = conn.readline() 364 if info == b'missing\n': 365 yield None, None, None, None 366 continue 367 if not (info and info.endswith(b'\n')): 368 raise ClientError('Hit EOF while looking for object info: %r' 369 % info) 370 oidx, oid_t, size = info.split(b' ') 371 size = int(size) 372 cr = chunkyreader(conn, size) 373 yield oidx, oid_t, size, cr 374 detritus = next(cr, None) 375 if detritus: 376 raise ClientError('unexpected leftover data ' + repr(detritus)) 377 # FIXME: confusing 378 not_ok = self.check_ok() 379 if not_ok: 380 raise not_ok 381 self._not_busy() 382 383 def refs(self, patterns=None, limit_to_heads=False, limit_to_tags=False): 384 patterns = patterns or tuple() 385 self._require_command(b'refs') 386 self.check_busy() 387 self._busy = b'refs' 388 conn = self.conn 389 conn.write(b'refs %d %d\n' % (1 if limit_to_heads else 0, 390 1 if limit_to_tags else 0)) 391 for pattern in patterns: 392 assert b'\n' not in pattern 393 conn.write(pattern) 394 conn.write(b'\n') 395 conn.write(b'\n') 396 for line in lines_until_sentinel(conn, b'\n', ClientError): 397 line = line[:-1] 398 oidx, name = line.split(b' ') 399 if len(oidx) != 40: 400 raise ClientError('Invalid object fingerprint in %r' % line) 401 if not name: 402 raise ClientError('Invalid reference name in %r' % line) 403 yield name, unhexlify(oidx) 404 # FIXME: confusing 405 not_ok = self.check_ok() 406 if not_ok: 407 raise not_ok 408 self._not_busy() 409 410 def rev_list(self, refs, parse=None, format=None): 411 """See git.rev_list for the general semantics, but note that with the 412 current interface, the parse function must be able to handle 413 (consume) any blank lines produced by the format because the 414 first one received that it doesn't consume will be interpreted 415 as a terminator for the entire rev-list result. 416 417 """ 418 self._require_command(b'rev-list') 419 if format: 420 assert b'\n' not in format 421 assert parse 422 for ref in refs: 423 assert ref 424 assert b'\n' not in ref 425 self.check_busy() 426 self._busy = b'rev-list' 427 conn = self.conn 428 conn.write(b'rev-list\n') 429 conn.write(b'\n') 430 if format: 431 conn.write(format) 432 conn.write(b'\n') 433 for ref in refs: 434 conn.write(ref) 435 conn.write(b'\n') 436 conn.write(b'\n') 437 if not format: 438 for line in lines_until_sentinel(conn, b'\n', ClientError): 439 line = line.strip() 440 assert len(line) == 40 441 yield line 442 else: 443 for line in lines_until_sentinel(conn, b'\n', ClientError): 444 if not line.startswith(b'commit '): 445 raise ClientError('unexpected line ' + repr(line)) 446 cmt_oidx = line[7:].strip() 447 assert len(cmt_oidx) == 40 448 yield cmt_oidx, parse(conn) 449 # FIXME: confusing 450 not_ok = self.check_ok() 451 if not_ok: 452 raise not_ok 453 self._not_busy() 454 455 def resolve(self, path, parent=None, want_meta=True, follow=False): 456 self._require_command(b'resolve') 457 self.check_busy() 458 self._busy = b'resolve' 459 conn = self.conn 460 conn.write(b'resolve %d\n' % ((1 if want_meta else 0) 461 | (2 if follow else 0) 462 | (4 if parent else 0))) 463 if parent: 464 vfs.write_resolution(conn, parent) 465 write_bvec(conn, path) 466 success = ord(conn.read(1)) 467 assert success in (0, 1) 468 if success: 469 result = vfs.read_resolution(conn) 470 else: 471 result = vfs.read_ioerror(conn) 472 # FIXME: confusing 473 not_ok = self.check_ok() 474 if not_ok: 475 raise not_ok 476 self._not_busy() 477 if isinstance(result, vfs.IOError): 478 raise result 479 return result 480 481 482class PackWriter_Remote(git.PackWriter): 483 def __init__(self, conn, objcache_maker, suggest_packs, 484 onopen, onclose, 485 ensure_busy, 486 compression_level=1, 487 max_pack_size=None, 488 max_pack_objects=None): 489 git.PackWriter.__init__(self, 490 objcache_maker=objcache_maker, 491 compression_level=compression_level, 492 max_pack_size=max_pack_size, 493 max_pack_objects=max_pack_objects) 494 self.file = conn 495 self.filename = b'remote socket' 496 self.suggest_packs = suggest_packs 497 self.onopen = onopen 498 self.onclose = onclose 499 self.ensure_busy = ensure_busy 500 self._packopen = False 501 self._bwcount = 0 502 self._bwtime = time.time() 503 504 def _open(self): 505 if not self._packopen: 506 self.onopen() 507 self._packopen = True 508 509 def _end(self, run_midx=True): 510 assert(run_midx) # We don't support this via remote yet 511 if self._packopen and self.file: 512 self.file.write(b'\0\0\0\0') 513 self._packopen = False 514 self.onclose() # Unbusy 515 self.objcache = None 516 return self.suggest_packs() # Returns last idx received 517 518 def close(self): 519 id = self._end() 520 self.file = None 521 return id 522 523 def abort(self): 524 raise ClientError("don't know how to abort remote pack writing") 525 526 def _raw_write(self, datalist, sha): 527 assert(self.file) 528 if not self._packopen: 529 self._open() 530 self.ensure_busy() 531 data = b''.join(datalist) 532 assert(data) 533 assert(sha) 534 crc = zlib.crc32(data) & 0xffffffff 535 outbuf = b''.join((struct.pack('!I', len(data) + 20 + 4), 536 sha, 537 struct.pack('!I', crc), 538 data)) 539 try: 540 (self._bwcount, self._bwtime) = _raw_write_bwlimit( 541 self.file, outbuf, self._bwcount, self._bwtime) 542 except IOError as e: 543 reraise(ClientError(e)) 544 self.outbytes += len(data) 545 self.count += 1 546 547 if self.file.has_input(): 548 self.suggest_packs() 549 self.objcache.refresh() 550 551 return sha, crc 552