1# Copyright 2011, Google Inc. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29"""This file provides classes and helper functions for parsing/building frames 30of the WebSocket protocol (RFC 6455). 31 32Specification: 33http://tools.ietf.org/html/rfc6455 34""" 35 36from collections import deque 37import logging 38import os 39import struct 40import time 41import socket 42import six 43 44from mod_pywebsocket import common 45from mod_pywebsocket import util 46from mod_pywebsocket._stream_exceptions import BadOperationException 47from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException 48from mod_pywebsocket._stream_exceptions import InvalidFrameException 49from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception 50from mod_pywebsocket._stream_exceptions import UnsupportedFrameException 51 52_NOOP_MASKER = util.NoopMasker() 53 54 55class Frame(object): 56 def __init__(self, 57 fin=1, 58 rsv1=0, 59 rsv2=0, 60 rsv3=0, 61 opcode=None, 62 payload=b''): 63 self.fin = fin 64 self.rsv1 = rsv1 65 self.rsv2 = rsv2 66 self.rsv3 = rsv3 67 self.opcode = opcode 68 self.payload = payload 69 70 71# Helper functions made public to be used for writing unittests for WebSocket 72# clients. 73 74 75def create_length_header(length, mask): 76 """Creates a length header. 77 78 Args: 79 length: Frame length. Must be less than 2^63. 80 mask: Mask bit. Must be boolean. 81 82 Raises: 83 ValueError: when bad data is given. 84 """ 85 86 if mask: 87 mask_bit = 1 << 7 88 else: 89 mask_bit = 0 90 91 if length < 0: 92 raise ValueError('length must be non negative integer') 93 elif length <= 125: 94 return util.pack_byte(mask_bit | length) 95 elif length < (1 << 16): 96 return util.pack_byte(mask_bit | 126) + struct.pack('!H', length) 97 elif length < (1 << 63): 98 return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length) 99 else: 100 raise ValueError('Payload is too big for one frame') 101 102 103def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask): 104 """Creates a frame header. 105 106 Raises: 107 Exception: when bad data is given. 108 """ 109 110 if opcode < 0 or 0xf < opcode: 111 raise ValueError('Opcode out of range') 112 113 if payload_length < 0 or (1 << 63) <= payload_length: 114 raise ValueError('payload_length out of range') 115 116 if (fin | rsv1 | rsv2 | rsv3) & ~1: 117 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1') 118 119 header = b'' 120 121 first_byte = ((fin << 7) 122 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4) 123 | opcode) 124 header += util.pack_byte(first_byte) 125 header += create_length_header(payload_length, mask) 126 127 return header 128 129 130def _build_frame(header, body, mask): 131 if not mask: 132 return header + body 133 134 masking_nonce = os.urandom(4) 135 masker = util.RepeatedXorMasker(masking_nonce) 136 137 return header + masking_nonce + masker.mask(body) 138 139 140def _filter_and_format_frame_object(frame, mask, frame_filters): 141 for frame_filter in frame_filters: 142 frame_filter.filter(frame) 143 144 header = create_header(frame.opcode, len(frame.payload), frame.fin, 145 frame.rsv1, frame.rsv2, frame.rsv3, mask) 146 return _build_frame(header, frame.payload, mask) 147 148 149def create_binary_frame(message, 150 opcode=common.OPCODE_BINARY, 151 fin=1, 152 mask=False, 153 frame_filters=[]): 154 """Creates a simple binary frame with no extension, reserved bit.""" 155 156 frame = Frame(fin=fin, opcode=opcode, payload=message) 157 return _filter_and_format_frame_object(frame, mask, frame_filters) 158 159 160def create_text_frame(message, 161 opcode=common.OPCODE_TEXT, 162 fin=1, 163 mask=False, 164 frame_filters=[]): 165 """Creates a simple text frame with no extension, reserved bit.""" 166 167 encoded_message = message.encode('utf-8') 168 return create_binary_frame(encoded_message, opcode, fin, mask, 169 frame_filters) 170 171 172def parse_frame(receive_bytes, 173 logger=None, 174 ws_version=common.VERSION_HYBI_LATEST, 175 unmask_receive=True): 176 """Parses a frame. Returns a tuple containing each header field and 177 payload. 178 179 Args: 180 receive_bytes: a function that reads frame data from a stream or 181 something similar. The function takes length of the bytes to be 182 read. The function must raise ConnectionTerminatedException if 183 there is not enough data to be read. 184 logger: a logging object. 185 ws_version: the version of WebSocket protocol. 186 unmask_receive: unmask received frames. When received unmasked 187 frame, raises InvalidFrameException. 188 189 Raises: 190 ConnectionTerminatedException: when receive_bytes raises it. 191 InvalidFrameException: when the frame contains invalid data. 192 """ 193 194 if not logger: 195 logger = logging.getLogger() 196 197 logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame') 198 199 first_byte = ord(receive_bytes(1)) 200 fin = (first_byte >> 7) & 1 201 rsv1 = (first_byte >> 6) & 1 202 rsv2 = (first_byte >> 5) & 1 203 rsv3 = (first_byte >> 4) & 1 204 opcode = first_byte & 0xf 205 206 second_byte = ord(receive_bytes(1)) 207 mask = (second_byte >> 7) & 1 208 payload_length = second_byte & 0x7f 209 210 logger.log( 211 common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' 212 'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask, 213 payload_length) 214 215 if (mask == 1) != unmask_receive: 216 raise InvalidFrameException( 217 'Mask bit on the received frame did\'nt match masking ' 218 'configuration for received frames') 219 220 # The HyBi and later specs disallow putting a value in 0x0-0xFFFF 221 # into the 8-octet extended payload length field (or 0x0-0xFD in 222 # 2-octet field). 223 valid_length_encoding = True 224 length_encoding_bytes = 1 225 if payload_length == 127: 226 logger.log(common.LOGLEVEL_FINE, 227 'Receive 8-octet extended payload length') 228 229 extended_payload_length = receive_bytes(8) 230 payload_length = struct.unpack('!Q', extended_payload_length)[0] 231 if payload_length > 0x7FFFFFFFFFFFFFFF: 232 raise InvalidFrameException('Extended payload length >= 2^63') 233 if ws_version >= 13 and payload_length < 0x10000: 234 valid_length_encoding = False 235 length_encoding_bytes = 8 236 237 logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s', 238 payload_length) 239 elif payload_length == 126: 240 logger.log(common.LOGLEVEL_FINE, 241 'Receive 2-octet extended payload length') 242 243 extended_payload_length = receive_bytes(2) 244 payload_length = struct.unpack('!H', extended_payload_length)[0] 245 if ws_version >= 13 and payload_length < 126: 246 valid_length_encoding = False 247 length_encoding_bytes = 2 248 249 logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s', 250 payload_length) 251 252 if not valid_length_encoding: 253 logger.warning( 254 'Payload length is not encoded using the minimal number of ' 255 'bytes (%d is encoded using %d bytes)', payload_length, 256 length_encoding_bytes) 257 258 if mask == 1: 259 logger.log(common.LOGLEVEL_FINE, 'Receive mask') 260 261 masking_nonce = receive_bytes(4) 262 masker = util.RepeatedXorMasker(masking_nonce) 263 264 logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) 265 else: 266 masker = _NOOP_MASKER 267 268 logger.log(common.LOGLEVEL_FINE, 'Receive payload data') 269 if logger.isEnabledFor(common.LOGLEVEL_FINE): 270 receive_start = time.time() 271 272 raw_payload_bytes = receive_bytes(payload_length) 273 274 if logger.isEnabledFor(common.LOGLEVEL_FINE): 275 logger.log( 276 common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s', 277 payload_length / (time.time() - receive_start) / 1000 / 1000) 278 logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') 279 280 if logger.isEnabledFor(common.LOGLEVEL_FINE): 281 unmask_start = time.time() 282 283 unmasked_bytes = masker.mask(raw_payload_bytes) 284 285 if logger.isEnabledFor(common.LOGLEVEL_FINE): 286 logger.log(common.LOGLEVEL_FINE, 287 'Done unmasking payload data at %s MB/s', 288 payload_length / (time.time() - unmask_start) / 1000 / 1000) 289 290 return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 291 292 293class FragmentedFrameBuilder(object): 294 """A stateful class to send a message as fragments.""" 295 def __init__(self, mask, frame_filters=[], encode_utf8=True): 296 """Constructs an instance.""" 297 298 self._mask = mask 299 self._frame_filters = frame_filters 300 # This is for skipping UTF-8 encoding when building text type frames 301 # from compressed data. 302 self._encode_utf8 = encode_utf8 303 304 self._started = False 305 306 # Hold opcode of the first frame in messages to verify types of other 307 # frames in the message are all the same. 308 self._opcode = common.OPCODE_TEXT 309 310 def build(self, payload_data, end, binary): 311 if binary: 312 frame_type = common.OPCODE_BINARY 313 else: 314 frame_type = common.OPCODE_TEXT 315 if self._started: 316 if self._opcode != frame_type: 317 raise ValueError('Message types are different in frames for ' 318 'the same message') 319 opcode = common.OPCODE_CONTINUATION 320 else: 321 opcode = frame_type 322 self._opcode = frame_type 323 324 if end: 325 self._started = False 326 fin = 1 327 else: 328 self._started = True 329 fin = 0 330 331 if binary or not self._encode_utf8: 332 return create_binary_frame(payload_data, opcode, fin, self._mask, 333 self._frame_filters) 334 else: 335 return create_text_frame(payload_data, opcode, fin, self._mask, 336 self._frame_filters) 337 338 339def _create_control_frame(opcode, body, mask, frame_filters): 340 frame = Frame(opcode=opcode, payload=body) 341 342 for frame_filter in frame_filters: 343 frame_filter.filter(frame) 344 345 if len(frame.payload) > 125: 346 raise BadOperationException( 347 'Payload data size of control frames must be 125 bytes or less') 348 349 header = create_header(frame.opcode, len(frame.payload), frame.fin, 350 frame.rsv1, frame.rsv2, frame.rsv3, mask) 351 return _build_frame(header, frame.payload, mask) 352 353 354def create_ping_frame(body, mask=False, frame_filters=[]): 355 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters) 356 357 358def create_pong_frame(body, mask=False, frame_filters=[]): 359 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters) 360 361 362def create_close_frame(body, mask=False, frame_filters=[]): 363 return _create_control_frame(common.OPCODE_CLOSE, body, mask, 364 frame_filters) 365 366 367def create_closing_handshake_body(code, reason): 368 body = b'' 369 if code is not None: 370 if (code > common.STATUS_USER_PRIVATE_MAX 371 or code < common.STATUS_NORMAL_CLOSURE): 372 raise BadOperationException('Status code is out of range') 373 if (code == common.STATUS_NO_STATUS_RECEIVED 374 or code == common.STATUS_ABNORMAL_CLOSURE 375 or code == common.STATUS_TLS_HANDSHAKE): 376 raise BadOperationException('Status code is reserved pseudo ' 377 'code') 378 encoded_reason = reason.encode('utf-8') 379 body = struct.pack('!H', code) + encoded_reason 380 return body 381 382 383class StreamOptions(object): 384 """Holds option values to configure Stream objects.""" 385 def __init__(self): 386 """Constructs StreamOptions.""" 387 388 # Filters applied to frames. 389 self.outgoing_frame_filters = [] 390 self.incoming_frame_filters = [] 391 392 # Filters applied to messages. Control frames are not affected by them. 393 self.outgoing_message_filters = [] 394 self.incoming_message_filters = [] 395 396 self.encode_text_message_to_utf8 = True 397 self.mask_send = False 398 self.unmask_receive = True 399 400 401class Stream(object): 402 """A class for parsing/building frames of the WebSocket protocol 403 (RFC 6455). 404 """ 405 def __init__(self, request, options): 406 """Constructs an instance. 407 408 Args: 409 request: mod_python request. 410 """ 411 412 self._logger = util.get_class_logger(self) 413 414 self._options = options 415 self._request = request 416 417 self._request.client_terminated = False 418 self._request.server_terminated = False 419 420 # Holds body of received fragments. 421 self._received_fragments = [] 422 # Holds the opcode of the first fragment. 423 self._original_opcode = None 424 425 self._writer = FragmentedFrameBuilder( 426 self._options.mask_send, self._options.outgoing_frame_filters, 427 self._options.encode_text_message_to_utf8) 428 429 self._ping_queue = deque() 430 431 def _read(self, length): 432 """Reads length bytes from connection. In case we catch any exception, 433 prepends remote address to the exception message and raise again. 434 435 Raises: 436 ConnectionTerminatedException: when read returns empty string. 437 """ 438 439 try: 440 read_bytes = self._request.connection.read(length) 441 if not read_bytes: 442 raise ConnectionTerminatedException( 443 'Receiving %d byte failed. Peer (%r) closed connection' % 444 (length, (self._request.connection.remote_addr, ))) 445 return read_bytes 446 except IOError as e: 447 # Also catch an IOError because mod_python throws it. 448 raise ConnectionTerminatedException( 449 'Receiving %d byte failed. IOError (%s) occurred' % 450 (length, e)) 451 452 def _write(self, bytes_to_write): 453 """Writes given bytes to connection. In case we catch any exception, 454 prepends remote address to the exception message and raise again. 455 """ 456 457 try: 458 self._request.connection.write(bytes_to_write) 459 except Exception as e: 460 util.prepend_message_to_exception( 461 'Failed to send message to %r: ' % 462 (self._request.connection.remote_addr, ), e) 463 raise 464 465 def receive_bytes(self, length): 466 """Receives multiple bytes. Retries read when we couldn't receive the 467 specified amount. This method returns byte strings. 468 469 Raises: 470 ConnectionTerminatedException: when read returns empty string. 471 """ 472 473 read_bytes = [] 474 while length > 0: 475 new_read_bytes = self._read(length) 476 read_bytes.append(new_read_bytes) 477 length -= len(new_read_bytes) 478 return b''.join(read_bytes) 479 480 def _read_until(self, delim_char): 481 """Reads bytes until we encounter delim_char. The result will not 482 contain delim_char. 483 484 Raises: 485 ConnectionTerminatedException: when read returns empty string. 486 """ 487 488 read_bytes = [] 489 while True: 490 ch = self._read(1) 491 if ch == delim_char: 492 break 493 read_bytes.append(ch) 494 return b''.join(read_bytes) 495 496 def _receive_frame(self): 497 """Receives a frame and return data in the frame as a tuple containing 498 each header field and payload separately. 499 500 Raises: 501 ConnectionTerminatedException: when read returns empty 502 string. 503 InvalidFrameException: when the frame contains invalid data. 504 """ 505 def _receive_bytes(length): 506 return self.receive_bytes(length) 507 508 return parse_frame(receive_bytes=_receive_bytes, 509 logger=self._logger, 510 ws_version=self._request.ws_version, 511 unmask_receive=self._options.unmask_receive) 512 513 def _receive_frame_as_frame_object(self): 514 opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() 515 516 return Frame(fin=fin, 517 rsv1=rsv1, 518 rsv2=rsv2, 519 rsv3=rsv3, 520 opcode=opcode, 521 payload=unmasked_bytes) 522 523 def receive_filtered_frame(self): 524 """Receives a frame and applies frame filters and message filters. 525 The frame to be received must satisfy following conditions: 526 - The frame is not fragmented. 527 - The opcode of the frame is TEXT or BINARY. 528 529 DO NOT USE this method except for testing purpose. 530 """ 531 532 frame = self._receive_frame_as_frame_object() 533 if not frame.fin: 534 raise InvalidFrameException( 535 'Segmented frames must not be received via ' 536 'receive_filtered_frame()') 537 if (frame.opcode != common.OPCODE_TEXT 538 and frame.opcode != common.OPCODE_BINARY): 539 raise InvalidFrameException( 540 'Control frames must not be received via ' 541 'receive_filtered_frame()') 542 543 for frame_filter in self._options.incoming_frame_filters: 544 frame_filter.filter(frame) 545 for message_filter in self._options.incoming_message_filters: 546 frame.payload = message_filter.filter(frame.payload) 547 return frame 548 549 def send_message(self, message, end=True, binary=False): 550 """Send message. 551 552 Args: 553 message: text in unicode or binary in str to send. 554 binary: send message as binary frame. 555 556 Raises: 557 BadOperationException: when called on a server-terminated 558 connection or called with inconsistent message type or 559 binary parameter. 560 """ 561 562 if self._request.server_terminated: 563 raise BadOperationException( 564 'Requested send_message after sending out a closing handshake') 565 566 if binary and isinstance(message, six.text_type): 567 raise BadOperationException( 568 'Message for binary frame must not be instance of Unicode') 569 570 for message_filter in self._options.outgoing_message_filters: 571 message = message_filter.filter(message, end, binary) 572 573 try: 574 # Set this to any positive integer to limit maximum size of data in 575 # payload data of each frame. 576 MAX_PAYLOAD_DATA_SIZE = -1 577 578 if MAX_PAYLOAD_DATA_SIZE <= 0: 579 self._write(self._writer.build(message, end, binary)) 580 return 581 582 bytes_written = 0 583 while True: 584 end_for_this_frame = end 585 bytes_to_write = len(message) - bytes_written 586 if (MAX_PAYLOAD_DATA_SIZE > 0 587 and bytes_to_write > MAX_PAYLOAD_DATA_SIZE): 588 end_for_this_frame = False 589 bytes_to_write = MAX_PAYLOAD_DATA_SIZE 590 591 frame = self._writer.build( 592 message[bytes_written:bytes_written + bytes_to_write], 593 end_for_this_frame, binary) 594 self._write(frame) 595 596 bytes_written += bytes_to_write 597 598 # This if must be placed here (the end of while block) so that 599 # at least one frame is sent. 600 if len(message) <= bytes_written: 601 break 602 except ValueError as e: 603 raise BadOperationException(e) 604 605 def _get_message_from_frame(self, frame): 606 """Gets a message from frame. If the message is composed of fragmented 607 frames and the frame is not the last fragmented frame, this method 608 returns None. The whole message will be returned when the last 609 fragmented frame is passed to this method. 610 611 Raises: 612 InvalidFrameException: when the frame doesn't match defragmentation 613 context, or the frame contains invalid data. 614 """ 615 616 if frame.opcode == common.OPCODE_CONTINUATION: 617 if not self._received_fragments: 618 if frame.fin: 619 raise InvalidFrameException( 620 'Received a termination frame but fragmentation ' 621 'not started') 622 else: 623 raise InvalidFrameException( 624 'Received an intermediate frame but ' 625 'fragmentation not started') 626 627 if frame.fin: 628 # End of fragmentation frame 629 self._received_fragments.append(frame.payload) 630 message = b''.join(self._received_fragments) 631 self._received_fragments = [] 632 return message 633 else: 634 # Intermediate frame 635 self._received_fragments.append(frame.payload) 636 return None 637 else: 638 if self._received_fragments: 639 if frame.fin: 640 raise InvalidFrameException( 641 'Received an unfragmented frame without ' 642 'terminating existing fragmentation') 643 else: 644 raise InvalidFrameException( 645 'New fragmentation started without terminating ' 646 'existing fragmentation') 647 648 if frame.fin: 649 # Unfragmented frame 650 651 self._original_opcode = frame.opcode 652 return frame.payload 653 else: 654 # Start of fragmentation frame 655 656 if common.is_control_opcode(frame.opcode): 657 raise InvalidFrameException( 658 'Control frames must not be fragmented') 659 660 self._original_opcode = frame.opcode 661 self._received_fragments.append(frame.payload) 662 return None 663 664 def _process_close_message(self, message): 665 """Processes close message. 666 667 Args: 668 message: close message. 669 670 Raises: 671 InvalidFrameException: when the message is invalid. 672 """ 673 674 self._request.client_terminated = True 675 676 # Status code is optional. We can have status reason only if we 677 # have status code. Status reason can be empty string. So, 678 # allowed cases are 679 # - no application data: no code no reason 680 # - 2 octet of application data: has code but no reason 681 # - 3 or more octet of application data: both code and reason 682 if len(message) == 0: 683 self._logger.debug('Received close frame (empty body)') 684 self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED 685 elif len(message) == 1: 686 raise InvalidFrameException( 687 'If a close frame has status code, the length of ' 688 'status code must be 2 octet') 689 elif len(message) >= 2: 690 self._request.ws_close_code = struct.unpack('!H', message[0:2])[0] 691 self._request.ws_close_reason = message[2:].decode( 692 'utf-8', 'replace') 693 self._logger.debug('Received close frame (code=%d, reason=%r)', 694 self._request.ws_close_code, 695 self._request.ws_close_reason) 696 697 # As we've received a close frame, no more data is coming over the 698 # socket. We can now safely close the socket without worrying about 699 # RST sending. 700 701 if self._request.server_terminated: 702 self._logger.debug( 703 'Received ack for server-initiated closing handshake') 704 return 705 706 self._logger.debug('Received client-initiated closing handshake') 707 708 code = common.STATUS_NORMAL_CLOSURE 709 reason = '' 710 if hasattr(self._request, '_dispatcher'): 711 dispatcher = self._request._dispatcher 712 code, reason = dispatcher.passive_closing_handshake(self._request) 713 if code is None and reason is not None and len(reason) > 0: 714 self._logger.warning( 715 'Handler specified reason despite code being None') 716 reason = '' 717 if reason is None: 718 reason = '' 719 self._send_closing_handshake(code, reason) 720 self._logger.debug( 721 'Acknowledged closing handshake initiated by the peer ' 722 '(code=%r, reason=%r)', code, reason) 723 724 def _process_ping_message(self, message): 725 """Processes ping message. 726 727 Args: 728 message: ping message. 729 """ 730 731 try: 732 handler = self._request.on_ping_handler 733 if handler: 734 handler(self._request, message) 735 return 736 except AttributeError: 737 pass 738 self._send_pong(message) 739 740 def _process_pong_message(self, message): 741 """Processes pong message. 742 743 Args: 744 message: pong message. 745 """ 746 747 # TODO(tyoshino): Add ping timeout handling. 748 749 inflight_pings = deque() 750 751 while True: 752 try: 753 expected_body = self._ping_queue.popleft() 754 if expected_body == message: 755 # inflight_pings contains pings ignored by the 756 # other peer. Just forget them. 757 self._logger.debug( 758 'Ping %r is acked (%d pings were ignored)', 759 expected_body, len(inflight_pings)) 760 break 761 else: 762 inflight_pings.append(expected_body) 763 except IndexError: 764 # The received pong was unsolicited pong. Keep the 765 # ping queue as is. 766 self._ping_queue = inflight_pings 767 self._logger.debug('Received a unsolicited pong') 768 break 769 770 try: 771 handler = self._request.on_pong_handler 772 if handler: 773 handler(self._request, message) 774 except AttributeError: 775 pass 776 777 def receive_message(self): 778 """Receive a WebSocket frame and return its payload as a text in 779 unicode or a binary in str. 780 781 Returns: 782 payload data of the frame 783 - as unicode instance if received text frame 784 - as str instance if received binary frame 785 or None iff received closing handshake. 786 Raises: 787 BadOperationException: when called on a client-terminated 788 connection. 789 ConnectionTerminatedException: when read returns empty 790 string. 791 InvalidFrameException: when the frame contains invalid 792 data. 793 UnsupportedFrameException: when the received frame has 794 flags, opcode we cannot handle. You can ignore this 795 exception and continue receiving the next frame. 796 """ 797 798 if self._request.client_terminated: 799 raise BadOperationException( 800 'Requested receive_message after receiving a closing ' 801 'handshake') 802 803 while True: 804 # mp_conn.read will block if no bytes are available. 805 806 frame = self._receive_frame_as_frame_object() 807 808 # Check the constraint on the payload size for control frames 809 # before extension processes the frame. 810 # See also http://tools.ietf.org/html/rfc6455#section-5.5 811 if (common.is_control_opcode(frame.opcode) 812 and len(frame.payload) > 125): 813 raise InvalidFrameException( 814 'Payload data size of control frames must be 125 bytes or ' 815 'less') 816 817 for frame_filter in self._options.incoming_frame_filters: 818 frame_filter.filter(frame) 819 820 if frame.rsv1 or frame.rsv2 or frame.rsv3: 821 raise UnsupportedFrameException( 822 'Unsupported flag is set (rsv = %d%d%d)' % 823 (frame.rsv1, frame.rsv2, frame.rsv3)) 824 825 message = self._get_message_from_frame(frame) 826 if message is None: 827 continue 828 829 for message_filter in self._options.incoming_message_filters: 830 message = message_filter.filter(message) 831 832 if self._original_opcode == common.OPCODE_TEXT: 833 # The WebSocket protocol section 4.4 specifies that invalid 834 # characters must be replaced with U+fffd REPLACEMENT 835 # CHARACTER. 836 try: 837 return message.decode('utf-8') 838 except UnicodeDecodeError as e: 839 raise InvalidUTF8Exception(e) 840 elif self._original_opcode == common.OPCODE_BINARY: 841 return message 842 elif self._original_opcode == common.OPCODE_CLOSE: 843 self._process_close_message(message) 844 return None 845 elif self._original_opcode == common.OPCODE_PING: 846 self._process_ping_message(message) 847 elif self._original_opcode == common.OPCODE_PONG: 848 self._process_pong_message(message) 849 else: 850 raise UnsupportedFrameException('Opcode %d is not supported' % 851 self._original_opcode) 852 853 def _send_closing_handshake(self, code, reason): 854 body = create_closing_handshake_body(code, reason) 855 frame = create_close_frame( 856 body, 857 mask=self._options.mask_send, 858 frame_filters=self._options.outgoing_frame_filters) 859 860 self._request.server_terminated = True 861 862 self._write(frame) 863 864 def close_connection(self, 865 code=common.STATUS_NORMAL_CLOSURE, 866 reason='', 867 wait_response=True): 868 """Closes a WebSocket connection. Note that this method blocks until 869 it receives acknowledgement to the closing handshake. 870 871 Args: 872 code: Status code for close frame. If code is None, a close 873 frame with empty body will be sent. 874 reason: string representing close reason. 875 wait_response: True when caller want to wait the response. 876 Raises: 877 BadOperationException: when reason is specified with code None 878 or reason is not an instance of both str and unicode. 879 """ 880 881 if self._request.server_terminated: 882 self._logger.debug( 883 'Requested close_connection but server is already terminated') 884 return 885 886 # When we receive a close frame, we call _process_close_message(). 887 # _process_close_message() immediately acknowledges to the 888 # server-initiated closing handshake and sets server_terminated to 889 # True. So, here we can assume that we haven't received any close 890 # frame. We're initiating a closing handshake. 891 892 if code is None: 893 if reason is not None and len(reason) > 0: 894 raise BadOperationException( 895 'close reason must not be specified if code is None') 896 reason = '' 897 else: 898 if not isinstance(reason, bytes) and not isinstance( 899 reason, six.text_type): 900 raise BadOperationException( 901 'close reason must be an instance of bytes or unicode') 902 903 self._send_closing_handshake(code, reason) 904 self._logger.debug('Initiated closing handshake (code=%r, reason=%r)', 905 code, reason) 906 907 if (code == common.STATUS_GOING_AWAY 908 or code == common.STATUS_PROTOCOL_ERROR) or not wait_response: 909 # It doesn't make sense to wait for a close frame if the reason is 910 # protocol error or that the server is going away. For some of 911 # other reasons, it might not make sense to wait for a close frame, 912 # but it's not clear, yet. 913 return 914 915 # TODO(ukai): 2. wait until the /client terminated/ flag has been set, 916 # or until a server-defined timeout expires. 917 # 918 # For now, we expect receiving closing handshake right after sending 919 # out closing handshake. 920 message = self.receive_message() 921 if message is not None: 922 raise ConnectionTerminatedException( 923 'Didn\'t receive valid ack for closing handshake') 924 # TODO: 3. close the WebSocket connection. 925 # note: mod_python Connection (mp_conn) doesn't have close method. 926 927 def send_ping(self, body, binary=False): 928 if not binary and isinstance(body, six.text_type): 929 body = body.encode('UTF-8') 930 frame = create_ping_frame(body, self._options.mask_send, 931 self._options.outgoing_frame_filters) 932 self._write(frame) 933 934 self._ping_queue.append(body) 935 936 def _send_pong(self, body): 937 frame = create_pong_frame(body, self._options.mask_send, 938 self._options.outgoing_frame_filters) 939 self._write(frame) 940 941 def get_last_received_opcode(self): 942 """Returns the opcode of the WebSocket message which the last received 943 frame belongs to. The return value is valid iff immediately after 944 receive_message call. 945 """ 946 947 return self._original_opcode 948 949 950# vi:sts=4 sw=4 et 951