1# Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2# Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
3#
4# This software may be used and distributed according to the terms of the
5# GNU General Public License version 2 or any later version.
6
7from __future__ import absolute_import
8
9import collections
10import contextlib
11
12from .i18n import _
13from .node import hex
14from . import (
15    discovery,
16    encoding,
17    error,
18    match as matchmod,
19    narrowspec,
20    pycompat,
21    streamclone,
22    templatefilters,
23    util,
24    wireprotoframing,
25    wireprototypes,
26)
27from .interfaces import util as interfaceutil
28from .utils import (
29    cborutil,
30    hashutil,
31    stringutil,
32)
33
34FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
35
36HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
37
38COMMANDS = wireprototypes.commanddict()
39
40# Value inserted into cache key computation function. Change the value to
41# force new cache keys for every command request. This should be done when
42# there is a change to how caching works, etc.
43GLOBAL_CACHE_VERSION = 1
44
45
46def handlehttpv2request(rctx, req, res, checkperm, urlparts):
47    from .hgweb import common as hgwebcommon
48
49    # URL space looks like: <permissions>/<command>, where <permission> can
50    # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
51
52    # Root URL does nothing meaningful... yet.
53    if not urlparts:
54        res.status = b'200 OK'
55        res.headers[b'Content-Type'] = b'text/plain'
56        res.setbodybytes(_(b'HTTP version 2 API handler'))
57        return
58
59    if len(urlparts) == 1:
60        res.status = b'404 Not Found'
61        res.headers[b'Content-Type'] = b'text/plain'
62        res.setbodybytes(
63            _(b'do not know how to process %s\n') % req.dispatchpath
64        )
65        return
66
67    permission, command = urlparts[0:2]
68
69    if permission not in (b'ro', b'rw'):
70        res.status = b'404 Not Found'
71        res.headers[b'Content-Type'] = b'text/plain'
72        res.setbodybytes(_(b'unknown permission: %s') % permission)
73        return
74
75    if req.method != b'POST':
76        res.status = b'405 Method Not Allowed'
77        res.headers[b'Allow'] = b'POST'
78        res.setbodybytes(_(b'commands require POST requests'))
79        return
80
81    # At some point we'll want to use our own API instead of recycling the
82    # behavior of version 1 of the wire protocol...
83    # TODO return reasonable responses - not responses that overload the
84    # HTTP status line message for error reporting.
85    try:
86        checkperm(rctx, req, b'pull' if permission == b'ro' else b'push')
87    except hgwebcommon.ErrorResponse as e:
88        res.status = hgwebcommon.statusmessage(
89            e.code, stringutil.forcebytestr(e)
90        )
91        for k, v in e.headers:
92            res.headers[k] = v
93        res.setbodybytes(b'permission denied')
94        return
95
96    # We have a special endpoint to reflect the request back at the client.
97    if command == b'debugreflect':
98        _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
99        return
100
101    # Extra commands that we handle that aren't really wire protocol
102    # commands. Think extra hard before making this hackery available to
103    # extension.
104    extracommands = {b'multirequest'}
105
106    if command not in COMMANDS and command not in extracommands:
107        res.status = b'404 Not Found'
108        res.headers[b'Content-Type'] = b'text/plain'
109        res.setbodybytes(_(b'unknown wire protocol command: %s\n') % command)
110        return
111
112    repo = rctx.repo
113    ui = repo.ui
114
115    proto = httpv2protocolhandler(req, ui)
116
117    if (
118        not COMMANDS.commandavailable(command, proto)
119        and command not in extracommands
120    ):
121        res.status = b'404 Not Found'
122        res.headers[b'Content-Type'] = b'text/plain'
123        res.setbodybytes(_(b'invalid wire protocol command: %s') % command)
124        return
125
126    # TODO consider cases where proxies may add additional Accept headers.
127    if req.headers.get(b'Accept') != FRAMINGTYPE:
128        res.status = b'406 Not Acceptable'
129        res.headers[b'Content-Type'] = b'text/plain'
130        res.setbodybytes(
131            _(b'client MUST specify Accept header with value: %s\n')
132            % FRAMINGTYPE
133        )
134        return
135
136    if req.headers.get(b'Content-Type') != FRAMINGTYPE:
137        res.status = b'415 Unsupported Media Type'
138        # TODO we should send a response with appropriate media type,
139        # since client does Accept it.
140        res.headers[b'Content-Type'] = b'text/plain'
141        res.setbodybytes(
142            _(b'client MUST send Content-Type header with value: %s\n')
143            % FRAMINGTYPE
144        )
145        return
146
147    _processhttpv2request(ui, repo, req, res, permission, command, proto)
148
149
150def _processhttpv2reflectrequest(ui, repo, req, res):
151    """Reads unified frame protocol request and dumps out state to client.
152
153    This special endpoint can be used to help debug the wire protocol.
154
155    Instead of routing the request through the normal dispatch mechanism,
156    we instead read all frames, decode them, and feed them into our state
157    tracker. We then dump the log of all that activity back out to the
158    client.
159    """
160    # Reflection APIs have a history of being abused, accidentally disclosing
161    # sensitive data, etc. So we have a config knob.
162    if not ui.configbool(b'experimental', b'web.api.debugreflect'):
163        res.status = b'404 Not Found'
164        res.headers[b'Content-Type'] = b'text/plain'
165        res.setbodybytes(_(b'debugreflect service not available'))
166        return
167
168    # We assume we have a unified framing protocol request body.
169
170    reactor = wireprotoframing.serverreactor(ui)
171    states = []
172
173    while True:
174        frame = wireprotoframing.readframe(req.bodyfh)
175
176        if not frame:
177            states.append(b'received: <no frame>')
178            break
179
180        states.append(
181            b'received: %d %d %d %s'
182            % (frame.typeid, frame.flags, frame.requestid, frame.payload)
183        )
184
185        action, meta = reactor.onframerecv(frame)
186        states.append(templatefilters.json((action, meta)))
187
188    action, meta = reactor.oninputeof()
189    meta[b'action'] = action
190    states.append(templatefilters.json(meta))
191
192    res.status = b'200 OK'
193    res.headers[b'Content-Type'] = b'text/plain'
194    res.setbodybytes(b'\n'.join(states))
195
196
197def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
198    """Post-validation handler for HTTPv2 requests.
199
200    Called when the HTTP request contains unified frame-based protocol
201    frames for evaluation.
202    """
203    # TODO Some HTTP clients are full duplex and can receive data before
204    # the entire request is transmitted. Figure out a way to indicate support
205    # for that so we can opt into full duplex mode.
206    reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
207    seencommand = False
208
209    outstream = None
210
211    while True:
212        frame = wireprotoframing.readframe(req.bodyfh)
213        if not frame:
214            break
215
216        action, meta = reactor.onframerecv(frame)
217
218        if action == b'wantframe':
219            # Need more data before we can do anything.
220            continue
221        elif action == b'runcommand':
222            # Defer creating output stream because we need to wait for
223            # protocol settings frames so proper encoding can be applied.
224            if not outstream:
225                outstream = reactor.makeoutputstream()
226
227            sentoutput = _httpv2runcommand(
228                ui,
229                repo,
230                req,
231                res,
232                authedperm,
233                reqcommand,
234                reactor,
235                outstream,
236                meta,
237                issubsequent=seencommand,
238            )
239
240            if sentoutput:
241                return
242
243            seencommand = True
244
245        elif action == b'error':
246            # TODO define proper error mechanism.
247            res.status = b'200 OK'
248            res.headers[b'Content-Type'] = b'text/plain'
249            res.setbodybytes(meta[b'message'] + b'\n')
250            return
251        else:
252            raise error.ProgrammingError(
253                b'unhandled action from frame processor: %s' % action
254            )
255
256    action, meta = reactor.oninputeof()
257    if action == b'sendframes':
258        # We assume we haven't started sending the response yet. If we're
259        # wrong, the response type will raise an exception.
260        res.status = b'200 OK'
261        res.headers[b'Content-Type'] = FRAMINGTYPE
262        res.setbodygen(meta[b'framegen'])
263    elif action == b'noop':
264        pass
265    else:
266        raise error.ProgrammingError(
267            b'unhandled action from frame processor: %s' % action
268        )
269
270
271def _httpv2runcommand(
272    ui,
273    repo,
274    req,
275    res,
276    authedperm,
277    reqcommand,
278    reactor,
279    outstream,
280    command,
281    issubsequent,
282):
283    """Dispatch a wire protocol command made from HTTPv2 requests.
284
285    The authenticated permission (``authedperm``) along with the original
286    command from the URL (``reqcommand``) are passed in.
287    """
288    # We already validated that the session has permissions to perform the
289    # actions in ``authedperm``. In the unified frame protocol, the canonical
290    # command to run is expressed in a frame. However, the URL also requested
291    # to run a specific command. We need to be careful that the command we
292    # run doesn't have permissions requirements greater than what was granted
293    # by ``authedperm``.
294    #
295    # Our rule for this is we only allow one command per HTTP request and
296    # that command must match the command in the URL. However, we make
297    # an exception for the ``multirequest`` URL. This URL is allowed to
298    # execute multiple commands. We double check permissions of each command
299    # as it is invoked to ensure there is no privilege escalation.
300    # TODO consider allowing multiple commands to regular command URLs
301    # iff each command is the same.
302
303    proto = httpv2protocolhandler(req, ui, args=command[b'args'])
304
305    if reqcommand == b'multirequest':
306        if not COMMANDS.commandavailable(command[b'command'], proto):
307            # TODO proper error mechanism
308            res.status = b'200 OK'
309            res.headers[b'Content-Type'] = b'text/plain'
310            res.setbodybytes(
311                _(b'wire protocol command not available: %s')
312                % command[b'command']
313            )
314            return True
315
316        # TODO don't use assert here, since it may be elided by -O.
317        assert authedperm in (b'ro', b'rw')
318        wirecommand = COMMANDS[command[b'command']]
319        assert wirecommand.permission in (b'push', b'pull')
320
321        if authedperm == b'ro' and wirecommand.permission != b'pull':
322            # TODO proper error mechanism
323            res.status = b'403 Forbidden'
324            res.headers[b'Content-Type'] = b'text/plain'
325            res.setbodybytes(
326                _(b'insufficient permissions to execute command: %s')
327                % command[b'command']
328            )
329            return True
330
331        # TODO should we also call checkperm() here? Maybe not if we're going
332        # to overhaul that API. The granted scope from the URL check should
333        # be good enough.
334
335    else:
336        # Don't allow multiple commands outside of ``multirequest`` URL.
337        if issubsequent:
338            # TODO proper error mechanism
339            res.status = b'200 OK'
340            res.headers[b'Content-Type'] = b'text/plain'
341            res.setbodybytes(
342                _(b'multiple commands cannot be issued to this URL')
343            )
344            return True
345
346        if reqcommand != command[b'command']:
347            # TODO define proper error mechanism
348            res.status = b'200 OK'
349            res.headers[b'Content-Type'] = b'text/plain'
350            res.setbodybytes(_(b'command in frame must match command in URL'))
351            return True
352
353    res.status = b'200 OK'
354    res.headers[b'Content-Type'] = FRAMINGTYPE
355
356    try:
357        objs = dispatch(repo, proto, command[b'command'], command[b'redirect'])
358
359        action, meta = reactor.oncommandresponsereadyobjects(
360            outstream, command[b'requestid'], objs
361        )
362
363    except error.WireprotoCommandError as e:
364        action, meta = reactor.oncommanderror(
365            outstream, command[b'requestid'], e.message, e.messageargs
366        )
367
368    except Exception as e:
369        action, meta = reactor.onservererror(
370            outstream,
371            command[b'requestid'],
372            _(b'exception when invoking command: %s')
373            % stringutil.forcebytestr(e),
374        )
375
376    if action == b'sendframes':
377        res.setbodygen(meta[b'framegen'])
378        return True
379    elif action == b'noop':
380        return False
381    else:
382        raise error.ProgrammingError(
383            b'unhandled event from reactor: %s' % action
384        )
385
386
387def getdispatchrepo(repo, proto, command):
388    viewconfig = repo.ui.config(b'server', b'view')
389    return repo.filtered(viewconfig)
390
391
392def dispatch(repo, proto, command, redirect):
393    """Run a wire protocol command.
394
395    Returns an iterable of objects that will be sent to the client.
396    """
397    repo = getdispatchrepo(repo, proto, command)
398
399    entry = COMMANDS[command]
400    func = entry.func
401    spec = entry.args
402
403    args = proto.getargs(spec)
404
405    # There is some duplicate boilerplate code here for calling the command and
406    # emitting objects. It is either that or a lot of indented code that looks
407    # like a pyramid (since there are a lot of code paths that result in not
408    # using the cacher).
409    callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
410
411    # Request is not cacheable. Don't bother instantiating a cacher.
412    if not entry.cachekeyfn:
413        for o in callcommand():
414            yield o
415        return
416
417    if redirect:
418        redirecttargets = redirect[b'targets']
419        redirecthashes = redirect[b'hashes']
420    else:
421        redirecttargets = []
422        redirecthashes = []
423
424    cacher = makeresponsecacher(
425        repo,
426        proto,
427        command,
428        args,
429        cborutil.streamencode,
430        redirecttargets=redirecttargets,
431        redirecthashes=redirecthashes,
432    )
433
434    # But we have no cacher. Do default handling.
435    if not cacher:
436        for o in callcommand():
437            yield o
438        return
439
440    with cacher:
441        cachekey = entry.cachekeyfn(
442            repo, proto, cacher, **pycompat.strkwargs(args)
443        )
444
445        # No cache key or the cacher doesn't like it. Do default handling.
446        if cachekey is None or not cacher.setcachekey(cachekey):
447            for o in callcommand():
448                yield o
449            return
450
451        # Serve it from the cache, if possible.
452        cached = cacher.lookup()
453
454        if cached:
455            for o in cached[b'objs']:
456                yield o
457            return
458
459        # Else call the command and feed its output into the cacher, allowing
460        # the cacher to buffer/mutate objects as it desires.
461        for o in callcommand():
462            for o in cacher.onobject(o):
463                yield o
464
465        for o in cacher.onfinished():
466            yield o
467
468
469@interfaceutil.implementer(wireprototypes.baseprotocolhandler)
470class httpv2protocolhandler(object):
471    def __init__(self, req, ui, args=None):
472        self._req = req
473        self._ui = ui
474        self._args = args
475
476    @property
477    def name(self):
478        return HTTP_WIREPROTO_V2
479
480    def getargs(self, args):
481        # First look for args that were passed but aren't registered on this
482        # command.
483        extra = set(self._args) - set(args)
484        if extra:
485            raise error.WireprotoCommandError(
486                b'unsupported argument to command: %s'
487                % b', '.join(sorted(extra))
488            )
489
490        # And look for required arguments that are missing.
491        missing = {a for a in args if args[a][b'required']} - set(self._args)
492
493        if missing:
494            raise error.WireprotoCommandError(
495                b'missing required arguments: %s' % b', '.join(sorted(missing))
496            )
497
498        # Now derive the arguments to pass to the command, taking into
499        # account the arguments specified by the client.
500        data = {}
501        for k, meta in sorted(args.items()):
502            # This argument wasn't passed by the client.
503            if k not in self._args:
504                data[k] = meta[b'default']()
505                continue
506
507            v = self._args[k]
508
509            # Sets may be expressed as lists. Silently normalize.
510            if meta[b'type'] == b'set' and isinstance(v, list):
511                v = set(v)
512
513            # TODO consider more/stronger type validation.
514
515            data[k] = v
516
517        return data
518
519    def getprotocaps(self):
520        # Protocol capabilities are currently not implemented for HTTP V2.
521        return set()
522
523    def getpayload(self):
524        raise NotImplementedError
525
526    @contextlib.contextmanager
527    def mayberedirectstdio(self):
528        raise NotImplementedError
529
530    def client(self):
531        raise NotImplementedError
532
533    def addcapabilities(self, repo, caps):
534        return caps
535
536    def checkperm(self, perm):
537        raise NotImplementedError
538
539
540def httpv2apidescriptor(req, repo):
541    proto = httpv2protocolhandler(req, repo.ui)
542
543    return _capabilitiesv2(repo, proto)
544
545
546def _capabilitiesv2(repo, proto):
547    """Obtain the set of capabilities for version 2 transports.
548
549    These capabilities are distinct from the capabilities for version 1
550    transports.
551    """
552    caps = {
553        b'commands': {},
554        b'framingmediatypes': [FRAMINGTYPE],
555        b'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
556    }
557
558    for command, entry in COMMANDS.items():
559        args = {}
560
561        for arg, meta in entry.args.items():
562            args[arg] = {
563                # TODO should this be a normalized type using CBOR's
564                # terminology?
565                b'type': meta[b'type'],
566                b'required': meta[b'required'],
567            }
568
569            if not meta[b'required']:
570                args[arg][b'default'] = meta[b'default']()
571
572            if meta[b'validvalues']:
573                args[arg][b'validvalues'] = meta[b'validvalues']
574
575        # TODO this type of check should be defined in a per-command callback.
576        if (
577            command == b'rawstorefiledata'
578            and not streamclone.allowservergeneration(repo)
579        ):
580            continue
581
582        caps[b'commands'][command] = {
583            b'args': args,
584            b'permissions': [entry.permission],
585        }
586
587        if entry.extracapabilitiesfn:
588            extracaps = entry.extracapabilitiesfn(repo, proto)
589            caps[b'commands'][command].update(extracaps)
590
591    caps[b'rawrepoformats'] = sorted(repo.requirements & repo.supportedformats)
592
593    targets = getadvertisedredirecttargets(repo, proto)
594    if targets:
595        caps[b'redirect'] = {
596            b'targets': [],
597            b'hashes': [b'sha256', b'sha1'],
598        }
599
600        for target in targets:
601            entry = {
602                b'name': target[b'name'],
603                b'protocol': target[b'protocol'],
604                b'uris': target[b'uris'],
605            }
606
607            for key in (b'snirequired', b'tlsversions'):
608                if key in target:
609                    entry[key] = target[key]
610
611            caps[b'redirect'][b'targets'].append(entry)
612
613    return proto.addcapabilities(repo, caps)
614
615
616def getadvertisedredirecttargets(repo, proto):
617    """Obtain a list of content redirect targets.
618
619    Returns a list containing potential redirect targets that will be
620    advertised in capabilities data. Each dict MUST have the following
621    keys:
622
623    name
624       The name of this redirect target. This is the identifier clients use
625       to refer to a target. It is transferred as part of every command
626       request.
627
628    protocol
629       Network protocol used by this target. Typically this is the string
630       in front of the ``://`` in a URL. e.g. ``https``.
631
632    uris
633       List of representative URIs for this target. Clients can use the
634       URIs to test parsing for compatibility or for ordering preference
635       for which target to use.
636
637    The following optional keys are recognized:
638
639    snirequired
640       Bool indicating if Server Name Indication (SNI) is required to
641       connect to this target.
642
643    tlsversions
644       List of bytes indicating which TLS versions are supported by this
645       target.
646
647    By default, clients reflect the target order advertised by servers
648    and servers will use the first client-advertised target when picking
649    a redirect target. So targets should be advertised in the order the
650    server prefers they be used.
651    """
652    return []
653
654
655def wireprotocommand(
656    name,
657    args=None,
658    permission=b'push',
659    cachekeyfn=None,
660    extracapabilitiesfn=None,
661):
662    """Decorator to declare a wire protocol command.
663
664    ``name`` is the name of the wire protocol command being provided.
665
666    ``args`` is a dict defining arguments accepted by the command. Keys are
667    the argument name. Values are dicts with the following keys:
668
669       ``type``
670          The argument data type. Must be one of the following string
671          literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
672          or ``bool``.
673
674       ``default``
675          A callable returning the default value for this argument. If not
676          specified, ``None`` will be the default value.
677
678       ``example``
679          An example value for this argument.
680
681       ``validvalues``
682          Set of recognized values for this argument.
683
684    ``permission`` defines the permission type needed to run this command.
685    Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
686    respectively. Default is to assume command requires ``push`` permissions
687    because otherwise commands not declaring their permissions could modify
688    a repository that is supposed to be read-only.
689
690    ``cachekeyfn`` defines an optional callable that can derive the
691    cache key for this request.
692
693    ``extracapabilitiesfn`` defines an optional callable that defines extra
694    command capabilities/parameters that are advertised next to the command
695    in the capabilities data structure describing the server. The callable
696    receives as arguments the repository and protocol objects. It returns
697    a dict of extra fields to add to the command descriptor.
698
699    Wire protocol commands are generators of objects to be serialized and
700    sent to the client.
701
702    If a command raises an uncaught exception, this will be translated into
703    a command error.
704
705    All commands can opt in to being cacheable by defining a function
706    (``cachekeyfn``) that is called to derive a cache key. This function
707    receives the same arguments as the command itself plus a ``cacher``
708    argument containing the active cacher for the request and returns a bytes
709    containing the key in a cache the response to this command may be cached
710    under.
711    """
712    transports = {
713        k for k, v in wireprototypes.TRANSPORTS.items() if v[b'version'] == 2
714    }
715
716    if permission not in (b'push', b'pull'):
717        raise error.ProgrammingError(
718            b'invalid wire protocol permission; '
719            b'got %s; expected "push" or "pull"' % permission
720        )
721
722    if args is None:
723        args = {}
724
725    if not isinstance(args, dict):
726        raise error.ProgrammingError(
727            b'arguments for version 2 commands must be declared as dicts'
728        )
729
730    for arg, meta in args.items():
731        if arg == b'*':
732            raise error.ProgrammingError(
733                b'* argument name not allowed on version 2 commands'
734            )
735
736        if not isinstance(meta, dict):
737            raise error.ProgrammingError(
738                b'arguments for version 2 commands '
739                b'must declare metadata as a dict'
740            )
741
742        if b'type' not in meta:
743            raise error.ProgrammingError(
744                b'%s argument for command %s does not '
745                b'declare type field' % (arg, name)
746            )
747
748        if meta[b'type'] not in (
749            b'bytes',
750            b'int',
751            b'list',
752            b'dict',
753            b'set',
754            b'bool',
755        ):
756            raise error.ProgrammingError(
757                b'%s argument for command %s has '
758                b'illegal type: %s' % (arg, name, meta[b'type'])
759            )
760
761        if b'example' not in meta:
762            raise error.ProgrammingError(
763                b'%s argument for command %s does not '
764                b'declare example field' % (arg, name)
765            )
766
767        meta[b'required'] = b'default' not in meta
768
769        meta.setdefault(b'default', lambda: None)
770        meta.setdefault(b'validvalues', None)
771
772    def register(func):
773        if name in COMMANDS:
774            raise error.ProgrammingError(
775                b'%s command already registered for version 2' % name
776            )
777
778        COMMANDS[name] = wireprototypes.commandentry(
779            func,
780            args=args,
781            transports=transports,
782            permission=permission,
783            cachekeyfn=cachekeyfn,
784            extracapabilitiesfn=extracapabilitiesfn,
785        )
786
787        return func
788
789    return register
790
791
792def makecommandcachekeyfn(command, localversion=None, allargs=False):
793    """Construct a cache key derivation function with common features.
794
795    By default, the cache key is a hash of:
796
797    * The command name.
798    * A global cache version number.
799    * A local cache version number (passed via ``localversion``).
800    * All the arguments passed to the command.
801    * The media type used.
802    * Wire protocol version string.
803    * The repository path.
804    """
805    if not allargs:
806        raise error.ProgrammingError(
807            b'only allargs=True is currently supported'
808        )
809
810    if localversion is None:
811        raise error.ProgrammingError(b'must set localversion argument value')
812
813    def cachekeyfn(repo, proto, cacher, **args):
814        spec = COMMANDS[command]
815
816        # Commands that mutate the repo can not be cached.
817        if spec.permission == b'push':
818            return None
819
820        # TODO config option to disable caching.
821
822        # Our key derivation strategy is to construct a data structure
823        # holding everything that could influence cacheability and to hash
824        # the CBOR representation of that. Using CBOR seems like it might
825        # be overkill. However, simpler hashing mechanisms are prone to
826        # duplicate input issues. e.g. if you just concatenate two values,
827        # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
828        # "padding" between values and prevents these problems.
829
830        # Seed the hash with various data.
831        state = {
832            # To invalidate all cache keys.
833            b'globalversion': GLOBAL_CACHE_VERSION,
834            # More granular cache key invalidation.
835            b'localversion': localversion,
836            # Cache keys are segmented by command.
837            b'command': command,
838            # Throw in the media type and API version strings so changes
839            # to exchange semantics invalid cache.
840            b'mediatype': FRAMINGTYPE,
841            b'version': HTTP_WIREPROTO_V2,
842            # So same requests for different repos don't share cache keys.
843            b'repo': repo.root,
844        }
845
846        # The arguments passed to us will have already been normalized.
847        # Default values will be set, etc. This is important because it
848        # means that it doesn't matter if clients send an explicit argument
849        # or rely on the default value: it will all normalize to the same
850        # set of arguments on the server and therefore the same cache key.
851        #
852        # Arguments by their very nature must support being encoded to CBOR.
853        # And the CBOR encoder is deterministic. So we hash the arguments
854        # by feeding the CBOR of their representation into the hasher.
855        if allargs:
856            state[b'args'] = pycompat.byteskwargs(args)
857
858        cacher.adjustcachekeystate(state)
859
860        hasher = hashutil.sha1()
861        for chunk in cborutil.streamencode(state):
862            hasher.update(chunk)
863
864        return pycompat.sysbytes(hasher.hexdigest())
865
866    return cachekeyfn
867
868
869def makeresponsecacher(
870    repo, proto, command, args, objencoderfn, redirecttargets, redirecthashes
871):
872    """Construct a cacher for a cacheable command.
873
874    Returns an ``iwireprotocolcommandcacher`` instance.
875
876    Extensions can monkeypatch this function to provide custom caching
877    backends.
878    """
879    return None
880
881
882def resolvenodes(repo, revisions):
883    """Resolve nodes from a revisions specifier data structure."""
884    cl = repo.changelog
885    clhasnode = cl.hasnode
886
887    seen = set()
888    nodes = []
889
890    if not isinstance(revisions, list):
891        raise error.WireprotoCommandError(
892            b'revisions must be defined as an array'
893        )
894
895    for spec in revisions:
896        if b'type' not in spec:
897            raise error.WireprotoCommandError(
898                b'type key not present in revision specifier'
899            )
900
901        typ = spec[b'type']
902
903        if typ == b'changesetexplicit':
904            if b'nodes' not in spec:
905                raise error.WireprotoCommandError(
906                    b'nodes key not present in changesetexplicit revision '
907                    b'specifier'
908                )
909
910            for node in spec[b'nodes']:
911                if node not in seen:
912                    nodes.append(node)
913                    seen.add(node)
914
915        elif typ == b'changesetexplicitdepth':
916            for key in (b'nodes', b'depth'):
917                if key not in spec:
918                    raise error.WireprotoCommandError(
919                        b'%s key not present in changesetexplicitdepth revision '
920                        b'specifier',
921                        (key,),
922                    )
923
924            for rev in repo.revs(
925                b'ancestors(%ln, %s)', spec[b'nodes'], spec[b'depth'] - 1
926            ):
927                node = cl.node(rev)
928
929                if node not in seen:
930                    nodes.append(node)
931                    seen.add(node)
932
933        elif typ == b'changesetdagrange':
934            for key in (b'roots', b'heads'):
935                if key not in spec:
936                    raise error.WireprotoCommandError(
937                        b'%s key not present in changesetdagrange revision '
938                        b'specifier',
939                        (key,),
940                    )
941
942            if not spec[b'heads']:
943                raise error.WireprotoCommandError(
944                    b'heads key in changesetdagrange cannot be empty'
945                )
946
947            if spec[b'roots']:
948                common = [n for n in spec[b'roots'] if clhasnode(n)]
949            else:
950                common = [repo.nullid]
951
952            for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
953                if n not in seen:
954                    nodes.append(n)
955                    seen.add(n)
956
957        else:
958            raise error.WireprotoCommandError(
959                b'unknown revision specifier type: %s', (typ,)
960            )
961
962    return nodes
963
964
965@wireprotocommand(b'branchmap', permission=b'pull')
966def branchmapv2(repo, proto):
967    yield {
968        encoding.fromlocal(k): v
969        for k, v in pycompat.iteritems(repo.branchmap())
970    }
971
972
973@wireprotocommand(b'capabilities', permission=b'pull')
974def capabilitiesv2(repo, proto):
975    yield _capabilitiesv2(repo, proto)
976
977
978@wireprotocommand(
979    b'changesetdata',
980    args={
981        b'revisions': {
982            b'type': b'list',
983            b'example': [
984                {
985                    b'type': b'changesetexplicit',
986                    b'nodes': [b'abcdef...'],
987                }
988            ],
989        },
990        b'fields': {
991            b'type': b'set',
992            b'default': set,
993            b'example': {b'parents', b'revision'},
994            b'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
995        },
996    },
997    permission=b'pull',
998)
999def changesetdata(repo, proto, revisions, fields):
1000    # TODO look for unknown fields and abort when they can't be serviced.
1001    # This could probably be validated by dispatcher using validvalues.
1002
1003    cl = repo.changelog
1004    outgoing = resolvenodes(repo, revisions)
1005    publishing = repo.publishing()
1006
1007    if outgoing:
1008        repo.hook(b'preoutgoing', throw=True, source=b'serve')
1009
1010    yield {
1011        b'totalitems': len(outgoing),
1012    }
1013
1014    # The phases of nodes already transferred to the client may have changed
1015    # since the client last requested data. We send phase-only records
1016    # for these revisions, if requested.
1017    # TODO actually do this. We'll probably want to emit phase heads
1018    # in the ancestry set of the outgoing revisions. This will ensure
1019    # that phase updates within that set are seen.
1020    if b'phase' in fields:
1021        pass
1022
1023    nodebookmarks = {}
1024    for mark, node in repo._bookmarks.items():
1025        nodebookmarks.setdefault(node, set()).add(mark)
1026
1027    # It is already topologically sorted by revision number.
1028    for node in outgoing:
1029        d = {
1030            b'node': node,
1031        }
1032
1033        if b'parents' in fields:
1034            d[b'parents'] = cl.parents(node)
1035
1036        if b'phase' in fields:
1037            if publishing:
1038                d[b'phase'] = b'public'
1039            else:
1040                ctx = repo[node]
1041                d[b'phase'] = ctx.phasestr()
1042
1043        if b'bookmarks' in fields and node in nodebookmarks:
1044            d[b'bookmarks'] = sorted(nodebookmarks[node])
1045            del nodebookmarks[node]
1046
1047        followingmeta = []
1048        followingdata = []
1049
1050        if b'revision' in fields:
1051            revisiondata = cl.revision(node)
1052            followingmeta.append((b'revision', len(revisiondata)))
1053            followingdata.append(revisiondata)
1054
1055        # TODO make it possible for extensions to wrap a function or register
1056        # a handler to service custom fields.
1057
1058        if followingmeta:
1059            d[b'fieldsfollowing'] = followingmeta
1060
1061        yield d
1062
1063        for extra in followingdata:
1064            yield extra
1065
1066    # If requested, send bookmarks from nodes that didn't have revision
1067    # data sent so receiver is aware of any bookmark updates.
1068    if b'bookmarks' in fields:
1069        for node, marks in sorted(pycompat.iteritems(nodebookmarks)):
1070            yield {
1071                b'node': node,
1072                b'bookmarks': sorted(marks),
1073            }
1074
1075
1076class FileAccessError(Exception):
1077    """Represents an error accessing a specific file."""
1078
1079    def __init__(self, path, msg, args):
1080        self.path = path
1081        self.msg = msg
1082        self.args = args
1083
1084
1085def getfilestore(repo, proto, path):
1086    """Obtain a file storage object for use with wire protocol.
1087
1088    Exists as a standalone function so extensions can monkeypatch to add
1089    access control.
1090    """
1091    # This seems to work even if the file doesn't exist. So catch
1092    # "empty" files and return an error.
1093    fl = repo.file(path)
1094
1095    if not len(fl):
1096        raise FileAccessError(path, b'unknown file: %s', (path,))
1097
1098    return fl
1099
1100
1101def emitfilerevisions(repo, path, revisions, linknodes, fields):
1102    for revision in revisions:
1103        d = {
1104            b'node': revision.node,
1105        }
1106
1107        if b'parents' in fields:
1108            d[b'parents'] = [revision.p1node, revision.p2node]
1109
1110        if b'linknode' in fields:
1111            d[b'linknode'] = linknodes[revision.node]
1112
1113        followingmeta = []
1114        followingdata = []
1115
1116        if b'revision' in fields:
1117            if revision.revision is not None:
1118                followingmeta.append((b'revision', len(revision.revision)))
1119                followingdata.append(revision.revision)
1120            else:
1121                d[b'deltabasenode'] = revision.basenode
1122                followingmeta.append((b'delta', len(revision.delta)))
1123                followingdata.append(revision.delta)
1124
1125        if followingmeta:
1126            d[b'fieldsfollowing'] = followingmeta
1127
1128        yield d
1129
1130        for extra in followingdata:
1131            yield extra
1132
1133
1134def makefilematcher(repo, pathfilter):
1135    """Construct a matcher from a path filter dict."""
1136
1137    # Validate values.
1138    if pathfilter:
1139        for key in (b'include', b'exclude'):
1140            for pattern in pathfilter.get(key, []):
1141                if not pattern.startswith((b'path:', b'rootfilesin:')):
1142                    raise error.WireprotoCommandError(
1143                        b'%s pattern must begin with `path:` or `rootfilesin:`; '
1144                        b'got %s',
1145                        (key, pattern),
1146                    )
1147
1148    if pathfilter:
1149        matcher = matchmod.match(
1150            repo.root,
1151            b'',
1152            include=pathfilter.get(b'include', []),
1153            exclude=pathfilter.get(b'exclude', []),
1154        )
1155    else:
1156        matcher = matchmod.match(repo.root, b'')
1157
1158    # Requested patterns could include files not in the local store. So
1159    # filter those out.
1160    return repo.narrowmatch(matcher)
1161
1162
1163@wireprotocommand(
1164    b'filedata',
1165    args={
1166        b'haveparents': {
1167            b'type': b'bool',
1168            b'default': lambda: False,
1169            b'example': True,
1170        },
1171        b'nodes': {
1172            b'type': b'list',
1173            b'example': [b'0123456...'],
1174        },
1175        b'fields': {
1176            b'type': b'set',
1177            b'default': set,
1178            b'example': {b'parents', b'revision'},
1179            b'validvalues': {b'parents', b'revision', b'linknode'},
1180        },
1181        b'path': {
1182            b'type': b'bytes',
1183            b'example': b'foo.txt',
1184        },
1185    },
1186    permission=b'pull',
1187    # TODO censoring a file revision won't invalidate the cache.
1188    # Figure out a way to take censoring into account when deriving
1189    # the cache key.
1190    cachekeyfn=makecommandcachekeyfn(b'filedata', 1, allargs=True),
1191)
1192def filedata(repo, proto, haveparents, nodes, fields, path):
1193    # TODO this API allows access to file revisions that are attached to
1194    # secret changesets. filesdata does not have this problem. Maybe this
1195    # API should be deleted?
1196
1197    try:
1198        # Extensions may wish to access the protocol handler.
1199        store = getfilestore(repo, proto, path)
1200    except FileAccessError as e:
1201        raise error.WireprotoCommandError(e.msg, e.args)
1202
1203    clnode = repo.changelog.node
1204    linknodes = {}
1205
1206    # Validate requested nodes.
1207    for node in nodes:
1208        try:
1209            store.rev(node)
1210        except error.LookupError:
1211            raise error.WireprotoCommandError(
1212                b'unknown file node: %s', (hex(node),)
1213            )
1214
1215        # TODO by creating the filectx against a specific file revision
1216        # instead of changeset, linkrev() is always used. This is wrong for
1217        # cases where linkrev() may refer to a hidden changeset. But since this
1218        # API doesn't know anything about changesets, we're not sure how to
1219        # disambiguate the linknode. Perhaps we should delete this API?
1220        fctx = repo.filectx(path, fileid=node)
1221        linknodes[node] = clnode(fctx.introrev())
1222
1223    revisions = store.emitrevisions(
1224        nodes,
1225        revisiondata=b'revision' in fields,
1226        assumehaveparentrevisions=haveparents,
1227    )
1228
1229    yield {
1230        b'totalitems': len(nodes),
1231    }
1232
1233    for o in emitfilerevisions(repo, path, revisions, linknodes, fields):
1234        yield o
1235
1236
1237def filesdatacapabilities(repo, proto):
1238    batchsize = repo.ui.configint(
1239        b'experimental', b'server.filesdata.recommended-batch-size'
1240    )
1241    return {
1242        b'recommendedbatchsize': batchsize,
1243    }
1244
1245
1246@wireprotocommand(
1247    b'filesdata',
1248    args={
1249        b'haveparents': {
1250            b'type': b'bool',
1251            b'default': lambda: False,
1252            b'example': True,
1253        },
1254        b'fields': {
1255            b'type': b'set',
1256            b'default': set,
1257            b'example': {b'parents', b'revision'},
1258            b'validvalues': {
1259                b'firstchangeset',
1260                b'linknode',
1261                b'parents',
1262                b'revision',
1263            },
1264        },
1265        b'pathfilter': {
1266            b'type': b'dict',
1267            b'default': lambda: None,
1268            b'example': {b'include': [b'path:tests']},
1269        },
1270        b'revisions': {
1271            b'type': b'list',
1272            b'example': [
1273                {
1274                    b'type': b'changesetexplicit',
1275                    b'nodes': [b'abcdef...'],
1276                }
1277            ],
1278        },
1279    },
1280    permission=b'pull',
1281    # TODO censoring a file revision won't invalidate the cache.
1282    # Figure out a way to take censoring into account when deriving
1283    # the cache key.
1284    cachekeyfn=makecommandcachekeyfn(b'filesdata', 1, allargs=True),
1285    extracapabilitiesfn=filesdatacapabilities,
1286)
1287def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1288    # TODO This should operate on a repo that exposes obsolete changesets. There
1289    # is a race between a client making a push that obsoletes a changeset and
1290    # another client fetching files data for that changeset. If a client has a
1291    # changeset, it should probably be allowed to access files data for that
1292    # changeset.
1293
1294    outgoing = resolvenodes(repo, revisions)
1295    filematcher = makefilematcher(repo, pathfilter)
1296
1297    # path -> {fnode: linknode}
1298    fnodes = collections.defaultdict(dict)
1299
1300    # We collect the set of relevant file revisions by iterating the changeset
1301    # revisions and either walking the set of files recorded in the changeset
1302    # or by walking the manifest at that revision. There is probably room for a
1303    # storage-level API to request this data, as it can be expensive to compute
1304    # and would benefit from caching or alternate storage from what revlogs
1305    # provide.
1306    for node in outgoing:
1307        ctx = repo[node]
1308        mctx = ctx.manifestctx()
1309        md = mctx.read()
1310
1311        if haveparents:
1312            checkpaths = ctx.files()
1313        else:
1314            checkpaths = md.keys()
1315
1316        for path in checkpaths:
1317            fnode = md[path]
1318
1319            if path in fnodes and fnode in fnodes[path]:
1320                continue
1321
1322            if not filematcher(path):
1323                continue
1324
1325            fnodes[path].setdefault(fnode, node)
1326
1327    yield {
1328        b'totalpaths': len(fnodes),
1329        b'totalitems': sum(len(v) for v in fnodes.values()),
1330    }
1331
1332    for path, filenodes in sorted(fnodes.items()):
1333        try:
1334            store = getfilestore(repo, proto, path)
1335        except FileAccessError as e:
1336            raise error.WireprotoCommandError(e.msg, e.args)
1337
1338        yield {
1339            b'path': path,
1340            b'totalitems': len(filenodes),
1341        }
1342
1343        revisions = store.emitrevisions(
1344            filenodes.keys(),
1345            revisiondata=b'revision' in fields,
1346            assumehaveparentrevisions=haveparents,
1347        )
1348
1349        for o in emitfilerevisions(repo, path, revisions, filenodes, fields):
1350            yield o
1351
1352
1353@wireprotocommand(
1354    b'heads',
1355    args={
1356        b'publiconly': {
1357            b'type': b'bool',
1358            b'default': lambda: False,
1359            b'example': False,
1360        },
1361    },
1362    permission=b'pull',
1363)
1364def headsv2(repo, proto, publiconly):
1365    if publiconly:
1366        repo = repo.filtered(b'immutable')
1367
1368    yield repo.heads()
1369
1370
1371@wireprotocommand(
1372    b'known',
1373    args={
1374        b'nodes': {
1375            b'type': b'list',
1376            b'default': list,
1377            b'example': [b'deadbeef'],
1378        },
1379    },
1380    permission=b'pull',
1381)
1382def knownv2(repo, proto, nodes):
1383    result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1384    yield result
1385
1386
1387@wireprotocommand(
1388    b'listkeys',
1389    args={
1390        b'namespace': {
1391            b'type': b'bytes',
1392            b'example': b'ns',
1393        },
1394    },
1395    permission=b'pull',
1396)
1397def listkeysv2(repo, proto, namespace):
1398    keys = repo.listkeys(encoding.tolocal(namespace))
1399    keys = {
1400        encoding.fromlocal(k): encoding.fromlocal(v)
1401        for k, v in pycompat.iteritems(keys)
1402    }
1403
1404    yield keys
1405
1406
1407@wireprotocommand(
1408    b'lookup',
1409    args={
1410        b'key': {
1411            b'type': b'bytes',
1412            b'example': b'foo',
1413        },
1414    },
1415    permission=b'pull',
1416)
1417def lookupv2(repo, proto, key):
1418    key = encoding.tolocal(key)
1419
1420    # TODO handle exception.
1421    node = repo.lookup(key)
1422
1423    yield node
1424
1425
1426def manifestdatacapabilities(repo, proto):
1427    batchsize = repo.ui.configint(
1428        b'experimental', b'server.manifestdata.recommended-batch-size'
1429    )
1430
1431    return {
1432        b'recommendedbatchsize': batchsize,
1433    }
1434
1435
1436@wireprotocommand(
1437    b'manifestdata',
1438    args={
1439        b'nodes': {
1440            b'type': b'list',
1441            b'example': [b'0123456...'],
1442        },
1443        b'haveparents': {
1444            b'type': b'bool',
1445            b'default': lambda: False,
1446            b'example': True,
1447        },
1448        b'fields': {
1449            b'type': b'set',
1450            b'default': set,
1451            b'example': {b'parents', b'revision'},
1452            b'validvalues': {b'parents', b'revision'},
1453        },
1454        b'tree': {
1455            b'type': b'bytes',
1456            b'example': b'',
1457        },
1458    },
1459    permission=b'pull',
1460    cachekeyfn=makecommandcachekeyfn(b'manifestdata', 1, allargs=True),
1461    extracapabilitiesfn=manifestdatacapabilities,
1462)
1463def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1464    store = repo.manifestlog.getstorage(tree)
1465
1466    # Validate the node is known and abort on unknown revisions.
1467    for node in nodes:
1468        try:
1469            store.rev(node)
1470        except error.LookupError:
1471            raise error.WireprotoCommandError(b'unknown node: %s', (node,))
1472
1473    revisions = store.emitrevisions(
1474        nodes,
1475        revisiondata=b'revision' in fields,
1476        assumehaveparentrevisions=haveparents,
1477    )
1478
1479    yield {
1480        b'totalitems': len(nodes),
1481    }
1482
1483    for revision in revisions:
1484        d = {
1485            b'node': revision.node,
1486        }
1487
1488        if b'parents' in fields:
1489            d[b'parents'] = [revision.p1node, revision.p2node]
1490
1491        followingmeta = []
1492        followingdata = []
1493
1494        if b'revision' in fields:
1495            if revision.revision is not None:
1496                followingmeta.append((b'revision', len(revision.revision)))
1497                followingdata.append(revision.revision)
1498            else:
1499                d[b'deltabasenode'] = revision.basenode
1500                followingmeta.append((b'delta', len(revision.delta)))
1501                followingdata.append(revision.delta)
1502
1503        if followingmeta:
1504            d[b'fieldsfollowing'] = followingmeta
1505
1506        yield d
1507
1508        for extra in followingdata:
1509            yield extra
1510
1511
1512@wireprotocommand(
1513    b'pushkey',
1514    args={
1515        b'namespace': {
1516            b'type': b'bytes',
1517            b'example': b'ns',
1518        },
1519        b'key': {
1520            b'type': b'bytes',
1521            b'example': b'key',
1522        },
1523        b'old': {
1524            b'type': b'bytes',
1525            b'example': b'old',
1526        },
1527        b'new': {
1528            b'type': b'bytes',
1529            b'example': b'new',
1530        },
1531    },
1532    permission=b'push',
1533)
1534def pushkeyv2(repo, proto, namespace, key, old, new):
1535    # TODO handle ui output redirection
1536    yield repo.pushkey(
1537        encoding.tolocal(namespace),
1538        encoding.tolocal(key),
1539        encoding.tolocal(old),
1540        encoding.tolocal(new),
1541    )
1542
1543
1544@wireprotocommand(
1545    b'rawstorefiledata',
1546    args={
1547        b'files': {
1548            b'type': b'list',
1549            b'example': [b'changelog', b'manifestlog'],
1550        },
1551        b'pathfilter': {
1552            b'type': b'list',
1553            b'default': lambda: None,
1554            b'example': {b'include': [b'path:tests']},
1555        },
1556    },
1557    permission=b'pull',
1558)
1559def rawstorefiledata(repo, proto, files, pathfilter):
1560    if not streamclone.allowservergeneration(repo):
1561        raise error.WireprotoCommandError(b'stream clone is disabled')
1562
1563    # TODO support dynamically advertising what store files "sets" are
1564    # available. For now, we support changelog, manifestlog, and files.
1565    files = set(files)
1566    allowedfiles = {b'changelog', b'manifestlog'}
1567
1568    unsupported = files - allowedfiles
1569    if unsupported:
1570        raise error.WireprotoCommandError(
1571            b'unknown file type: %s', (b', '.join(sorted(unsupported)),)
1572        )
1573
1574    with repo.lock():
1575        topfiles = list(repo.store.topfiles())
1576
1577    sendfiles = []
1578    totalsize = 0
1579
1580    # TODO this is a bunch of storage layer interface abstractions because
1581    # it assumes revlogs.
1582    for rl_type, name, size in topfiles:
1583        # XXX use the `rl_type` for that
1584        if b'changelog' in files and name.startswith(b'00changelog'):
1585            pass
1586        elif b'manifestlog' in files and name.startswith(b'00manifest'):
1587            pass
1588        else:
1589            continue
1590
1591        sendfiles.append((b'store', name, size))
1592        totalsize += size
1593
1594    yield {
1595        b'filecount': len(sendfiles),
1596        b'totalsize': totalsize,
1597    }
1598
1599    for location, name, size in sendfiles:
1600        yield {
1601            b'location': location,
1602            b'path': name,
1603            b'size': size,
1604        }
1605
1606        # We have to use a closure for this to ensure the context manager is
1607        # closed only after sending the final chunk.
1608        def getfiledata():
1609            with repo.svfs(name, b'rb', auditpath=False) as fh:
1610                for chunk in util.filechunkiter(fh, limit=size):
1611                    yield chunk
1612
1613        yield wireprototypes.indefinitebytestringresponse(getfiledata())
1614