1# Code dedicated to the discovery of obsolescence marker "over the wire"
2#
3# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
4#
5# This software may be used and distributed according to the terms of the
6# GNU General Public License version 2 or any later version.
7
8# Status: Experiment in progress // open question
9#
10#   The final discovery algorithm and protocol will go into core when we'll be
11#   happy with it.
12#
13#   Some of the code in this module is for compatiblity with older version
14#   of evolve and will be eventually dropped.
15
16from __future__ import absolute_import
17
18import hashlib
19import heapq
20import os
21import sqlite3
22import struct
23import weakref
24
25from mercurial.i18n import _
26from mercurial import (
27    encoding,
28    error,
29    exchange,
30    extensions,
31    localrepo,
32    node as nodemod,
33    obsolete,
34    scmutil,
35    store,
36    util,
37)
38
39from mercurial.utils.stringutil import forcebytestr
40
41from . import (
42    compat,
43    exthelper,
44    obscache,
45    utility,
46    stablerange,
47    stablerangecache,
48)
49
50from mercurial import wireprotov1server
51from mercurial.wireprotov1peer import wirepeer
52from mercurial.wireprototypes import encodelist, decodelist
53
54_pack = struct.pack
55_unpack = struct.unpack
56_calcsize = struct.calcsize
57
58eh = exthelper.exthelper()
59obsexcmsg = utility.obsexcmsg
60
61# Config
62eh.configitem(b'experimental', b'evolution.obsdiscovery', True)
63eh.configitem(b'experimental', b'obshashrange', True)
64eh.configitem(b'experimental', b'obshashrange.warm-cache', b'auto')
65eh.configitem(b'experimental', b'obshashrange.max-revs', None)
66
67##################################
68###  Code performing discovery ###
69##################################
70
71def findmissingrange(ui, local, remote, probeset,
72                     initialsamplesize=100,
73                     fullsamplesize=200):
74    missing = set()
75    starttime = util.timer()
76
77    heads = local.revs(b'heads(%ld)', probeset)
78    local.stablerange.warmup(local)
79
80    rangelength = local.stablerange.rangelength
81    subranges = local.stablerange.subranges
82    # size of slice ?
83    heappop = heapq.heappop
84    heappush = heapq.heappush
85    heapify = heapq.heapify
86
87    tested = set()
88
89    sample = []
90    samplesize = initialsamplesize
91
92    def addentry(entry):
93        if entry in tested:
94            return False
95        sample.append(entry)
96        tested.add(entry)
97        return True
98
99    for h in heads:
100        entry = (h, 0)
101        addentry(entry)
102
103    local.obsstore.rangeobshashcache.update(local)
104    querycount = 0
105    progress = ui.makeprogress(_(b"comparing obsmarker with other"), _(b"queries"))
106    overflow = []
107    while sample or overflow:
108        if overflow:
109            sample.extend(overflow)
110            overflow = []
111
112        if samplesize < len(sample):
113            # too much sample already
114            overflow = sample[samplesize:]
115            sample = sample[:samplesize]
116        elif len(sample) < samplesize:
117            ui.debug(b"query %i; add more sample (target %i, current %i)\n"
118                     % (querycount, samplesize, len(sample)))
119            # we need more sample !
120            needed = samplesize - len(sample)
121            sliceme = []
122            heapify(sliceme)
123            for entry in sample:
124                if 1 < rangelength(local, entry):
125                    heappush(sliceme, (-rangelength(local, entry), entry))
126
127            while sliceme and 0 < needed:
128                _key, target = heappop(sliceme)
129                for new in subranges(local, target):
130                    # XXX we could record hierarchy to optimise drop
131                    if addentry(new):
132                        if 1 < len(new):
133                            heappush(sliceme, (-rangelength(local, new), new))
134                        needed -= 1
135                        if needed <= 0:
136                            break
137
138        # no longer the first interation
139        samplesize = fullsamplesize
140
141        nbsample = len(sample)
142        maxsize = max([rangelength(local, r) for r in sample])
143        ui.debug(b"query %i; sample size is %i, largest range %i\n"
144                 % (querycount, nbsample, maxsize))
145        nbreplies = 0
146        replies = list(_queryrange(ui, local, remote, sample))
147        sample = []
148        n = local.changelog.node
149        for entry, remotehash in replies:
150            nbreplies += 1
151            if remotehash == _obshashrange(local, entry):
152                continue
153            elif 1 == rangelength(local, entry):
154                missing.add(n(entry[0]))
155            else:
156                for new in subranges(local, entry):
157                    addentry(new)
158        assert nbsample == nbreplies
159        querycount += 1
160        progress.update(querycount)
161    progress.complete()
162    local.obsstore.rangeobshashcache.save(local)
163    duration = util.timer() - starttime
164    logmsg = (b'obsdiscovery, %d/%d mismatch'
165              b' - %d obshashrange queries in %.4f seconds\n')
166    logmsg %= (len(missing), len(probeset), querycount, duration)
167    ui.log(b'evoext-obsdiscovery', logmsg)
168    ui.debug(logmsg)
169    return sorted(missing)
170
171def _queryrange(ui, repo, remote, allentries):
172    #  question are asked with node
173    n = repo.changelog.node
174    noderanges = [(n(entry[0]), entry[1]) for entry in allentries]
175    replies = remote.evoext_obshashrange_v1(noderanges)
176    result = []
177    for idx, entry in enumerate(allentries):
178        result.append((entry, replies[idx]))
179    return result
180
181##############################
182### Range Hash computation ###
183##############################
184
185@eh.command(
186    b'debugobshashrange',
187    [
188        (b'', b'rev', [], b'display obshash for all (rev, 0) range in REVS'),
189        (b'', b'subranges', False, b'display all subranges'),
190    ],
191    _(b''))
192def debugobshashrange(ui, repo, **opts):
193    """display the ::REVS set topologically sorted in a stable way
194    """
195    s = nodemod.short
196    revs = scmutil.revrange(repo, opts['rev'])
197    # prewarm depth cache
198    if revs:
199        repo.stablerange.warmup(repo, max(revs))
200    cl = repo.changelog
201    rangelength = repo.stablerange.rangelength
202    depthrev = repo.stablerange.depthrev
203    if opts['subranges']:
204        ranges = stablerange.subrangesclosure(repo, repo.stablerange, revs)
205    else:
206        ranges = [(r, 0) for r in revs]
207    headers = (b'rev', b'node', b'index', b'size', b'depth', b'obshash')
208    linetemplate = b'%12d %12s %12d %12d %12d %12s\n'
209    headertemplate = linetemplate.replace(b'd', b's')
210    ui.status(headertemplate % headers)
211    repo.obsstore.rangeobshashcache.update(repo)
212    for r in ranges:
213        d = (r[0],
214             s(cl.node(r[0])),
215             r[1],
216             rangelength(repo, r),
217             depthrev(repo, r[0]),
218             s(_obshashrange(repo, r)))
219        ui.status(linetemplate % d)
220    repo.obsstore.rangeobshashcache.save(repo)
221
222def _obshashrange(repo, rangeid):
223    """return the obsolete hash associated to a range"""
224    cache = repo.obsstore.rangeobshashcache
225    cl = repo.changelog
226    obshash = cache.get(rangeid)
227    if obshash is not None:
228        return obshash
229    pieces = []
230    nullid = nodemod.nullid
231    if repo.stablerange.rangelength(repo, rangeid) == 1:
232        rangenode = cl.node(rangeid[0])
233        tmarkers = repo.obsstore.relevantmarkers([rangenode])
234        pieces = []
235        for m in tmarkers:
236            mbin = obsolete._fm1encodeonemarker(m)
237            pieces.append(mbin)
238        pieces.sort()
239    else:
240        for subrange in repo.stablerange.subranges(repo, rangeid):
241            obshash = _obshashrange(repo, subrange)
242            if obshash != nullid:
243                pieces.append(obshash)
244
245    sha = hashlib.sha1()
246    # note: if there is only one subrange with actual data, we'll just
247    # reuse the same hash.
248    if not pieces:
249        obshash = nodemod.nullid
250    elif len(pieces) != 1 or obshash is None:
251        sha = hashlib.sha1()
252        for p in pieces:
253            sha.update(p)
254        obshash = sha.digest()
255    cache[rangeid] = obshash
256    return obshash
257
258### sqlite caching
259
260_sqliteschema = [
261    r"""CREATE TABLE obshashrange(rev     INTEGER NOT NULL,
262                                 idx     INTEGER NOT NULL,
263                                 obshash BLOB    NOT NULL,
264                                 PRIMARY KEY(rev, idx));""",
265    r"CREATE INDEX range_index ON obshashrange(rev, idx);",
266    r"""CREATE TABLE meta(schemaversion INTEGER NOT NULL,
267                         tiprev        INTEGER NOT NULL,
268                         tipnode       BLOB    NOT NULL,
269                         nbobsmarker   INTEGER NOT NULL,
270                         obssize       BLOB    NOT NULL,
271                         obskey        BLOB    NOT NULL
272                        );""",
273]
274_queryexist = r"SELECT name FROM sqlite_master WHERE type='table' AND name='meta';"
275_clearmeta = r"""DELETE FROM meta;"""
276_newmeta = r"""INSERT INTO meta (schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey)
277            VALUES (?,?,?,?,?,?);"""
278_updateobshash = r"INSERT INTO obshashrange(rev, idx, obshash) VALUES (?,?,?);"
279_querymeta = r"SELECT schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey FROM meta;"
280_queryobshash = r"SELECT obshash FROM obshashrange WHERE (rev = ? AND idx = ?);"
281_query_max_stored = r"SELECT MAX(rev) FROM obshashrange"
282
283_reset = r"DELETE FROM obshashrange;"
284_delete = r"DELETE FROM obshashrange WHERE (rev = ? AND idx = ?);"
285
286def _affectedby(repo, markers):
287    """return all nodes whose relevant set is affected by this changeset
288
289    This is a reversed version of obsstore.relevantmarkers
290    """
291    affected_nodes = set()
292    known_markers = set(markers)
293    nodes_to_proceed = set()
294    markers_to_proceed = set(known_markers)
295
296    successors = repo.obsstore.successors
297    predecessors = repo.obsstore.predecessors
298
299    while nodes_to_proceed or markers_to_proceed:
300        while markers_to_proceed:
301            marker = markers_to_proceed.pop()
302            # check successors and parent
303            if marker[1]:
304                relevant = (marker[1], )
305            else: # prune case
306                relevant = ((marker[0], ), marker[5])
307            for relnodes in relevant:
308                if relnodes is None:
309                    continue
310                for node in relnodes:
311                    if node not in affected_nodes:
312                        nodes_to_proceed.add(node)
313                    affected_nodes.add(node)
314        # markers_to_proceed is now empty:
315        if nodes_to_proceed:
316            node = nodes_to_proceed.pop()
317            markers = set()
318            markers.update(successors.get(node, ()))
319            markers.update(predecessors.get(node, ()))
320            markers -= known_markers
321            markers_to_proceed.update(markers)
322            known_markers.update(markers)
323
324    return affected_nodes
325
326# if there is that many new obsmarkers, reset without analysing them
327RESET_ABOVE = 10000
328
329class _obshashcache(obscache.dualsourcecache):
330
331    _schemaversion = 3
332
333    _cachename = b'evo-ext-obshashrange' # used for error message
334    _filename = b'evoext_obshashrange_v2.sqlite'
335
336    def __init__(self, repo):
337        super(_obshashcache, self).__init__()
338        self._vfs = repo.vfs
339        self._path = repo.cachevfs.join(self._filename)
340        self._new = set()
341        self._valid = True
342        self._repo = weakref.ref(repo.unfiltered())
343        # cache status
344        self._ondiskcachekey = None
345        self._data = {}
346        self._createmode = store._calcmode(self._vfs)
347
348    def clear(self, reset=False):
349        super(_obshashcache, self).clear(reset=reset)
350        self._data.clear()
351        self._new.clear()
352        if reset:
353            self._valid = False
354        if r'_con' in vars(self):
355            del self._con
356
357    def get(self, rangeid):
358        # revision should be covered by the tiprev
359        #
360        # XXX there are issue with cache warming, we hack around it for now
361        if not getattr(self, '_updating', False):
362            if self._cachekey[0] < rangeid[0]:
363                msg = (b'using unwarmed obshashrangecache (%s %s)'
364                       % (rangeid[0], self._cachekey[0]))
365                raise error.ProgrammingError(msg)
366
367        value = self._data.get(rangeid)
368        if value is None and self._con is not None:
369            nrange = (rangeid[0], rangeid[1])
370            try:
371                obshash = self._con.execute(_queryobshash, nrange).fetchone()
372                if obshash is not None:
373                    value = obshash[0]
374                self._data[rangeid] = value
375            except (sqlite3.DatabaseError, sqlite3.OperationalError):
376                # something is wrong with the sqlite db
377                # Since this is a cache, we ignore it.
378                if r'_con' in vars(self):
379                    del self._con
380                self._new.clear()
381        return value
382
383    def __setitem__(self, rangeid, obshash):
384        self._new.add(rangeid)
385        self._data[rangeid] = obshash
386
387    def _updatefrom(self, repo, revs, obsmarkers):
388        """override this method to update your cache data incrementally
389
390        revs:      list of new revision in the changelog
391        obsmarker: list of new obsmarkers in the obsstore
392        """
393        # XXX for now, we'll not actually update the cache, but we'll be
394        # smarter at invalidating it.
395        #
396        # 1) new revisions does not get their entry updated (not update)
397        # 2) if we detect markers affecting non-new revision we reset the cache
398
399        self._updating = True
400
401        con = self._con
402        if con is not None:
403            reset = False
404            affected = []
405            if RESET_ABOVE < len(obsmarkers):
406                # lots of new obsmarkers, probably smarter to reset the cache
407                repo.ui.log(b'evoext-cache', b'obshashcache reset - '
408                            b'many new markers (%d)\n'
409                            % len(obsmarkers))
410                reset = True
411            elif obsmarkers:
412                max_stored = con.execute(_query_max_stored).fetchall()[0][0]
413                affected_nodes = _affectedby(repo, obsmarkers)
414
415                getrev = compat.getgetrev(repo.changelog)
416                affected = [getrev(n) for n in affected_nodes]
417                affected = [r for r in affected
418                            if r is not None and r <= max_stored]
419
420            if RESET_ABOVE < len(affected):
421                repo.ui.log(b'evoext-cache', b'obshashcache reset - '
422                            b'new markers affect many changeset (%d)\n'
423                            % len(affected))
424                reset = True
425
426            if affected or reset:
427                if not reset:
428                    repo.ui.log(b'evoext-cache', b'obshashcache clean - '
429                                b'new markers affect %d changeset and cached ranges\n'
430                                % len(affected))
431                if con is not None:
432                    # always reset for now, the code detecting affect is buggy
433                    # so we need to reset more broadly than we would like.
434                    try:
435                        if repo.stablerange._con is None:
436                            repo.ui.log(b'evoext-cache', b'obshashcache reset - '
437                                        b'underlying stablerange cache unavailable\n')
438                            reset = True
439                        if reset:
440                            con.execute(_reset)
441                            self._data.clear()
442                        else:
443                            ranges = repo.stablerange.contains(repo, affected)
444                            con.executemany(_delete, ranges)
445                            for r in ranges:
446                                self._data.pop(r, None)
447                    except (sqlite3.DatabaseError, sqlite3.OperationalError) as exc:
448                        repo.ui.log(b'evoext-cache',
449                                    b'error while updating obshashrange cache: %s\n'
450                                    % forcebytestr(exc))
451                        del self._updating
452                        return
453
454                # rewarm key revisions
455                #
456                # (The current invalidation is too wide, but rewarming every
457                # single revision is quite costly)
458                newrevs = []
459                stop = self._cachekey[0] # tiprev
460                for h in repo.filtered(b'immutable').changelog.headrevs():
461                    if h <= stop and h in affected:
462                        newrevs.append(h)
463                newrevs.extend(revs)
464                revs = newrevs
465
466        repo.depthcache.update(repo)
467        total = len(revs)
468
469        # warm the cache for the new revs
470        progress = repo.ui.makeprogress(b'updating obshashrange cache', _(b'changesets'), total)
471        progress.update(0)
472        for idx, r in enumerate(revs):
473            _obshashrange(repo, (r, 0))
474            revstr = b'' if r is None else (b'rev %d' % r)
475            progress.update(idx, item=revstr)
476        progress.complete()
477
478        del self._updating
479
480    @property
481    def _fullcachekey(self):
482        return (self._schemaversion, ) + self._cachekey
483
484    def load(self, repo):
485        if self._con is None:
486            self._cachekey = self.emptykey
487            self._ondiskcachekey = self.emptykey
488        assert self._cachekey is not None
489
490    def _db(self):
491        try:
492            util.makedirs(self._vfs.dirname(self._path), self._createmode)
493        except OSError:
494            return None
495        if self._createmode is not None:
496            pre_existed = os.access(self._path, os.R_OK)
497        con = sqlite3.connect(encoding.strfromlocal(self._path), timeout=30,
498                              isolation_level=r"IMMEDIATE")
499        con.text_factory = bytes
500        if self._createmode is not None and not pre_existed:
501            try:
502                os.chmod(self._path, self._createmode & 0o666)
503            except OSError:
504                pass
505        return con
506
507    @util.propertycache
508    def _con(self):
509        if not self._valid:
510            return None
511        repo = self._repo()
512        if repo is None:
513            return None
514        con = self._db()
515        if con is None:
516            return None
517        cur = con.execute(_queryexist)
518        if cur.fetchone() is None:
519            self._valid = False
520            return None
521        meta = con.execute(_querymeta).fetchone()
522        if meta is None or meta[0] != self._schemaversion:
523            self._valid = False
524            return None
525        self._cachekey = self._ondiskcachekey = meta[1:]
526        return con
527
528    def save(self, repo):
529        if self._cachekey is None:
530            return
531        if self._cachekey == self._ondiskcachekey and not self._new:
532            return
533        repo = repo.unfiltered()
534        try:
535            with repo.lock():
536                if r'stablerange' in vars(repo):
537                    repo.stablerange.save(repo)
538                self._save(repo)
539        except error.LockError:
540            # Exceptionnally we are noisy about it since performance impact
541            # is large We should address that before using this more
542            # widely.
543            msg = _(b'obshashrange cache: skipping save unable to lock repo\n')
544            repo.ui.warn(msg)
545
546    def _save(self, repo):
547        if not self._new:
548            return
549        try:
550            return self._trysave(repo)
551        except (IOError, OSError, sqlite3.DatabaseError, sqlite3.OperationalError, sqlite3.IntegrityError) as exc:
552            # Catch error that may arise under stress
553            #
554            # operational error catch read-only and locked database
555            # IntegrityError catch Unique constraint error that may arise
556            if r'_con' in vars(self):
557                del self._con
558            self._new.clear()
559            repo.ui.log(b'evoext-cache',
560                        b'error while saving new data: %s\n'
561                        % forcebytestr(exc))
562            repo.ui.debug(b'evoext-cache: error while saving new data: %s\n'
563                          % forcebytestr(exc))
564
565    def _trysave(self, repo):
566        if self._con is None:
567            util.unlinkpath(self._path, ignoremissing=True)
568            if r'_con' in vars(self):
569                del self._con
570
571            con = self._db()
572            if con is None:
573                repo.ui.log(b'evoext-cache', b'unable to write obshashrange cache'
574                            b' - cannot create database\n')
575                return
576            with con:
577                for req in _sqliteschema:
578                    con.execute(req)
579
580                meta = [self._schemaversion] + list(self.emptykey)
581                con.execute(_newmeta, meta)
582                self._ondiskcachekey = self.emptykey
583        else:
584            con = self._con
585        with con:
586            meta = con.execute(_querymeta).fetchone()
587            if meta[1:] != self._ondiskcachekey:
588                # drifting is currently an issue because this means another
589                # process might have already added the cache line we are about
590                # to add. This will confuse sqlite
591                msg = _(b'obshashrange cache: skipping write, '
592                        b'database drifted under my feet\n')
593                repo.ui.warn(msg)
594                self._new.clear()
595                self._valid = False
596                if r'_con' in vars(self):
597                    del self._con
598                self._valid = False
599                return
600            data = ((rangeid[0], rangeid[1], self.get(rangeid)) for rangeid in self._new)
601            con.executemany(_updateobshash, data)
602            cachekey = self._fullcachekey
603            con.execute(_clearmeta) # remove the older entry
604            con.execute(_newmeta, cachekey)
605            self._new.clear()
606            self._valid = True
607            self._ondiskcachekey = self._cachekey
608
609@eh.wrapfunction(obsolete.obsstore, '_addmarkers')
610def _addmarkers(orig, obsstore, *args, **kwargs):
611    obsstore.rangeobshashcache.clear()
612    return orig(obsstore, *args, **kwargs)
613
614obsstorefilecache = localrepo.localrepository.obsstore
615
616
617# obsstore is a filecache so we have do to some spacial dancing
618@eh.wrapfunction(obsstorefilecache, 'func')
619def obsstorewithcache(orig, repo):
620    obsstore = orig(repo)
621    obsstore.rangeobshashcache = _obshashcache(repo.unfiltered())
622    return obsstore
623
624@eh.reposetup
625def setupcache(ui, repo):
626
627    class obshashrepo(repo.__class__):
628        @localrepo.unfilteredmethod
629        def destroyed(self):
630            if r'obsstore' in vars(self):
631                self.obsstore.rangeobshashcache.clear()
632            toplevel = not util.safehasattr(self, '_destroying')
633            if toplevel:
634                self._destroying = True
635            try:
636                super(obshashrepo, self).destroyed()
637            finally:
638                if toplevel:
639                    del self._destroying
640
641        @localrepo.unfilteredmethod
642        def updatecaches(self, tr=None, **kwargs):
643            if utility.shouldwarmcache(self, tr):
644                self.obsstore.rangeobshashcache.update(self)
645                self.obsstore.rangeobshashcache.save(self)
646            super(obshashrepo, self).updatecaches(tr, **kwargs)
647
648    repo.__class__ = obshashrepo
649
650### wire protocol commands
651
652def _obshashrange_v0(repo, ranges):
653    """return a list of hash from a list of range
654
655    The range have the id encoded as a node
656
657    return 'wdirid' for unknown range"""
658    getrev = compat.getgetrev(repo.changelog)
659    ranges = [(getrev(n), idx) for n, idx in ranges]
660    if ranges:
661        maxrev = max(r for r, i in ranges)
662        if maxrev is not None:
663            repo.stablerange.warmup(repo, upto=maxrev)
664    result = []
665    repo.obsstore.rangeobshashcache.update(repo)
666    for r in ranges:
667        if r[0] is None:
668            result.append(nodemod.wdirid)
669        else:
670            result.append(_obshashrange(repo, r))
671    repo.obsstore.rangeobshashcache.save(repo)
672    return result
673
674@eh.addattr(localrepo.localpeer, 'evoext_obshashrange_v1')
675def local_obshashrange_v0(peer, ranges):
676    return _obshashrange_v0(peer._repo, ranges)
677
678
679_indexformat = b'>I'
680_indexsize = _calcsize(_indexformat)
681def _encrange(node_rangeid):
682    """encode a (node) range"""
683    headnode, index = node_rangeid
684    return headnode + _pack(_indexformat, index)
685
686def _decrange(data):
687    """encode a (node) range"""
688    assert _indexsize < len(data), len(data)
689    headnode = data[:-_indexsize]
690    index = _unpack(_indexformat, data[-_indexsize:])[0]
691    return (headnode, index)
692
693@eh.addattr(wirepeer, 'evoext_obshashrange_v1')
694def peer_obshashrange_v0(self, ranges):
695    binranges = [_encrange(r) for r in ranges]
696    encranges = encodelist(binranges)
697    d = self._call(b"evoext_obshashrange_v1", ranges=encranges)
698    try:
699        return decodelist(d)
700    except ValueError:
701        self._abort(error.ResponseError(_(b"unexpected response:"), d))
702
703@wireprotov1server.wireprotocommand(b'evoext_obshashrange_v1', b'ranges', b'pull')
704def srv_obshashrange_v1(repo, proto, ranges):
705    ranges = decodelist(ranges)
706    ranges = [_decrange(r) for r in ranges]
707    hashes = _obshashrange_v0(repo, ranges)
708    return encodelist(hashes)
709
710def _useobshashrange(repo):
711    base = repo.ui.configbool(b'experimental', b'obshashrange')
712    if base:
713        maxrevs = repo.ui.configint(b'experimental', b'obshashrange.max-revs')
714        if maxrevs is not None and maxrevs < len(repo.unfiltered()):
715            base = False
716    return base
717
718def _canobshashrange(local, remote):
719    return (_useobshashrange(local)
720            and remote.capable(b'_evoext_obshashrange_v1'))
721
722def _obshashrange_capabilities(orig, repo, proto):
723    """wrapper to advertise new capability"""
724    caps = orig(repo, proto)
725    enabled = _useobshashrange(repo)
726    if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:
727        caps.append(b'_evoext_obshashrange_v1')
728    return caps
729
730@eh.extsetup
731def obshashrange_extsetup(ui):
732    extensions.wrapfunction(wireprotov1server, '_capabilities',
733                            _obshashrange_capabilities)
734
735##########################################
736###  trigger discovery during exchange ###
737##########################################
738
739def _dopushmarkers(pushop):
740    return (# we have any markers to push
741            pushop.repo.obsstore
742            # exchange of obsmarkers is enabled locally
743            and obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
744            # remote server accept markers
745            and b'obsolete' in pushop.remote.listkeys(b'namespaces'))
746
747def _pushobshashrange(pushop, commonrevs):
748    repo = pushop.repo.unfiltered()
749    remote = pushop.remote
750    missing = findmissingrange(pushop.ui, repo, remote, commonrevs)
751    missing += pushop.outgoing.missing
752    return missing
753
754# available discovery method, first valid is used
755# tuple (canuse, perform discovery))
756obsdiscoveries = [
757    (_canobshashrange, _pushobshashrange),
758]
759
760obsdiscovery_skip_message = b"""\
761(skipping discovery of obsolescence markers, will exchange everything)
762(controled by 'experimental.evolution.obsdiscovery' configuration)
763"""
764
765def usediscovery(repo):
766    return repo.ui.configbool(b'experimental', b'evolution.obsdiscovery')
767
768@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
769def _pushdiscoveryobsmarkers(orig, pushop):
770    if _dopushmarkers(pushop):
771        repo = pushop.repo
772        remote = pushop.remote
773        obsexcmsg(repo.ui, b"computing relevant nodes\n")
774        revs = list(repo.revs(b'::%ln', pushop.futureheads))
775        unfi = repo.unfiltered()
776
777        if not usediscovery(repo):
778            # discovery disabled by user
779            repo.ui.status(obsdiscovery_skip_message)
780            return orig(pushop)
781
782        # look for an obs-discovery protocol we can use
783        discovery = None
784        for candidate in obsdiscoveries:
785            if candidate[0](repo, remote):
786                discovery = candidate[1]
787                break
788
789        if discovery is None:
790            # no discovery available, rely on core to push all relevants
791            # obs markers.
792            return orig(pushop)
793
794        obsexcmsg(repo.ui, b"looking for common markers in %i nodes\n"
795                           % len(revs))
796        commonrevs = list(unfi.revs(b'::%ln', pushop.outgoing.commonheads))
797        # find the nodes where the relevant obsmarkers mismatches
798        nodes = discovery(pushop, commonrevs)
799
800        if nodes:
801            obsexcmsg(repo.ui, b"computing markers relevant to %i nodes\n"
802                               % len(nodes))
803            pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
804        else:
805            obsexcmsg(repo.ui, b"markers already in sync\n")
806            pushop.outobsmarkers = []
807
808@eh.extsetup
809def _installobsmarkersdiscovery(ui):
810    olddisco = exchange.pushdiscoverymapping[b'obsmarker']
811
812    def newdisco(pushop):
813        _pushdiscoveryobsmarkers(olddisco, pushop)
814    exchange.pushdiscoverymapping[b'obsmarker'] = newdisco
815
816def buildpullobsmarkersboundaries(pullop, bundle2=True):
817    """small function returning the argument for pull markers call
818    may to contains 'heads' and 'common'. skip the key for None.
819
820    It is a separed function to play around with strategy for that."""
821    repo = pullop.repo
822    remote = pullop.remote
823    unfi = repo.unfiltered()
824    # Also exclude filtered revisions. Working on unfiltered repository can
825    # give a bit more precise view of the repository. However it makes the
826    # overall operation more complicated.
827    filteredrevs = repo.changelog.filteredrevs
828    # XXX probably not very efficient
829    revs = unfi.revs(b'::(%ln - null) - %ld', pullop.common, filteredrevs)
830    boundaries = {b'heads': pullop.pulledsubset}
831    if not revs: # nothing common
832        boundaries[b'common'] = [nodemod.nullid]
833        return boundaries
834
835    if not usediscovery(repo):
836        # discovery disabled by users.
837        repo.ui.status(obsdiscovery_skip_message)
838        boundaries[b'common'] = [nodemod.nullid]
839        return boundaries
840
841    if bundle2 and _canobshashrange(repo, remote):
842        obsexcmsg(repo.ui, b"looking for common markers in %i nodes\n"
843                  % len(revs))
844        missing = findmissingrange(repo.ui, repo, pullop.remote, revs)
845        boundaries[b'missing'] = missing
846        # using getattr since `limitedarguments` is missing
847        # hg <= 5.0 (69921d02daaf)
848        if getattr(pullop.remote, 'limitedarguments', False):
849            # prepare for a possible fallback to common
850            common = repo.set("heads(only(%ld, %ln))", revs, missing)
851            boundaries[b'common'] = [c.node() for c in common]
852    else:
853        boundaries[b'common'] = [nodemod.nullid]
854    return boundaries
855
856# merge later for outer layer wrapping
857eh.merge(stablerangecache.eh)
858