1# wireprotoframing.py - unified framing protocol for wire protocol 2# 3# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> 4# 5# This software may be used and distributed according to the terms of the 6# GNU General Public License version 2 or any later version. 7 8# This file contains functionality to support the unified frame-based wire 9# protocol. For details about the protocol, see 10# `hg help internals.wireprotocol`. 11 12from __future__ import absolute_import 13 14import collections 15import struct 16 17from .i18n import _ 18from .pycompat import getattr 19from .thirdparty import attr 20from . import ( 21 encoding, 22 error, 23 pycompat, 24 util, 25 wireprototypes, 26) 27from .utils import ( 28 cborutil, 29 stringutil, 30) 31 32FRAME_HEADER_SIZE = 8 33DEFAULT_MAX_FRAME_SIZE = 32768 34 35STREAM_FLAG_BEGIN_STREAM = 0x01 36STREAM_FLAG_END_STREAM = 0x02 37STREAM_FLAG_ENCODING_APPLIED = 0x04 38 39STREAM_FLAGS = { 40 b'stream-begin': STREAM_FLAG_BEGIN_STREAM, 41 b'stream-end': STREAM_FLAG_END_STREAM, 42 b'encoded': STREAM_FLAG_ENCODING_APPLIED, 43} 44 45FRAME_TYPE_COMMAND_REQUEST = 0x01 46FRAME_TYPE_COMMAND_DATA = 0x02 47FRAME_TYPE_COMMAND_RESPONSE = 0x03 48FRAME_TYPE_ERROR_RESPONSE = 0x05 49FRAME_TYPE_TEXT_OUTPUT = 0x06 50FRAME_TYPE_PROGRESS = 0x07 51FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08 52FRAME_TYPE_STREAM_SETTINGS = 0x09 53 54FRAME_TYPES = { 55 b'command-request': FRAME_TYPE_COMMAND_REQUEST, 56 b'command-data': FRAME_TYPE_COMMAND_DATA, 57 b'command-response': FRAME_TYPE_COMMAND_RESPONSE, 58 b'error-response': FRAME_TYPE_ERROR_RESPONSE, 59 b'text-output': FRAME_TYPE_TEXT_OUTPUT, 60 b'progress': FRAME_TYPE_PROGRESS, 61 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, 62 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS, 63} 64 65FLAG_COMMAND_REQUEST_NEW = 0x01 66FLAG_COMMAND_REQUEST_CONTINUATION = 0x02 67FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04 68FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08 69 70FLAGS_COMMAND_REQUEST = { 71 b'new': FLAG_COMMAND_REQUEST_NEW, 72 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION, 73 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES, 74 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA, 75} 76 77FLAG_COMMAND_DATA_CONTINUATION = 0x01 78FLAG_COMMAND_DATA_EOS = 0x02 79 80FLAGS_COMMAND_DATA = { 81 b'continuation': FLAG_COMMAND_DATA_CONTINUATION, 82 b'eos': FLAG_COMMAND_DATA_EOS, 83} 84 85FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01 86FLAG_COMMAND_RESPONSE_EOS = 0x02 87 88FLAGS_COMMAND_RESPONSE = { 89 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION, 90 b'eos': FLAG_COMMAND_RESPONSE_EOS, 91} 92 93FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01 94FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02 95 96FLAGS_SENDER_PROTOCOL_SETTINGS = { 97 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION, 98 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS, 99} 100 101FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01 102FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02 103 104FLAGS_STREAM_ENCODING_SETTINGS = { 105 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION, 106 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS, 107} 108 109# Maps frame types to their available flags. 110FRAME_TYPE_FLAGS = { 111 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST, 112 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA, 113 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE, 114 FRAME_TYPE_ERROR_RESPONSE: {}, 115 FRAME_TYPE_TEXT_OUTPUT: {}, 116 FRAME_TYPE_PROGRESS: {}, 117 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS, 118 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS, 119} 120 121ARGUMENT_RECORD_HEADER = struct.Struct('<HH') 122 123 124def humanflags(mapping, value): 125 """Convert a numeric flags value to a human value, using a mapping table.""" 126 namemap = {v: k for k, v in pycompat.iteritems(mapping)} 127 flags = [] 128 val = 1 129 while value >= val: 130 if value & val: 131 flags.append(namemap.get(val, b'<unknown 0x%02x>' % val)) 132 val <<= 1 133 134 return b'|'.join(flags) 135 136 137@attr.s(slots=True) 138class frameheader(object): 139 """Represents the data in a frame header.""" 140 141 length = attr.ib() 142 requestid = attr.ib() 143 streamid = attr.ib() 144 streamflags = attr.ib() 145 typeid = attr.ib() 146 flags = attr.ib() 147 148 149@attr.s(slots=True, repr=False) 150class frame(object): 151 """Represents a parsed frame.""" 152 153 requestid = attr.ib() 154 streamid = attr.ib() 155 streamflags = attr.ib() 156 typeid = attr.ib() 157 flags = attr.ib() 158 payload = attr.ib() 159 160 @encoding.strmethod 161 def __repr__(self): 162 typename = b'<unknown 0x%02x>' % self.typeid 163 for name, value in pycompat.iteritems(FRAME_TYPES): 164 if value == self.typeid: 165 typename = name 166 break 167 168 return ( 169 b'frame(size=%d; request=%d; stream=%d; streamflags=%s; ' 170 b'type=%s; flags=%s)' 171 % ( 172 len(self.payload), 173 self.requestid, 174 self.streamid, 175 humanflags(STREAM_FLAGS, self.streamflags), 176 typename, 177 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags), 178 ) 179 ) 180 181 182def makeframe(requestid, streamid, streamflags, typeid, flags, payload): 183 """Assemble a frame into a byte array.""" 184 # TODO assert size of payload. 185 frame = bytearray(FRAME_HEADER_SIZE + len(payload)) 186 187 # 24 bits length 188 # 16 bits request id 189 # 8 bits stream id 190 # 8 bits stream flags 191 # 4 bits type 192 # 4 bits flags 193 194 l = struct.pack('<I', len(payload)) 195 frame[0:3] = l[0:3] 196 struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags) 197 frame[7] = (typeid << 4) | flags 198 frame[8:] = payload 199 200 return frame 201 202 203def makeframefromhumanstring(s): 204 """Create a frame from a human readable string 205 206 Strings have the form: 207 208 <request-id> <stream-id> <stream-flags> <type> <flags> <payload> 209 210 This can be used by user-facing applications and tests for creating 211 frames easily without having to type out a bunch of constants. 212 213 Request ID and stream IDs are integers. 214 215 Stream flags, frame type, and flags can be specified by integer or 216 named constant. 217 218 Flags can be delimited by `|` to bitwise OR them together. 219 220 If the payload begins with ``cbor:``, the following string will be 221 evaluated as Python literal and the resulting object will be fed into 222 a CBOR encoder. Otherwise, the payload is interpreted as a Python 223 byte string literal. 224 """ 225 fields = s.split(b' ', 5) 226 requestid, streamid, streamflags, frametype, frameflags, payload = fields 227 228 requestid = int(requestid) 229 streamid = int(streamid) 230 231 finalstreamflags = 0 232 for flag in streamflags.split(b'|'): 233 if flag in STREAM_FLAGS: 234 finalstreamflags |= STREAM_FLAGS[flag] 235 else: 236 finalstreamflags |= int(flag) 237 238 if frametype in FRAME_TYPES: 239 frametype = FRAME_TYPES[frametype] 240 else: 241 frametype = int(frametype) 242 243 finalflags = 0 244 validflags = FRAME_TYPE_FLAGS[frametype] 245 for flag in frameflags.split(b'|'): 246 if flag in validflags: 247 finalflags |= validflags[flag] 248 else: 249 finalflags |= int(flag) 250 251 if payload.startswith(b'cbor:'): 252 payload = b''.join( 253 cborutil.streamencode(stringutil.evalpythonliteral(payload[5:])) 254 ) 255 256 else: 257 payload = stringutil.unescapestr(payload) 258 259 return makeframe( 260 requestid=requestid, 261 streamid=streamid, 262 streamflags=finalstreamflags, 263 typeid=frametype, 264 flags=finalflags, 265 payload=payload, 266 ) 267 268 269def parseheader(data): 270 """Parse a unified framing protocol frame header from a buffer. 271 272 The header is expected to be in the buffer at offset 0 and the 273 buffer is expected to be large enough to hold a full header. 274 """ 275 # 24 bits payload length (little endian) 276 # 16 bits request ID 277 # 8 bits stream ID 278 # 8 bits stream flags 279 # 4 bits frame type 280 # 4 bits frame flags 281 # ... payload 282 framelength = data[0] + 256 * data[1] + 16384 * data[2] 283 requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3) 284 typeflags = data[7] 285 286 frametype = (typeflags & 0xF0) >> 4 287 frameflags = typeflags & 0x0F 288 289 return frameheader( 290 framelength, requestid, streamid, streamflags, frametype, frameflags 291 ) 292 293 294def readframe(fh): 295 """Read a unified framing protocol frame from a file object. 296 297 Returns a 3-tuple of (type, flags, payload) for the decoded frame or 298 None if no frame is available. May raise if a malformed frame is 299 seen. 300 """ 301 header = bytearray(FRAME_HEADER_SIZE) 302 303 readcount = fh.readinto(header) 304 305 if readcount == 0: 306 return None 307 308 if readcount != FRAME_HEADER_SIZE: 309 raise error.Abort( 310 _(b'received incomplete frame: got %d bytes: %s') 311 % (readcount, header) 312 ) 313 314 h = parseheader(header) 315 316 payload = fh.read(h.length) 317 if len(payload) != h.length: 318 raise error.Abort( 319 _(b'frame length error: expected %d; got %d') 320 % (h.length, len(payload)) 321 ) 322 323 return frame( 324 h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload 325 ) 326 327 328def createcommandframes( 329 stream, 330 requestid, 331 cmd, 332 args, 333 datafh=None, 334 maxframesize=DEFAULT_MAX_FRAME_SIZE, 335 redirect=None, 336): 337 """Create frames necessary to transmit a request to run a command. 338 339 This is a generator of bytearrays. Each item represents a frame 340 ready to be sent over the wire to a peer. 341 """ 342 data = {b'name': cmd} 343 if args: 344 data[b'args'] = args 345 346 if redirect: 347 data[b'redirect'] = redirect 348 349 data = b''.join(cborutil.streamencode(data)) 350 351 offset = 0 352 353 while True: 354 flags = 0 355 356 # Must set new or continuation flag. 357 if not offset: 358 flags |= FLAG_COMMAND_REQUEST_NEW 359 else: 360 flags |= FLAG_COMMAND_REQUEST_CONTINUATION 361 362 # Data frames is set on all frames. 363 if datafh: 364 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA 365 366 payload = data[offset : offset + maxframesize] 367 offset += len(payload) 368 369 if len(payload) == maxframesize and offset < len(data): 370 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES 371 372 yield stream.makeframe( 373 requestid=requestid, 374 typeid=FRAME_TYPE_COMMAND_REQUEST, 375 flags=flags, 376 payload=payload, 377 ) 378 379 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES): 380 break 381 382 if datafh: 383 while True: 384 data = datafh.read(DEFAULT_MAX_FRAME_SIZE) 385 386 done = False 387 if len(data) == DEFAULT_MAX_FRAME_SIZE: 388 flags = FLAG_COMMAND_DATA_CONTINUATION 389 else: 390 flags = FLAG_COMMAND_DATA_EOS 391 assert datafh.read(1) == b'' 392 done = True 393 394 yield stream.makeframe( 395 requestid=requestid, 396 typeid=FRAME_TYPE_COMMAND_DATA, 397 flags=flags, 398 payload=data, 399 ) 400 401 if done: 402 break 403 404 405def createcommandresponseokframe(stream, requestid): 406 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) 407 408 if stream.streamsettingssent: 409 overall = stream.encode(overall) 410 encoded = True 411 412 if not overall: 413 return None 414 else: 415 encoded = False 416 417 return stream.makeframe( 418 requestid=requestid, 419 typeid=FRAME_TYPE_COMMAND_RESPONSE, 420 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 421 payload=overall, 422 encoded=encoded, 423 ) 424 425 426def createcommandresponseeosframes( 427 stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE 428): 429 """Create an empty payload frame representing command end-of-stream.""" 430 payload = stream.flush() 431 432 offset = 0 433 while True: 434 chunk = payload[offset : offset + maxframesize] 435 offset += len(chunk) 436 437 done = offset == len(payload) 438 439 if done: 440 flags = FLAG_COMMAND_RESPONSE_EOS 441 else: 442 flags = FLAG_COMMAND_RESPONSE_CONTINUATION 443 444 yield stream.makeframe( 445 requestid=requestid, 446 typeid=FRAME_TYPE_COMMAND_RESPONSE, 447 flags=flags, 448 payload=chunk, 449 encoded=payload != b'', 450 ) 451 452 if done: 453 break 454 455 456def createalternatelocationresponseframe(stream, requestid, location): 457 data = { 458 b'status': b'redirect', 459 b'location': { 460 b'url': location.url, 461 b'mediatype': location.mediatype, 462 }, 463 } 464 465 for a in ( 466 'size', 467 'fullhashes', 468 'fullhashseed', 469 'serverdercerts', 470 'servercadercerts', 471 ): 472 value = getattr(location, a) 473 if value is not None: 474 data[b'location'][pycompat.bytestr(a)] = value 475 476 payload = b''.join(cborutil.streamencode(data)) 477 478 if stream.streamsettingssent: 479 payload = stream.encode(payload) 480 encoded = True 481 else: 482 encoded = False 483 484 return stream.makeframe( 485 requestid=requestid, 486 typeid=FRAME_TYPE_COMMAND_RESPONSE, 487 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 488 payload=payload, 489 encoded=encoded, 490 ) 491 492 493def createcommanderrorresponse(stream, requestid, message, args=None): 494 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom 495 # formatting works consistently? 496 m = { 497 b'status': b'error', 498 b'error': { 499 b'message': message, 500 }, 501 } 502 503 if args: 504 m[b'error'][b'args'] = args 505 506 overall = b''.join(cborutil.streamencode(m)) 507 508 yield stream.makeframe( 509 requestid=requestid, 510 typeid=FRAME_TYPE_COMMAND_RESPONSE, 511 flags=FLAG_COMMAND_RESPONSE_EOS, 512 payload=overall, 513 ) 514 515 516def createerrorframe(stream, requestid, msg, errtype): 517 # TODO properly handle frame size limits. 518 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE 519 520 payload = b''.join( 521 cborutil.streamencode( 522 { 523 b'type': errtype, 524 b'message': [{b'msg': msg}], 525 } 526 ) 527 ) 528 529 yield stream.makeframe( 530 requestid=requestid, 531 typeid=FRAME_TYPE_ERROR_RESPONSE, 532 flags=0, 533 payload=payload, 534 ) 535 536 537def createtextoutputframe( 538 stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE 539): 540 """Create a text output frame to render text to people. 541 542 ``atoms`` is a 3-tuple of (formatting string, args, labels). 543 544 The formatting string contains ``%s`` tokens to be replaced by the 545 corresponding indexed entry in ``args``. ``labels`` is an iterable of 546 formatters to be applied at rendering time. In terms of the ``ui`` 547 class, each atom corresponds to a ``ui.write()``. 548 """ 549 atomdicts = [] 550 551 for (formatting, args, labels) in atoms: 552 # TODO look for localstr, other types here? 553 554 if not isinstance(formatting, bytes): 555 raise ValueError(b'must use bytes formatting strings') 556 for arg in args: 557 if not isinstance(arg, bytes): 558 raise ValueError(b'must use bytes for arguments') 559 for label in labels: 560 if not isinstance(label, bytes): 561 raise ValueError(b'must use bytes for labels') 562 563 # Formatting string must be ASCII. 564 formatting = formatting.decode('ascii', 'replace').encode('ascii') 565 566 # Arguments must be UTF-8. 567 args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args] 568 569 # Labels must be ASCII. 570 labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels] 571 572 atom = {b'msg': formatting} 573 if args: 574 atom[b'args'] = args 575 if labels: 576 atom[b'labels'] = labels 577 578 atomdicts.append(atom) 579 580 payload = b''.join(cborutil.streamencode(atomdicts)) 581 582 if len(payload) > maxframesize: 583 raise ValueError(b'cannot encode data in a single frame') 584 585 yield stream.makeframe( 586 requestid=requestid, 587 typeid=FRAME_TYPE_TEXT_OUTPUT, 588 flags=0, 589 payload=payload, 590 ) 591 592 593class bufferingcommandresponseemitter(object): 594 """Helper object to emit command response frames intelligently. 595 596 Raw command response data is likely emitted in chunks much smaller 597 than what can fit in a single frame. This class exists to buffer 598 chunks until enough data is available to fit in a single frame. 599 600 TODO we'll need something like this when compression is supported. 601 So it might make sense to implement this functionality at the stream 602 level. 603 """ 604 605 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): 606 self._stream = stream 607 self._requestid = requestid 608 self._maxsize = maxframesize 609 self._chunks = [] 610 self._chunkssize = 0 611 612 def send(self, data): 613 """Send new data for emission. 614 615 Is a generator of new frames that were derived from the new input. 616 617 If the special input ``None`` is received, flushes all buffered 618 data to frames. 619 """ 620 621 if data is None: 622 for frame in self._flush(): 623 yield frame 624 return 625 626 data = self._stream.encode(data) 627 628 # There is a ton of potential to do more complicated things here. 629 # Our immediate goal is to coalesce small chunks into big frames, 630 # not achieve the fewest number of frames possible. So we go with 631 # a simple implementation: 632 # 633 # * If a chunk is too large for a frame, we flush and emit frames 634 # for the new chunk. 635 # * If a chunk can be buffered without total buffered size limits 636 # being exceeded, we do that. 637 # * If a chunk causes us to go over our buffering limit, we flush 638 # and then buffer the new chunk. 639 640 if not data: 641 return 642 643 if len(data) > self._maxsize: 644 for frame in self._flush(): 645 yield frame 646 647 # Now emit frames for the big chunk. 648 offset = 0 649 while True: 650 chunk = data[offset : offset + self._maxsize] 651 offset += len(chunk) 652 653 yield self._stream.makeframe( 654 self._requestid, 655 typeid=FRAME_TYPE_COMMAND_RESPONSE, 656 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 657 payload=chunk, 658 encoded=True, 659 ) 660 661 if offset == len(data): 662 return 663 664 # If we don't have enough to constitute a full frame, buffer and 665 # return. 666 if len(data) + self._chunkssize < self._maxsize: 667 self._chunks.append(data) 668 self._chunkssize += len(data) 669 return 670 671 # Else flush what we have and buffer the new chunk. We could do 672 # something more intelligent here, like break the chunk. Let's 673 # keep things simple for now. 674 for frame in self._flush(): 675 yield frame 676 677 self._chunks.append(data) 678 self._chunkssize = len(data) 679 680 def _flush(self): 681 payload = b''.join(self._chunks) 682 assert len(payload) <= self._maxsize 683 684 self._chunks[:] = [] 685 self._chunkssize = 0 686 687 if not payload: 688 return 689 690 yield self._stream.makeframe( 691 self._requestid, 692 typeid=FRAME_TYPE_COMMAND_RESPONSE, 693 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 694 payload=payload, 695 encoded=True, 696 ) 697 698 699# TODO consider defining encoders/decoders using the util.compressionengine 700# mechanism. 701 702 703class identityencoder(object): 704 """Encoder for the "identity" stream encoding profile.""" 705 706 def __init__(self, ui): 707 pass 708 709 def encode(self, data): 710 return data 711 712 def flush(self): 713 return b'' 714 715 def finish(self): 716 return b'' 717 718 719class identitydecoder(object): 720 """Decoder for the "identity" stream encoding profile.""" 721 722 def __init__(self, ui, extraobjs): 723 if extraobjs: 724 raise error.Abort( 725 _(b'identity decoder received unexpected additional values') 726 ) 727 728 def decode(self, data): 729 return data 730 731 732class zlibencoder(object): 733 def __init__(self, ui): 734 import zlib 735 736 self._zlib = zlib 737 self._compressor = zlib.compressobj() 738 739 def encode(self, data): 740 return self._compressor.compress(data) 741 742 def flush(self): 743 # Z_SYNC_FLUSH doesn't reset compression context, which is 744 # what we want. 745 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) 746 747 def finish(self): 748 res = self._compressor.flush(self._zlib.Z_FINISH) 749 self._compressor = None 750 return res 751 752 753class zlibdecoder(object): 754 def __init__(self, ui, extraobjs): 755 import zlib 756 757 if extraobjs: 758 raise error.Abort( 759 _(b'zlib decoder received unexpected additional values') 760 ) 761 762 self._decompressor = zlib.decompressobj() 763 764 def decode(self, data): 765 # Python 2's zlib module doesn't use the buffer protocol and can't 766 # handle all bytes-like types. 767 if not pycompat.ispy3 and isinstance(data, bytearray): 768 data = bytes(data) 769 770 return self._decompressor.decompress(data) 771 772 773class zstdbaseencoder(object): 774 def __init__(self, level): 775 from . import zstd 776 777 self._zstd = zstd 778 cctx = zstd.ZstdCompressor(level=level) 779 self._compressor = cctx.compressobj() 780 781 def encode(self, data): 782 return self._compressor.compress(data) 783 784 def flush(self): 785 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the 786 # compressor and allows a decompressor to access all encoded data 787 # up to this point. 788 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) 789 790 def finish(self): 791 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) 792 self._compressor = None 793 return res 794 795 796class zstd8mbencoder(zstdbaseencoder): 797 def __init__(self, ui): 798 super(zstd8mbencoder, self).__init__(3) 799 800 801class zstdbasedecoder(object): 802 def __init__(self, maxwindowsize): 803 from . import zstd 804 805 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) 806 self._decompressor = dctx.decompressobj() 807 808 def decode(self, data): 809 return self._decompressor.decompress(data) 810 811 812class zstd8mbdecoder(zstdbasedecoder): 813 def __init__(self, ui, extraobjs): 814 if extraobjs: 815 raise error.Abort( 816 _(b'zstd8mb decoder received unexpected additional values') 817 ) 818 819 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) 820 821 822# We lazily populate this to avoid excessive module imports when importing 823# this module. 824STREAM_ENCODERS = {} 825STREAM_ENCODERS_ORDER = [] 826 827 828def populatestreamencoders(): 829 if STREAM_ENCODERS: 830 return 831 832 try: 833 from . import zstd 834 835 zstd.__version__ 836 except ImportError: 837 zstd = None 838 839 # zstandard is fastest and is preferred. 840 if zstd: 841 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) 842 STREAM_ENCODERS_ORDER.append(b'zstd-8mb') 843 844 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) 845 STREAM_ENCODERS_ORDER.append(b'zlib') 846 847 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) 848 STREAM_ENCODERS_ORDER.append(b'identity') 849 850 851class stream(object): 852 """Represents a logical unidirectional series of frames.""" 853 854 def __init__(self, streamid, active=False): 855 self.streamid = streamid 856 self._active = active 857 858 def makeframe(self, requestid, typeid, flags, payload): 859 """Create a frame to be sent out over this stream. 860 861 Only returns the frame instance. Does not actually send it. 862 """ 863 streamflags = 0 864 if not self._active: 865 streamflags |= STREAM_FLAG_BEGIN_STREAM 866 self._active = True 867 868 return makeframe( 869 requestid, self.streamid, streamflags, typeid, flags, payload 870 ) 871 872 873class inputstream(stream): 874 """Represents a stream used for receiving data.""" 875 876 def __init__(self, streamid, active=False): 877 super(inputstream, self).__init__(streamid, active=active) 878 self._decoder = None 879 880 def setdecoder(self, ui, name, extraobjs): 881 """Set the decoder for this stream. 882 883 Receives the stream profile name and any additional CBOR objects 884 decoded from the stream encoding settings frame payloads. 885 """ 886 if name not in STREAM_ENCODERS: 887 raise error.Abort(_(b'unknown stream decoder: %s') % name) 888 889 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) 890 891 def decode(self, data): 892 # Default is identity decoder. We don't bother instantiating one 893 # because it is trivial. 894 if not self._decoder: 895 return data 896 897 return self._decoder.decode(data) 898 899 def flush(self): 900 if not self._decoder: 901 return b'' 902 903 return self._decoder.flush() 904 905 906class outputstream(stream): 907 """Represents a stream used for sending data.""" 908 909 def __init__(self, streamid, active=False): 910 super(outputstream, self).__init__(streamid, active=active) 911 self.streamsettingssent = False 912 self._encoder = None 913 self._encodername = None 914 915 def setencoder(self, ui, name): 916 """Set the encoder for this stream. 917 918 Receives the stream profile name. 919 """ 920 if name not in STREAM_ENCODERS: 921 raise error.Abort(_(b'unknown stream encoder: %s') % name) 922 923 self._encoder = STREAM_ENCODERS[name][0](ui) 924 self._encodername = name 925 926 def encode(self, data): 927 if not self._encoder: 928 return data 929 930 return self._encoder.encode(data) 931 932 def flush(self): 933 if not self._encoder: 934 return b'' 935 936 return self._encoder.flush() 937 938 def finish(self): 939 if not self._encoder: 940 return b'' 941 942 self._encoder.finish() 943 944 def makeframe(self, requestid, typeid, flags, payload, encoded=False): 945 """Create a frame to be sent out over this stream. 946 947 Only returns the frame instance. Does not actually send it. 948 """ 949 streamflags = 0 950 if not self._active: 951 streamflags |= STREAM_FLAG_BEGIN_STREAM 952 self._active = True 953 954 if encoded: 955 if not self.streamsettingssent: 956 raise error.ProgrammingError( 957 b'attempting to send encoded frame without sending stream ' 958 b'settings' 959 ) 960 961 streamflags |= STREAM_FLAG_ENCODING_APPLIED 962 963 if ( 964 typeid == FRAME_TYPE_STREAM_SETTINGS 965 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS 966 ): 967 self.streamsettingssent = True 968 969 return makeframe( 970 requestid, self.streamid, streamflags, typeid, flags, payload 971 ) 972 973 def makestreamsettingsframe(self, requestid): 974 """Create a stream settings frame for this stream. 975 976 Returns frame data or None if no stream settings frame is needed or has 977 already been sent. 978 """ 979 if not self._encoder or self.streamsettingssent: 980 return None 981 982 payload = b''.join(cborutil.streamencode(self._encodername)) 983 return self.makeframe( 984 requestid, 985 FRAME_TYPE_STREAM_SETTINGS, 986 FLAG_STREAM_ENCODING_SETTINGS_EOS, 987 payload, 988 ) 989 990 991def ensureserverstream(stream): 992 if stream.streamid % 2: 993 raise error.ProgrammingError( 994 b'server should only write to even ' 995 b'numbered streams; %d is not even' % stream.streamid 996 ) 997 998 999DEFAULT_PROTOCOL_SETTINGS = { 1000 b'contentencodings': [b'identity'], 1001} 1002 1003 1004class serverreactor(object): 1005 """Holds state of a server handling frame-based protocol requests. 1006 1007 This class is the "brain" of the unified frame-based protocol server 1008 component. While the protocol is stateless from the perspective of 1009 requests/commands, something needs to track which frames have been 1010 received, what frames to expect, etc. This class is that thing. 1011 1012 Instances are modeled as a state machine of sorts. Instances are also 1013 reactionary to external events. The point of this class is to encapsulate 1014 the state of the connection and the exchange of frames, not to perform 1015 work. Instead, callers tell this class when something occurs, like a 1016 frame arriving. If that activity is worthy of a follow-up action (say 1017 *run a command*), the return value of that handler will say so. 1018 1019 I/O and CPU intensive operations are purposefully delegated outside of 1020 this class. 1021 1022 Consumers are expected to tell instances when events occur. They do so by 1023 calling the various ``on*`` methods. These methods return a 2-tuple 1024 describing any follow-up action(s) to take. The first element is the 1025 name of an action to perform. The second is a data structure (usually 1026 a dict) specific to that action that contains more information. e.g. 1027 if the server wants to send frames back to the client, the data structure 1028 will contain a reference to those frames. 1029 1030 Valid actions that consumers can be instructed to take are: 1031 1032 sendframes 1033 Indicates that frames should be sent to the client. The ``framegen`` 1034 key contains a generator of frames that should be sent. The server 1035 assumes that all frames are sent to the client. 1036 1037 error 1038 Indicates that an error occurred. Consumer should probably abort. 1039 1040 runcommand 1041 Indicates that the consumer should run a wire protocol command. Details 1042 of the command to run are given in the data structure. 1043 1044 wantframe 1045 Indicates that nothing of interest happened and the server is waiting on 1046 more frames from the client before anything interesting can be done. 1047 1048 noop 1049 Indicates no additional action is required. 1050 1051 Known Issues 1052 ------------ 1053 1054 There are no limits to the number of partially received commands or their 1055 size. A malicious client could stream command request data and exhaust the 1056 server's memory. 1057 1058 Partially received commands are not acted upon when end of input is 1059 reached. Should the server error if it receives a partial request? 1060 Should the client send a message to abort a partially transmitted request 1061 to facilitate graceful shutdown? 1062 1063 Active requests that haven't been responded to aren't tracked. This means 1064 that if we receive a command and instruct its dispatch, another command 1065 with its request ID can come in over the wire and there will be a race 1066 between who responds to what. 1067 """ 1068 1069 def __init__(self, ui, deferoutput=False): 1070 """Construct a new server reactor. 1071 1072 ``deferoutput`` can be used to indicate that no output frames should be 1073 instructed to be sent until input has been exhausted. In this mode, 1074 events that would normally generate output frames (such as a command 1075 response being ready) will instead defer instructing the consumer to 1076 send those frames. This is useful for half-duplex transports where the 1077 sender cannot receive until all data has been transmitted. 1078 """ 1079 self._ui = ui 1080 self._deferoutput = deferoutput 1081 self._state = b'initial' 1082 self._nextoutgoingstreamid = 2 1083 self._bufferedframegens = [] 1084 # stream id -> stream instance for all active streams from the client. 1085 self._incomingstreams = {} 1086 self._outgoingstreams = {} 1087 # request id -> dict of commands that are actively being received. 1088 self._receivingcommands = {} 1089 # Request IDs that have been received and are actively being processed. 1090 # Once all output for a request has been sent, it is removed from this 1091 # set. 1092 self._activecommands = set() 1093 1094 self._protocolsettingsdecoder = None 1095 1096 # Sender protocol settings are optional. Set implied default values. 1097 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) 1098 1099 populatestreamencoders() 1100 1101 def onframerecv(self, frame): 1102 """Process a frame that has been received off the wire. 1103 1104 Returns a dict with an ``action`` key that details what action, 1105 if any, the consumer should take next. 1106 """ 1107 if not frame.streamid % 2: 1108 self._state = b'errored' 1109 return self._makeerrorresult( 1110 _(b'received frame with even numbered stream ID: %d') 1111 % frame.streamid 1112 ) 1113 1114 if frame.streamid not in self._incomingstreams: 1115 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: 1116 self._state = b'errored' 1117 return self._makeerrorresult( 1118 _( 1119 b'received frame on unknown inactive stream without ' 1120 b'beginning of stream flag set' 1121 ) 1122 ) 1123 1124 self._incomingstreams[frame.streamid] = inputstream(frame.streamid) 1125 1126 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: 1127 # TODO handle decoding frames 1128 self._state = b'errored' 1129 raise error.ProgrammingError( 1130 b'support for decoding stream payloads not yet implemented' 1131 ) 1132 1133 if frame.streamflags & STREAM_FLAG_END_STREAM: 1134 del self._incomingstreams[frame.streamid] 1135 1136 handlers = { 1137 b'initial': self._onframeinitial, 1138 b'protocol-settings-receiving': self._onframeprotocolsettings, 1139 b'idle': self._onframeidle, 1140 b'command-receiving': self._onframecommandreceiving, 1141 b'errored': self._onframeerrored, 1142 } 1143 1144 meth = handlers.get(self._state) 1145 if not meth: 1146 raise error.ProgrammingError(b'unhandled state: %s' % self._state) 1147 1148 return meth(frame) 1149 1150 def oncommandresponsereadyobjects(self, stream, requestid, objs): 1151 """Signal that objects are ready to be sent to the client. 1152 1153 ``objs`` is an iterable of objects (typically a generator) that will 1154 be encoded via CBOR and added to frames, which will be sent to the 1155 client. 1156 """ 1157 ensureserverstream(stream) 1158 1159 # A more robust solution would be to check for objs.{next,__next__}. 1160 if isinstance(objs, list): 1161 objs = iter(objs) 1162 1163 # We need to take care over exception handling. Uncaught exceptions 1164 # when generating frames could lead to premature end of the frame 1165 # stream and the possibility of the server or client process getting 1166 # in a bad state. 1167 # 1168 # Keep in mind that if ``objs`` is a generator, advancing it could 1169 # raise exceptions that originated in e.g. wire protocol command 1170 # functions. That is why we differentiate between exceptions raised 1171 # when iterating versus other exceptions that occur. 1172 # 1173 # In all cases, when the function finishes, the request is fully 1174 # handled and no new frames for it should be seen. 1175 1176 def sendframes(): 1177 emitted = False 1178 alternatelocationsent = False 1179 emitter = bufferingcommandresponseemitter(stream, requestid) 1180 while True: 1181 try: 1182 o = next(objs) 1183 except StopIteration: 1184 for frame in emitter.send(None): 1185 yield frame 1186 1187 if emitted: 1188 for frame in createcommandresponseeosframes( 1189 stream, requestid 1190 ): 1191 yield frame 1192 break 1193 1194 except error.WireprotoCommandError as e: 1195 for frame in createcommanderrorresponse( 1196 stream, requestid, e.message, e.messageargs 1197 ): 1198 yield frame 1199 break 1200 1201 except Exception as e: 1202 for frame in createerrorframe( 1203 stream, 1204 requestid, 1205 b'%s' % stringutil.forcebytestr(e), 1206 errtype=b'server', 1207 ): 1208 1209 yield frame 1210 1211 break 1212 1213 try: 1214 # Alternate location responses can only be the first and 1215 # only object in the output stream. 1216 if isinstance(o, wireprototypes.alternatelocationresponse): 1217 if emitted: 1218 raise error.ProgrammingError( 1219 b'alternatelocationresponse seen after initial ' 1220 b'output object' 1221 ) 1222 1223 frame = stream.makestreamsettingsframe(requestid) 1224 if frame: 1225 yield frame 1226 1227 yield createalternatelocationresponseframe( 1228 stream, requestid, o 1229 ) 1230 1231 alternatelocationsent = True 1232 emitted = True 1233 continue 1234 1235 if alternatelocationsent: 1236 raise error.ProgrammingError( 1237 b'object follows alternatelocationresponse' 1238 ) 1239 1240 if not emitted: 1241 # Frame is optional. 1242 frame = stream.makestreamsettingsframe(requestid) 1243 if frame: 1244 yield frame 1245 1246 # May be None if empty frame (due to encoding). 1247 frame = createcommandresponseokframe(stream, requestid) 1248 if frame: 1249 yield frame 1250 1251 emitted = True 1252 1253 # Objects emitted by command functions can be serializable 1254 # data structures or special types. 1255 # TODO consider extracting the content normalization to a 1256 # standalone function, as it may be useful for e.g. cachers. 1257 1258 # A pre-encoded object is sent directly to the emitter. 1259 if isinstance(o, wireprototypes.encodedresponse): 1260 for frame in emitter.send(o.data): 1261 yield frame 1262 1263 elif isinstance( 1264 o, wireprototypes.indefinitebytestringresponse 1265 ): 1266 for chunk in cborutil.streamencodebytestringfromiter( 1267 o.chunks 1268 ): 1269 1270 for frame in emitter.send(chunk): 1271 yield frame 1272 1273 # A regular object is CBOR encoded. 1274 else: 1275 for chunk in cborutil.streamencode(o): 1276 for frame in emitter.send(chunk): 1277 yield frame 1278 1279 except Exception as e: 1280 for frame in createerrorframe( 1281 stream, requestid, b'%s' % e, errtype=b'server' 1282 ): 1283 yield frame 1284 1285 break 1286 1287 self._activecommands.remove(requestid) 1288 1289 return self._handlesendframes(sendframes()) 1290 1291 def oninputeof(self): 1292 """Signals that end of input has been received. 1293 1294 No more frames will be received. All pending activity should be 1295 completed. 1296 """ 1297 # TODO should we do anything about in-flight commands? 1298 1299 if not self._deferoutput or not self._bufferedframegens: 1300 return b'noop', {} 1301 1302 # If we buffered all our responses, emit those. 1303 def makegen(): 1304 for gen in self._bufferedframegens: 1305 for frame in gen: 1306 yield frame 1307 1308 return b'sendframes', { 1309 b'framegen': makegen(), 1310 } 1311 1312 def _handlesendframes(self, framegen): 1313 if self._deferoutput: 1314 self._bufferedframegens.append(framegen) 1315 return b'noop', {} 1316 else: 1317 return b'sendframes', { 1318 b'framegen': framegen, 1319 } 1320 1321 def onservererror(self, stream, requestid, msg): 1322 ensureserverstream(stream) 1323 1324 def sendframes(): 1325 for frame in createerrorframe( 1326 stream, requestid, msg, errtype=b'server' 1327 ): 1328 yield frame 1329 1330 self._activecommands.remove(requestid) 1331 1332 return self._handlesendframes(sendframes()) 1333 1334 def oncommanderror(self, stream, requestid, message, args=None): 1335 """Called when a command encountered an error before sending output.""" 1336 ensureserverstream(stream) 1337 1338 def sendframes(): 1339 for frame in createcommanderrorresponse( 1340 stream, requestid, message, args 1341 ): 1342 yield frame 1343 1344 self._activecommands.remove(requestid) 1345 1346 return self._handlesendframes(sendframes()) 1347 1348 def makeoutputstream(self): 1349 """Create a stream to be used for sending data to the client. 1350 1351 If this is called before protocol settings frames are received, we 1352 don't know what stream encodings are supported by the client and 1353 we will default to identity. 1354 """ 1355 streamid = self._nextoutgoingstreamid 1356 self._nextoutgoingstreamid += 2 1357 1358 s = outputstream(streamid) 1359 self._outgoingstreams[streamid] = s 1360 1361 # Always use the *server's* preferred encoder over the client's, 1362 # as servers have more to lose from sub-optimal encoders being used. 1363 for name in STREAM_ENCODERS_ORDER: 1364 if name in self._sendersettings[b'contentencodings']: 1365 s.setencoder(self._ui, name) 1366 break 1367 1368 return s 1369 1370 def _makeerrorresult(self, msg): 1371 return b'error', { 1372 b'message': msg, 1373 } 1374 1375 def _makeruncommandresult(self, requestid): 1376 entry = self._receivingcommands[requestid] 1377 1378 if not entry[b'requestdone']: 1379 self._state = b'errored' 1380 raise error.ProgrammingError( 1381 b'should not be called without requestdone set' 1382 ) 1383 1384 del self._receivingcommands[requestid] 1385 1386 if self._receivingcommands: 1387 self._state = b'command-receiving' 1388 else: 1389 self._state = b'idle' 1390 1391 # Decode the payloads as CBOR. 1392 entry[b'payload'].seek(0) 1393 request = cborutil.decodeall(entry[b'payload'].getvalue())[0] 1394 1395 if b'name' not in request: 1396 self._state = b'errored' 1397 return self._makeerrorresult( 1398 _(b'command request missing "name" field') 1399 ) 1400 1401 if b'args' not in request: 1402 request[b'args'] = {} 1403 1404 assert requestid not in self._activecommands 1405 self._activecommands.add(requestid) 1406 1407 return ( 1408 b'runcommand', 1409 { 1410 b'requestid': requestid, 1411 b'command': request[b'name'], 1412 b'args': request[b'args'], 1413 b'redirect': request.get(b'redirect'), 1414 b'data': entry[b'data'].getvalue() if entry[b'data'] else None, 1415 }, 1416 ) 1417 1418 def _makewantframeresult(self): 1419 return b'wantframe', { 1420 b'state': self._state, 1421 } 1422 1423 def _validatecommandrequestframe(self, frame): 1424 new = frame.flags & FLAG_COMMAND_REQUEST_NEW 1425 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION 1426 1427 if new and continuation: 1428 self._state = b'errored' 1429 return self._makeerrorresult( 1430 _( 1431 b'received command request frame with both new and ' 1432 b'continuation flags set' 1433 ) 1434 ) 1435 1436 if not new and not continuation: 1437 self._state = b'errored' 1438 return self._makeerrorresult( 1439 _( 1440 b'received command request frame with neither new nor ' 1441 b'continuation flags set' 1442 ) 1443 ) 1444 1445 def _onframeinitial(self, frame): 1446 # Called when we receive a frame when in the "initial" state. 1447 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: 1448 self._state = b'protocol-settings-receiving' 1449 self._protocolsettingsdecoder = cborutil.bufferingdecoder() 1450 return self._onframeprotocolsettings(frame) 1451 1452 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST: 1453 self._state = b'idle' 1454 return self._onframeidle(frame) 1455 1456 else: 1457 self._state = b'errored' 1458 return self._makeerrorresult( 1459 _( 1460 b'expected sender protocol settings or command request ' 1461 b'frame; got %d' 1462 ) 1463 % frame.typeid 1464 ) 1465 1466 def _onframeprotocolsettings(self, frame): 1467 assert self._state == b'protocol-settings-receiving' 1468 assert self._protocolsettingsdecoder is not None 1469 1470 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: 1471 self._state = b'errored' 1472 return self._makeerrorresult( 1473 _(b'expected sender protocol settings frame; got %d') 1474 % frame.typeid 1475 ) 1476 1477 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION 1478 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS 1479 1480 if more and eos: 1481 self._state = b'errored' 1482 return self._makeerrorresult( 1483 _( 1484 b'sender protocol settings frame cannot have both ' 1485 b'continuation and end of stream flags set' 1486 ) 1487 ) 1488 1489 if not more and not eos: 1490 self._state = b'errored' 1491 return self._makeerrorresult( 1492 _( 1493 b'sender protocol settings frame must have continuation or ' 1494 b'end of stream flag set' 1495 ) 1496 ) 1497 1498 # TODO establish limits for maximum amount of data that can be 1499 # buffered. 1500 try: 1501 self._protocolsettingsdecoder.decode(frame.payload) 1502 except Exception as e: 1503 self._state = b'errored' 1504 return self._makeerrorresult( 1505 _( 1506 b'error decoding CBOR from sender protocol settings frame: %s' 1507 ) 1508 % stringutil.forcebytestr(e) 1509 ) 1510 1511 if more: 1512 return self._makewantframeresult() 1513 1514 assert eos 1515 1516 decoded = self._protocolsettingsdecoder.getavailable() 1517 self._protocolsettingsdecoder = None 1518 1519 if not decoded: 1520 self._state = b'errored' 1521 return self._makeerrorresult( 1522 _(b'sender protocol settings frame did not contain CBOR data') 1523 ) 1524 elif len(decoded) > 1: 1525 self._state = b'errored' 1526 return self._makeerrorresult( 1527 _( 1528 b'sender protocol settings frame contained multiple CBOR ' 1529 b'values' 1530 ) 1531 ) 1532 1533 d = decoded[0] 1534 1535 if b'contentencodings' in d: 1536 self._sendersettings[b'contentencodings'] = d[b'contentencodings'] 1537 1538 self._state = b'idle' 1539 1540 return self._makewantframeresult() 1541 1542 def _onframeidle(self, frame): 1543 # The only frame type that should be received in this state is a 1544 # command request. 1545 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST: 1546 self._state = b'errored' 1547 return self._makeerrorresult( 1548 _(b'expected command request frame; got %d') % frame.typeid 1549 ) 1550 1551 res = self._validatecommandrequestframe(frame) 1552 if res: 1553 return res 1554 1555 if frame.requestid in self._receivingcommands: 1556 self._state = b'errored' 1557 return self._makeerrorresult( 1558 _(b'request with ID %d already received') % frame.requestid 1559 ) 1560 1561 if frame.requestid in self._activecommands: 1562 self._state = b'errored' 1563 return self._makeerrorresult( 1564 _(b'request with ID %d is already active') % frame.requestid 1565 ) 1566 1567 new = frame.flags & FLAG_COMMAND_REQUEST_NEW 1568 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES 1569 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA 1570 1571 if not new: 1572 self._state = b'errored' 1573 return self._makeerrorresult( 1574 _(b'received command request frame without new flag set') 1575 ) 1576 1577 payload = util.bytesio() 1578 payload.write(frame.payload) 1579 1580 self._receivingcommands[frame.requestid] = { 1581 b'payload': payload, 1582 b'data': None, 1583 b'requestdone': not moreframes, 1584 b'expectingdata': bool(expectingdata), 1585 } 1586 1587 # This is the final frame for this request. Dispatch it. 1588 if not moreframes and not expectingdata: 1589 return self._makeruncommandresult(frame.requestid) 1590 1591 assert moreframes or expectingdata 1592 self._state = b'command-receiving' 1593 return self._makewantframeresult() 1594 1595 def _onframecommandreceiving(self, frame): 1596 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: 1597 # Process new command requests as such. 1598 if frame.flags & FLAG_COMMAND_REQUEST_NEW: 1599 return self._onframeidle(frame) 1600 1601 res = self._validatecommandrequestframe(frame) 1602 if res: 1603 return res 1604 1605 # All other frames should be related to a command that is currently 1606 # receiving but is not active. 1607 if frame.requestid in self._activecommands: 1608 self._state = b'errored' 1609 return self._makeerrorresult( 1610 _(b'received frame for request that is still active: %d') 1611 % frame.requestid 1612 ) 1613 1614 if frame.requestid not in self._receivingcommands: 1615 self._state = b'errored' 1616 return self._makeerrorresult( 1617 _(b'received frame for request that is not receiving: %d') 1618 % frame.requestid 1619 ) 1620 1621 entry = self._receivingcommands[frame.requestid] 1622 1623 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST: 1624 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES 1625 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA) 1626 1627 if entry[b'requestdone']: 1628 self._state = b'errored' 1629 return self._makeerrorresult( 1630 _( 1631 b'received command request frame when request frames ' 1632 b'were supposedly done' 1633 ) 1634 ) 1635 1636 if expectingdata != entry[b'expectingdata']: 1637 self._state = b'errored' 1638 return self._makeerrorresult( 1639 _(b'mismatch between expect data flag and previous frame') 1640 ) 1641 1642 entry[b'payload'].write(frame.payload) 1643 1644 if not moreframes: 1645 entry[b'requestdone'] = True 1646 1647 if not moreframes and not expectingdata: 1648 return self._makeruncommandresult(frame.requestid) 1649 1650 return self._makewantframeresult() 1651 1652 elif frame.typeid == FRAME_TYPE_COMMAND_DATA: 1653 if not entry[b'expectingdata']: 1654 self._state = b'errored' 1655 return self._makeerrorresult( 1656 _( 1657 b'received command data frame for request that is not ' 1658 b'expecting data: %d' 1659 ) 1660 % frame.requestid 1661 ) 1662 1663 if entry[b'data'] is None: 1664 entry[b'data'] = util.bytesio() 1665 1666 return self._handlecommanddataframe(frame, entry) 1667 else: 1668 self._state = b'errored' 1669 return self._makeerrorresult( 1670 _(b'received unexpected frame type: %d') % frame.typeid 1671 ) 1672 1673 def _handlecommanddataframe(self, frame, entry): 1674 assert frame.typeid == FRAME_TYPE_COMMAND_DATA 1675 1676 # TODO support streaming data instead of buffering it. 1677 entry[b'data'].write(frame.payload) 1678 1679 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION: 1680 return self._makewantframeresult() 1681 elif frame.flags & FLAG_COMMAND_DATA_EOS: 1682 entry[b'data'].seek(0) 1683 return self._makeruncommandresult(frame.requestid) 1684 else: 1685 self._state = b'errored' 1686 return self._makeerrorresult(_(b'command data frame without flags')) 1687 1688 def _onframeerrored(self, frame): 1689 return self._makeerrorresult(_(b'server already errored')) 1690 1691 1692class commandrequest(object): 1693 """Represents a request to run a command.""" 1694 1695 def __init__(self, requestid, name, args, datafh=None, redirect=None): 1696 self.requestid = requestid 1697 self.name = name 1698 self.args = args 1699 self.datafh = datafh 1700 self.redirect = redirect 1701 self.state = b'pending' 1702 1703 1704class clientreactor(object): 1705 """Holds state of a client issuing frame-based protocol requests. 1706 1707 This is like ``serverreactor`` but for client-side state. 1708 1709 Each instance is bound to the lifetime of a connection. For persistent 1710 connection transports using e.g. TCP sockets and speaking the raw 1711 framing protocol, there will be a single instance for the lifetime of 1712 the TCP socket. For transports where there are multiple discrete 1713 interactions (say tunneled within in HTTP request), there will be a 1714 separate instance for each distinct interaction. 1715 1716 Consumers are expected to tell instances when events occur by calling 1717 various methods. These methods return a 2-tuple describing any follow-up 1718 action(s) to take. The first element is the name of an action to 1719 perform. The second is a data structure (usually a dict) specific to 1720 that action that contains more information. e.g. if the reactor wants 1721 to send frames to the server, the data structure will contain a reference 1722 to those frames. 1723 1724 Valid actions that consumers can be instructed to take are: 1725 1726 noop 1727 Indicates no additional action is required. 1728 1729 sendframes 1730 Indicates that frames should be sent to the server. The ``framegen`` 1731 key contains a generator of frames that should be sent. The reactor 1732 assumes that all frames in this generator are sent to the server. 1733 1734 error 1735 Indicates that an error occurred. The ``message`` key contains an 1736 error message describing the failure. 1737 1738 responsedata 1739 Indicates a response to a previously-issued command was received. 1740 1741 The ``request`` key contains the ``commandrequest`` instance that 1742 represents the request this data is for. 1743 1744 The ``data`` key contains the decoded data from the server. 1745 1746 ``expectmore`` and ``eos`` evaluate to True when more response data 1747 is expected to follow or we're at the end of the response stream, 1748 respectively. 1749 """ 1750 1751 def __init__( 1752 self, 1753 ui, 1754 hasmultiplesend=False, 1755 buffersends=True, 1756 clientcontentencoders=None, 1757 ): 1758 """Create a new instance. 1759 1760 ``hasmultiplesend`` indicates whether multiple sends are supported 1761 by the transport. When True, it is possible to send commands immediately 1762 instead of buffering until the caller signals an intent to finish a 1763 send operation. 1764 1765 ``buffercommands`` indicates whether sends should be buffered until the 1766 last request has been issued. 1767 1768 ``clientcontentencoders`` is an iterable of content encoders the client 1769 will advertise to the server and that the server can use for encoding 1770 data. If not defined, the client will not advertise content encoders 1771 to the server. 1772 """ 1773 self._ui = ui 1774 self._hasmultiplesend = hasmultiplesend 1775 self._buffersends = buffersends 1776 self._clientcontentencoders = clientcontentencoders 1777 1778 self._canissuecommands = True 1779 self._cansend = True 1780 self._protocolsettingssent = False 1781 1782 self._nextrequestid = 1 1783 # We only support a single outgoing stream for now. 1784 self._outgoingstream = outputstream(1) 1785 self._pendingrequests = collections.deque() 1786 self._activerequests = {} 1787 self._incomingstreams = {} 1788 self._streamsettingsdecoders = {} 1789 1790 populatestreamencoders() 1791 1792 def callcommand(self, name, args, datafh=None, redirect=None): 1793 """Request that a command be executed. 1794 1795 Receives the command name, a dict of arguments to pass to the command, 1796 and an optional file object containing the raw data for the command. 1797 1798 Returns a 3-tuple of (request, action, action data). 1799 """ 1800 if not self._canissuecommands: 1801 raise error.ProgrammingError(b'cannot issue new commands') 1802 1803 requestid = self._nextrequestid 1804 self._nextrequestid += 2 1805 1806 request = commandrequest( 1807 requestid, name, args, datafh=datafh, redirect=redirect 1808 ) 1809 1810 if self._buffersends: 1811 self._pendingrequests.append(request) 1812 return request, b'noop', {} 1813 else: 1814 if not self._cansend: 1815 raise error.ProgrammingError( 1816 b'sends cannot be performed on this instance' 1817 ) 1818 1819 if not self._hasmultiplesend: 1820 self._cansend = False 1821 self._canissuecommands = False 1822 1823 return ( 1824 request, 1825 b'sendframes', 1826 { 1827 b'framegen': self._makecommandframes(request), 1828 }, 1829 ) 1830 1831 def flushcommands(self): 1832 """Request that all queued commands be sent. 1833 1834 If any commands are buffered, this will instruct the caller to send 1835 them over the wire. If no commands are buffered it instructs the client 1836 to no-op. 1837 1838 If instances aren't configured for multiple sends, no new command 1839 requests are allowed after this is called. 1840 """ 1841 if not self._pendingrequests: 1842 return b'noop', {} 1843 1844 if not self._cansend: 1845 raise error.ProgrammingError( 1846 b'sends cannot be performed on this instance' 1847 ) 1848 1849 # If the instance only allows sending once, mark that we have fired 1850 # our one shot. 1851 if not self._hasmultiplesend: 1852 self._canissuecommands = False 1853 self._cansend = False 1854 1855 def makeframes(): 1856 while self._pendingrequests: 1857 request = self._pendingrequests.popleft() 1858 for frame in self._makecommandframes(request): 1859 yield frame 1860 1861 return b'sendframes', { 1862 b'framegen': makeframes(), 1863 } 1864 1865 def _makecommandframes(self, request): 1866 """Emit frames to issue a command request. 1867 1868 As a side-effect, update request accounting to reflect its changed 1869 state. 1870 """ 1871 self._activerequests[request.requestid] = request 1872 request.state = b'sending' 1873 1874 if not self._protocolsettingssent and self._clientcontentencoders: 1875 self._protocolsettingssent = True 1876 1877 payload = b''.join( 1878 cborutil.streamencode( 1879 { 1880 b'contentencodings': self._clientcontentencoders, 1881 } 1882 ) 1883 ) 1884 1885 yield self._outgoingstream.makeframe( 1886 requestid=request.requestid, 1887 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS, 1888 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS, 1889 payload=payload, 1890 ) 1891 1892 res = createcommandframes( 1893 self._outgoingstream, 1894 request.requestid, 1895 request.name, 1896 request.args, 1897 datafh=request.datafh, 1898 redirect=request.redirect, 1899 ) 1900 1901 for frame in res: 1902 yield frame 1903 1904 request.state = b'sent' 1905 1906 def onframerecv(self, frame): 1907 """Process a frame that has been received off the wire. 1908 1909 Returns a 2-tuple of (action, meta) describing further action the 1910 caller needs to take as a result of receiving this frame. 1911 """ 1912 if frame.streamid % 2: 1913 return ( 1914 b'error', 1915 { 1916 b'message': ( 1917 _(b'received frame with odd numbered stream ID: %d') 1918 % frame.streamid 1919 ), 1920 }, 1921 ) 1922 1923 if frame.streamid not in self._incomingstreams: 1924 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM: 1925 return ( 1926 b'error', 1927 { 1928 b'message': _( 1929 b'received frame on unknown stream ' 1930 b'without beginning of stream flag set' 1931 ), 1932 }, 1933 ) 1934 1935 self._incomingstreams[frame.streamid] = inputstream(frame.streamid) 1936 1937 stream = self._incomingstreams[frame.streamid] 1938 1939 # If the payload is encoded, ask the stream to decode it. We 1940 # merely substitute the decoded result into the frame payload as 1941 # if it had been transferred all along. 1942 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED: 1943 frame.payload = stream.decode(frame.payload) 1944 1945 if frame.streamflags & STREAM_FLAG_END_STREAM: 1946 del self._incomingstreams[frame.streamid] 1947 1948 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: 1949 return self._onstreamsettingsframe(frame) 1950 1951 if frame.requestid not in self._activerequests: 1952 return ( 1953 b'error', 1954 { 1955 b'message': ( 1956 _(b'received frame for inactive request ID: %d') 1957 % frame.requestid 1958 ), 1959 }, 1960 ) 1961 1962 request = self._activerequests[frame.requestid] 1963 request.state = b'receiving' 1964 1965 handlers = { 1966 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe, 1967 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe, 1968 } 1969 1970 meth = handlers.get(frame.typeid) 1971 if not meth: 1972 raise error.ProgrammingError( 1973 b'unhandled frame type: %d' % frame.typeid 1974 ) 1975 1976 return meth(request, frame) 1977 1978 def _onstreamsettingsframe(self, frame): 1979 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS 1980 1981 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION 1982 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS 1983 1984 if more and eos: 1985 return ( 1986 b'error', 1987 { 1988 b'message': ( 1989 _( 1990 b'stream encoding settings frame cannot have both ' 1991 b'continuation and end of stream flags set' 1992 ) 1993 ), 1994 }, 1995 ) 1996 1997 if not more and not eos: 1998 return ( 1999 b'error', 2000 { 2001 b'message': _( 2002 b'stream encoding settings frame must have ' 2003 b'continuation or end of stream flag set' 2004 ), 2005 }, 2006 ) 2007 2008 if frame.streamid not in self._streamsettingsdecoders: 2009 decoder = cborutil.bufferingdecoder() 2010 self._streamsettingsdecoders[frame.streamid] = decoder 2011 2012 decoder = self._streamsettingsdecoders[frame.streamid] 2013 2014 try: 2015 decoder.decode(frame.payload) 2016 except Exception as e: 2017 return ( 2018 b'error', 2019 { 2020 b'message': ( 2021 _( 2022 b'error decoding CBOR from stream encoding ' 2023 b'settings frame: %s' 2024 ) 2025 % stringutil.forcebytestr(e) 2026 ), 2027 }, 2028 ) 2029 2030 if more: 2031 return b'noop', {} 2032 2033 assert eos 2034 2035 decoded = decoder.getavailable() 2036 del self._streamsettingsdecoders[frame.streamid] 2037 2038 if not decoded: 2039 return ( 2040 b'error', 2041 { 2042 b'message': _( 2043 b'stream encoding settings frame did not contain ' 2044 b'CBOR data' 2045 ), 2046 }, 2047 ) 2048 2049 try: 2050 self._incomingstreams[frame.streamid].setdecoder( 2051 self._ui, decoded[0], decoded[1:] 2052 ) 2053 except Exception as e: 2054 return ( 2055 b'error', 2056 { 2057 b'message': ( 2058 _(b'error setting stream decoder: %s') 2059 % stringutil.forcebytestr(e) 2060 ), 2061 }, 2062 ) 2063 2064 return b'noop', {} 2065 2066 def _oncommandresponseframe(self, request, frame): 2067 if frame.flags & FLAG_COMMAND_RESPONSE_EOS: 2068 request.state = b'received' 2069 del self._activerequests[request.requestid] 2070 2071 return ( 2072 b'responsedata', 2073 { 2074 b'request': request, 2075 b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION, 2076 b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS, 2077 b'data': frame.payload, 2078 }, 2079 ) 2080 2081 def _onerrorresponseframe(self, request, frame): 2082 request.state = b'errored' 2083 del self._activerequests[request.requestid] 2084 2085 # The payload should be a CBOR map. 2086 m = cborutil.decodeall(frame.payload)[0] 2087 2088 return ( 2089 b'error', 2090 { 2091 b'request': request, 2092 b'type': m[b'type'], 2093 b'message': m[b'message'], 2094 }, 2095 ) 2096