1# exchangev2.py - repository exchange for wire protocol version 2 2# 3# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> 4# 5# This software may be used and distributed according to the terms of the 6# GNU General Public License version 2 or any later version. 7 8from __future__ import absolute_import 9 10import collections 11import weakref 12 13from .i18n import _ 14from .node import short 15from . import ( 16 bookmarks, 17 error, 18 mdiff, 19 narrowspec, 20 phases, 21 pycompat, 22 requirements as requirementsmod, 23 setdiscovery, 24) 25from .interfaces import repository 26 27 28def pull(pullop): 29 """Pull using wire protocol version 2.""" 30 repo = pullop.repo 31 remote = pullop.remote 32 33 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) 34 35 # If this is a clone and it was requested to perform a "stream clone", 36 # we obtain the raw files data from the remote then fall back to an 37 # incremental pull. This is somewhat hacky and is not nearly robust enough 38 # for long-term usage. 39 if usingrawchangelogandmanifest: 40 with repo.transaction(b'clone'): 41 _fetchrawstorefiles(repo, remote) 42 repo.invalidate(clearfilecache=True) 43 44 tr = pullop.trmanager.transaction() 45 46 # We don't use the repo's narrow matcher here because the patterns passed 47 # to exchange.pull() could be different. 48 narrowmatcher = narrowspec.match( 49 repo.root, 50 # Empty maps to nevermatcher. So always 51 # set includes if missing. 52 pullop.includepats or {b'path:.'}, 53 pullop.excludepats, 54 ) 55 56 if pullop.includepats or pullop.excludepats: 57 pathfilter = {} 58 if pullop.includepats: 59 pathfilter[b'include'] = sorted(pullop.includepats) 60 if pullop.excludepats: 61 pathfilter[b'exclude'] = sorted(pullop.excludepats) 62 else: 63 pathfilter = None 64 65 # Figure out what needs to be fetched. 66 common, fetch, remoteheads = _pullchangesetdiscovery( 67 repo, remote, pullop.heads, abortwhenunrelated=pullop.force 68 ) 69 70 # And fetch the data. 71 pullheads = pullop.heads or remoteheads 72 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads) 73 74 # New revisions are written to the changelog. But all other updates 75 # are deferred. Do those now. 76 77 # Ensure all new changesets are draft by default. If the repo is 78 # publishing, the phase will be adjusted by the loop below. 79 if csetres[b'added']: 80 phases.registernew( 81 repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']] 82 ) 83 84 # And adjust the phase of all changesets accordingly. 85 for phasenumber, phase in phases.phasenames.items(): 86 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]: 87 continue 88 89 phases.advanceboundary( 90 repo, 91 tr, 92 phasenumber, 93 csetres[b'nodesbyphase'][phase], 94 ) 95 96 # Write bookmark updates. 97 bookmarks.updatefromremote( 98 repo.ui, 99 repo, 100 csetres[b'bookmarks'], 101 remote.url(), 102 pullop.gettransaction, 103 explicit=pullop.explicitbookmarks, 104 ) 105 106 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes']) 107 108 # We don't properly support shallow changeset and manifest yet. So we apply 109 # depth limiting locally. 110 if pullop.depth: 111 relevantcsetnodes = set() 112 clnode = repo.changelog.node 113 114 for rev in repo.revs( 115 b'ancestors(%ln, %s)', pullheads, pullop.depth - 1 116 ): 117 relevantcsetnodes.add(clnode(rev)) 118 119 csetrelevantfilter = lambda n: n in relevantcsetnodes 120 121 else: 122 csetrelevantfilter = lambda n: True 123 124 # If obtaining the raw store files, we need to scan the full repo to 125 # derive all the changesets, manifests, and linkrevs. 126 if usingrawchangelogandmanifest: 127 csetsforfiles = [] 128 mnodesforfiles = [] 129 manifestlinkrevs = {} 130 131 for rev in repo: 132 ctx = repo[rev] 133 node = ctx.node() 134 135 if not csetrelevantfilter(node): 136 continue 137 138 mnode = ctx.manifestnode() 139 140 csetsforfiles.append(node) 141 mnodesforfiles.append(mnode) 142 manifestlinkrevs[mnode] = rev 143 144 else: 145 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)] 146 mnodesforfiles = manres[b'added'] 147 manifestlinkrevs = manres[b'linkrevs'] 148 149 # Find all file nodes referenced by added manifests and fetch those 150 # revisions. 151 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) 152 _fetchfilesfromcsets( 153 repo, 154 tr, 155 remote, 156 pathfilter, 157 fnodes, 158 csetsforfiles, 159 manifestlinkrevs, 160 shallow=bool(pullop.depth), 161 ) 162 163 164def _checkuserawstorefiledata(pullop): 165 """Check whether we should use rawstorefiledata command to retrieve data.""" 166 167 repo = pullop.repo 168 remote = pullop.remote 169 170 # Command to obtain raw store data isn't available. 171 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: 172 return False 173 174 # Only honor if user requested stream clone operation. 175 if not pullop.streamclonerequested: 176 return False 177 178 # Only works on empty repos. 179 if len(repo): 180 return False 181 182 # TODO This is super hacky. There needs to be a storage API for this. We 183 # also need to check for compatibility with the remote. 184 if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements: 185 return False 186 187 return True 188 189 190def _fetchrawstorefiles(repo, remote): 191 with remote.commandexecutor() as e: 192 objs = e.callcommand( 193 b'rawstorefiledata', 194 { 195 b'files': [b'changelog', b'manifestlog'], 196 }, 197 ).result() 198 199 # First object is a summary of files data that follows. 200 overall = next(objs) 201 202 progress = repo.ui.makeprogress( 203 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes') 204 ) 205 with progress: 206 progress.update(0) 207 208 # Next are pairs of file metadata, data. 209 while True: 210 try: 211 filemeta = next(objs) 212 except StopIteration: 213 break 214 215 for k in (b'location', b'path', b'size'): 216 if k not in filemeta: 217 raise error.Abort( 218 _(b'remote file data missing key: %s') % k 219 ) 220 221 if filemeta[b'location'] == b'store': 222 vfs = repo.svfs 223 else: 224 raise error.Abort( 225 _(b'invalid location for raw file data: %s') 226 % filemeta[b'location'] 227 ) 228 229 bytesremaining = filemeta[b'size'] 230 231 with vfs.open(filemeta[b'path'], b'wb') as fh: 232 while True: 233 try: 234 chunk = next(objs) 235 except StopIteration: 236 break 237 238 bytesremaining -= len(chunk) 239 240 if bytesremaining < 0: 241 raise error.Abort( 242 _( 243 b'received invalid number of bytes for file ' 244 b'data; expected %d, got extra' 245 ) 246 % filemeta[b'size'] 247 ) 248 249 progress.increment(step=len(chunk)) 250 fh.write(chunk) 251 252 try: 253 if chunk.islast: 254 break 255 except AttributeError: 256 raise error.Abort( 257 _( 258 b'did not receive indefinite length bytestring ' 259 b'for file data' 260 ) 261 ) 262 263 if bytesremaining: 264 raise error.Abort( 265 _( 266 b'received invalid number of bytes for' 267 b'file data; expected %d got %d' 268 ) 269 % ( 270 filemeta[b'size'], 271 filemeta[b'size'] - bytesremaining, 272 ) 273 ) 274 275 276def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): 277 """Determine which changesets need to be pulled.""" 278 279 if heads: 280 knownnode = repo.changelog.hasnode 281 if all(knownnode(head) for head in heads): 282 return heads, False, heads 283 284 # TODO wire protocol version 2 is capable of more efficient discovery 285 # than setdiscovery. Consider implementing something better. 286 common, fetch, remoteheads = setdiscovery.findcommonheads( 287 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated 288 ) 289 290 common = set(common) 291 remoteheads = set(remoteheads) 292 293 # If a remote head is filtered locally, put it back in the common set. 294 # See the comment in exchange._pulldiscoverychangegroup() for more. 295 296 if fetch and remoteheads: 297 has_node = repo.unfiltered().changelog.index.has_node 298 299 common |= {head for head in remoteheads if has_node(head)} 300 301 if set(remoteheads).issubset(common): 302 fetch = [] 303 304 common.discard(repo.nullid) 305 306 return common, fetch, remoteheads 307 308 309def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): 310 # TODO consider adding a step here where we obtain the DAG shape first 311 # (or ask the server to slice changesets into chunks for us) so that 312 # we can perform multiple fetches in batches. This will facilitate 313 # resuming interrupted clones, higher server-side cache hit rates due 314 # to smaller segments, etc. 315 with remote.commandexecutor() as e: 316 objs = e.callcommand( 317 b'changesetdata', 318 { 319 b'revisions': [ 320 { 321 b'type': b'changesetdagrange', 322 b'roots': sorted(common), 323 b'heads': sorted(remoteheads), 324 } 325 ], 326 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'}, 327 }, 328 ).result() 329 330 # The context manager waits on all response data when exiting. So 331 # we need to remain in the context manager in order to stream data. 332 return _processchangesetdata(repo, tr, objs) 333 334 335def _processchangesetdata(repo, tr, objs): 336 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs)) 337 338 urepo = repo.unfiltered() 339 cl = urepo.changelog 340 341 cl.delayupdate(tr) 342 343 # The first emitted object is a header describing the data that 344 # follows. 345 meta = next(objs) 346 347 progress = repo.ui.makeprogress( 348 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems') 349 ) 350 351 manifestnodes = {} 352 added = [] 353 354 def linkrev(node): 355 repo.ui.debug(b'add changeset %s\n' % short(node)) 356 # Linkrev for changelog is always self. 357 return len(cl) 358 359 def ondupchangeset(cl, rev): 360 added.append(cl.node(rev)) 361 362 def onchangeset(cl, rev): 363 progress.increment() 364 365 revision = cl.changelogrevision(rev) 366 added.append(cl.node(rev)) 367 368 # We need to preserve the mapping of changelog revision to node 369 # so we can set the linkrev accordingly when manifests are added. 370 manifestnodes[rev] = revision.manifest 371 372 repo.register_changeset(rev, revision) 373 374 nodesbyphase = {phase: set() for phase in phases.phasenames.values()} 375 remotebookmarks = {} 376 377 # addgroup() expects a 7-tuple describing revisions. This normalizes 378 # the wire data to that format. 379 # 380 # This loop also aggregates non-revision metadata, such as phase 381 # data. 382 def iterrevisions(): 383 for cset in objs: 384 node = cset[b'node'] 385 386 if b'phase' in cset: 387 nodesbyphase[cset[b'phase']].add(node) 388 389 for mark in cset.get(b'bookmarks', []): 390 remotebookmarks[mark] = node 391 392 # TODO add mechanism for extensions to examine records so they 393 # can siphon off custom data fields. 394 395 extrafields = {} 396 397 for field, size in cset.get(b'fieldsfollowing', []): 398 extrafields[field] = next(objs) 399 400 # Some entries might only be metadata only updates. 401 if b'revision' not in extrafields: 402 continue 403 404 data = extrafields[b'revision'] 405 406 yield ( 407 node, 408 cset[b'parents'][0], 409 cset[b'parents'][1], 410 # Linknode is always itself for changesets. 411 cset[b'node'], 412 # We always send full revisions. So delta base is not set. 413 repo.nullid, 414 mdiff.trivialdiffheader(len(data)) + data, 415 # Flags not yet supported. 416 0, 417 # Sidedata not yet supported 418 {}, 419 ) 420 421 cl.addgroup( 422 iterrevisions(), 423 linkrev, 424 weakref.proxy(tr), 425 alwayscache=True, 426 addrevisioncb=onchangeset, 427 duplicaterevisioncb=ondupchangeset, 428 ) 429 430 progress.complete() 431 432 return { 433 b'added': added, 434 b'nodesbyphase': nodesbyphase, 435 b'bookmarks': remotebookmarks, 436 b'manifestnodes': manifestnodes, 437 } 438 439 440def _fetchmanifests(repo, tr, remote, manifestnodes): 441 rootmanifest = repo.manifestlog.getstorage(b'') 442 443 # Some manifests can be shared between changesets. Filter out revisions 444 # we already know about. 445 fetchnodes = [] 446 linkrevs = {} 447 seen = set() 448 449 for clrev, node in sorted(pycompat.iteritems(manifestnodes)): 450 if node in seen: 451 continue 452 453 try: 454 rootmanifest.rev(node) 455 except error.LookupError: 456 fetchnodes.append(node) 457 linkrevs[node] = clrev 458 459 seen.add(node) 460 461 # TODO handle tree manifests 462 463 # addgroup() expects 7-tuple describing revisions. This normalizes 464 # the wire data to that format. 465 def iterrevisions(objs, progress): 466 for manifest in objs: 467 node = manifest[b'node'] 468 469 extrafields = {} 470 471 for field, size in manifest.get(b'fieldsfollowing', []): 472 extrafields[field] = next(objs) 473 474 if b'delta' in extrafields: 475 basenode = manifest[b'deltabasenode'] 476 delta = extrafields[b'delta'] 477 elif b'revision' in extrafields: 478 basenode = repo.nullid 479 revision = extrafields[b'revision'] 480 delta = mdiff.trivialdiffheader(len(revision)) + revision 481 else: 482 continue 483 484 yield ( 485 node, 486 manifest[b'parents'][0], 487 manifest[b'parents'][1], 488 # The value passed in is passed to the lookup function passed 489 # to addgroup(). We already have a map of manifest node to 490 # changelog revision number. So we just pass in the 491 # manifest node here and use linkrevs.__getitem__ as the 492 # resolution function. 493 node, 494 basenode, 495 delta, 496 # Flags not yet supported. 497 0, 498 # Sidedata not yet supported. 499 {}, 500 ) 501 502 progress.increment() 503 504 progress = repo.ui.makeprogress( 505 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes) 506 ) 507 508 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata'] 509 batchsize = commandmeta.get(b'recommendedbatchsize', 10000) 510 # TODO make size configurable on client? 511 512 # We send commands 1 at a time to the remote. This is not the most 513 # efficient because we incur a round trip at the end of each batch. 514 # However, the existing frame-based reactor keeps consuming server 515 # data in the background. And this results in response data buffering 516 # in memory. This can consume gigabytes of memory. 517 # TODO send multiple commands in a request once background buffering 518 # issues are resolved. 519 520 added = [] 521 522 for i in pycompat.xrange(0, len(fetchnodes), batchsize): 523 batch = [node for node in fetchnodes[i : i + batchsize]] 524 if not batch: 525 continue 526 527 with remote.commandexecutor() as e: 528 objs = e.callcommand( 529 b'manifestdata', 530 { 531 b'tree': b'', 532 b'nodes': batch, 533 b'fields': {b'parents', b'revision'}, 534 b'haveparents': True, 535 }, 536 ).result() 537 538 # Chomp off header object. 539 next(objs) 540 541 def onchangeset(cl, rev): 542 added.append(cl.node(rev)) 543 544 rootmanifest.addgroup( 545 iterrevisions(objs, progress), 546 linkrevs.__getitem__, 547 weakref.proxy(tr), 548 addrevisioncb=onchangeset, 549 duplicaterevisioncb=onchangeset, 550 ) 551 552 progress.complete() 553 554 return { 555 b'added': added, 556 b'linkrevs': linkrevs, 557 } 558 559 560def _derivefilesfrommanifests(repo, matcher, manifestnodes): 561 """Determine what file nodes are relevant given a set of manifest nodes. 562 563 Returns a dict mapping file paths to dicts of file node to first manifest 564 node. 565 """ 566 ml = repo.manifestlog 567 fnodes = collections.defaultdict(dict) 568 569 progress = repo.ui.makeprogress( 570 _(b'scanning manifests'), total=len(manifestnodes) 571 ) 572 573 with progress: 574 for manifestnode in manifestnodes: 575 m = ml.get(b'', manifestnode) 576 577 # TODO this will pull in unwanted nodes because it takes the storage 578 # delta into consideration. What we really want is something that 579 # takes the delta between the manifest's parents. And ideally we 580 # would ignore file nodes that are known locally. For now, ignore 581 # both these limitations. This will result in incremental fetches 582 # requesting data we already have. So this is far from ideal. 583 md = m.readfast() 584 585 for path, fnode in md.items(): 586 if matcher(path): 587 fnodes[path].setdefault(fnode, manifestnode) 588 589 progress.increment() 590 591 return fnodes 592 593 594def _fetchfiles(repo, tr, remote, fnodes, linkrevs): 595 """Fetch file data from explicit file revisions.""" 596 597 def iterrevisions(objs, progress): 598 for filerevision in objs: 599 node = filerevision[b'node'] 600 601 extrafields = {} 602 603 for field, size in filerevision.get(b'fieldsfollowing', []): 604 extrafields[field] = next(objs) 605 606 if b'delta' in extrafields: 607 basenode = filerevision[b'deltabasenode'] 608 delta = extrafields[b'delta'] 609 elif b'revision' in extrafields: 610 basenode = repo.nullid 611 revision = extrafields[b'revision'] 612 delta = mdiff.trivialdiffheader(len(revision)) + revision 613 else: 614 continue 615 616 yield ( 617 node, 618 filerevision[b'parents'][0], 619 filerevision[b'parents'][1], 620 node, 621 basenode, 622 delta, 623 # Flags not yet supported. 624 0, 625 # Sidedata not yet supported. 626 {}, 627 ) 628 629 progress.increment() 630 631 progress = repo.ui.makeprogress( 632 _(b'files'), 633 unit=_(b'chunks'), 634 total=sum(len(v) for v in pycompat.itervalues(fnodes)), 635 ) 636 637 # TODO make batch size configurable 638 batchsize = 10000 639 fnodeslist = [x for x in sorted(fnodes.items())] 640 641 for i in pycompat.xrange(0, len(fnodeslist), batchsize): 642 batch = [x for x in fnodeslist[i : i + batchsize]] 643 if not batch: 644 continue 645 646 with remote.commandexecutor() as e: 647 fs = [] 648 locallinkrevs = {} 649 650 for path, nodes in batch: 651 fs.append( 652 ( 653 path, 654 e.callcommand( 655 b'filedata', 656 { 657 b'path': path, 658 b'nodes': sorted(nodes), 659 b'fields': {b'parents', b'revision'}, 660 b'haveparents': True, 661 }, 662 ), 663 ) 664 ) 665 666 locallinkrevs[path] = { 667 node: linkrevs[manifestnode] 668 for node, manifestnode in pycompat.iteritems(nodes) 669 } 670 671 for path, f in fs: 672 objs = f.result() 673 674 # Chomp off header objects. 675 next(objs) 676 677 store = repo.file(path) 678 store.addgroup( 679 iterrevisions(objs, progress), 680 locallinkrevs[path].__getitem__, 681 weakref.proxy(tr), 682 ) 683 684 685def _fetchfilesfromcsets( 686 repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False 687): 688 """Fetch file data from explicit changeset revisions.""" 689 690 def iterrevisions(objs, remaining, progress): 691 while remaining: 692 filerevision = next(objs) 693 694 node = filerevision[b'node'] 695 696 extrafields = {} 697 698 for field, size in filerevision.get(b'fieldsfollowing', []): 699 extrafields[field] = next(objs) 700 701 if b'delta' in extrafields: 702 basenode = filerevision[b'deltabasenode'] 703 delta = extrafields[b'delta'] 704 elif b'revision' in extrafields: 705 basenode = repo.nullid 706 revision = extrafields[b'revision'] 707 delta = mdiff.trivialdiffheader(len(revision)) + revision 708 else: 709 continue 710 711 if b'linknode' in filerevision: 712 linknode = filerevision[b'linknode'] 713 else: 714 linknode = node 715 716 yield ( 717 node, 718 filerevision[b'parents'][0], 719 filerevision[b'parents'][1], 720 linknode, 721 basenode, 722 delta, 723 # Flags not yet supported. 724 0, 725 # Sidedata not yet supported. 726 {}, 727 ) 728 729 progress.increment() 730 remaining -= 1 731 732 progress = repo.ui.makeprogress( 733 _(b'files'), 734 unit=_(b'chunks'), 735 total=sum(len(v) for v in pycompat.itervalues(fnodes)), 736 ) 737 738 commandmeta = remote.apidescriptor[b'commands'][b'filesdata'] 739 batchsize = commandmeta.get(b'recommendedbatchsize', 50000) 740 741 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features 742 fields = {b'parents', b'revision'} 743 clrev = repo.changelog.rev 744 745 # There are no guarantees that we'll have ancestor revisions if 746 # a) this repo has shallow file storage b) shallow data fetching is enabled. 747 # Force remote to not delta against possibly unknown revisions when these 748 # conditions hold. 749 haveparents = not (shallowfiles or shallow) 750 751 # Similarly, we may not have calculated linkrevs for all incoming file 752 # revisions. Ask the remote to do work for us in this case. 753 if not haveparents: 754 fields.add(b'linknode') 755 756 for i in pycompat.xrange(0, len(csets), batchsize): 757 batch = [x for x in csets[i : i + batchsize]] 758 if not batch: 759 continue 760 761 with remote.commandexecutor() as e: 762 args = { 763 b'revisions': [ 764 { 765 b'type': b'changesetexplicit', 766 b'nodes': batch, 767 } 768 ], 769 b'fields': fields, 770 b'haveparents': haveparents, 771 } 772 773 if pathfilter: 774 args[b'pathfilter'] = pathfilter 775 776 objs = e.callcommand(b'filesdata', args).result() 777 778 # First object is an overall header. 779 overall = next(objs) 780 781 # We have overall['totalpaths'] segments. 782 for i in pycompat.xrange(overall[b'totalpaths']): 783 header = next(objs) 784 785 path = header[b'path'] 786 store = repo.file(path) 787 788 linkrevs = { 789 fnode: manlinkrevs[mnode] 790 for fnode, mnode in pycompat.iteritems(fnodes[path]) 791 } 792 793 def getlinkrev(node): 794 if node in linkrevs: 795 return linkrevs[node] 796 else: 797 return clrev(node) 798 799 store.addgroup( 800 iterrevisions(objs, header[b'totalitems'], progress), 801 getlinkrev, 802 weakref.proxy(tr), 803 maybemissingparents=shallow, 804 ) 805