1# exchangev2.py - repository exchange for wire protocol version 2
2#
3# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4#
5# This software may be used and distributed according to the terms of the
6# GNU General Public License version 2 or any later version.
7
8from __future__ import absolute_import
9
10import collections
11import weakref
12
13from .i18n import _
14from .node import short
15from . import (
16    bookmarks,
17    error,
18    mdiff,
19    narrowspec,
20    phases,
21    pycompat,
22    requirements as requirementsmod,
23    setdiscovery,
24)
25from .interfaces import repository
26
27
28def pull(pullop):
29    """Pull using wire protocol version 2."""
30    repo = pullop.repo
31    remote = pullop.remote
32
33    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
34
35    # If this is a clone and it was requested to perform a "stream clone",
36    # we obtain the raw files data from the remote then fall back to an
37    # incremental pull. This is somewhat hacky and is not nearly robust enough
38    # for long-term usage.
39    if usingrawchangelogandmanifest:
40        with repo.transaction(b'clone'):
41            _fetchrawstorefiles(repo, remote)
42            repo.invalidate(clearfilecache=True)
43
44    tr = pullop.trmanager.transaction()
45
46    # We don't use the repo's narrow matcher here because the patterns passed
47    # to exchange.pull() could be different.
48    narrowmatcher = narrowspec.match(
49        repo.root,
50        # Empty maps to nevermatcher. So always
51        # set includes if missing.
52        pullop.includepats or {b'path:.'},
53        pullop.excludepats,
54    )
55
56    if pullop.includepats or pullop.excludepats:
57        pathfilter = {}
58        if pullop.includepats:
59            pathfilter[b'include'] = sorted(pullop.includepats)
60        if pullop.excludepats:
61            pathfilter[b'exclude'] = sorted(pullop.excludepats)
62    else:
63        pathfilter = None
64
65    # Figure out what needs to be fetched.
66    common, fetch, remoteheads = _pullchangesetdiscovery(
67        repo, remote, pullop.heads, abortwhenunrelated=pullop.force
68    )
69
70    # And fetch the data.
71    pullheads = pullop.heads or remoteheads
72    csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
73
74    # New revisions are written to the changelog. But all other updates
75    # are deferred. Do those now.
76
77    # Ensure all new changesets are draft by default. If the repo is
78    # publishing, the phase will be adjusted by the loop below.
79    if csetres[b'added']:
80        phases.registernew(
81            repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
82        )
83
84    # And adjust the phase of all changesets accordingly.
85    for phasenumber, phase in phases.phasenames.items():
86        if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
87            continue
88
89        phases.advanceboundary(
90            repo,
91            tr,
92            phasenumber,
93            csetres[b'nodesbyphase'][phase],
94        )
95
96    # Write bookmark updates.
97    bookmarks.updatefromremote(
98        repo.ui,
99        repo,
100        csetres[b'bookmarks'],
101        remote.url(),
102        pullop.gettransaction,
103        explicit=pullop.explicitbookmarks,
104    )
105
106    manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
107
108    # We don't properly support shallow changeset and manifest yet. So we apply
109    # depth limiting locally.
110    if pullop.depth:
111        relevantcsetnodes = set()
112        clnode = repo.changelog.node
113
114        for rev in repo.revs(
115            b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
116        ):
117            relevantcsetnodes.add(clnode(rev))
118
119        csetrelevantfilter = lambda n: n in relevantcsetnodes
120
121    else:
122        csetrelevantfilter = lambda n: True
123
124    # If obtaining the raw store files, we need to scan the full repo to
125    # derive all the changesets, manifests, and linkrevs.
126    if usingrawchangelogandmanifest:
127        csetsforfiles = []
128        mnodesforfiles = []
129        manifestlinkrevs = {}
130
131        for rev in repo:
132            ctx = repo[rev]
133            node = ctx.node()
134
135            if not csetrelevantfilter(node):
136                continue
137
138            mnode = ctx.manifestnode()
139
140            csetsforfiles.append(node)
141            mnodesforfiles.append(mnode)
142            manifestlinkrevs[mnode] = rev
143
144    else:
145        csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
146        mnodesforfiles = manres[b'added']
147        manifestlinkrevs = manres[b'linkrevs']
148
149    # Find all file nodes referenced by added manifests and fetch those
150    # revisions.
151    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
152    _fetchfilesfromcsets(
153        repo,
154        tr,
155        remote,
156        pathfilter,
157        fnodes,
158        csetsforfiles,
159        manifestlinkrevs,
160        shallow=bool(pullop.depth),
161    )
162
163
164def _checkuserawstorefiledata(pullop):
165    """Check whether we should use rawstorefiledata command to retrieve data."""
166
167    repo = pullop.repo
168    remote = pullop.remote
169
170    # Command to obtain raw store data isn't available.
171    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
172        return False
173
174    # Only honor if user requested stream clone operation.
175    if not pullop.streamclonerequested:
176        return False
177
178    # Only works on empty repos.
179    if len(repo):
180        return False
181
182    # TODO This is super hacky. There needs to be a storage API for this. We
183    # also need to check for compatibility with the remote.
184    if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements:
185        return False
186
187    return True
188
189
190def _fetchrawstorefiles(repo, remote):
191    with remote.commandexecutor() as e:
192        objs = e.callcommand(
193            b'rawstorefiledata',
194            {
195                b'files': [b'changelog', b'manifestlog'],
196            },
197        ).result()
198
199        # First object is a summary of files data that follows.
200        overall = next(objs)
201
202        progress = repo.ui.makeprogress(
203            _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
204        )
205        with progress:
206            progress.update(0)
207
208            # Next are pairs of file metadata, data.
209            while True:
210                try:
211                    filemeta = next(objs)
212                except StopIteration:
213                    break
214
215                for k in (b'location', b'path', b'size'):
216                    if k not in filemeta:
217                        raise error.Abort(
218                            _(b'remote file data missing key: %s') % k
219                        )
220
221                if filemeta[b'location'] == b'store':
222                    vfs = repo.svfs
223                else:
224                    raise error.Abort(
225                        _(b'invalid location for raw file data: %s')
226                        % filemeta[b'location']
227                    )
228
229                bytesremaining = filemeta[b'size']
230
231                with vfs.open(filemeta[b'path'], b'wb') as fh:
232                    while True:
233                        try:
234                            chunk = next(objs)
235                        except StopIteration:
236                            break
237
238                        bytesremaining -= len(chunk)
239
240                        if bytesremaining < 0:
241                            raise error.Abort(
242                                _(
243                                    b'received invalid number of bytes for file '
244                                    b'data; expected %d, got extra'
245                                )
246                                % filemeta[b'size']
247                            )
248
249                        progress.increment(step=len(chunk))
250                        fh.write(chunk)
251
252                        try:
253                            if chunk.islast:
254                                break
255                        except AttributeError:
256                            raise error.Abort(
257                                _(
258                                    b'did not receive indefinite length bytestring '
259                                    b'for file data'
260                                )
261                            )
262
263                if bytesremaining:
264                    raise error.Abort(
265                        _(
266                            b'received invalid number of bytes for'
267                            b'file data; expected %d got %d'
268                        )
269                        % (
270                            filemeta[b'size'],
271                            filemeta[b'size'] - bytesremaining,
272                        )
273                    )
274
275
276def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
277    """Determine which changesets need to be pulled."""
278
279    if heads:
280        knownnode = repo.changelog.hasnode
281        if all(knownnode(head) for head in heads):
282            return heads, False, heads
283
284    # TODO wire protocol version 2 is capable of more efficient discovery
285    # than setdiscovery. Consider implementing something better.
286    common, fetch, remoteheads = setdiscovery.findcommonheads(
287        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
288    )
289
290    common = set(common)
291    remoteheads = set(remoteheads)
292
293    # If a remote head is filtered locally, put it back in the common set.
294    # See the comment in exchange._pulldiscoverychangegroup() for more.
295
296    if fetch and remoteheads:
297        has_node = repo.unfiltered().changelog.index.has_node
298
299        common |= {head for head in remoteheads if has_node(head)}
300
301        if set(remoteheads).issubset(common):
302            fetch = []
303
304    common.discard(repo.nullid)
305
306    return common, fetch, remoteheads
307
308
309def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
310    # TODO consider adding a step here where we obtain the DAG shape first
311    # (or ask the server to slice changesets into chunks for us) so that
312    # we can perform multiple fetches in batches. This will facilitate
313    # resuming interrupted clones, higher server-side cache hit rates due
314    # to smaller segments, etc.
315    with remote.commandexecutor() as e:
316        objs = e.callcommand(
317            b'changesetdata',
318            {
319                b'revisions': [
320                    {
321                        b'type': b'changesetdagrange',
322                        b'roots': sorted(common),
323                        b'heads': sorted(remoteheads),
324                    }
325                ],
326                b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
327            },
328        ).result()
329
330        # The context manager waits on all response data when exiting. So
331        # we need to remain in the context manager in order to stream data.
332        return _processchangesetdata(repo, tr, objs)
333
334
335def _processchangesetdata(repo, tr, objs):
336    repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
337
338    urepo = repo.unfiltered()
339    cl = urepo.changelog
340
341    cl.delayupdate(tr)
342
343    # The first emitted object is a header describing the data that
344    # follows.
345    meta = next(objs)
346
347    progress = repo.ui.makeprogress(
348        _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
349    )
350
351    manifestnodes = {}
352    added = []
353
354    def linkrev(node):
355        repo.ui.debug(b'add changeset %s\n' % short(node))
356        # Linkrev for changelog is always self.
357        return len(cl)
358
359    def ondupchangeset(cl, rev):
360        added.append(cl.node(rev))
361
362    def onchangeset(cl, rev):
363        progress.increment()
364
365        revision = cl.changelogrevision(rev)
366        added.append(cl.node(rev))
367
368        # We need to preserve the mapping of changelog revision to node
369        # so we can set the linkrev accordingly when manifests are added.
370        manifestnodes[rev] = revision.manifest
371
372        repo.register_changeset(rev, revision)
373
374    nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
375    remotebookmarks = {}
376
377    # addgroup() expects a 7-tuple describing revisions. This normalizes
378    # the wire data to that format.
379    #
380    # This loop also aggregates non-revision metadata, such as phase
381    # data.
382    def iterrevisions():
383        for cset in objs:
384            node = cset[b'node']
385
386            if b'phase' in cset:
387                nodesbyphase[cset[b'phase']].add(node)
388
389            for mark in cset.get(b'bookmarks', []):
390                remotebookmarks[mark] = node
391
392            # TODO add mechanism for extensions to examine records so they
393            # can siphon off custom data fields.
394
395            extrafields = {}
396
397            for field, size in cset.get(b'fieldsfollowing', []):
398                extrafields[field] = next(objs)
399
400            # Some entries might only be metadata only updates.
401            if b'revision' not in extrafields:
402                continue
403
404            data = extrafields[b'revision']
405
406            yield (
407                node,
408                cset[b'parents'][0],
409                cset[b'parents'][1],
410                # Linknode is always itself for changesets.
411                cset[b'node'],
412                # We always send full revisions. So delta base is not set.
413                repo.nullid,
414                mdiff.trivialdiffheader(len(data)) + data,
415                # Flags not yet supported.
416                0,
417                # Sidedata not yet supported
418                {},
419            )
420
421    cl.addgroup(
422        iterrevisions(),
423        linkrev,
424        weakref.proxy(tr),
425        alwayscache=True,
426        addrevisioncb=onchangeset,
427        duplicaterevisioncb=ondupchangeset,
428    )
429
430    progress.complete()
431
432    return {
433        b'added': added,
434        b'nodesbyphase': nodesbyphase,
435        b'bookmarks': remotebookmarks,
436        b'manifestnodes': manifestnodes,
437    }
438
439
440def _fetchmanifests(repo, tr, remote, manifestnodes):
441    rootmanifest = repo.manifestlog.getstorage(b'')
442
443    # Some manifests can be shared between changesets. Filter out revisions
444    # we already know about.
445    fetchnodes = []
446    linkrevs = {}
447    seen = set()
448
449    for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
450        if node in seen:
451            continue
452
453        try:
454            rootmanifest.rev(node)
455        except error.LookupError:
456            fetchnodes.append(node)
457            linkrevs[node] = clrev
458
459        seen.add(node)
460
461    # TODO handle tree manifests
462
463    # addgroup() expects 7-tuple describing revisions. This normalizes
464    # the wire data to that format.
465    def iterrevisions(objs, progress):
466        for manifest in objs:
467            node = manifest[b'node']
468
469            extrafields = {}
470
471            for field, size in manifest.get(b'fieldsfollowing', []):
472                extrafields[field] = next(objs)
473
474            if b'delta' in extrafields:
475                basenode = manifest[b'deltabasenode']
476                delta = extrafields[b'delta']
477            elif b'revision' in extrafields:
478                basenode = repo.nullid
479                revision = extrafields[b'revision']
480                delta = mdiff.trivialdiffheader(len(revision)) + revision
481            else:
482                continue
483
484            yield (
485                node,
486                manifest[b'parents'][0],
487                manifest[b'parents'][1],
488                # The value passed in is passed to the lookup function passed
489                # to addgroup(). We already have a map of manifest node to
490                # changelog revision number. So we just pass in the
491                # manifest node here and use linkrevs.__getitem__ as the
492                # resolution function.
493                node,
494                basenode,
495                delta,
496                # Flags not yet supported.
497                0,
498                # Sidedata not yet supported.
499                {},
500            )
501
502            progress.increment()
503
504    progress = repo.ui.makeprogress(
505        _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
506    )
507
508    commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
509    batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
510    # TODO make size configurable on client?
511
512    # We send commands 1 at a time to the remote. This is not the most
513    # efficient because we incur a round trip at the end of each batch.
514    # However, the existing frame-based reactor keeps consuming server
515    # data in the background. And this results in response data buffering
516    # in memory. This can consume gigabytes of memory.
517    # TODO send multiple commands in a request once background buffering
518    # issues are resolved.
519
520    added = []
521
522    for i in pycompat.xrange(0, len(fetchnodes), batchsize):
523        batch = [node for node in fetchnodes[i : i + batchsize]]
524        if not batch:
525            continue
526
527        with remote.commandexecutor() as e:
528            objs = e.callcommand(
529                b'manifestdata',
530                {
531                    b'tree': b'',
532                    b'nodes': batch,
533                    b'fields': {b'parents', b'revision'},
534                    b'haveparents': True,
535                },
536            ).result()
537
538            # Chomp off header object.
539            next(objs)
540
541            def onchangeset(cl, rev):
542                added.append(cl.node(rev))
543
544            rootmanifest.addgroup(
545                iterrevisions(objs, progress),
546                linkrevs.__getitem__,
547                weakref.proxy(tr),
548                addrevisioncb=onchangeset,
549                duplicaterevisioncb=onchangeset,
550            )
551
552    progress.complete()
553
554    return {
555        b'added': added,
556        b'linkrevs': linkrevs,
557    }
558
559
560def _derivefilesfrommanifests(repo, matcher, manifestnodes):
561    """Determine what file nodes are relevant given a set of manifest nodes.
562
563    Returns a dict mapping file paths to dicts of file node to first manifest
564    node.
565    """
566    ml = repo.manifestlog
567    fnodes = collections.defaultdict(dict)
568
569    progress = repo.ui.makeprogress(
570        _(b'scanning manifests'), total=len(manifestnodes)
571    )
572
573    with progress:
574        for manifestnode in manifestnodes:
575            m = ml.get(b'', manifestnode)
576
577            # TODO this will pull in unwanted nodes because it takes the storage
578            # delta into consideration. What we really want is something that
579            # takes the delta between the manifest's parents. And ideally we
580            # would ignore file nodes that are known locally. For now, ignore
581            # both these limitations. This will result in incremental fetches
582            # requesting data we already have. So this is far from ideal.
583            md = m.readfast()
584
585            for path, fnode in md.items():
586                if matcher(path):
587                    fnodes[path].setdefault(fnode, manifestnode)
588
589            progress.increment()
590
591    return fnodes
592
593
594def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
595    """Fetch file data from explicit file revisions."""
596
597    def iterrevisions(objs, progress):
598        for filerevision in objs:
599            node = filerevision[b'node']
600
601            extrafields = {}
602
603            for field, size in filerevision.get(b'fieldsfollowing', []):
604                extrafields[field] = next(objs)
605
606            if b'delta' in extrafields:
607                basenode = filerevision[b'deltabasenode']
608                delta = extrafields[b'delta']
609            elif b'revision' in extrafields:
610                basenode = repo.nullid
611                revision = extrafields[b'revision']
612                delta = mdiff.trivialdiffheader(len(revision)) + revision
613            else:
614                continue
615
616            yield (
617                node,
618                filerevision[b'parents'][0],
619                filerevision[b'parents'][1],
620                node,
621                basenode,
622                delta,
623                # Flags not yet supported.
624                0,
625                # Sidedata not yet supported.
626                {},
627            )
628
629            progress.increment()
630
631    progress = repo.ui.makeprogress(
632        _(b'files'),
633        unit=_(b'chunks'),
634        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
635    )
636
637    # TODO make batch size configurable
638    batchsize = 10000
639    fnodeslist = [x for x in sorted(fnodes.items())]
640
641    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
642        batch = [x for x in fnodeslist[i : i + batchsize]]
643        if not batch:
644            continue
645
646        with remote.commandexecutor() as e:
647            fs = []
648            locallinkrevs = {}
649
650            for path, nodes in batch:
651                fs.append(
652                    (
653                        path,
654                        e.callcommand(
655                            b'filedata',
656                            {
657                                b'path': path,
658                                b'nodes': sorted(nodes),
659                                b'fields': {b'parents', b'revision'},
660                                b'haveparents': True,
661                            },
662                        ),
663                    )
664                )
665
666                locallinkrevs[path] = {
667                    node: linkrevs[manifestnode]
668                    for node, manifestnode in pycompat.iteritems(nodes)
669                }
670
671            for path, f in fs:
672                objs = f.result()
673
674                # Chomp off header objects.
675                next(objs)
676
677                store = repo.file(path)
678                store.addgroup(
679                    iterrevisions(objs, progress),
680                    locallinkrevs[path].__getitem__,
681                    weakref.proxy(tr),
682                )
683
684
685def _fetchfilesfromcsets(
686    repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
687):
688    """Fetch file data from explicit changeset revisions."""
689
690    def iterrevisions(objs, remaining, progress):
691        while remaining:
692            filerevision = next(objs)
693
694            node = filerevision[b'node']
695
696            extrafields = {}
697
698            for field, size in filerevision.get(b'fieldsfollowing', []):
699                extrafields[field] = next(objs)
700
701            if b'delta' in extrafields:
702                basenode = filerevision[b'deltabasenode']
703                delta = extrafields[b'delta']
704            elif b'revision' in extrafields:
705                basenode = repo.nullid
706                revision = extrafields[b'revision']
707                delta = mdiff.trivialdiffheader(len(revision)) + revision
708            else:
709                continue
710
711            if b'linknode' in filerevision:
712                linknode = filerevision[b'linknode']
713            else:
714                linknode = node
715
716            yield (
717                node,
718                filerevision[b'parents'][0],
719                filerevision[b'parents'][1],
720                linknode,
721                basenode,
722                delta,
723                # Flags not yet supported.
724                0,
725                # Sidedata not yet supported.
726                {},
727            )
728
729            progress.increment()
730            remaining -= 1
731
732    progress = repo.ui.makeprogress(
733        _(b'files'),
734        unit=_(b'chunks'),
735        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
736    )
737
738    commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
739    batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
740
741    shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
742    fields = {b'parents', b'revision'}
743    clrev = repo.changelog.rev
744
745    # There are no guarantees that we'll have ancestor revisions if
746    # a) this repo has shallow file storage b) shallow data fetching is enabled.
747    # Force remote to not delta against possibly unknown revisions when these
748    # conditions hold.
749    haveparents = not (shallowfiles or shallow)
750
751    # Similarly, we may not have calculated linkrevs for all incoming file
752    # revisions. Ask the remote to do work for us in this case.
753    if not haveparents:
754        fields.add(b'linknode')
755
756    for i in pycompat.xrange(0, len(csets), batchsize):
757        batch = [x for x in csets[i : i + batchsize]]
758        if not batch:
759            continue
760
761        with remote.commandexecutor() as e:
762            args = {
763                b'revisions': [
764                    {
765                        b'type': b'changesetexplicit',
766                        b'nodes': batch,
767                    }
768                ],
769                b'fields': fields,
770                b'haveparents': haveparents,
771            }
772
773            if pathfilter:
774                args[b'pathfilter'] = pathfilter
775
776            objs = e.callcommand(b'filesdata', args).result()
777
778            # First object is an overall header.
779            overall = next(objs)
780
781            # We have overall['totalpaths'] segments.
782            for i in pycompat.xrange(overall[b'totalpaths']):
783                header = next(objs)
784
785                path = header[b'path']
786                store = repo.file(path)
787
788                linkrevs = {
789                    fnode: manlinkrevs[mnode]
790                    for fnode, mnode in pycompat.iteritems(fnodes[path])
791                }
792
793                def getlinkrev(node):
794                    if node in linkrevs:
795                        return linkrevs[node]
796                    else:
797                        return clrev(node)
798
799                store.addgroup(
800                    iterrevisions(objs, header[b'totalitems'], progress),
801                    getlinkrev,
802                    weakref.proxy(tr),
803                    maybemissingparents=shallow,
804                )
805