1# Code dedicated to the discovery of obsolescence marker "over the wire" 2# 3# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org> 4# 5# This software may be used and distributed according to the terms of the 6# GNU General Public License version 2 or any later version. 7 8# Status: Experiment in progress // open question 9# 10# The final discovery algorithm and protocol will go into core when we'll be 11# happy with it. 12# 13# Some of the code in this module is for compatiblity with older version 14# of evolve and will be eventually dropped. 15 16from __future__ import absolute_import 17 18import hashlib 19import heapq 20import os 21import sqlite3 22import struct 23import weakref 24 25from mercurial.i18n import _ 26from mercurial import ( 27 encoding, 28 error, 29 exchange, 30 extensions, 31 localrepo, 32 node as nodemod, 33 obsolete, 34 scmutil, 35 store, 36 util, 37) 38 39from mercurial.utils.stringutil import forcebytestr 40 41from . import ( 42 compat, 43 exthelper, 44 obscache, 45 utility, 46 stablerange, 47 stablerangecache, 48) 49 50from mercurial import wireprotov1server 51from mercurial.wireprotov1peer import wirepeer 52from mercurial.wireprototypes import encodelist, decodelist 53 54_pack = struct.pack 55_unpack = struct.unpack 56_calcsize = struct.calcsize 57 58eh = exthelper.exthelper() 59obsexcmsg = utility.obsexcmsg 60 61# Config 62eh.configitem(b'experimental', b'evolution.obsdiscovery', True) 63eh.configitem(b'experimental', b'obshashrange', True) 64eh.configitem(b'experimental', b'obshashrange.warm-cache', b'auto') 65eh.configitem(b'experimental', b'obshashrange.max-revs', None) 66 67################################## 68### Code performing discovery ### 69################################## 70 71def findmissingrange(ui, local, remote, probeset, 72 initialsamplesize=100, 73 fullsamplesize=200): 74 missing = set() 75 starttime = util.timer() 76 77 heads = local.revs(b'heads(%ld)', probeset) 78 local.stablerange.warmup(local) 79 80 rangelength = local.stablerange.rangelength 81 subranges = local.stablerange.subranges 82 # size of slice ? 83 heappop = heapq.heappop 84 heappush = heapq.heappush 85 heapify = heapq.heapify 86 87 tested = set() 88 89 sample = [] 90 samplesize = initialsamplesize 91 92 def addentry(entry): 93 if entry in tested: 94 return False 95 sample.append(entry) 96 tested.add(entry) 97 return True 98 99 for h in heads: 100 entry = (h, 0) 101 addentry(entry) 102 103 local.obsstore.rangeobshashcache.update(local) 104 querycount = 0 105 progress = ui.makeprogress(_(b"comparing obsmarker with other"), _(b"queries")) 106 overflow = [] 107 while sample or overflow: 108 if overflow: 109 sample.extend(overflow) 110 overflow = [] 111 112 if samplesize < len(sample): 113 # too much sample already 114 overflow = sample[samplesize:] 115 sample = sample[:samplesize] 116 elif len(sample) < samplesize: 117 ui.debug(b"query %i; add more sample (target %i, current %i)\n" 118 % (querycount, samplesize, len(sample))) 119 # we need more sample ! 120 needed = samplesize - len(sample) 121 sliceme = [] 122 heapify(sliceme) 123 for entry in sample: 124 if 1 < rangelength(local, entry): 125 heappush(sliceme, (-rangelength(local, entry), entry)) 126 127 while sliceme and 0 < needed: 128 _key, target = heappop(sliceme) 129 for new in subranges(local, target): 130 # XXX we could record hierarchy to optimise drop 131 if addentry(new): 132 if 1 < len(new): 133 heappush(sliceme, (-rangelength(local, new), new)) 134 needed -= 1 135 if needed <= 0: 136 break 137 138 # no longer the first interation 139 samplesize = fullsamplesize 140 141 nbsample = len(sample) 142 maxsize = max([rangelength(local, r) for r in sample]) 143 ui.debug(b"query %i; sample size is %i, largest range %i\n" 144 % (querycount, nbsample, maxsize)) 145 nbreplies = 0 146 replies = list(_queryrange(ui, local, remote, sample)) 147 sample = [] 148 n = local.changelog.node 149 for entry, remotehash in replies: 150 nbreplies += 1 151 if remotehash == _obshashrange(local, entry): 152 continue 153 elif 1 == rangelength(local, entry): 154 missing.add(n(entry[0])) 155 else: 156 for new in subranges(local, entry): 157 addentry(new) 158 assert nbsample == nbreplies 159 querycount += 1 160 progress.update(querycount) 161 progress.complete() 162 local.obsstore.rangeobshashcache.save(local) 163 duration = util.timer() - starttime 164 logmsg = (b'obsdiscovery, %d/%d mismatch' 165 b' - %d obshashrange queries in %.4f seconds\n') 166 logmsg %= (len(missing), len(probeset), querycount, duration) 167 ui.log(b'evoext-obsdiscovery', logmsg) 168 ui.debug(logmsg) 169 return sorted(missing) 170 171def _queryrange(ui, repo, remote, allentries): 172 # question are asked with node 173 n = repo.changelog.node 174 noderanges = [(n(entry[0]), entry[1]) for entry in allentries] 175 replies = remote.evoext_obshashrange_v1(noderanges) 176 result = [] 177 for idx, entry in enumerate(allentries): 178 result.append((entry, replies[idx])) 179 return result 180 181############################## 182### Range Hash computation ### 183############################## 184 185@eh.command( 186 b'debugobshashrange', 187 [ 188 (b'', b'rev', [], b'display obshash for all (rev, 0) range in REVS'), 189 (b'', b'subranges', False, b'display all subranges'), 190 ], 191 _(b'')) 192def debugobshashrange(ui, repo, **opts): 193 """display the ::REVS set topologically sorted in a stable way 194 """ 195 s = nodemod.short 196 revs = scmutil.revrange(repo, opts['rev']) 197 # prewarm depth cache 198 if revs: 199 repo.stablerange.warmup(repo, max(revs)) 200 cl = repo.changelog 201 rangelength = repo.stablerange.rangelength 202 depthrev = repo.stablerange.depthrev 203 if opts['subranges']: 204 ranges = stablerange.subrangesclosure(repo, repo.stablerange, revs) 205 else: 206 ranges = [(r, 0) for r in revs] 207 headers = (b'rev', b'node', b'index', b'size', b'depth', b'obshash') 208 linetemplate = b'%12d %12s %12d %12d %12d %12s\n' 209 headertemplate = linetemplate.replace(b'd', b's') 210 ui.status(headertemplate % headers) 211 repo.obsstore.rangeobshashcache.update(repo) 212 for r in ranges: 213 d = (r[0], 214 s(cl.node(r[0])), 215 r[1], 216 rangelength(repo, r), 217 depthrev(repo, r[0]), 218 s(_obshashrange(repo, r))) 219 ui.status(linetemplate % d) 220 repo.obsstore.rangeobshashcache.save(repo) 221 222def _obshashrange(repo, rangeid): 223 """return the obsolete hash associated to a range""" 224 cache = repo.obsstore.rangeobshashcache 225 cl = repo.changelog 226 obshash = cache.get(rangeid) 227 if obshash is not None: 228 return obshash 229 pieces = [] 230 nullid = nodemod.nullid 231 if repo.stablerange.rangelength(repo, rangeid) == 1: 232 rangenode = cl.node(rangeid[0]) 233 tmarkers = repo.obsstore.relevantmarkers([rangenode]) 234 pieces = [] 235 for m in tmarkers: 236 mbin = obsolete._fm1encodeonemarker(m) 237 pieces.append(mbin) 238 pieces.sort() 239 else: 240 for subrange in repo.stablerange.subranges(repo, rangeid): 241 obshash = _obshashrange(repo, subrange) 242 if obshash != nullid: 243 pieces.append(obshash) 244 245 sha = hashlib.sha1() 246 # note: if there is only one subrange with actual data, we'll just 247 # reuse the same hash. 248 if not pieces: 249 obshash = nodemod.nullid 250 elif len(pieces) != 1 or obshash is None: 251 sha = hashlib.sha1() 252 for p in pieces: 253 sha.update(p) 254 obshash = sha.digest() 255 cache[rangeid] = obshash 256 return obshash 257 258### sqlite caching 259 260_sqliteschema = [ 261 r"""CREATE TABLE obshashrange(rev INTEGER NOT NULL, 262 idx INTEGER NOT NULL, 263 obshash BLOB NOT NULL, 264 PRIMARY KEY(rev, idx));""", 265 r"CREATE INDEX range_index ON obshashrange(rev, idx);", 266 r"""CREATE TABLE meta(schemaversion INTEGER NOT NULL, 267 tiprev INTEGER NOT NULL, 268 tipnode BLOB NOT NULL, 269 nbobsmarker INTEGER NOT NULL, 270 obssize BLOB NOT NULL, 271 obskey BLOB NOT NULL 272 );""", 273] 274_queryexist = r"SELECT name FROM sqlite_master WHERE type='table' AND name='meta';" 275_clearmeta = r"""DELETE FROM meta;""" 276_newmeta = r"""INSERT INTO meta (schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey) 277 VALUES (?,?,?,?,?,?);""" 278_updateobshash = r"INSERT INTO obshashrange(rev, idx, obshash) VALUES (?,?,?);" 279_querymeta = r"SELECT schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey FROM meta;" 280_queryobshash = r"SELECT obshash FROM obshashrange WHERE (rev = ? AND idx = ?);" 281_query_max_stored = r"SELECT MAX(rev) FROM obshashrange" 282 283_reset = r"DELETE FROM obshashrange;" 284_delete = r"DELETE FROM obshashrange WHERE (rev = ? AND idx = ?);" 285 286def _affectedby(repo, markers): 287 """return all nodes whose relevant set is affected by this changeset 288 289 This is a reversed version of obsstore.relevantmarkers 290 """ 291 affected_nodes = set() 292 known_markers = set(markers) 293 nodes_to_proceed = set() 294 markers_to_proceed = set(known_markers) 295 296 successors = repo.obsstore.successors 297 predecessors = repo.obsstore.predecessors 298 299 while nodes_to_proceed or markers_to_proceed: 300 while markers_to_proceed: 301 marker = markers_to_proceed.pop() 302 # check successors and parent 303 if marker[1]: 304 relevant = (marker[1], ) 305 else: # prune case 306 relevant = ((marker[0], ), marker[5]) 307 for relnodes in relevant: 308 if relnodes is None: 309 continue 310 for node in relnodes: 311 if node not in affected_nodes: 312 nodes_to_proceed.add(node) 313 affected_nodes.add(node) 314 # markers_to_proceed is now empty: 315 if nodes_to_proceed: 316 node = nodes_to_proceed.pop() 317 markers = set() 318 markers.update(successors.get(node, ())) 319 markers.update(predecessors.get(node, ())) 320 markers -= known_markers 321 markers_to_proceed.update(markers) 322 known_markers.update(markers) 323 324 return affected_nodes 325 326# if there is that many new obsmarkers, reset without analysing them 327RESET_ABOVE = 10000 328 329class _obshashcache(obscache.dualsourcecache): 330 331 _schemaversion = 3 332 333 _cachename = b'evo-ext-obshashrange' # used for error message 334 _filename = b'evoext_obshashrange_v2.sqlite' 335 336 def __init__(self, repo): 337 super(_obshashcache, self).__init__() 338 self._vfs = repo.vfs 339 self._path = repo.cachevfs.join(self._filename) 340 self._new = set() 341 self._valid = True 342 self._repo = weakref.ref(repo.unfiltered()) 343 # cache status 344 self._ondiskcachekey = None 345 self._data = {} 346 self._createmode = store._calcmode(self._vfs) 347 348 def clear(self, reset=False): 349 super(_obshashcache, self).clear(reset=reset) 350 self._data.clear() 351 self._new.clear() 352 if reset: 353 self._valid = False 354 if r'_con' in vars(self): 355 del self._con 356 357 def get(self, rangeid): 358 # revision should be covered by the tiprev 359 # 360 # XXX there are issue with cache warming, we hack around it for now 361 if not getattr(self, '_updating', False): 362 if self._cachekey[0] < rangeid[0]: 363 msg = (b'using unwarmed obshashrangecache (%s %s)' 364 % (rangeid[0], self._cachekey[0])) 365 raise error.ProgrammingError(msg) 366 367 value = self._data.get(rangeid) 368 if value is None and self._con is not None: 369 nrange = (rangeid[0], rangeid[1]) 370 try: 371 obshash = self._con.execute(_queryobshash, nrange).fetchone() 372 if obshash is not None: 373 value = obshash[0] 374 self._data[rangeid] = value 375 except (sqlite3.DatabaseError, sqlite3.OperationalError): 376 # something is wrong with the sqlite db 377 # Since this is a cache, we ignore it. 378 if r'_con' in vars(self): 379 del self._con 380 self._new.clear() 381 return value 382 383 def __setitem__(self, rangeid, obshash): 384 self._new.add(rangeid) 385 self._data[rangeid] = obshash 386 387 def _updatefrom(self, repo, revs, obsmarkers): 388 """override this method to update your cache data incrementally 389 390 revs: list of new revision in the changelog 391 obsmarker: list of new obsmarkers in the obsstore 392 """ 393 # XXX for now, we'll not actually update the cache, but we'll be 394 # smarter at invalidating it. 395 # 396 # 1) new revisions does not get their entry updated (not update) 397 # 2) if we detect markers affecting non-new revision we reset the cache 398 399 self._updating = True 400 401 con = self._con 402 if con is not None: 403 reset = False 404 affected = [] 405 if RESET_ABOVE < len(obsmarkers): 406 # lots of new obsmarkers, probably smarter to reset the cache 407 repo.ui.log(b'evoext-cache', b'obshashcache reset - ' 408 b'many new markers (%d)\n' 409 % len(obsmarkers)) 410 reset = True 411 elif obsmarkers: 412 max_stored = con.execute(_query_max_stored).fetchall()[0][0] 413 affected_nodes = _affectedby(repo, obsmarkers) 414 415 getrev = compat.getgetrev(repo.changelog) 416 affected = [getrev(n) for n in affected_nodes] 417 affected = [r for r in affected 418 if r is not None and r <= max_stored] 419 420 if RESET_ABOVE < len(affected): 421 repo.ui.log(b'evoext-cache', b'obshashcache reset - ' 422 b'new markers affect many changeset (%d)\n' 423 % len(affected)) 424 reset = True 425 426 if affected or reset: 427 if not reset: 428 repo.ui.log(b'evoext-cache', b'obshashcache clean - ' 429 b'new markers affect %d changeset and cached ranges\n' 430 % len(affected)) 431 if con is not None: 432 # always reset for now, the code detecting affect is buggy 433 # so we need to reset more broadly than we would like. 434 try: 435 if repo.stablerange._con is None: 436 repo.ui.log(b'evoext-cache', b'obshashcache reset - ' 437 b'underlying stablerange cache unavailable\n') 438 reset = True 439 if reset: 440 con.execute(_reset) 441 self._data.clear() 442 else: 443 ranges = repo.stablerange.contains(repo, affected) 444 con.executemany(_delete, ranges) 445 for r in ranges: 446 self._data.pop(r, None) 447 except (sqlite3.DatabaseError, sqlite3.OperationalError) as exc: 448 repo.ui.log(b'evoext-cache', 449 b'error while updating obshashrange cache: %s\n' 450 % forcebytestr(exc)) 451 del self._updating 452 return 453 454 # rewarm key revisions 455 # 456 # (The current invalidation is too wide, but rewarming every 457 # single revision is quite costly) 458 newrevs = [] 459 stop = self._cachekey[0] # tiprev 460 for h in repo.filtered(b'immutable').changelog.headrevs(): 461 if h <= stop and h in affected: 462 newrevs.append(h) 463 newrevs.extend(revs) 464 revs = newrevs 465 466 repo.depthcache.update(repo) 467 total = len(revs) 468 469 # warm the cache for the new revs 470 progress = repo.ui.makeprogress(b'updating obshashrange cache', _(b'changesets'), total) 471 progress.update(0) 472 for idx, r in enumerate(revs): 473 _obshashrange(repo, (r, 0)) 474 revstr = b'' if r is None else (b'rev %d' % r) 475 progress.update(idx, item=revstr) 476 progress.complete() 477 478 del self._updating 479 480 @property 481 def _fullcachekey(self): 482 return (self._schemaversion, ) + self._cachekey 483 484 def load(self, repo): 485 if self._con is None: 486 self._cachekey = self.emptykey 487 self._ondiskcachekey = self.emptykey 488 assert self._cachekey is not None 489 490 def _db(self): 491 try: 492 util.makedirs(self._vfs.dirname(self._path), self._createmode) 493 except OSError: 494 return None 495 if self._createmode is not None: 496 pre_existed = os.access(self._path, os.R_OK) 497 con = sqlite3.connect(encoding.strfromlocal(self._path), timeout=30, 498 isolation_level=r"IMMEDIATE") 499 con.text_factory = bytes 500 if self._createmode is not None and not pre_existed: 501 try: 502 os.chmod(self._path, self._createmode & 0o666) 503 except OSError: 504 pass 505 return con 506 507 @util.propertycache 508 def _con(self): 509 if not self._valid: 510 return None 511 repo = self._repo() 512 if repo is None: 513 return None 514 con = self._db() 515 if con is None: 516 return None 517 cur = con.execute(_queryexist) 518 if cur.fetchone() is None: 519 self._valid = False 520 return None 521 meta = con.execute(_querymeta).fetchone() 522 if meta is None or meta[0] != self._schemaversion: 523 self._valid = False 524 return None 525 self._cachekey = self._ondiskcachekey = meta[1:] 526 return con 527 528 def save(self, repo): 529 if self._cachekey is None: 530 return 531 if self._cachekey == self._ondiskcachekey and not self._new: 532 return 533 repo = repo.unfiltered() 534 try: 535 with repo.lock(): 536 if r'stablerange' in vars(repo): 537 repo.stablerange.save(repo) 538 self._save(repo) 539 except error.LockError: 540 # Exceptionnally we are noisy about it since performance impact 541 # is large We should address that before using this more 542 # widely. 543 msg = _(b'obshashrange cache: skipping save unable to lock repo\n') 544 repo.ui.warn(msg) 545 546 def _save(self, repo): 547 if not self._new: 548 return 549 try: 550 return self._trysave(repo) 551 except (IOError, OSError, sqlite3.DatabaseError, sqlite3.OperationalError, sqlite3.IntegrityError) as exc: 552 # Catch error that may arise under stress 553 # 554 # operational error catch read-only and locked database 555 # IntegrityError catch Unique constraint error that may arise 556 if r'_con' in vars(self): 557 del self._con 558 self._new.clear() 559 repo.ui.log(b'evoext-cache', 560 b'error while saving new data: %s\n' 561 % forcebytestr(exc)) 562 repo.ui.debug(b'evoext-cache: error while saving new data: %s\n' 563 % forcebytestr(exc)) 564 565 def _trysave(self, repo): 566 if self._con is None: 567 util.unlinkpath(self._path, ignoremissing=True) 568 if r'_con' in vars(self): 569 del self._con 570 571 con = self._db() 572 if con is None: 573 repo.ui.log(b'evoext-cache', b'unable to write obshashrange cache' 574 b' - cannot create database\n') 575 return 576 with con: 577 for req in _sqliteschema: 578 con.execute(req) 579 580 meta = [self._schemaversion] + list(self.emptykey) 581 con.execute(_newmeta, meta) 582 self._ondiskcachekey = self.emptykey 583 else: 584 con = self._con 585 with con: 586 meta = con.execute(_querymeta).fetchone() 587 if meta[1:] != self._ondiskcachekey: 588 # drifting is currently an issue because this means another 589 # process might have already added the cache line we are about 590 # to add. This will confuse sqlite 591 msg = _(b'obshashrange cache: skipping write, ' 592 b'database drifted under my feet\n') 593 repo.ui.warn(msg) 594 self._new.clear() 595 self._valid = False 596 if r'_con' in vars(self): 597 del self._con 598 self._valid = False 599 return 600 data = ((rangeid[0], rangeid[1], self.get(rangeid)) for rangeid in self._new) 601 con.executemany(_updateobshash, data) 602 cachekey = self._fullcachekey 603 con.execute(_clearmeta) # remove the older entry 604 con.execute(_newmeta, cachekey) 605 self._new.clear() 606 self._valid = True 607 self._ondiskcachekey = self._cachekey 608 609@eh.wrapfunction(obsolete.obsstore, '_addmarkers') 610def _addmarkers(orig, obsstore, *args, **kwargs): 611 obsstore.rangeobshashcache.clear() 612 return orig(obsstore, *args, **kwargs) 613 614obsstorefilecache = localrepo.localrepository.obsstore 615 616 617# obsstore is a filecache so we have do to some spacial dancing 618@eh.wrapfunction(obsstorefilecache, 'func') 619def obsstorewithcache(orig, repo): 620 obsstore = orig(repo) 621 obsstore.rangeobshashcache = _obshashcache(repo.unfiltered()) 622 return obsstore 623 624@eh.reposetup 625def setupcache(ui, repo): 626 627 class obshashrepo(repo.__class__): 628 @localrepo.unfilteredmethod 629 def destroyed(self): 630 if r'obsstore' in vars(self): 631 self.obsstore.rangeobshashcache.clear() 632 toplevel = not util.safehasattr(self, '_destroying') 633 if toplevel: 634 self._destroying = True 635 try: 636 super(obshashrepo, self).destroyed() 637 finally: 638 if toplevel: 639 del self._destroying 640 641 @localrepo.unfilteredmethod 642 def updatecaches(self, tr=None, **kwargs): 643 if utility.shouldwarmcache(self, tr): 644 self.obsstore.rangeobshashcache.update(self) 645 self.obsstore.rangeobshashcache.save(self) 646 super(obshashrepo, self).updatecaches(tr, **kwargs) 647 648 repo.__class__ = obshashrepo 649 650### wire protocol commands 651 652def _obshashrange_v0(repo, ranges): 653 """return a list of hash from a list of range 654 655 The range have the id encoded as a node 656 657 return 'wdirid' for unknown range""" 658 getrev = compat.getgetrev(repo.changelog) 659 ranges = [(getrev(n), idx) for n, idx in ranges] 660 if ranges: 661 maxrev = max(r for r, i in ranges) 662 if maxrev is not None: 663 repo.stablerange.warmup(repo, upto=maxrev) 664 result = [] 665 repo.obsstore.rangeobshashcache.update(repo) 666 for r in ranges: 667 if r[0] is None: 668 result.append(nodemod.wdirid) 669 else: 670 result.append(_obshashrange(repo, r)) 671 repo.obsstore.rangeobshashcache.save(repo) 672 return result 673 674@eh.addattr(localrepo.localpeer, 'evoext_obshashrange_v1') 675def local_obshashrange_v0(peer, ranges): 676 return _obshashrange_v0(peer._repo, ranges) 677 678 679_indexformat = b'>I' 680_indexsize = _calcsize(_indexformat) 681def _encrange(node_rangeid): 682 """encode a (node) range""" 683 headnode, index = node_rangeid 684 return headnode + _pack(_indexformat, index) 685 686def _decrange(data): 687 """encode a (node) range""" 688 assert _indexsize < len(data), len(data) 689 headnode = data[:-_indexsize] 690 index = _unpack(_indexformat, data[-_indexsize:])[0] 691 return (headnode, index) 692 693@eh.addattr(wirepeer, 'evoext_obshashrange_v1') 694def peer_obshashrange_v0(self, ranges): 695 binranges = [_encrange(r) for r in ranges] 696 encranges = encodelist(binranges) 697 d = self._call(b"evoext_obshashrange_v1", ranges=encranges) 698 try: 699 return decodelist(d) 700 except ValueError: 701 self._abort(error.ResponseError(_(b"unexpected response:"), d)) 702 703@wireprotov1server.wireprotocommand(b'evoext_obshashrange_v1', b'ranges', b'pull') 704def srv_obshashrange_v1(repo, proto, ranges): 705 ranges = decodelist(ranges) 706 ranges = [_decrange(r) for r in ranges] 707 hashes = _obshashrange_v0(repo, ranges) 708 return encodelist(hashes) 709 710def _useobshashrange(repo): 711 base = repo.ui.configbool(b'experimental', b'obshashrange') 712 if base: 713 maxrevs = repo.ui.configint(b'experimental', b'obshashrange.max-revs') 714 if maxrevs is not None and maxrevs < len(repo.unfiltered()): 715 base = False 716 return base 717 718def _canobshashrange(local, remote): 719 return (_useobshashrange(local) 720 and remote.capable(b'_evoext_obshashrange_v1')) 721 722def _obshashrange_capabilities(orig, repo, proto): 723 """wrapper to advertise new capability""" 724 caps = orig(repo, proto) 725 enabled = _useobshashrange(repo) 726 if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled: 727 caps.append(b'_evoext_obshashrange_v1') 728 return caps 729 730@eh.extsetup 731def obshashrange_extsetup(ui): 732 extensions.wrapfunction(wireprotov1server, '_capabilities', 733 _obshashrange_capabilities) 734 735########################################## 736### trigger discovery during exchange ### 737########################################## 738 739def _dopushmarkers(pushop): 740 return (# we have any markers to push 741 pushop.repo.obsstore 742 # exchange of obsmarkers is enabled locally 743 and obsolete.isenabled(pushop.repo, obsolete.exchangeopt) 744 # remote server accept markers 745 and b'obsolete' in pushop.remote.listkeys(b'namespaces')) 746 747def _pushobshashrange(pushop, commonrevs): 748 repo = pushop.repo.unfiltered() 749 remote = pushop.remote 750 missing = findmissingrange(pushop.ui, repo, remote, commonrevs) 751 missing += pushop.outgoing.missing 752 return missing 753 754# available discovery method, first valid is used 755# tuple (canuse, perform discovery)) 756obsdiscoveries = [ 757 (_canobshashrange, _pushobshashrange), 758] 759 760obsdiscovery_skip_message = b"""\ 761(skipping discovery of obsolescence markers, will exchange everything) 762(controled by 'experimental.evolution.obsdiscovery' configuration) 763""" 764 765def usediscovery(repo): 766 return repo.ui.configbool(b'experimental', b'evolution.obsdiscovery') 767 768@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers') 769def _pushdiscoveryobsmarkers(orig, pushop): 770 if _dopushmarkers(pushop): 771 repo = pushop.repo 772 remote = pushop.remote 773 obsexcmsg(repo.ui, b"computing relevant nodes\n") 774 revs = list(repo.revs(b'::%ln', pushop.futureheads)) 775 unfi = repo.unfiltered() 776 777 if not usediscovery(repo): 778 # discovery disabled by user 779 repo.ui.status(obsdiscovery_skip_message) 780 return orig(pushop) 781 782 # look for an obs-discovery protocol we can use 783 discovery = None 784 for candidate in obsdiscoveries: 785 if candidate[0](repo, remote): 786 discovery = candidate[1] 787 break 788 789 if discovery is None: 790 # no discovery available, rely on core to push all relevants 791 # obs markers. 792 return orig(pushop) 793 794 obsexcmsg(repo.ui, b"looking for common markers in %i nodes\n" 795 % len(revs)) 796 commonrevs = list(unfi.revs(b'::%ln', pushop.outgoing.commonheads)) 797 # find the nodes where the relevant obsmarkers mismatches 798 nodes = discovery(pushop, commonrevs) 799 800 if nodes: 801 obsexcmsg(repo.ui, b"computing markers relevant to %i nodes\n" 802 % len(nodes)) 803 pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes) 804 else: 805 obsexcmsg(repo.ui, b"markers already in sync\n") 806 pushop.outobsmarkers = [] 807 808@eh.extsetup 809def _installobsmarkersdiscovery(ui): 810 olddisco = exchange.pushdiscoverymapping[b'obsmarker'] 811 812 def newdisco(pushop): 813 _pushdiscoveryobsmarkers(olddisco, pushop) 814 exchange.pushdiscoverymapping[b'obsmarker'] = newdisco 815 816def buildpullobsmarkersboundaries(pullop, bundle2=True): 817 """small function returning the argument for pull markers call 818 may to contains 'heads' and 'common'. skip the key for None. 819 820 It is a separed function to play around with strategy for that.""" 821 repo = pullop.repo 822 remote = pullop.remote 823 unfi = repo.unfiltered() 824 # Also exclude filtered revisions. Working on unfiltered repository can 825 # give a bit more precise view of the repository. However it makes the 826 # overall operation more complicated. 827 filteredrevs = repo.changelog.filteredrevs 828 # XXX probably not very efficient 829 revs = unfi.revs(b'::(%ln - null) - %ld', pullop.common, filteredrevs) 830 boundaries = {b'heads': pullop.pulledsubset} 831 if not revs: # nothing common 832 boundaries[b'common'] = [nodemod.nullid] 833 return boundaries 834 835 if not usediscovery(repo): 836 # discovery disabled by users. 837 repo.ui.status(obsdiscovery_skip_message) 838 boundaries[b'common'] = [nodemod.nullid] 839 return boundaries 840 841 if bundle2 and _canobshashrange(repo, remote): 842 obsexcmsg(repo.ui, b"looking for common markers in %i nodes\n" 843 % len(revs)) 844 missing = findmissingrange(repo.ui, repo, pullop.remote, revs) 845 boundaries[b'missing'] = missing 846 # using getattr since `limitedarguments` is missing 847 # hg <= 5.0 (69921d02daaf) 848 if getattr(pullop.remote, 'limitedarguments', False): 849 # prepare for a possible fallback to common 850 common = repo.set("heads(only(%ld, %ln))", revs, missing) 851 boundaries[b'common'] = [c.node() for c in common] 852 else: 853 boundaries[b'common'] = [nodemod.nullid] 854 return boundaries 855 856# merge later for outer layer wrapping 857eh.merge(stablerangecache.eh) 858