1# wireprotoframing.py - unified framing protocol for wire protocol
2#
3# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4#
5# This software may be used and distributed according to the terms of the
6# GNU General Public License version 2 or any later version.
7
8# This file contains functionality to support the unified frame-based wire
9# protocol. For details about the protocol, see
10# `hg help internals.wireprotocol`.
11
12from __future__ import absolute_import
13
14import collections
15import struct
16
17from .i18n import _
18from .pycompat import getattr
19from .thirdparty import attr
20from . import (
21    encoding,
22    error,
23    pycompat,
24    util,
25    wireprototypes,
26)
27from .utils import (
28    cborutil,
29    stringutil,
30)
31
32FRAME_HEADER_SIZE = 8
33DEFAULT_MAX_FRAME_SIZE = 32768
34
35STREAM_FLAG_BEGIN_STREAM = 0x01
36STREAM_FLAG_END_STREAM = 0x02
37STREAM_FLAG_ENCODING_APPLIED = 0x04
38
39STREAM_FLAGS = {
40    b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41    b'stream-end': STREAM_FLAG_END_STREAM,
42    b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43}
44
45FRAME_TYPE_COMMAND_REQUEST = 0x01
46FRAME_TYPE_COMMAND_DATA = 0x02
47FRAME_TYPE_COMMAND_RESPONSE = 0x03
48FRAME_TYPE_ERROR_RESPONSE = 0x05
49FRAME_TYPE_TEXT_OUTPUT = 0x06
50FRAME_TYPE_PROGRESS = 0x07
51FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
52FRAME_TYPE_STREAM_SETTINGS = 0x09
53
54FRAME_TYPES = {
55    b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56    b'command-data': FRAME_TYPE_COMMAND_DATA,
57    b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58    b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59    b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60    b'progress': FRAME_TYPE_PROGRESS,
61    b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
62    b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63}
64
65FLAG_COMMAND_REQUEST_NEW = 0x01
66FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69
70FLAGS_COMMAND_REQUEST = {
71    b'new': FLAG_COMMAND_REQUEST_NEW,
72    b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73    b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74    b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75}
76
77FLAG_COMMAND_DATA_CONTINUATION = 0x01
78FLAG_COMMAND_DATA_EOS = 0x02
79
80FLAGS_COMMAND_DATA = {
81    b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82    b'eos': FLAG_COMMAND_DATA_EOS,
83}
84
85FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86FLAG_COMMAND_RESPONSE_EOS = 0x02
87
88FLAGS_COMMAND_RESPONSE = {
89    b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90    b'eos': FLAG_COMMAND_RESPONSE_EOS,
91}
92
93FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
94FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
95
96FLAGS_SENDER_PROTOCOL_SETTINGS = {
97    b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
98    b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
99}
100
101FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
102FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
103
104FLAGS_STREAM_ENCODING_SETTINGS = {
105    b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
106    b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
107}
108
109# Maps frame types to their available flags.
110FRAME_TYPE_FLAGS = {
111    FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112    FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113    FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114    FRAME_TYPE_ERROR_RESPONSE: {},
115    FRAME_TYPE_TEXT_OUTPUT: {},
116    FRAME_TYPE_PROGRESS: {},
117    FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
118    FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
119}
120
121ARGUMENT_RECORD_HEADER = struct.Struct('<HH')
122
123
124def humanflags(mapping, value):
125    """Convert a numeric flags value to a human value, using a mapping table."""
126    namemap = {v: k for k, v in pycompat.iteritems(mapping)}
127    flags = []
128    val = 1
129    while value >= val:
130        if value & val:
131            flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
132        val <<= 1
133
134    return b'|'.join(flags)
135
136
137@attr.s(slots=True)
138class frameheader(object):
139    """Represents the data in a frame header."""
140
141    length = attr.ib()
142    requestid = attr.ib()
143    streamid = attr.ib()
144    streamflags = attr.ib()
145    typeid = attr.ib()
146    flags = attr.ib()
147
148
149@attr.s(slots=True, repr=False)
150class frame(object):
151    """Represents a parsed frame."""
152
153    requestid = attr.ib()
154    streamid = attr.ib()
155    streamflags = attr.ib()
156    typeid = attr.ib()
157    flags = attr.ib()
158    payload = attr.ib()
159
160    @encoding.strmethod
161    def __repr__(self):
162        typename = b'<unknown 0x%02x>' % self.typeid
163        for name, value in pycompat.iteritems(FRAME_TYPES):
164            if value == self.typeid:
165                typename = name
166                break
167
168        return (
169            b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
170            b'type=%s; flags=%s)'
171            % (
172                len(self.payload),
173                self.requestid,
174                self.streamid,
175                humanflags(STREAM_FLAGS, self.streamflags),
176                typename,
177                humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
178            )
179        )
180
181
182def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
183    """Assemble a frame into a byte array."""
184    # TODO assert size of payload.
185    frame = bytearray(FRAME_HEADER_SIZE + len(payload))
186
187    # 24 bits length
188    # 16 bits request id
189    # 8 bits stream id
190    # 8 bits stream flags
191    # 4 bits type
192    # 4 bits flags
193
194    l = struct.pack('<I', len(payload))
195    frame[0:3] = l[0:3]
196    struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags)
197    frame[7] = (typeid << 4) | flags
198    frame[8:] = payload
199
200    return frame
201
202
203def makeframefromhumanstring(s):
204    """Create a frame from a human readable string
205
206    Strings have the form:
207
208        <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
209
210    This can be used by user-facing applications and tests for creating
211    frames easily without having to type out a bunch of constants.
212
213    Request ID and stream IDs are integers.
214
215    Stream flags, frame type, and flags can be specified by integer or
216    named constant.
217
218    Flags can be delimited by `|` to bitwise OR them together.
219
220    If the payload begins with ``cbor:``, the following string will be
221    evaluated as Python literal and the resulting object will be fed into
222    a CBOR encoder. Otherwise, the payload is interpreted as a Python
223    byte string literal.
224    """
225    fields = s.split(b' ', 5)
226    requestid, streamid, streamflags, frametype, frameflags, payload = fields
227
228    requestid = int(requestid)
229    streamid = int(streamid)
230
231    finalstreamflags = 0
232    for flag in streamflags.split(b'|'):
233        if flag in STREAM_FLAGS:
234            finalstreamflags |= STREAM_FLAGS[flag]
235        else:
236            finalstreamflags |= int(flag)
237
238    if frametype in FRAME_TYPES:
239        frametype = FRAME_TYPES[frametype]
240    else:
241        frametype = int(frametype)
242
243    finalflags = 0
244    validflags = FRAME_TYPE_FLAGS[frametype]
245    for flag in frameflags.split(b'|'):
246        if flag in validflags:
247            finalflags |= validflags[flag]
248        else:
249            finalflags |= int(flag)
250
251    if payload.startswith(b'cbor:'):
252        payload = b''.join(
253            cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
254        )
255
256    else:
257        payload = stringutil.unescapestr(payload)
258
259    return makeframe(
260        requestid=requestid,
261        streamid=streamid,
262        streamflags=finalstreamflags,
263        typeid=frametype,
264        flags=finalflags,
265        payload=payload,
266    )
267
268
269def parseheader(data):
270    """Parse a unified framing protocol frame header from a buffer.
271
272    The header is expected to be in the buffer at offset 0 and the
273    buffer is expected to be large enough to hold a full header.
274    """
275    # 24 bits payload length (little endian)
276    # 16 bits request ID
277    # 8 bits stream ID
278    # 8 bits stream flags
279    # 4 bits frame type
280    # 4 bits frame flags
281    # ... payload
282    framelength = data[0] + 256 * data[1] + 16384 * data[2]
283    requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3)
284    typeflags = data[7]
285
286    frametype = (typeflags & 0xF0) >> 4
287    frameflags = typeflags & 0x0F
288
289    return frameheader(
290        framelength, requestid, streamid, streamflags, frametype, frameflags
291    )
292
293
294def readframe(fh):
295    """Read a unified framing protocol frame from a file object.
296
297    Returns a 3-tuple of (type, flags, payload) for the decoded frame or
298    None if no frame is available. May raise if a malformed frame is
299    seen.
300    """
301    header = bytearray(FRAME_HEADER_SIZE)
302
303    readcount = fh.readinto(header)
304
305    if readcount == 0:
306        return None
307
308    if readcount != FRAME_HEADER_SIZE:
309        raise error.Abort(
310            _(b'received incomplete frame: got %d bytes: %s')
311            % (readcount, header)
312        )
313
314    h = parseheader(header)
315
316    payload = fh.read(h.length)
317    if len(payload) != h.length:
318        raise error.Abort(
319            _(b'frame length error: expected %d; got %d')
320            % (h.length, len(payload))
321        )
322
323    return frame(
324        h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
325    )
326
327
328def createcommandframes(
329    stream,
330    requestid,
331    cmd,
332    args,
333    datafh=None,
334    maxframesize=DEFAULT_MAX_FRAME_SIZE,
335    redirect=None,
336):
337    """Create frames necessary to transmit a request to run a command.
338
339    This is a generator of bytearrays. Each item represents a frame
340    ready to be sent over the wire to a peer.
341    """
342    data = {b'name': cmd}
343    if args:
344        data[b'args'] = args
345
346    if redirect:
347        data[b'redirect'] = redirect
348
349    data = b''.join(cborutil.streamencode(data))
350
351    offset = 0
352
353    while True:
354        flags = 0
355
356        # Must set new or continuation flag.
357        if not offset:
358            flags |= FLAG_COMMAND_REQUEST_NEW
359        else:
360            flags |= FLAG_COMMAND_REQUEST_CONTINUATION
361
362        # Data frames is set on all frames.
363        if datafh:
364            flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
365
366        payload = data[offset : offset + maxframesize]
367        offset += len(payload)
368
369        if len(payload) == maxframesize and offset < len(data):
370            flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
371
372        yield stream.makeframe(
373            requestid=requestid,
374            typeid=FRAME_TYPE_COMMAND_REQUEST,
375            flags=flags,
376            payload=payload,
377        )
378
379        if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
380            break
381
382    if datafh:
383        while True:
384            data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
385
386            done = False
387            if len(data) == DEFAULT_MAX_FRAME_SIZE:
388                flags = FLAG_COMMAND_DATA_CONTINUATION
389            else:
390                flags = FLAG_COMMAND_DATA_EOS
391                assert datafh.read(1) == b''
392                done = True
393
394            yield stream.makeframe(
395                requestid=requestid,
396                typeid=FRAME_TYPE_COMMAND_DATA,
397                flags=flags,
398                payload=data,
399            )
400
401            if done:
402                break
403
404
405def createcommandresponseokframe(stream, requestid):
406    overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
407
408    if stream.streamsettingssent:
409        overall = stream.encode(overall)
410        encoded = True
411
412        if not overall:
413            return None
414    else:
415        encoded = False
416
417    return stream.makeframe(
418        requestid=requestid,
419        typeid=FRAME_TYPE_COMMAND_RESPONSE,
420        flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
421        payload=overall,
422        encoded=encoded,
423    )
424
425
426def createcommandresponseeosframes(
427    stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
428):
429    """Create an empty payload frame representing command end-of-stream."""
430    payload = stream.flush()
431
432    offset = 0
433    while True:
434        chunk = payload[offset : offset + maxframesize]
435        offset += len(chunk)
436
437        done = offset == len(payload)
438
439        if done:
440            flags = FLAG_COMMAND_RESPONSE_EOS
441        else:
442            flags = FLAG_COMMAND_RESPONSE_CONTINUATION
443
444        yield stream.makeframe(
445            requestid=requestid,
446            typeid=FRAME_TYPE_COMMAND_RESPONSE,
447            flags=flags,
448            payload=chunk,
449            encoded=payload != b'',
450        )
451
452        if done:
453            break
454
455
456def createalternatelocationresponseframe(stream, requestid, location):
457    data = {
458        b'status': b'redirect',
459        b'location': {
460            b'url': location.url,
461            b'mediatype': location.mediatype,
462        },
463    }
464
465    for a in (
466        'size',
467        'fullhashes',
468        'fullhashseed',
469        'serverdercerts',
470        'servercadercerts',
471    ):
472        value = getattr(location, a)
473        if value is not None:
474            data[b'location'][pycompat.bytestr(a)] = value
475
476    payload = b''.join(cborutil.streamencode(data))
477
478    if stream.streamsettingssent:
479        payload = stream.encode(payload)
480        encoded = True
481    else:
482        encoded = False
483
484    return stream.makeframe(
485        requestid=requestid,
486        typeid=FRAME_TYPE_COMMAND_RESPONSE,
487        flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
488        payload=payload,
489        encoded=encoded,
490    )
491
492
493def createcommanderrorresponse(stream, requestid, message, args=None):
494    # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
495    # formatting works consistently?
496    m = {
497        b'status': b'error',
498        b'error': {
499            b'message': message,
500        },
501    }
502
503    if args:
504        m[b'error'][b'args'] = args
505
506    overall = b''.join(cborutil.streamencode(m))
507
508    yield stream.makeframe(
509        requestid=requestid,
510        typeid=FRAME_TYPE_COMMAND_RESPONSE,
511        flags=FLAG_COMMAND_RESPONSE_EOS,
512        payload=overall,
513    )
514
515
516def createerrorframe(stream, requestid, msg, errtype):
517    # TODO properly handle frame size limits.
518    assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
519
520    payload = b''.join(
521        cborutil.streamencode(
522            {
523                b'type': errtype,
524                b'message': [{b'msg': msg}],
525            }
526        )
527    )
528
529    yield stream.makeframe(
530        requestid=requestid,
531        typeid=FRAME_TYPE_ERROR_RESPONSE,
532        flags=0,
533        payload=payload,
534    )
535
536
537def createtextoutputframe(
538    stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
539):
540    """Create a text output frame to render text to people.
541
542    ``atoms`` is a 3-tuple of (formatting string, args, labels).
543
544    The formatting string contains ``%s`` tokens to be replaced by the
545    corresponding indexed entry in ``args``. ``labels`` is an iterable of
546    formatters to be applied at rendering time. In terms of the ``ui``
547    class, each atom corresponds to a ``ui.write()``.
548    """
549    atomdicts = []
550
551    for (formatting, args, labels) in atoms:
552        # TODO look for localstr, other types here?
553
554        if not isinstance(formatting, bytes):
555            raise ValueError(b'must use bytes formatting strings')
556        for arg in args:
557            if not isinstance(arg, bytes):
558                raise ValueError(b'must use bytes for arguments')
559        for label in labels:
560            if not isinstance(label, bytes):
561                raise ValueError(b'must use bytes for labels')
562
563        # Formatting string must be ASCII.
564        formatting = formatting.decode('ascii', 'replace').encode('ascii')
565
566        # Arguments must be UTF-8.
567        args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args]
568
569        # Labels must be ASCII.
570        labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels]
571
572        atom = {b'msg': formatting}
573        if args:
574            atom[b'args'] = args
575        if labels:
576            atom[b'labels'] = labels
577
578        atomdicts.append(atom)
579
580    payload = b''.join(cborutil.streamencode(atomdicts))
581
582    if len(payload) > maxframesize:
583        raise ValueError(b'cannot encode data in a single frame')
584
585    yield stream.makeframe(
586        requestid=requestid,
587        typeid=FRAME_TYPE_TEXT_OUTPUT,
588        flags=0,
589        payload=payload,
590    )
591
592
593class bufferingcommandresponseemitter(object):
594    """Helper object to emit command response frames intelligently.
595
596    Raw command response data is likely emitted in chunks much smaller
597    than what can fit in a single frame. This class exists to buffer
598    chunks until enough data is available to fit in a single frame.
599
600    TODO we'll need something like this when compression is supported.
601    So it might make sense to implement this functionality at the stream
602    level.
603    """
604
605    def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
606        self._stream = stream
607        self._requestid = requestid
608        self._maxsize = maxframesize
609        self._chunks = []
610        self._chunkssize = 0
611
612    def send(self, data):
613        """Send new data for emission.
614
615        Is a generator of new frames that were derived from the new input.
616
617        If the special input ``None`` is received, flushes all buffered
618        data to frames.
619        """
620
621        if data is None:
622            for frame in self._flush():
623                yield frame
624            return
625
626        data = self._stream.encode(data)
627
628        # There is a ton of potential to do more complicated things here.
629        # Our immediate goal is to coalesce small chunks into big frames,
630        # not achieve the fewest number of frames possible. So we go with
631        # a simple implementation:
632        #
633        # * If a chunk is too large for a frame, we flush and emit frames
634        #   for the new chunk.
635        # * If a chunk can be buffered without total buffered size limits
636        #   being exceeded, we do that.
637        # * If a chunk causes us to go over our buffering limit, we flush
638        #   and then buffer the new chunk.
639
640        if not data:
641            return
642
643        if len(data) > self._maxsize:
644            for frame in self._flush():
645                yield frame
646
647            # Now emit frames for the big chunk.
648            offset = 0
649            while True:
650                chunk = data[offset : offset + self._maxsize]
651                offset += len(chunk)
652
653                yield self._stream.makeframe(
654                    self._requestid,
655                    typeid=FRAME_TYPE_COMMAND_RESPONSE,
656                    flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
657                    payload=chunk,
658                    encoded=True,
659                )
660
661                if offset == len(data):
662                    return
663
664        # If we don't have enough to constitute a full frame, buffer and
665        # return.
666        if len(data) + self._chunkssize < self._maxsize:
667            self._chunks.append(data)
668            self._chunkssize += len(data)
669            return
670
671        # Else flush what we have and buffer the new chunk. We could do
672        # something more intelligent here, like break the chunk. Let's
673        # keep things simple for now.
674        for frame in self._flush():
675            yield frame
676
677        self._chunks.append(data)
678        self._chunkssize = len(data)
679
680    def _flush(self):
681        payload = b''.join(self._chunks)
682        assert len(payload) <= self._maxsize
683
684        self._chunks[:] = []
685        self._chunkssize = 0
686
687        if not payload:
688            return
689
690        yield self._stream.makeframe(
691            self._requestid,
692            typeid=FRAME_TYPE_COMMAND_RESPONSE,
693            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
694            payload=payload,
695            encoded=True,
696        )
697
698
699# TODO consider defining encoders/decoders using the util.compressionengine
700# mechanism.
701
702
703class identityencoder(object):
704    """Encoder for the "identity" stream encoding profile."""
705
706    def __init__(self, ui):
707        pass
708
709    def encode(self, data):
710        return data
711
712    def flush(self):
713        return b''
714
715    def finish(self):
716        return b''
717
718
719class identitydecoder(object):
720    """Decoder for the "identity" stream encoding profile."""
721
722    def __init__(self, ui, extraobjs):
723        if extraobjs:
724            raise error.Abort(
725                _(b'identity decoder received unexpected additional values')
726            )
727
728    def decode(self, data):
729        return data
730
731
732class zlibencoder(object):
733    def __init__(self, ui):
734        import zlib
735
736        self._zlib = zlib
737        self._compressor = zlib.compressobj()
738
739    def encode(self, data):
740        return self._compressor.compress(data)
741
742    def flush(self):
743        # Z_SYNC_FLUSH doesn't reset compression context, which is
744        # what we want.
745        return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
746
747    def finish(self):
748        res = self._compressor.flush(self._zlib.Z_FINISH)
749        self._compressor = None
750        return res
751
752
753class zlibdecoder(object):
754    def __init__(self, ui, extraobjs):
755        import zlib
756
757        if extraobjs:
758            raise error.Abort(
759                _(b'zlib decoder received unexpected additional values')
760            )
761
762        self._decompressor = zlib.decompressobj()
763
764    def decode(self, data):
765        # Python 2's zlib module doesn't use the buffer protocol and can't
766        # handle all bytes-like types.
767        if not pycompat.ispy3 and isinstance(data, bytearray):
768            data = bytes(data)
769
770        return self._decompressor.decompress(data)
771
772
773class zstdbaseencoder(object):
774    def __init__(self, level):
775        from . import zstd
776
777        self._zstd = zstd
778        cctx = zstd.ZstdCompressor(level=level)
779        self._compressor = cctx.compressobj()
780
781    def encode(self, data):
782        return self._compressor.compress(data)
783
784    def flush(self):
785        # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
786        # compressor and allows a decompressor to access all encoded data
787        # up to this point.
788        return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
789
790    def finish(self):
791        res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
792        self._compressor = None
793        return res
794
795
796class zstd8mbencoder(zstdbaseencoder):
797    def __init__(self, ui):
798        super(zstd8mbencoder, self).__init__(3)
799
800
801class zstdbasedecoder(object):
802    def __init__(self, maxwindowsize):
803        from . import zstd
804
805        dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
806        self._decompressor = dctx.decompressobj()
807
808    def decode(self, data):
809        return self._decompressor.decompress(data)
810
811
812class zstd8mbdecoder(zstdbasedecoder):
813    def __init__(self, ui, extraobjs):
814        if extraobjs:
815            raise error.Abort(
816                _(b'zstd8mb decoder received unexpected additional values')
817            )
818
819        super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
820
821
822# We lazily populate this to avoid excessive module imports when importing
823# this module.
824STREAM_ENCODERS = {}
825STREAM_ENCODERS_ORDER = []
826
827
828def populatestreamencoders():
829    if STREAM_ENCODERS:
830        return
831
832    try:
833        from . import zstd
834
835        zstd.__version__
836    except ImportError:
837        zstd = None
838
839    # zstandard is fastest and is preferred.
840    if zstd:
841        STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
842        STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
843
844    STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
845    STREAM_ENCODERS_ORDER.append(b'zlib')
846
847    STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
848    STREAM_ENCODERS_ORDER.append(b'identity')
849
850
851class stream(object):
852    """Represents a logical unidirectional series of frames."""
853
854    def __init__(self, streamid, active=False):
855        self.streamid = streamid
856        self._active = active
857
858    def makeframe(self, requestid, typeid, flags, payload):
859        """Create a frame to be sent out over this stream.
860
861        Only returns the frame instance. Does not actually send it.
862        """
863        streamflags = 0
864        if not self._active:
865            streamflags |= STREAM_FLAG_BEGIN_STREAM
866            self._active = True
867
868        return makeframe(
869            requestid, self.streamid, streamflags, typeid, flags, payload
870        )
871
872
873class inputstream(stream):
874    """Represents a stream used for receiving data."""
875
876    def __init__(self, streamid, active=False):
877        super(inputstream, self).__init__(streamid, active=active)
878        self._decoder = None
879
880    def setdecoder(self, ui, name, extraobjs):
881        """Set the decoder for this stream.
882
883        Receives the stream profile name and any additional CBOR objects
884        decoded from the stream encoding settings frame payloads.
885        """
886        if name not in STREAM_ENCODERS:
887            raise error.Abort(_(b'unknown stream decoder: %s') % name)
888
889        self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
890
891    def decode(self, data):
892        # Default is identity decoder. We don't bother instantiating one
893        # because it is trivial.
894        if not self._decoder:
895            return data
896
897        return self._decoder.decode(data)
898
899    def flush(self):
900        if not self._decoder:
901            return b''
902
903        return self._decoder.flush()
904
905
906class outputstream(stream):
907    """Represents a stream used for sending data."""
908
909    def __init__(self, streamid, active=False):
910        super(outputstream, self).__init__(streamid, active=active)
911        self.streamsettingssent = False
912        self._encoder = None
913        self._encodername = None
914
915    def setencoder(self, ui, name):
916        """Set the encoder for this stream.
917
918        Receives the stream profile name.
919        """
920        if name not in STREAM_ENCODERS:
921            raise error.Abort(_(b'unknown stream encoder: %s') % name)
922
923        self._encoder = STREAM_ENCODERS[name][0](ui)
924        self._encodername = name
925
926    def encode(self, data):
927        if not self._encoder:
928            return data
929
930        return self._encoder.encode(data)
931
932    def flush(self):
933        if not self._encoder:
934            return b''
935
936        return self._encoder.flush()
937
938    def finish(self):
939        if not self._encoder:
940            return b''
941
942        self._encoder.finish()
943
944    def makeframe(self, requestid, typeid, flags, payload, encoded=False):
945        """Create a frame to be sent out over this stream.
946
947        Only returns the frame instance. Does not actually send it.
948        """
949        streamflags = 0
950        if not self._active:
951            streamflags |= STREAM_FLAG_BEGIN_STREAM
952            self._active = True
953
954        if encoded:
955            if not self.streamsettingssent:
956                raise error.ProgrammingError(
957                    b'attempting to send encoded frame without sending stream '
958                    b'settings'
959                )
960
961            streamflags |= STREAM_FLAG_ENCODING_APPLIED
962
963        if (
964            typeid == FRAME_TYPE_STREAM_SETTINGS
965            and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
966        ):
967            self.streamsettingssent = True
968
969        return makeframe(
970            requestid, self.streamid, streamflags, typeid, flags, payload
971        )
972
973    def makestreamsettingsframe(self, requestid):
974        """Create a stream settings frame for this stream.
975
976        Returns frame data or None if no stream settings frame is needed or has
977        already been sent.
978        """
979        if not self._encoder or self.streamsettingssent:
980            return None
981
982        payload = b''.join(cborutil.streamencode(self._encodername))
983        return self.makeframe(
984            requestid,
985            FRAME_TYPE_STREAM_SETTINGS,
986            FLAG_STREAM_ENCODING_SETTINGS_EOS,
987            payload,
988        )
989
990
991def ensureserverstream(stream):
992    if stream.streamid % 2:
993        raise error.ProgrammingError(
994            b'server should only write to even '
995            b'numbered streams; %d is not even' % stream.streamid
996        )
997
998
999DEFAULT_PROTOCOL_SETTINGS = {
1000    b'contentencodings': [b'identity'],
1001}
1002
1003
1004class serverreactor(object):
1005    """Holds state of a server handling frame-based protocol requests.
1006
1007    This class is the "brain" of the unified frame-based protocol server
1008    component. While the protocol is stateless from the perspective of
1009    requests/commands, something needs to track which frames have been
1010    received, what frames to expect, etc. This class is that thing.
1011
1012    Instances are modeled as a state machine of sorts. Instances are also
1013    reactionary to external events. The point of this class is to encapsulate
1014    the state of the connection and the exchange of frames, not to perform
1015    work. Instead, callers tell this class when something occurs, like a
1016    frame arriving. If that activity is worthy of a follow-up action (say
1017    *run a command*), the return value of that handler will say so.
1018
1019    I/O and CPU intensive operations are purposefully delegated outside of
1020    this class.
1021
1022    Consumers are expected to tell instances when events occur. They do so by
1023    calling the various ``on*`` methods. These methods return a 2-tuple
1024    describing any follow-up action(s) to take. The first element is the
1025    name of an action to perform. The second is a data structure (usually
1026    a dict) specific to that action that contains more information. e.g.
1027    if the server wants to send frames back to the client, the data structure
1028    will contain a reference to those frames.
1029
1030    Valid actions that consumers can be instructed to take are:
1031
1032    sendframes
1033       Indicates that frames should be sent to the client. The ``framegen``
1034       key contains a generator of frames that should be sent. The server
1035       assumes that all frames are sent to the client.
1036
1037    error
1038       Indicates that an error occurred. Consumer should probably abort.
1039
1040    runcommand
1041       Indicates that the consumer should run a wire protocol command. Details
1042       of the command to run are given in the data structure.
1043
1044    wantframe
1045       Indicates that nothing of interest happened and the server is waiting on
1046       more frames from the client before anything interesting can be done.
1047
1048    noop
1049       Indicates no additional action is required.
1050
1051    Known Issues
1052    ------------
1053
1054    There are no limits to the number of partially received commands or their
1055    size. A malicious client could stream command request data and exhaust the
1056    server's memory.
1057
1058    Partially received commands are not acted upon when end of input is
1059    reached. Should the server error if it receives a partial request?
1060    Should the client send a message to abort a partially transmitted request
1061    to facilitate graceful shutdown?
1062
1063    Active requests that haven't been responded to aren't tracked. This means
1064    that if we receive a command and instruct its dispatch, another command
1065    with its request ID can come in over the wire and there will be a race
1066    between who responds to what.
1067    """
1068
1069    def __init__(self, ui, deferoutput=False):
1070        """Construct a new server reactor.
1071
1072        ``deferoutput`` can be used to indicate that no output frames should be
1073        instructed to be sent until input has been exhausted. In this mode,
1074        events that would normally generate output frames (such as a command
1075        response being ready) will instead defer instructing the consumer to
1076        send those frames. This is useful for half-duplex transports where the
1077        sender cannot receive until all data has been transmitted.
1078        """
1079        self._ui = ui
1080        self._deferoutput = deferoutput
1081        self._state = b'initial'
1082        self._nextoutgoingstreamid = 2
1083        self._bufferedframegens = []
1084        # stream id -> stream instance for all active streams from the client.
1085        self._incomingstreams = {}
1086        self._outgoingstreams = {}
1087        # request id -> dict of commands that are actively being received.
1088        self._receivingcommands = {}
1089        # Request IDs that have been received and are actively being processed.
1090        # Once all output for a request has been sent, it is removed from this
1091        # set.
1092        self._activecommands = set()
1093
1094        self._protocolsettingsdecoder = None
1095
1096        # Sender protocol settings are optional. Set implied default values.
1097        self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
1098
1099        populatestreamencoders()
1100
1101    def onframerecv(self, frame):
1102        """Process a frame that has been received off the wire.
1103
1104        Returns a dict with an ``action`` key that details what action,
1105        if any, the consumer should take next.
1106        """
1107        if not frame.streamid % 2:
1108            self._state = b'errored'
1109            return self._makeerrorresult(
1110                _(b'received frame with even numbered stream ID: %d')
1111                % frame.streamid
1112            )
1113
1114        if frame.streamid not in self._incomingstreams:
1115            if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1116                self._state = b'errored'
1117                return self._makeerrorresult(
1118                    _(
1119                        b'received frame on unknown inactive stream without '
1120                        b'beginning of stream flag set'
1121                    )
1122                )
1123
1124            self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1125
1126        if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1127            # TODO handle decoding frames
1128            self._state = b'errored'
1129            raise error.ProgrammingError(
1130                b'support for decoding stream payloads not yet implemented'
1131            )
1132
1133        if frame.streamflags & STREAM_FLAG_END_STREAM:
1134            del self._incomingstreams[frame.streamid]
1135
1136        handlers = {
1137            b'initial': self._onframeinitial,
1138            b'protocol-settings-receiving': self._onframeprotocolsettings,
1139            b'idle': self._onframeidle,
1140            b'command-receiving': self._onframecommandreceiving,
1141            b'errored': self._onframeerrored,
1142        }
1143
1144        meth = handlers.get(self._state)
1145        if not meth:
1146            raise error.ProgrammingError(b'unhandled state: %s' % self._state)
1147
1148        return meth(frame)
1149
1150    def oncommandresponsereadyobjects(self, stream, requestid, objs):
1151        """Signal that objects are ready to be sent to the client.
1152
1153        ``objs`` is an iterable of objects (typically a generator) that will
1154        be encoded via CBOR and added to frames, which will be sent to the
1155        client.
1156        """
1157        ensureserverstream(stream)
1158
1159        # A more robust solution would be to check for objs.{next,__next__}.
1160        if isinstance(objs, list):
1161            objs = iter(objs)
1162
1163        # We need to take care over exception handling. Uncaught exceptions
1164        # when generating frames could lead to premature end of the frame
1165        # stream and the possibility of the server or client process getting
1166        # in a bad state.
1167        #
1168        # Keep in mind that if ``objs`` is a generator, advancing it could
1169        # raise exceptions that originated in e.g. wire protocol command
1170        # functions. That is why we differentiate between exceptions raised
1171        # when iterating versus other exceptions that occur.
1172        #
1173        # In all cases, when the function finishes, the request is fully
1174        # handled and no new frames for it should be seen.
1175
1176        def sendframes():
1177            emitted = False
1178            alternatelocationsent = False
1179            emitter = bufferingcommandresponseemitter(stream, requestid)
1180            while True:
1181                try:
1182                    o = next(objs)
1183                except StopIteration:
1184                    for frame in emitter.send(None):
1185                        yield frame
1186
1187                    if emitted:
1188                        for frame in createcommandresponseeosframes(
1189                            stream, requestid
1190                        ):
1191                            yield frame
1192                    break
1193
1194                except error.WireprotoCommandError as e:
1195                    for frame in createcommanderrorresponse(
1196                        stream, requestid, e.message, e.messageargs
1197                    ):
1198                        yield frame
1199                    break
1200
1201                except Exception as e:
1202                    for frame in createerrorframe(
1203                        stream,
1204                        requestid,
1205                        b'%s' % stringutil.forcebytestr(e),
1206                        errtype=b'server',
1207                    ):
1208
1209                        yield frame
1210
1211                    break
1212
1213                try:
1214                    # Alternate location responses can only be the first and
1215                    # only object in the output stream.
1216                    if isinstance(o, wireprototypes.alternatelocationresponse):
1217                        if emitted:
1218                            raise error.ProgrammingError(
1219                                b'alternatelocationresponse seen after initial '
1220                                b'output object'
1221                            )
1222
1223                        frame = stream.makestreamsettingsframe(requestid)
1224                        if frame:
1225                            yield frame
1226
1227                        yield createalternatelocationresponseframe(
1228                            stream, requestid, o
1229                        )
1230
1231                        alternatelocationsent = True
1232                        emitted = True
1233                        continue
1234
1235                    if alternatelocationsent:
1236                        raise error.ProgrammingError(
1237                            b'object follows alternatelocationresponse'
1238                        )
1239
1240                    if not emitted:
1241                        # Frame is optional.
1242                        frame = stream.makestreamsettingsframe(requestid)
1243                        if frame:
1244                            yield frame
1245
1246                        # May be None if empty frame (due to encoding).
1247                        frame = createcommandresponseokframe(stream, requestid)
1248                        if frame:
1249                            yield frame
1250
1251                        emitted = True
1252
1253                    # Objects emitted by command functions can be serializable
1254                    # data structures or special types.
1255                    # TODO consider extracting the content normalization to a
1256                    # standalone function, as it may be useful for e.g. cachers.
1257
1258                    # A pre-encoded object is sent directly to the emitter.
1259                    if isinstance(o, wireprototypes.encodedresponse):
1260                        for frame in emitter.send(o.data):
1261                            yield frame
1262
1263                    elif isinstance(
1264                        o, wireprototypes.indefinitebytestringresponse
1265                    ):
1266                        for chunk in cborutil.streamencodebytestringfromiter(
1267                            o.chunks
1268                        ):
1269
1270                            for frame in emitter.send(chunk):
1271                                yield frame
1272
1273                    # A regular object is CBOR encoded.
1274                    else:
1275                        for chunk in cborutil.streamencode(o):
1276                            for frame in emitter.send(chunk):
1277                                yield frame
1278
1279                except Exception as e:
1280                    for frame in createerrorframe(
1281                        stream, requestid, b'%s' % e, errtype=b'server'
1282                    ):
1283                        yield frame
1284
1285                    break
1286
1287            self._activecommands.remove(requestid)
1288
1289        return self._handlesendframes(sendframes())
1290
1291    def oninputeof(self):
1292        """Signals that end of input has been received.
1293
1294        No more frames will be received. All pending activity should be
1295        completed.
1296        """
1297        # TODO should we do anything about in-flight commands?
1298
1299        if not self._deferoutput or not self._bufferedframegens:
1300            return b'noop', {}
1301
1302        # If we buffered all our responses, emit those.
1303        def makegen():
1304            for gen in self._bufferedframegens:
1305                for frame in gen:
1306                    yield frame
1307
1308        return b'sendframes', {
1309            b'framegen': makegen(),
1310        }
1311
1312    def _handlesendframes(self, framegen):
1313        if self._deferoutput:
1314            self._bufferedframegens.append(framegen)
1315            return b'noop', {}
1316        else:
1317            return b'sendframes', {
1318                b'framegen': framegen,
1319            }
1320
1321    def onservererror(self, stream, requestid, msg):
1322        ensureserverstream(stream)
1323
1324        def sendframes():
1325            for frame in createerrorframe(
1326                stream, requestid, msg, errtype=b'server'
1327            ):
1328                yield frame
1329
1330            self._activecommands.remove(requestid)
1331
1332        return self._handlesendframes(sendframes())
1333
1334    def oncommanderror(self, stream, requestid, message, args=None):
1335        """Called when a command encountered an error before sending output."""
1336        ensureserverstream(stream)
1337
1338        def sendframes():
1339            for frame in createcommanderrorresponse(
1340                stream, requestid, message, args
1341            ):
1342                yield frame
1343
1344            self._activecommands.remove(requestid)
1345
1346        return self._handlesendframes(sendframes())
1347
1348    def makeoutputstream(self):
1349        """Create a stream to be used for sending data to the client.
1350
1351        If this is called before protocol settings frames are received, we
1352        don't know what stream encodings are supported by the client and
1353        we will default to identity.
1354        """
1355        streamid = self._nextoutgoingstreamid
1356        self._nextoutgoingstreamid += 2
1357
1358        s = outputstream(streamid)
1359        self._outgoingstreams[streamid] = s
1360
1361        # Always use the *server's* preferred encoder over the client's,
1362        # as servers have more to lose from sub-optimal encoders being used.
1363        for name in STREAM_ENCODERS_ORDER:
1364            if name in self._sendersettings[b'contentencodings']:
1365                s.setencoder(self._ui, name)
1366                break
1367
1368        return s
1369
1370    def _makeerrorresult(self, msg):
1371        return b'error', {
1372            b'message': msg,
1373        }
1374
1375    def _makeruncommandresult(self, requestid):
1376        entry = self._receivingcommands[requestid]
1377
1378        if not entry[b'requestdone']:
1379            self._state = b'errored'
1380            raise error.ProgrammingError(
1381                b'should not be called without requestdone set'
1382            )
1383
1384        del self._receivingcommands[requestid]
1385
1386        if self._receivingcommands:
1387            self._state = b'command-receiving'
1388        else:
1389            self._state = b'idle'
1390
1391        # Decode the payloads as CBOR.
1392        entry[b'payload'].seek(0)
1393        request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
1394
1395        if b'name' not in request:
1396            self._state = b'errored'
1397            return self._makeerrorresult(
1398                _(b'command request missing "name" field')
1399            )
1400
1401        if b'args' not in request:
1402            request[b'args'] = {}
1403
1404        assert requestid not in self._activecommands
1405        self._activecommands.add(requestid)
1406
1407        return (
1408            b'runcommand',
1409            {
1410                b'requestid': requestid,
1411                b'command': request[b'name'],
1412                b'args': request[b'args'],
1413                b'redirect': request.get(b'redirect'),
1414                b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
1415            },
1416        )
1417
1418    def _makewantframeresult(self):
1419        return b'wantframe', {
1420            b'state': self._state,
1421        }
1422
1423    def _validatecommandrequestframe(self, frame):
1424        new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1425        continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1426
1427        if new and continuation:
1428            self._state = b'errored'
1429            return self._makeerrorresult(
1430                _(
1431                    b'received command request frame with both new and '
1432                    b'continuation flags set'
1433                )
1434            )
1435
1436        if not new and not continuation:
1437            self._state = b'errored'
1438            return self._makeerrorresult(
1439                _(
1440                    b'received command request frame with neither new nor '
1441                    b'continuation flags set'
1442                )
1443            )
1444
1445    def _onframeinitial(self, frame):
1446        # Called when we receive a frame when in the "initial" state.
1447        if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1448            self._state = b'protocol-settings-receiving'
1449            self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1450            return self._onframeprotocolsettings(frame)
1451
1452        elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1453            self._state = b'idle'
1454            return self._onframeidle(frame)
1455
1456        else:
1457            self._state = b'errored'
1458            return self._makeerrorresult(
1459                _(
1460                    b'expected sender protocol settings or command request '
1461                    b'frame; got %d'
1462                )
1463                % frame.typeid
1464            )
1465
1466    def _onframeprotocolsettings(self, frame):
1467        assert self._state == b'protocol-settings-receiving'
1468        assert self._protocolsettingsdecoder is not None
1469
1470        if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1471            self._state = b'errored'
1472            return self._makeerrorresult(
1473                _(b'expected sender protocol settings frame; got %d')
1474                % frame.typeid
1475            )
1476
1477        more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1478        eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1479
1480        if more and eos:
1481            self._state = b'errored'
1482            return self._makeerrorresult(
1483                _(
1484                    b'sender protocol settings frame cannot have both '
1485                    b'continuation and end of stream flags set'
1486                )
1487            )
1488
1489        if not more and not eos:
1490            self._state = b'errored'
1491            return self._makeerrorresult(
1492                _(
1493                    b'sender protocol settings frame must have continuation or '
1494                    b'end of stream flag set'
1495                )
1496            )
1497
1498        # TODO establish limits for maximum amount of data that can be
1499        # buffered.
1500        try:
1501            self._protocolsettingsdecoder.decode(frame.payload)
1502        except Exception as e:
1503            self._state = b'errored'
1504            return self._makeerrorresult(
1505                _(
1506                    b'error decoding CBOR from sender protocol settings frame: %s'
1507                )
1508                % stringutil.forcebytestr(e)
1509            )
1510
1511        if more:
1512            return self._makewantframeresult()
1513
1514        assert eos
1515
1516        decoded = self._protocolsettingsdecoder.getavailable()
1517        self._protocolsettingsdecoder = None
1518
1519        if not decoded:
1520            self._state = b'errored'
1521            return self._makeerrorresult(
1522                _(b'sender protocol settings frame did not contain CBOR data')
1523            )
1524        elif len(decoded) > 1:
1525            self._state = b'errored'
1526            return self._makeerrorresult(
1527                _(
1528                    b'sender protocol settings frame contained multiple CBOR '
1529                    b'values'
1530                )
1531            )
1532
1533        d = decoded[0]
1534
1535        if b'contentencodings' in d:
1536            self._sendersettings[b'contentencodings'] = d[b'contentencodings']
1537
1538        self._state = b'idle'
1539
1540        return self._makewantframeresult()
1541
1542    def _onframeidle(self, frame):
1543        # The only frame type that should be received in this state is a
1544        # command request.
1545        if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1546            self._state = b'errored'
1547            return self._makeerrorresult(
1548                _(b'expected command request frame; got %d') % frame.typeid
1549            )
1550
1551        res = self._validatecommandrequestframe(frame)
1552        if res:
1553            return res
1554
1555        if frame.requestid in self._receivingcommands:
1556            self._state = b'errored'
1557            return self._makeerrorresult(
1558                _(b'request with ID %d already received') % frame.requestid
1559            )
1560
1561        if frame.requestid in self._activecommands:
1562            self._state = b'errored'
1563            return self._makeerrorresult(
1564                _(b'request with ID %d is already active') % frame.requestid
1565            )
1566
1567        new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1568        moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1569        expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1570
1571        if not new:
1572            self._state = b'errored'
1573            return self._makeerrorresult(
1574                _(b'received command request frame without new flag set')
1575            )
1576
1577        payload = util.bytesio()
1578        payload.write(frame.payload)
1579
1580        self._receivingcommands[frame.requestid] = {
1581            b'payload': payload,
1582            b'data': None,
1583            b'requestdone': not moreframes,
1584            b'expectingdata': bool(expectingdata),
1585        }
1586
1587        # This is the final frame for this request. Dispatch it.
1588        if not moreframes and not expectingdata:
1589            return self._makeruncommandresult(frame.requestid)
1590
1591        assert moreframes or expectingdata
1592        self._state = b'command-receiving'
1593        return self._makewantframeresult()
1594
1595    def _onframecommandreceiving(self, frame):
1596        if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1597            # Process new command requests as such.
1598            if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1599                return self._onframeidle(frame)
1600
1601            res = self._validatecommandrequestframe(frame)
1602            if res:
1603                return res
1604
1605        # All other frames should be related to a command that is currently
1606        # receiving but is not active.
1607        if frame.requestid in self._activecommands:
1608            self._state = b'errored'
1609            return self._makeerrorresult(
1610                _(b'received frame for request that is still active: %d')
1611                % frame.requestid
1612            )
1613
1614        if frame.requestid not in self._receivingcommands:
1615            self._state = b'errored'
1616            return self._makeerrorresult(
1617                _(b'received frame for request that is not receiving: %d')
1618                % frame.requestid
1619            )
1620
1621        entry = self._receivingcommands[frame.requestid]
1622
1623        if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1624            moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1625            expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1626
1627            if entry[b'requestdone']:
1628                self._state = b'errored'
1629                return self._makeerrorresult(
1630                    _(
1631                        b'received command request frame when request frames '
1632                        b'were supposedly done'
1633                    )
1634                )
1635
1636            if expectingdata != entry[b'expectingdata']:
1637                self._state = b'errored'
1638                return self._makeerrorresult(
1639                    _(b'mismatch between expect data flag and previous frame')
1640                )
1641
1642            entry[b'payload'].write(frame.payload)
1643
1644            if not moreframes:
1645                entry[b'requestdone'] = True
1646
1647            if not moreframes and not expectingdata:
1648                return self._makeruncommandresult(frame.requestid)
1649
1650            return self._makewantframeresult()
1651
1652        elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1653            if not entry[b'expectingdata']:
1654                self._state = b'errored'
1655                return self._makeerrorresult(
1656                    _(
1657                        b'received command data frame for request that is not '
1658                        b'expecting data: %d'
1659                    )
1660                    % frame.requestid
1661                )
1662
1663            if entry[b'data'] is None:
1664                entry[b'data'] = util.bytesio()
1665
1666            return self._handlecommanddataframe(frame, entry)
1667        else:
1668            self._state = b'errored'
1669            return self._makeerrorresult(
1670                _(b'received unexpected frame type: %d') % frame.typeid
1671            )
1672
1673    def _handlecommanddataframe(self, frame, entry):
1674        assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1675
1676        # TODO support streaming data instead of buffering it.
1677        entry[b'data'].write(frame.payload)
1678
1679        if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1680            return self._makewantframeresult()
1681        elif frame.flags & FLAG_COMMAND_DATA_EOS:
1682            entry[b'data'].seek(0)
1683            return self._makeruncommandresult(frame.requestid)
1684        else:
1685            self._state = b'errored'
1686            return self._makeerrorresult(_(b'command data frame without flags'))
1687
1688    def _onframeerrored(self, frame):
1689        return self._makeerrorresult(_(b'server already errored'))
1690
1691
1692class commandrequest(object):
1693    """Represents a request to run a command."""
1694
1695    def __init__(self, requestid, name, args, datafh=None, redirect=None):
1696        self.requestid = requestid
1697        self.name = name
1698        self.args = args
1699        self.datafh = datafh
1700        self.redirect = redirect
1701        self.state = b'pending'
1702
1703
1704class clientreactor(object):
1705    """Holds state of a client issuing frame-based protocol requests.
1706
1707    This is like ``serverreactor`` but for client-side state.
1708
1709    Each instance is bound to the lifetime of a connection. For persistent
1710    connection transports using e.g. TCP sockets and speaking the raw
1711    framing protocol, there will be a single instance for the lifetime of
1712    the TCP socket. For transports where there are multiple discrete
1713    interactions (say tunneled within in HTTP request), there will be a
1714    separate instance for each distinct interaction.
1715
1716    Consumers are expected to tell instances when events occur by calling
1717    various methods. These methods return a 2-tuple describing any follow-up
1718    action(s) to take. The first element is the name of an action to
1719    perform. The second is a data structure (usually a dict) specific to
1720    that action that contains more information. e.g. if the reactor wants
1721    to send frames to the server, the data structure will contain a reference
1722    to those frames.
1723
1724    Valid actions that consumers can be instructed to take are:
1725
1726    noop
1727       Indicates no additional action is required.
1728
1729    sendframes
1730       Indicates that frames should be sent to the server. The ``framegen``
1731       key contains a generator of frames that should be sent. The reactor
1732       assumes that all frames in this generator are sent to the server.
1733
1734    error
1735       Indicates that an error occurred. The ``message`` key contains an
1736       error message describing the failure.
1737
1738    responsedata
1739       Indicates a response to a previously-issued command was received.
1740
1741       The ``request`` key contains the ``commandrequest`` instance that
1742       represents the request this data is for.
1743
1744       The ``data`` key contains the decoded data from the server.
1745
1746       ``expectmore`` and ``eos`` evaluate to True when more response data
1747       is expected to follow or we're at the end of the response stream,
1748       respectively.
1749    """
1750
1751    def __init__(
1752        self,
1753        ui,
1754        hasmultiplesend=False,
1755        buffersends=True,
1756        clientcontentencoders=None,
1757    ):
1758        """Create a new instance.
1759
1760        ``hasmultiplesend`` indicates whether multiple sends are supported
1761        by the transport. When True, it is possible to send commands immediately
1762        instead of buffering until the caller signals an intent to finish a
1763        send operation.
1764
1765        ``buffercommands`` indicates whether sends should be buffered until the
1766        last request has been issued.
1767
1768        ``clientcontentencoders`` is an iterable of content encoders the client
1769        will advertise to the server and that the server can use for encoding
1770        data. If not defined, the client will not advertise content encoders
1771        to the server.
1772        """
1773        self._ui = ui
1774        self._hasmultiplesend = hasmultiplesend
1775        self._buffersends = buffersends
1776        self._clientcontentencoders = clientcontentencoders
1777
1778        self._canissuecommands = True
1779        self._cansend = True
1780        self._protocolsettingssent = False
1781
1782        self._nextrequestid = 1
1783        # We only support a single outgoing stream for now.
1784        self._outgoingstream = outputstream(1)
1785        self._pendingrequests = collections.deque()
1786        self._activerequests = {}
1787        self._incomingstreams = {}
1788        self._streamsettingsdecoders = {}
1789
1790        populatestreamencoders()
1791
1792    def callcommand(self, name, args, datafh=None, redirect=None):
1793        """Request that a command be executed.
1794
1795        Receives the command name, a dict of arguments to pass to the command,
1796        and an optional file object containing the raw data for the command.
1797
1798        Returns a 3-tuple of (request, action, action data).
1799        """
1800        if not self._canissuecommands:
1801            raise error.ProgrammingError(b'cannot issue new commands')
1802
1803        requestid = self._nextrequestid
1804        self._nextrequestid += 2
1805
1806        request = commandrequest(
1807            requestid, name, args, datafh=datafh, redirect=redirect
1808        )
1809
1810        if self._buffersends:
1811            self._pendingrequests.append(request)
1812            return request, b'noop', {}
1813        else:
1814            if not self._cansend:
1815                raise error.ProgrammingError(
1816                    b'sends cannot be performed on this instance'
1817                )
1818
1819            if not self._hasmultiplesend:
1820                self._cansend = False
1821                self._canissuecommands = False
1822
1823            return (
1824                request,
1825                b'sendframes',
1826                {
1827                    b'framegen': self._makecommandframes(request),
1828                },
1829            )
1830
1831    def flushcommands(self):
1832        """Request that all queued commands be sent.
1833
1834        If any commands are buffered, this will instruct the caller to send
1835        them over the wire. If no commands are buffered it instructs the client
1836        to no-op.
1837
1838        If instances aren't configured for multiple sends, no new command
1839        requests are allowed after this is called.
1840        """
1841        if not self._pendingrequests:
1842            return b'noop', {}
1843
1844        if not self._cansend:
1845            raise error.ProgrammingError(
1846                b'sends cannot be performed on this instance'
1847            )
1848
1849        # If the instance only allows sending once, mark that we have fired
1850        # our one shot.
1851        if not self._hasmultiplesend:
1852            self._canissuecommands = False
1853            self._cansend = False
1854
1855        def makeframes():
1856            while self._pendingrequests:
1857                request = self._pendingrequests.popleft()
1858                for frame in self._makecommandframes(request):
1859                    yield frame
1860
1861        return b'sendframes', {
1862            b'framegen': makeframes(),
1863        }
1864
1865    def _makecommandframes(self, request):
1866        """Emit frames to issue a command request.
1867
1868        As a side-effect, update request accounting to reflect its changed
1869        state.
1870        """
1871        self._activerequests[request.requestid] = request
1872        request.state = b'sending'
1873
1874        if not self._protocolsettingssent and self._clientcontentencoders:
1875            self._protocolsettingssent = True
1876
1877            payload = b''.join(
1878                cborutil.streamencode(
1879                    {
1880                        b'contentencodings': self._clientcontentencoders,
1881                    }
1882                )
1883            )
1884
1885            yield self._outgoingstream.makeframe(
1886                requestid=request.requestid,
1887                typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
1888                flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
1889                payload=payload,
1890            )
1891
1892        res = createcommandframes(
1893            self._outgoingstream,
1894            request.requestid,
1895            request.name,
1896            request.args,
1897            datafh=request.datafh,
1898            redirect=request.redirect,
1899        )
1900
1901        for frame in res:
1902            yield frame
1903
1904        request.state = b'sent'
1905
1906    def onframerecv(self, frame):
1907        """Process a frame that has been received off the wire.
1908
1909        Returns a 2-tuple of (action, meta) describing further action the
1910        caller needs to take as a result of receiving this frame.
1911        """
1912        if frame.streamid % 2:
1913            return (
1914                b'error',
1915                {
1916                    b'message': (
1917                        _(b'received frame with odd numbered stream ID: %d')
1918                        % frame.streamid
1919                    ),
1920                },
1921            )
1922
1923        if frame.streamid not in self._incomingstreams:
1924            if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1925                return (
1926                    b'error',
1927                    {
1928                        b'message': _(
1929                            b'received frame on unknown stream '
1930                            b'without beginning of stream flag set'
1931                        ),
1932                    },
1933                )
1934
1935            self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
1936
1937        stream = self._incomingstreams[frame.streamid]
1938
1939        # If the payload is encoded, ask the stream to decode it. We
1940        # merely substitute the decoded result into the frame payload as
1941        # if it had been transferred all along.
1942        if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1943            frame.payload = stream.decode(frame.payload)
1944
1945        if frame.streamflags & STREAM_FLAG_END_STREAM:
1946            del self._incomingstreams[frame.streamid]
1947
1948        if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1949            return self._onstreamsettingsframe(frame)
1950
1951        if frame.requestid not in self._activerequests:
1952            return (
1953                b'error',
1954                {
1955                    b'message': (
1956                        _(b'received frame for inactive request ID: %d')
1957                        % frame.requestid
1958                    ),
1959                },
1960            )
1961
1962        request = self._activerequests[frame.requestid]
1963        request.state = b'receiving'
1964
1965        handlers = {
1966            FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1967            FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1968        }
1969
1970        meth = handlers.get(frame.typeid)
1971        if not meth:
1972            raise error.ProgrammingError(
1973                b'unhandled frame type: %d' % frame.typeid
1974            )
1975
1976        return meth(request, frame)
1977
1978    def _onstreamsettingsframe(self, frame):
1979        assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1980
1981        more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1982        eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1983
1984        if more and eos:
1985            return (
1986                b'error',
1987                {
1988                    b'message': (
1989                        _(
1990                            b'stream encoding settings frame cannot have both '
1991                            b'continuation and end of stream flags set'
1992                        )
1993                    ),
1994                },
1995            )
1996
1997        if not more and not eos:
1998            return (
1999                b'error',
2000                {
2001                    b'message': _(
2002                        b'stream encoding settings frame must have '
2003                        b'continuation or end of stream flag set'
2004                    ),
2005                },
2006            )
2007
2008        if frame.streamid not in self._streamsettingsdecoders:
2009            decoder = cborutil.bufferingdecoder()
2010            self._streamsettingsdecoders[frame.streamid] = decoder
2011
2012        decoder = self._streamsettingsdecoders[frame.streamid]
2013
2014        try:
2015            decoder.decode(frame.payload)
2016        except Exception as e:
2017            return (
2018                b'error',
2019                {
2020                    b'message': (
2021                        _(
2022                            b'error decoding CBOR from stream encoding '
2023                            b'settings frame: %s'
2024                        )
2025                        % stringutil.forcebytestr(e)
2026                    ),
2027                },
2028            )
2029
2030        if more:
2031            return b'noop', {}
2032
2033        assert eos
2034
2035        decoded = decoder.getavailable()
2036        del self._streamsettingsdecoders[frame.streamid]
2037
2038        if not decoded:
2039            return (
2040                b'error',
2041                {
2042                    b'message': _(
2043                        b'stream encoding settings frame did not contain '
2044                        b'CBOR data'
2045                    ),
2046                },
2047            )
2048
2049        try:
2050            self._incomingstreams[frame.streamid].setdecoder(
2051                self._ui, decoded[0], decoded[1:]
2052            )
2053        except Exception as e:
2054            return (
2055                b'error',
2056                {
2057                    b'message': (
2058                        _(b'error setting stream decoder: %s')
2059                        % stringutil.forcebytestr(e)
2060                    ),
2061                },
2062            )
2063
2064        return b'noop', {}
2065
2066    def _oncommandresponseframe(self, request, frame):
2067        if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
2068            request.state = b'received'
2069            del self._activerequests[request.requestid]
2070
2071        return (
2072            b'responsedata',
2073            {
2074                b'request': request,
2075                b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
2076                b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
2077                b'data': frame.payload,
2078            },
2079        )
2080
2081    def _onerrorresponseframe(self, request, frame):
2082        request.state = b'errored'
2083        del self._activerequests[request.requestid]
2084
2085        # The payload should be a CBOR map.
2086        m = cborutil.decodeall(frame.payload)[0]
2087
2088        return (
2089            b'error',
2090            {
2091                b'request': request,
2092                b'type': m[b'type'],
2093                b'message': m[b'message'],
2094            },
2095        )
2096