1# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net> 2# Copyright 2005-2007 Olivia Mackall <olivia@selenic.com> 3# 4# This software may be used and distributed according to the terms of the 5# GNU General Public License version 2 or any later version. 6 7from __future__ import absolute_import 8 9import collections 10import contextlib 11 12from .i18n import _ 13from .node import hex 14from . import ( 15 discovery, 16 encoding, 17 error, 18 match as matchmod, 19 narrowspec, 20 pycompat, 21 streamclone, 22 templatefilters, 23 util, 24 wireprotoframing, 25 wireprototypes, 26) 27from .interfaces import util as interfaceutil 28from .utils import ( 29 cborutil, 30 hashutil, 31 stringutil, 32) 33 34FRAMINGTYPE = b'application/mercurial-exp-framing-0006' 35 36HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2 37 38COMMANDS = wireprototypes.commanddict() 39 40# Value inserted into cache key computation function. Change the value to 41# force new cache keys for every command request. This should be done when 42# there is a change to how caching works, etc. 43GLOBAL_CACHE_VERSION = 1 44 45 46def handlehttpv2request(rctx, req, res, checkperm, urlparts): 47 from .hgweb import common as hgwebcommon 48 49 # URL space looks like: <permissions>/<command>, where <permission> can 50 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively. 51 52 # Root URL does nothing meaningful... yet. 53 if not urlparts: 54 res.status = b'200 OK' 55 res.headers[b'Content-Type'] = b'text/plain' 56 res.setbodybytes(_(b'HTTP version 2 API handler')) 57 return 58 59 if len(urlparts) == 1: 60 res.status = b'404 Not Found' 61 res.headers[b'Content-Type'] = b'text/plain' 62 res.setbodybytes( 63 _(b'do not know how to process %s\n') % req.dispatchpath 64 ) 65 return 66 67 permission, command = urlparts[0:2] 68 69 if permission not in (b'ro', b'rw'): 70 res.status = b'404 Not Found' 71 res.headers[b'Content-Type'] = b'text/plain' 72 res.setbodybytes(_(b'unknown permission: %s') % permission) 73 return 74 75 if req.method != b'POST': 76 res.status = b'405 Method Not Allowed' 77 res.headers[b'Allow'] = b'POST' 78 res.setbodybytes(_(b'commands require POST requests')) 79 return 80 81 # At some point we'll want to use our own API instead of recycling the 82 # behavior of version 1 of the wire protocol... 83 # TODO return reasonable responses - not responses that overload the 84 # HTTP status line message for error reporting. 85 try: 86 checkperm(rctx, req, b'pull' if permission == b'ro' else b'push') 87 except hgwebcommon.ErrorResponse as e: 88 res.status = hgwebcommon.statusmessage( 89 e.code, stringutil.forcebytestr(e) 90 ) 91 for k, v in e.headers: 92 res.headers[k] = v 93 res.setbodybytes(b'permission denied') 94 return 95 96 # We have a special endpoint to reflect the request back at the client. 97 if command == b'debugreflect': 98 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res) 99 return 100 101 # Extra commands that we handle that aren't really wire protocol 102 # commands. Think extra hard before making this hackery available to 103 # extension. 104 extracommands = {b'multirequest'} 105 106 if command not in COMMANDS and command not in extracommands: 107 res.status = b'404 Not Found' 108 res.headers[b'Content-Type'] = b'text/plain' 109 res.setbodybytes(_(b'unknown wire protocol command: %s\n') % command) 110 return 111 112 repo = rctx.repo 113 ui = repo.ui 114 115 proto = httpv2protocolhandler(req, ui) 116 117 if ( 118 not COMMANDS.commandavailable(command, proto) 119 and command not in extracommands 120 ): 121 res.status = b'404 Not Found' 122 res.headers[b'Content-Type'] = b'text/plain' 123 res.setbodybytes(_(b'invalid wire protocol command: %s') % command) 124 return 125 126 # TODO consider cases where proxies may add additional Accept headers. 127 if req.headers.get(b'Accept') != FRAMINGTYPE: 128 res.status = b'406 Not Acceptable' 129 res.headers[b'Content-Type'] = b'text/plain' 130 res.setbodybytes( 131 _(b'client MUST specify Accept header with value: %s\n') 132 % FRAMINGTYPE 133 ) 134 return 135 136 if req.headers.get(b'Content-Type') != FRAMINGTYPE: 137 res.status = b'415 Unsupported Media Type' 138 # TODO we should send a response with appropriate media type, 139 # since client does Accept it. 140 res.headers[b'Content-Type'] = b'text/plain' 141 res.setbodybytes( 142 _(b'client MUST send Content-Type header with value: %s\n') 143 % FRAMINGTYPE 144 ) 145 return 146 147 _processhttpv2request(ui, repo, req, res, permission, command, proto) 148 149 150def _processhttpv2reflectrequest(ui, repo, req, res): 151 """Reads unified frame protocol request and dumps out state to client. 152 153 This special endpoint can be used to help debug the wire protocol. 154 155 Instead of routing the request through the normal dispatch mechanism, 156 we instead read all frames, decode them, and feed them into our state 157 tracker. We then dump the log of all that activity back out to the 158 client. 159 """ 160 # Reflection APIs have a history of being abused, accidentally disclosing 161 # sensitive data, etc. So we have a config knob. 162 if not ui.configbool(b'experimental', b'web.api.debugreflect'): 163 res.status = b'404 Not Found' 164 res.headers[b'Content-Type'] = b'text/plain' 165 res.setbodybytes(_(b'debugreflect service not available')) 166 return 167 168 # We assume we have a unified framing protocol request body. 169 170 reactor = wireprotoframing.serverreactor(ui) 171 states = [] 172 173 while True: 174 frame = wireprotoframing.readframe(req.bodyfh) 175 176 if not frame: 177 states.append(b'received: <no frame>') 178 break 179 180 states.append( 181 b'received: %d %d %d %s' 182 % (frame.typeid, frame.flags, frame.requestid, frame.payload) 183 ) 184 185 action, meta = reactor.onframerecv(frame) 186 states.append(templatefilters.json((action, meta))) 187 188 action, meta = reactor.oninputeof() 189 meta[b'action'] = action 190 states.append(templatefilters.json(meta)) 191 192 res.status = b'200 OK' 193 res.headers[b'Content-Type'] = b'text/plain' 194 res.setbodybytes(b'\n'.join(states)) 195 196 197def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto): 198 """Post-validation handler for HTTPv2 requests. 199 200 Called when the HTTP request contains unified frame-based protocol 201 frames for evaluation. 202 """ 203 # TODO Some HTTP clients are full duplex and can receive data before 204 # the entire request is transmitted. Figure out a way to indicate support 205 # for that so we can opt into full duplex mode. 206 reactor = wireprotoframing.serverreactor(ui, deferoutput=True) 207 seencommand = False 208 209 outstream = None 210 211 while True: 212 frame = wireprotoframing.readframe(req.bodyfh) 213 if not frame: 214 break 215 216 action, meta = reactor.onframerecv(frame) 217 218 if action == b'wantframe': 219 # Need more data before we can do anything. 220 continue 221 elif action == b'runcommand': 222 # Defer creating output stream because we need to wait for 223 # protocol settings frames so proper encoding can be applied. 224 if not outstream: 225 outstream = reactor.makeoutputstream() 226 227 sentoutput = _httpv2runcommand( 228 ui, 229 repo, 230 req, 231 res, 232 authedperm, 233 reqcommand, 234 reactor, 235 outstream, 236 meta, 237 issubsequent=seencommand, 238 ) 239 240 if sentoutput: 241 return 242 243 seencommand = True 244 245 elif action == b'error': 246 # TODO define proper error mechanism. 247 res.status = b'200 OK' 248 res.headers[b'Content-Type'] = b'text/plain' 249 res.setbodybytes(meta[b'message'] + b'\n') 250 return 251 else: 252 raise error.ProgrammingError( 253 b'unhandled action from frame processor: %s' % action 254 ) 255 256 action, meta = reactor.oninputeof() 257 if action == b'sendframes': 258 # We assume we haven't started sending the response yet. If we're 259 # wrong, the response type will raise an exception. 260 res.status = b'200 OK' 261 res.headers[b'Content-Type'] = FRAMINGTYPE 262 res.setbodygen(meta[b'framegen']) 263 elif action == b'noop': 264 pass 265 else: 266 raise error.ProgrammingError( 267 b'unhandled action from frame processor: %s' % action 268 ) 269 270 271def _httpv2runcommand( 272 ui, 273 repo, 274 req, 275 res, 276 authedperm, 277 reqcommand, 278 reactor, 279 outstream, 280 command, 281 issubsequent, 282): 283 """Dispatch a wire protocol command made from HTTPv2 requests. 284 285 The authenticated permission (``authedperm``) along with the original 286 command from the URL (``reqcommand``) are passed in. 287 """ 288 # We already validated that the session has permissions to perform the 289 # actions in ``authedperm``. In the unified frame protocol, the canonical 290 # command to run is expressed in a frame. However, the URL also requested 291 # to run a specific command. We need to be careful that the command we 292 # run doesn't have permissions requirements greater than what was granted 293 # by ``authedperm``. 294 # 295 # Our rule for this is we only allow one command per HTTP request and 296 # that command must match the command in the URL. However, we make 297 # an exception for the ``multirequest`` URL. This URL is allowed to 298 # execute multiple commands. We double check permissions of each command 299 # as it is invoked to ensure there is no privilege escalation. 300 # TODO consider allowing multiple commands to regular command URLs 301 # iff each command is the same. 302 303 proto = httpv2protocolhandler(req, ui, args=command[b'args']) 304 305 if reqcommand == b'multirequest': 306 if not COMMANDS.commandavailable(command[b'command'], proto): 307 # TODO proper error mechanism 308 res.status = b'200 OK' 309 res.headers[b'Content-Type'] = b'text/plain' 310 res.setbodybytes( 311 _(b'wire protocol command not available: %s') 312 % command[b'command'] 313 ) 314 return True 315 316 # TODO don't use assert here, since it may be elided by -O. 317 assert authedperm in (b'ro', b'rw') 318 wirecommand = COMMANDS[command[b'command']] 319 assert wirecommand.permission in (b'push', b'pull') 320 321 if authedperm == b'ro' and wirecommand.permission != b'pull': 322 # TODO proper error mechanism 323 res.status = b'403 Forbidden' 324 res.headers[b'Content-Type'] = b'text/plain' 325 res.setbodybytes( 326 _(b'insufficient permissions to execute command: %s') 327 % command[b'command'] 328 ) 329 return True 330 331 # TODO should we also call checkperm() here? Maybe not if we're going 332 # to overhaul that API. The granted scope from the URL check should 333 # be good enough. 334 335 else: 336 # Don't allow multiple commands outside of ``multirequest`` URL. 337 if issubsequent: 338 # TODO proper error mechanism 339 res.status = b'200 OK' 340 res.headers[b'Content-Type'] = b'text/plain' 341 res.setbodybytes( 342 _(b'multiple commands cannot be issued to this URL') 343 ) 344 return True 345 346 if reqcommand != command[b'command']: 347 # TODO define proper error mechanism 348 res.status = b'200 OK' 349 res.headers[b'Content-Type'] = b'text/plain' 350 res.setbodybytes(_(b'command in frame must match command in URL')) 351 return True 352 353 res.status = b'200 OK' 354 res.headers[b'Content-Type'] = FRAMINGTYPE 355 356 try: 357 objs = dispatch(repo, proto, command[b'command'], command[b'redirect']) 358 359 action, meta = reactor.oncommandresponsereadyobjects( 360 outstream, command[b'requestid'], objs 361 ) 362 363 except error.WireprotoCommandError as e: 364 action, meta = reactor.oncommanderror( 365 outstream, command[b'requestid'], e.message, e.messageargs 366 ) 367 368 except Exception as e: 369 action, meta = reactor.onservererror( 370 outstream, 371 command[b'requestid'], 372 _(b'exception when invoking command: %s') 373 % stringutil.forcebytestr(e), 374 ) 375 376 if action == b'sendframes': 377 res.setbodygen(meta[b'framegen']) 378 return True 379 elif action == b'noop': 380 return False 381 else: 382 raise error.ProgrammingError( 383 b'unhandled event from reactor: %s' % action 384 ) 385 386 387def getdispatchrepo(repo, proto, command): 388 viewconfig = repo.ui.config(b'server', b'view') 389 return repo.filtered(viewconfig) 390 391 392def dispatch(repo, proto, command, redirect): 393 """Run a wire protocol command. 394 395 Returns an iterable of objects that will be sent to the client. 396 """ 397 repo = getdispatchrepo(repo, proto, command) 398 399 entry = COMMANDS[command] 400 func = entry.func 401 spec = entry.args 402 403 args = proto.getargs(spec) 404 405 # There is some duplicate boilerplate code here for calling the command and 406 # emitting objects. It is either that or a lot of indented code that looks 407 # like a pyramid (since there are a lot of code paths that result in not 408 # using the cacher). 409 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args)) 410 411 # Request is not cacheable. Don't bother instantiating a cacher. 412 if not entry.cachekeyfn: 413 for o in callcommand(): 414 yield o 415 return 416 417 if redirect: 418 redirecttargets = redirect[b'targets'] 419 redirecthashes = redirect[b'hashes'] 420 else: 421 redirecttargets = [] 422 redirecthashes = [] 423 424 cacher = makeresponsecacher( 425 repo, 426 proto, 427 command, 428 args, 429 cborutil.streamencode, 430 redirecttargets=redirecttargets, 431 redirecthashes=redirecthashes, 432 ) 433 434 # But we have no cacher. Do default handling. 435 if not cacher: 436 for o in callcommand(): 437 yield o 438 return 439 440 with cacher: 441 cachekey = entry.cachekeyfn( 442 repo, proto, cacher, **pycompat.strkwargs(args) 443 ) 444 445 # No cache key or the cacher doesn't like it. Do default handling. 446 if cachekey is None or not cacher.setcachekey(cachekey): 447 for o in callcommand(): 448 yield o 449 return 450 451 # Serve it from the cache, if possible. 452 cached = cacher.lookup() 453 454 if cached: 455 for o in cached[b'objs']: 456 yield o 457 return 458 459 # Else call the command and feed its output into the cacher, allowing 460 # the cacher to buffer/mutate objects as it desires. 461 for o in callcommand(): 462 for o in cacher.onobject(o): 463 yield o 464 465 for o in cacher.onfinished(): 466 yield o 467 468 469@interfaceutil.implementer(wireprototypes.baseprotocolhandler) 470class httpv2protocolhandler(object): 471 def __init__(self, req, ui, args=None): 472 self._req = req 473 self._ui = ui 474 self._args = args 475 476 @property 477 def name(self): 478 return HTTP_WIREPROTO_V2 479 480 def getargs(self, args): 481 # First look for args that were passed but aren't registered on this 482 # command. 483 extra = set(self._args) - set(args) 484 if extra: 485 raise error.WireprotoCommandError( 486 b'unsupported argument to command: %s' 487 % b', '.join(sorted(extra)) 488 ) 489 490 # And look for required arguments that are missing. 491 missing = {a for a in args if args[a][b'required']} - set(self._args) 492 493 if missing: 494 raise error.WireprotoCommandError( 495 b'missing required arguments: %s' % b', '.join(sorted(missing)) 496 ) 497 498 # Now derive the arguments to pass to the command, taking into 499 # account the arguments specified by the client. 500 data = {} 501 for k, meta in sorted(args.items()): 502 # This argument wasn't passed by the client. 503 if k not in self._args: 504 data[k] = meta[b'default']() 505 continue 506 507 v = self._args[k] 508 509 # Sets may be expressed as lists. Silently normalize. 510 if meta[b'type'] == b'set' and isinstance(v, list): 511 v = set(v) 512 513 # TODO consider more/stronger type validation. 514 515 data[k] = v 516 517 return data 518 519 def getprotocaps(self): 520 # Protocol capabilities are currently not implemented for HTTP V2. 521 return set() 522 523 def getpayload(self): 524 raise NotImplementedError 525 526 @contextlib.contextmanager 527 def mayberedirectstdio(self): 528 raise NotImplementedError 529 530 def client(self): 531 raise NotImplementedError 532 533 def addcapabilities(self, repo, caps): 534 return caps 535 536 def checkperm(self, perm): 537 raise NotImplementedError 538 539 540def httpv2apidescriptor(req, repo): 541 proto = httpv2protocolhandler(req, repo.ui) 542 543 return _capabilitiesv2(repo, proto) 544 545 546def _capabilitiesv2(repo, proto): 547 """Obtain the set of capabilities for version 2 transports. 548 549 These capabilities are distinct from the capabilities for version 1 550 transports. 551 """ 552 caps = { 553 b'commands': {}, 554 b'framingmediatypes': [FRAMINGTYPE], 555 b'pathfilterprefixes': set(narrowspec.VALID_PREFIXES), 556 } 557 558 for command, entry in COMMANDS.items(): 559 args = {} 560 561 for arg, meta in entry.args.items(): 562 args[arg] = { 563 # TODO should this be a normalized type using CBOR's 564 # terminology? 565 b'type': meta[b'type'], 566 b'required': meta[b'required'], 567 } 568 569 if not meta[b'required']: 570 args[arg][b'default'] = meta[b'default']() 571 572 if meta[b'validvalues']: 573 args[arg][b'validvalues'] = meta[b'validvalues'] 574 575 # TODO this type of check should be defined in a per-command callback. 576 if ( 577 command == b'rawstorefiledata' 578 and not streamclone.allowservergeneration(repo) 579 ): 580 continue 581 582 caps[b'commands'][command] = { 583 b'args': args, 584 b'permissions': [entry.permission], 585 } 586 587 if entry.extracapabilitiesfn: 588 extracaps = entry.extracapabilitiesfn(repo, proto) 589 caps[b'commands'][command].update(extracaps) 590 591 caps[b'rawrepoformats'] = sorted(repo.requirements & repo.supportedformats) 592 593 targets = getadvertisedredirecttargets(repo, proto) 594 if targets: 595 caps[b'redirect'] = { 596 b'targets': [], 597 b'hashes': [b'sha256', b'sha1'], 598 } 599 600 for target in targets: 601 entry = { 602 b'name': target[b'name'], 603 b'protocol': target[b'protocol'], 604 b'uris': target[b'uris'], 605 } 606 607 for key in (b'snirequired', b'tlsversions'): 608 if key in target: 609 entry[key] = target[key] 610 611 caps[b'redirect'][b'targets'].append(entry) 612 613 return proto.addcapabilities(repo, caps) 614 615 616def getadvertisedredirecttargets(repo, proto): 617 """Obtain a list of content redirect targets. 618 619 Returns a list containing potential redirect targets that will be 620 advertised in capabilities data. Each dict MUST have the following 621 keys: 622 623 name 624 The name of this redirect target. This is the identifier clients use 625 to refer to a target. It is transferred as part of every command 626 request. 627 628 protocol 629 Network protocol used by this target. Typically this is the string 630 in front of the ``://`` in a URL. e.g. ``https``. 631 632 uris 633 List of representative URIs for this target. Clients can use the 634 URIs to test parsing for compatibility or for ordering preference 635 for which target to use. 636 637 The following optional keys are recognized: 638 639 snirequired 640 Bool indicating if Server Name Indication (SNI) is required to 641 connect to this target. 642 643 tlsversions 644 List of bytes indicating which TLS versions are supported by this 645 target. 646 647 By default, clients reflect the target order advertised by servers 648 and servers will use the first client-advertised target when picking 649 a redirect target. So targets should be advertised in the order the 650 server prefers they be used. 651 """ 652 return [] 653 654 655def wireprotocommand( 656 name, 657 args=None, 658 permission=b'push', 659 cachekeyfn=None, 660 extracapabilitiesfn=None, 661): 662 """Decorator to declare a wire protocol command. 663 664 ``name`` is the name of the wire protocol command being provided. 665 666 ``args`` is a dict defining arguments accepted by the command. Keys are 667 the argument name. Values are dicts with the following keys: 668 669 ``type`` 670 The argument data type. Must be one of the following string 671 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``, 672 or ``bool``. 673 674 ``default`` 675 A callable returning the default value for this argument. If not 676 specified, ``None`` will be the default value. 677 678 ``example`` 679 An example value for this argument. 680 681 ``validvalues`` 682 Set of recognized values for this argument. 683 684 ``permission`` defines the permission type needed to run this command. 685 Can be ``push`` or ``pull``. These roughly map to read-write and read-only, 686 respectively. Default is to assume command requires ``push`` permissions 687 because otherwise commands not declaring their permissions could modify 688 a repository that is supposed to be read-only. 689 690 ``cachekeyfn`` defines an optional callable that can derive the 691 cache key for this request. 692 693 ``extracapabilitiesfn`` defines an optional callable that defines extra 694 command capabilities/parameters that are advertised next to the command 695 in the capabilities data structure describing the server. The callable 696 receives as arguments the repository and protocol objects. It returns 697 a dict of extra fields to add to the command descriptor. 698 699 Wire protocol commands are generators of objects to be serialized and 700 sent to the client. 701 702 If a command raises an uncaught exception, this will be translated into 703 a command error. 704 705 All commands can opt in to being cacheable by defining a function 706 (``cachekeyfn``) that is called to derive a cache key. This function 707 receives the same arguments as the command itself plus a ``cacher`` 708 argument containing the active cacher for the request and returns a bytes 709 containing the key in a cache the response to this command may be cached 710 under. 711 """ 712 transports = { 713 k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 2 714 } 715 716 if permission not in (b'push', b'pull'): 717 raise error.ProgrammingError( 718 b'invalid wire protocol permission; ' 719 b'got %s; expected "push" or "pull"' % permission 720 ) 721 722 if args is None: 723 args = {} 724 725 if not isinstance(args, dict): 726 raise error.ProgrammingError( 727 b'arguments for version 2 commands must be declared as dicts' 728 ) 729 730 for arg, meta in args.items(): 731 if arg == b'*': 732 raise error.ProgrammingError( 733 b'* argument name not allowed on version 2 commands' 734 ) 735 736 if not isinstance(meta, dict): 737 raise error.ProgrammingError( 738 b'arguments for version 2 commands ' 739 b'must declare metadata as a dict' 740 ) 741 742 if b'type' not in meta: 743 raise error.ProgrammingError( 744 b'%s argument for command %s does not ' 745 b'declare type field' % (arg, name) 746 ) 747 748 if meta[b'type'] not in ( 749 b'bytes', 750 b'int', 751 b'list', 752 b'dict', 753 b'set', 754 b'bool', 755 ): 756 raise error.ProgrammingError( 757 b'%s argument for command %s has ' 758 b'illegal type: %s' % (arg, name, meta[b'type']) 759 ) 760 761 if b'example' not in meta: 762 raise error.ProgrammingError( 763 b'%s argument for command %s does not ' 764 b'declare example field' % (arg, name) 765 ) 766 767 meta[b'required'] = b'default' not in meta 768 769 meta.setdefault(b'default', lambda: None) 770 meta.setdefault(b'validvalues', None) 771 772 def register(func): 773 if name in COMMANDS: 774 raise error.ProgrammingError( 775 b'%s command already registered for version 2' % name 776 ) 777 778 COMMANDS[name] = wireprototypes.commandentry( 779 func, 780 args=args, 781 transports=transports, 782 permission=permission, 783 cachekeyfn=cachekeyfn, 784 extracapabilitiesfn=extracapabilitiesfn, 785 ) 786 787 return func 788 789 return register 790 791 792def makecommandcachekeyfn(command, localversion=None, allargs=False): 793 """Construct a cache key derivation function with common features. 794 795 By default, the cache key is a hash of: 796 797 * The command name. 798 * A global cache version number. 799 * A local cache version number (passed via ``localversion``). 800 * All the arguments passed to the command. 801 * The media type used. 802 * Wire protocol version string. 803 * The repository path. 804 """ 805 if not allargs: 806 raise error.ProgrammingError( 807 b'only allargs=True is currently supported' 808 ) 809 810 if localversion is None: 811 raise error.ProgrammingError(b'must set localversion argument value') 812 813 def cachekeyfn(repo, proto, cacher, **args): 814 spec = COMMANDS[command] 815 816 # Commands that mutate the repo can not be cached. 817 if spec.permission == b'push': 818 return None 819 820 # TODO config option to disable caching. 821 822 # Our key derivation strategy is to construct a data structure 823 # holding everything that could influence cacheability and to hash 824 # the CBOR representation of that. Using CBOR seems like it might 825 # be overkill. However, simpler hashing mechanisms are prone to 826 # duplicate input issues. e.g. if you just concatenate two values, 827 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides 828 # "padding" between values and prevents these problems. 829 830 # Seed the hash with various data. 831 state = { 832 # To invalidate all cache keys. 833 b'globalversion': GLOBAL_CACHE_VERSION, 834 # More granular cache key invalidation. 835 b'localversion': localversion, 836 # Cache keys are segmented by command. 837 b'command': command, 838 # Throw in the media type and API version strings so changes 839 # to exchange semantics invalid cache. 840 b'mediatype': FRAMINGTYPE, 841 b'version': HTTP_WIREPROTO_V2, 842 # So same requests for different repos don't share cache keys. 843 b'repo': repo.root, 844 } 845 846 # The arguments passed to us will have already been normalized. 847 # Default values will be set, etc. This is important because it 848 # means that it doesn't matter if clients send an explicit argument 849 # or rely on the default value: it will all normalize to the same 850 # set of arguments on the server and therefore the same cache key. 851 # 852 # Arguments by their very nature must support being encoded to CBOR. 853 # And the CBOR encoder is deterministic. So we hash the arguments 854 # by feeding the CBOR of their representation into the hasher. 855 if allargs: 856 state[b'args'] = pycompat.byteskwargs(args) 857 858 cacher.adjustcachekeystate(state) 859 860 hasher = hashutil.sha1() 861 for chunk in cborutil.streamencode(state): 862 hasher.update(chunk) 863 864 return pycompat.sysbytes(hasher.hexdigest()) 865 866 return cachekeyfn 867 868 869def makeresponsecacher( 870 repo, proto, command, args, objencoderfn, redirecttargets, redirecthashes 871): 872 """Construct a cacher for a cacheable command. 873 874 Returns an ``iwireprotocolcommandcacher`` instance. 875 876 Extensions can monkeypatch this function to provide custom caching 877 backends. 878 """ 879 return None 880 881 882def resolvenodes(repo, revisions): 883 """Resolve nodes from a revisions specifier data structure.""" 884 cl = repo.changelog 885 clhasnode = cl.hasnode 886 887 seen = set() 888 nodes = [] 889 890 if not isinstance(revisions, list): 891 raise error.WireprotoCommandError( 892 b'revisions must be defined as an array' 893 ) 894 895 for spec in revisions: 896 if b'type' not in spec: 897 raise error.WireprotoCommandError( 898 b'type key not present in revision specifier' 899 ) 900 901 typ = spec[b'type'] 902 903 if typ == b'changesetexplicit': 904 if b'nodes' not in spec: 905 raise error.WireprotoCommandError( 906 b'nodes key not present in changesetexplicit revision ' 907 b'specifier' 908 ) 909 910 for node in spec[b'nodes']: 911 if node not in seen: 912 nodes.append(node) 913 seen.add(node) 914 915 elif typ == b'changesetexplicitdepth': 916 for key in (b'nodes', b'depth'): 917 if key not in spec: 918 raise error.WireprotoCommandError( 919 b'%s key not present in changesetexplicitdepth revision ' 920 b'specifier', 921 (key,), 922 ) 923 924 for rev in repo.revs( 925 b'ancestors(%ln, %s)', spec[b'nodes'], spec[b'depth'] - 1 926 ): 927 node = cl.node(rev) 928 929 if node not in seen: 930 nodes.append(node) 931 seen.add(node) 932 933 elif typ == b'changesetdagrange': 934 for key in (b'roots', b'heads'): 935 if key not in spec: 936 raise error.WireprotoCommandError( 937 b'%s key not present in changesetdagrange revision ' 938 b'specifier', 939 (key,), 940 ) 941 942 if not spec[b'heads']: 943 raise error.WireprotoCommandError( 944 b'heads key in changesetdagrange cannot be empty' 945 ) 946 947 if spec[b'roots']: 948 common = [n for n in spec[b'roots'] if clhasnode(n)] 949 else: 950 common = [repo.nullid] 951 952 for n in discovery.outgoing(repo, common, spec[b'heads']).missing: 953 if n not in seen: 954 nodes.append(n) 955 seen.add(n) 956 957 else: 958 raise error.WireprotoCommandError( 959 b'unknown revision specifier type: %s', (typ,) 960 ) 961 962 return nodes 963 964 965@wireprotocommand(b'branchmap', permission=b'pull') 966def branchmapv2(repo, proto): 967 yield { 968 encoding.fromlocal(k): v 969 for k, v in pycompat.iteritems(repo.branchmap()) 970 } 971 972 973@wireprotocommand(b'capabilities', permission=b'pull') 974def capabilitiesv2(repo, proto): 975 yield _capabilitiesv2(repo, proto) 976 977 978@wireprotocommand( 979 b'changesetdata', 980 args={ 981 b'revisions': { 982 b'type': b'list', 983 b'example': [ 984 { 985 b'type': b'changesetexplicit', 986 b'nodes': [b'abcdef...'], 987 } 988 ], 989 }, 990 b'fields': { 991 b'type': b'set', 992 b'default': set, 993 b'example': {b'parents', b'revision'}, 994 b'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'}, 995 }, 996 }, 997 permission=b'pull', 998) 999def changesetdata(repo, proto, revisions, fields): 1000 # TODO look for unknown fields and abort when they can't be serviced. 1001 # This could probably be validated by dispatcher using validvalues. 1002 1003 cl = repo.changelog 1004 outgoing = resolvenodes(repo, revisions) 1005 publishing = repo.publishing() 1006 1007 if outgoing: 1008 repo.hook(b'preoutgoing', throw=True, source=b'serve') 1009 1010 yield { 1011 b'totalitems': len(outgoing), 1012 } 1013 1014 # The phases of nodes already transferred to the client may have changed 1015 # since the client last requested data. We send phase-only records 1016 # for these revisions, if requested. 1017 # TODO actually do this. We'll probably want to emit phase heads 1018 # in the ancestry set of the outgoing revisions. This will ensure 1019 # that phase updates within that set are seen. 1020 if b'phase' in fields: 1021 pass 1022 1023 nodebookmarks = {} 1024 for mark, node in repo._bookmarks.items(): 1025 nodebookmarks.setdefault(node, set()).add(mark) 1026 1027 # It is already topologically sorted by revision number. 1028 for node in outgoing: 1029 d = { 1030 b'node': node, 1031 } 1032 1033 if b'parents' in fields: 1034 d[b'parents'] = cl.parents(node) 1035 1036 if b'phase' in fields: 1037 if publishing: 1038 d[b'phase'] = b'public' 1039 else: 1040 ctx = repo[node] 1041 d[b'phase'] = ctx.phasestr() 1042 1043 if b'bookmarks' in fields and node in nodebookmarks: 1044 d[b'bookmarks'] = sorted(nodebookmarks[node]) 1045 del nodebookmarks[node] 1046 1047 followingmeta = [] 1048 followingdata = [] 1049 1050 if b'revision' in fields: 1051 revisiondata = cl.revision(node) 1052 followingmeta.append((b'revision', len(revisiondata))) 1053 followingdata.append(revisiondata) 1054 1055 # TODO make it possible for extensions to wrap a function or register 1056 # a handler to service custom fields. 1057 1058 if followingmeta: 1059 d[b'fieldsfollowing'] = followingmeta 1060 1061 yield d 1062 1063 for extra in followingdata: 1064 yield extra 1065 1066 # If requested, send bookmarks from nodes that didn't have revision 1067 # data sent so receiver is aware of any bookmark updates. 1068 if b'bookmarks' in fields: 1069 for node, marks in sorted(pycompat.iteritems(nodebookmarks)): 1070 yield { 1071 b'node': node, 1072 b'bookmarks': sorted(marks), 1073 } 1074 1075 1076class FileAccessError(Exception): 1077 """Represents an error accessing a specific file.""" 1078 1079 def __init__(self, path, msg, args): 1080 self.path = path 1081 self.msg = msg 1082 self.args = args 1083 1084 1085def getfilestore(repo, proto, path): 1086 """Obtain a file storage object for use with wire protocol. 1087 1088 Exists as a standalone function so extensions can monkeypatch to add 1089 access control. 1090 """ 1091 # This seems to work even if the file doesn't exist. So catch 1092 # "empty" files and return an error. 1093 fl = repo.file(path) 1094 1095 if not len(fl): 1096 raise FileAccessError(path, b'unknown file: %s', (path,)) 1097 1098 return fl 1099 1100 1101def emitfilerevisions(repo, path, revisions, linknodes, fields): 1102 for revision in revisions: 1103 d = { 1104 b'node': revision.node, 1105 } 1106 1107 if b'parents' in fields: 1108 d[b'parents'] = [revision.p1node, revision.p2node] 1109 1110 if b'linknode' in fields: 1111 d[b'linknode'] = linknodes[revision.node] 1112 1113 followingmeta = [] 1114 followingdata = [] 1115 1116 if b'revision' in fields: 1117 if revision.revision is not None: 1118 followingmeta.append((b'revision', len(revision.revision))) 1119 followingdata.append(revision.revision) 1120 else: 1121 d[b'deltabasenode'] = revision.basenode 1122 followingmeta.append((b'delta', len(revision.delta))) 1123 followingdata.append(revision.delta) 1124 1125 if followingmeta: 1126 d[b'fieldsfollowing'] = followingmeta 1127 1128 yield d 1129 1130 for extra in followingdata: 1131 yield extra 1132 1133 1134def makefilematcher(repo, pathfilter): 1135 """Construct a matcher from a path filter dict.""" 1136 1137 # Validate values. 1138 if pathfilter: 1139 for key in (b'include', b'exclude'): 1140 for pattern in pathfilter.get(key, []): 1141 if not pattern.startswith((b'path:', b'rootfilesin:')): 1142 raise error.WireprotoCommandError( 1143 b'%s pattern must begin with `path:` or `rootfilesin:`; ' 1144 b'got %s', 1145 (key, pattern), 1146 ) 1147 1148 if pathfilter: 1149 matcher = matchmod.match( 1150 repo.root, 1151 b'', 1152 include=pathfilter.get(b'include', []), 1153 exclude=pathfilter.get(b'exclude', []), 1154 ) 1155 else: 1156 matcher = matchmod.match(repo.root, b'') 1157 1158 # Requested patterns could include files not in the local store. So 1159 # filter those out. 1160 return repo.narrowmatch(matcher) 1161 1162 1163@wireprotocommand( 1164 b'filedata', 1165 args={ 1166 b'haveparents': { 1167 b'type': b'bool', 1168 b'default': lambda: False, 1169 b'example': True, 1170 }, 1171 b'nodes': { 1172 b'type': b'list', 1173 b'example': [b'0123456...'], 1174 }, 1175 b'fields': { 1176 b'type': b'set', 1177 b'default': set, 1178 b'example': {b'parents', b'revision'}, 1179 b'validvalues': {b'parents', b'revision', b'linknode'}, 1180 }, 1181 b'path': { 1182 b'type': b'bytes', 1183 b'example': b'foo.txt', 1184 }, 1185 }, 1186 permission=b'pull', 1187 # TODO censoring a file revision won't invalidate the cache. 1188 # Figure out a way to take censoring into account when deriving 1189 # the cache key. 1190 cachekeyfn=makecommandcachekeyfn(b'filedata', 1, allargs=True), 1191) 1192def filedata(repo, proto, haveparents, nodes, fields, path): 1193 # TODO this API allows access to file revisions that are attached to 1194 # secret changesets. filesdata does not have this problem. Maybe this 1195 # API should be deleted? 1196 1197 try: 1198 # Extensions may wish to access the protocol handler. 1199 store = getfilestore(repo, proto, path) 1200 except FileAccessError as e: 1201 raise error.WireprotoCommandError(e.msg, e.args) 1202 1203 clnode = repo.changelog.node 1204 linknodes = {} 1205 1206 # Validate requested nodes. 1207 for node in nodes: 1208 try: 1209 store.rev(node) 1210 except error.LookupError: 1211 raise error.WireprotoCommandError( 1212 b'unknown file node: %s', (hex(node),) 1213 ) 1214 1215 # TODO by creating the filectx against a specific file revision 1216 # instead of changeset, linkrev() is always used. This is wrong for 1217 # cases where linkrev() may refer to a hidden changeset. But since this 1218 # API doesn't know anything about changesets, we're not sure how to 1219 # disambiguate the linknode. Perhaps we should delete this API? 1220 fctx = repo.filectx(path, fileid=node) 1221 linknodes[node] = clnode(fctx.introrev()) 1222 1223 revisions = store.emitrevisions( 1224 nodes, 1225 revisiondata=b'revision' in fields, 1226 assumehaveparentrevisions=haveparents, 1227 ) 1228 1229 yield { 1230 b'totalitems': len(nodes), 1231 } 1232 1233 for o in emitfilerevisions(repo, path, revisions, linknodes, fields): 1234 yield o 1235 1236 1237def filesdatacapabilities(repo, proto): 1238 batchsize = repo.ui.configint( 1239 b'experimental', b'server.filesdata.recommended-batch-size' 1240 ) 1241 return { 1242 b'recommendedbatchsize': batchsize, 1243 } 1244 1245 1246@wireprotocommand( 1247 b'filesdata', 1248 args={ 1249 b'haveparents': { 1250 b'type': b'bool', 1251 b'default': lambda: False, 1252 b'example': True, 1253 }, 1254 b'fields': { 1255 b'type': b'set', 1256 b'default': set, 1257 b'example': {b'parents', b'revision'}, 1258 b'validvalues': { 1259 b'firstchangeset', 1260 b'linknode', 1261 b'parents', 1262 b'revision', 1263 }, 1264 }, 1265 b'pathfilter': { 1266 b'type': b'dict', 1267 b'default': lambda: None, 1268 b'example': {b'include': [b'path:tests']}, 1269 }, 1270 b'revisions': { 1271 b'type': b'list', 1272 b'example': [ 1273 { 1274 b'type': b'changesetexplicit', 1275 b'nodes': [b'abcdef...'], 1276 } 1277 ], 1278 }, 1279 }, 1280 permission=b'pull', 1281 # TODO censoring a file revision won't invalidate the cache. 1282 # Figure out a way to take censoring into account when deriving 1283 # the cache key. 1284 cachekeyfn=makecommandcachekeyfn(b'filesdata', 1, allargs=True), 1285 extracapabilitiesfn=filesdatacapabilities, 1286) 1287def filesdata(repo, proto, haveparents, fields, pathfilter, revisions): 1288 # TODO This should operate on a repo that exposes obsolete changesets. There 1289 # is a race between a client making a push that obsoletes a changeset and 1290 # another client fetching files data for that changeset. If a client has a 1291 # changeset, it should probably be allowed to access files data for that 1292 # changeset. 1293 1294 outgoing = resolvenodes(repo, revisions) 1295 filematcher = makefilematcher(repo, pathfilter) 1296 1297 # path -> {fnode: linknode} 1298 fnodes = collections.defaultdict(dict) 1299 1300 # We collect the set of relevant file revisions by iterating the changeset 1301 # revisions and either walking the set of files recorded in the changeset 1302 # or by walking the manifest at that revision. There is probably room for a 1303 # storage-level API to request this data, as it can be expensive to compute 1304 # and would benefit from caching or alternate storage from what revlogs 1305 # provide. 1306 for node in outgoing: 1307 ctx = repo[node] 1308 mctx = ctx.manifestctx() 1309 md = mctx.read() 1310 1311 if haveparents: 1312 checkpaths = ctx.files() 1313 else: 1314 checkpaths = md.keys() 1315 1316 for path in checkpaths: 1317 fnode = md[path] 1318 1319 if path in fnodes and fnode in fnodes[path]: 1320 continue 1321 1322 if not filematcher(path): 1323 continue 1324 1325 fnodes[path].setdefault(fnode, node) 1326 1327 yield { 1328 b'totalpaths': len(fnodes), 1329 b'totalitems': sum(len(v) for v in fnodes.values()), 1330 } 1331 1332 for path, filenodes in sorted(fnodes.items()): 1333 try: 1334 store = getfilestore(repo, proto, path) 1335 except FileAccessError as e: 1336 raise error.WireprotoCommandError(e.msg, e.args) 1337 1338 yield { 1339 b'path': path, 1340 b'totalitems': len(filenodes), 1341 } 1342 1343 revisions = store.emitrevisions( 1344 filenodes.keys(), 1345 revisiondata=b'revision' in fields, 1346 assumehaveparentrevisions=haveparents, 1347 ) 1348 1349 for o in emitfilerevisions(repo, path, revisions, filenodes, fields): 1350 yield o 1351 1352 1353@wireprotocommand( 1354 b'heads', 1355 args={ 1356 b'publiconly': { 1357 b'type': b'bool', 1358 b'default': lambda: False, 1359 b'example': False, 1360 }, 1361 }, 1362 permission=b'pull', 1363) 1364def headsv2(repo, proto, publiconly): 1365 if publiconly: 1366 repo = repo.filtered(b'immutable') 1367 1368 yield repo.heads() 1369 1370 1371@wireprotocommand( 1372 b'known', 1373 args={ 1374 b'nodes': { 1375 b'type': b'list', 1376 b'default': list, 1377 b'example': [b'deadbeef'], 1378 }, 1379 }, 1380 permission=b'pull', 1381) 1382def knownv2(repo, proto, nodes): 1383 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes)) 1384 yield result 1385 1386 1387@wireprotocommand( 1388 b'listkeys', 1389 args={ 1390 b'namespace': { 1391 b'type': b'bytes', 1392 b'example': b'ns', 1393 }, 1394 }, 1395 permission=b'pull', 1396) 1397def listkeysv2(repo, proto, namespace): 1398 keys = repo.listkeys(encoding.tolocal(namespace)) 1399 keys = { 1400 encoding.fromlocal(k): encoding.fromlocal(v) 1401 for k, v in pycompat.iteritems(keys) 1402 } 1403 1404 yield keys 1405 1406 1407@wireprotocommand( 1408 b'lookup', 1409 args={ 1410 b'key': { 1411 b'type': b'bytes', 1412 b'example': b'foo', 1413 }, 1414 }, 1415 permission=b'pull', 1416) 1417def lookupv2(repo, proto, key): 1418 key = encoding.tolocal(key) 1419 1420 # TODO handle exception. 1421 node = repo.lookup(key) 1422 1423 yield node 1424 1425 1426def manifestdatacapabilities(repo, proto): 1427 batchsize = repo.ui.configint( 1428 b'experimental', b'server.manifestdata.recommended-batch-size' 1429 ) 1430 1431 return { 1432 b'recommendedbatchsize': batchsize, 1433 } 1434 1435 1436@wireprotocommand( 1437 b'manifestdata', 1438 args={ 1439 b'nodes': { 1440 b'type': b'list', 1441 b'example': [b'0123456...'], 1442 }, 1443 b'haveparents': { 1444 b'type': b'bool', 1445 b'default': lambda: False, 1446 b'example': True, 1447 }, 1448 b'fields': { 1449 b'type': b'set', 1450 b'default': set, 1451 b'example': {b'parents', b'revision'}, 1452 b'validvalues': {b'parents', b'revision'}, 1453 }, 1454 b'tree': { 1455 b'type': b'bytes', 1456 b'example': b'', 1457 }, 1458 }, 1459 permission=b'pull', 1460 cachekeyfn=makecommandcachekeyfn(b'manifestdata', 1, allargs=True), 1461 extracapabilitiesfn=manifestdatacapabilities, 1462) 1463def manifestdata(repo, proto, haveparents, nodes, fields, tree): 1464 store = repo.manifestlog.getstorage(tree) 1465 1466 # Validate the node is known and abort on unknown revisions. 1467 for node in nodes: 1468 try: 1469 store.rev(node) 1470 except error.LookupError: 1471 raise error.WireprotoCommandError(b'unknown node: %s', (node,)) 1472 1473 revisions = store.emitrevisions( 1474 nodes, 1475 revisiondata=b'revision' in fields, 1476 assumehaveparentrevisions=haveparents, 1477 ) 1478 1479 yield { 1480 b'totalitems': len(nodes), 1481 } 1482 1483 for revision in revisions: 1484 d = { 1485 b'node': revision.node, 1486 } 1487 1488 if b'parents' in fields: 1489 d[b'parents'] = [revision.p1node, revision.p2node] 1490 1491 followingmeta = [] 1492 followingdata = [] 1493 1494 if b'revision' in fields: 1495 if revision.revision is not None: 1496 followingmeta.append((b'revision', len(revision.revision))) 1497 followingdata.append(revision.revision) 1498 else: 1499 d[b'deltabasenode'] = revision.basenode 1500 followingmeta.append((b'delta', len(revision.delta))) 1501 followingdata.append(revision.delta) 1502 1503 if followingmeta: 1504 d[b'fieldsfollowing'] = followingmeta 1505 1506 yield d 1507 1508 for extra in followingdata: 1509 yield extra 1510 1511 1512@wireprotocommand( 1513 b'pushkey', 1514 args={ 1515 b'namespace': { 1516 b'type': b'bytes', 1517 b'example': b'ns', 1518 }, 1519 b'key': { 1520 b'type': b'bytes', 1521 b'example': b'key', 1522 }, 1523 b'old': { 1524 b'type': b'bytes', 1525 b'example': b'old', 1526 }, 1527 b'new': { 1528 b'type': b'bytes', 1529 b'example': b'new', 1530 }, 1531 }, 1532 permission=b'push', 1533) 1534def pushkeyv2(repo, proto, namespace, key, old, new): 1535 # TODO handle ui output redirection 1536 yield repo.pushkey( 1537 encoding.tolocal(namespace), 1538 encoding.tolocal(key), 1539 encoding.tolocal(old), 1540 encoding.tolocal(new), 1541 ) 1542 1543 1544@wireprotocommand( 1545 b'rawstorefiledata', 1546 args={ 1547 b'files': { 1548 b'type': b'list', 1549 b'example': [b'changelog', b'manifestlog'], 1550 }, 1551 b'pathfilter': { 1552 b'type': b'list', 1553 b'default': lambda: None, 1554 b'example': {b'include': [b'path:tests']}, 1555 }, 1556 }, 1557 permission=b'pull', 1558) 1559def rawstorefiledata(repo, proto, files, pathfilter): 1560 if not streamclone.allowservergeneration(repo): 1561 raise error.WireprotoCommandError(b'stream clone is disabled') 1562 1563 # TODO support dynamically advertising what store files "sets" are 1564 # available. For now, we support changelog, manifestlog, and files. 1565 files = set(files) 1566 allowedfiles = {b'changelog', b'manifestlog'} 1567 1568 unsupported = files - allowedfiles 1569 if unsupported: 1570 raise error.WireprotoCommandError( 1571 b'unknown file type: %s', (b', '.join(sorted(unsupported)),) 1572 ) 1573 1574 with repo.lock(): 1575 topfiles = list(repo.store.topfiles()) 1576 1577 sendfiles = [] 1578 totalsize = 0 1579 1580 # TODO this is a bunch of storage layer interface abstractions because 1581 # it assumes revlogs. 1582 for rl_type, name, size in topfiles: 1583 # XXX use the `rl_type` for that 1584 if b'changelog' in files and name.startswith(b'00changelog'): 1585 pass 1586 elif b'manifestlog' in files and name.startswith(b'00manifest'): 1587 pass 1588 else: 1589 continue 1590 1591 sendfiles.append((b'store', name, size)) 1592 totalsize += size 1593 1594 yield { 1595 b'filecount': len(sendfiles), 1596 b'totalsize': totalsize, 1597 } 1598 1599 for location, name, size in sendfiles: 1600 yield { 1601 b'location': location, 1602 b'path': name, 1603 b'size': size, 1604 } 1605 1606 # We have to use a closure for this to ensure the context manager is 1607 # closed only after sending the final chunk. 1608 def getfiledata(): 1609 with repo.svfs(name, b'rb', auditpath=False) as fh: 1610 for chunk in util.filechunkiter(fh, limit=size): 1611 yield chunk 1612 1613 yield wireprototypes.indefinitebytestringresponse(getfiledata()) 1614