1# Copyright 2011, Google Inc. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29"""This file provides classes and helper functions for parsing/building frames 30of the WebSocket protocol (RFC 6455). 31 32Specification: 33http://tools.ietf.org/html/rfc6455 34""" 35 36from __future__ import absolute_import, division 37 38from collections import deque 39import logging 40import os 41import struct 42import time 43import socket 44import six 45 46from mod_pywebsocket import common 47from mod_pywebsocket import util 48from mod_pywebsocket._stream_exceptions import BadOperationException 49from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException 50from mod_pywebsocket._stream_exceptions import InvalidFrameException 51from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception 52from mod_pywebsocket._stream_exceptions import UnsupportedFrameException 53 54_NOOP_MASKER = util.NoopMasker() 55 56 57class Frame(object): 58 def __init__(self, 59 fin=1, 60 rsv1=0, 61 rsv2=0, 62 rsv3=0, 63 opcode=None, 64 payload=b''): 65 self.fin = fin 66 self.rsv1 = rsv1 67 self.rsv2 = rsv2 68 self.rsv3 = rsv3 69 self.opcode = opcode 70 self.payload = payload 71 72 73# Helper functions made public to be used for writing unittests for WebSocket 74# clients. 75 76 77def create_length_header(length, mask): 78 """Creates a length header. 79 80 Args: 81 length: Frame length. Must be less than 2^63. 82 mask: Mask bit. Must be boolean. 83 84 Raises: 85 ValueError: when bad data is given. 86 """ 87 88 if mask: 89 mask_bit = 1 << 7 90 else: 91 mask_bit = 0 92 93 if length < 0: 94 raise ValueError('length must be non negative integer') 95 elif length <= 125: 96 return util.pack_byte(mask_bit | length) 97 elif length < (1 << 16): 98 return util.pack_byte(mask_bit | 126) + struct.pack('!H', length) 99 elif length < (1 << 63): 100 return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length) 101 else: 102 raise ValueError('Payload is too big for one frame') 103 104 105def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask): 106 """Creates a frame header. 107 108 Raises: 109 Exception: when bad data is given. 110 """ 111 112 if opcode < 0 or 0xf < opcode: 113 raise ValueError('Opcode out of range') 114 115 if payload_length < 0 or (1 << 63) <= payload_length: 116 raise ValueError('payload_length out of range') 117 118 if (fin | rsv1 | rsv2 | rsv3) & ~1: 119 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1') 120 121 header = b'' 122 123 first_byte = ((fin << 7) 124 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4) 125 | opcode) 126 header += util.pack_byte(first_byte) 127 header += create_length_header(payload_length, mask) 128 129 return header 130 131 132def _build_frame(header, body, mask): 133 if not mask: 134 return header + body 135 136 masking_nonce = os.urandom(4) 137 masker = util.RepeatedXorMasker(masking_nonce) 138 139 return header + masking_nonce + masker.mask(body) 140 141 142def _filter_and_format_frame_object(frame, mask, frame_filters): 143 for frame_filter in frame_filters: 144 frame_filter.filter(frame) 145 146 header = create_header(frame.opcode, len(frame.payload), frame.fin, 147 frame.rsv1, frame.rsv2, frame.rsv3, mask) 148 return _build_frame(header, frame.payload, mask) 149 150 151def create_binary_frame(message, 152 opcode=common.OPCODE_BINARY, 153 fin=1, 154 mask=False, 155 frame_filters=[]): 156 """Creates a simple binary frame with no extension, reserved bit.""" 157 158 frame = Frame(fin=fin, opcode=opcode, payload=message) 159 return _filter_and_format_frame_object(frame, mask, frame_filters) 160 161 162def create_text_frame(message, 163 opcode=common.OPCODE_TEXT, 164 fin=1, 165 mask=False, 166 frame_filters=[]): 167 """Creates a simple text frame with no extension, reserved bit.""" 168 169 encoded_message = message.encode('utf-8') 170 return create_binary_frame(encoded_message, opcode, fin, mask, 171 frame_filters) 172 173 174def parse_frame(receive_bytes, 175 logger=None, 176 ws_version=common.VERSION_HYBI_LATEST, 177 unmask_receive=True): 178 """Parses a frame. Returns a tuple containing each header field and 179 payload. 180 181 Args: 182 receive_bytes: a function that reads frame data from a stream or 183 something similar. The function takes length of the bytes to be 184 read. The function must raise ConnectionTerminatedException if 185 there is not enough data to be read. 186 logger: a logging object. 187 ws_version: the version of WebSocket protocol. 188 unmask_receive: unmask received frames. When received unmasked 189 frame, raises InvalidFrameException. 190 191 Raises: 192 ConnectionTerminatedException: when receive_bytes raises it. 193 InvalidFrameException: when the frame contains invalid data. 194 """ 195 196 if not logger: 197 logger = logging.getLogger() 198 199 logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame') 200 201 first_byte = ord(receive_bytes(1)) 202 fin = (first_byte >> 7) & 1 203 rsv1 = (first_byte >> 6) & 1 204 rsv2 = (first_byte >> 5) & 1 205 rsv3 = (first_byte >> 4) & 1 206 opcode = first_byte & 0xf 207 208 second_byte = ord(receive_bytes(1)) 209 mask = (second_byte >> 7) & 1 210 payload_length = second_byte & 0x7f 211 212 logger.log( 213 common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' 214 'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask, 215 payload_length) 216 217 if (mask == 1) != unmask_receive: 218 raise InvalidFrameException( 219 'Mask bit on the received frame did\'nt match masking ' 220 'configuration for received frames') 221 222 # The HyBi and later specs disallow putting a value in 0x0-0xFFFF 223 # into the 8-octet extended payload length field (or 0x0-0xFD in 224 # 2-octet field). 225 valid_length_encoding = True 226 length_encoding_bytes = 1 227 if payload_length == 127: 228 logger.log(common.LOGLEVEL_FINE, 229 'Receive 8-octet extended payload length') 230 231 extended_payload_length = receive_bytes(8) 232 payload_length = struct.unpack('!Q', extended_payload_length)[0] 233 if payload_length > 0x7FFFFFFFFFFFFFFF: 234 raise InvalidFrameException('Extended payload length >= 2^63') 235 if ws_version >= 13 and payload_length < 0x10000: 236 valid_length_encoding = False 237 length_encoding_bytes = 8 238 239 logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s', 240 payload_length) 241 elif payload_length == 126: 242 logger.log(common.LOGLEVEL_FINE, 243 'Receive 2-octet extended payload length') 244 245 extended_payload_length = receive_bytes(2) 246 payload_length = struct.unpack('!H', extended_payload_length)[0] 247 if ws_version >= 13 and payload_length < 126: 248 valid_length_encoding = False 249 length_encoding_bytes = 2 250 251 logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s', 252 payload_length) 253 254 if not valid_length_encoding: 255 logger.warning( 256 'Payload length is not encoded using the minimal number of ' 257 'bytes (%d is encoded using %d bytes)', payload_length, 258 length_encoding_bytes) 259 260 if mask == 1: 261 logger.log(common.LOGLEVEL_FINE, 'Receive mask') 262 263 masking_nonce = receive_bytes(4) 264 masker = util.RepeatedXorMasker(masking_nonce) 265 266 logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) 267 else: 268 masker = _NOOP_MASKER 269 270 logger.log(common.LOGLEVEL_FINE, 'Receive payload data') 271 if logger.isEnabledFor(common.LOGLEVEL_FINE): 272 receive_start = time.time() 273 274 raw_payload_bytes = receive_bytes(payload_length) 275 276 if logger.isEnabledFor(common.LOGLEVEL_FINE): 277 # pylint --py3k W1619 278 logger.log( 279 common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s', 280 payload_length / (time.time() - receive_start) / 1000 / 1000) 281 logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') 282 283 if logger.isEnabledFor(common.LOGLEVEL_FINE): 284 unmask_start = time.time() 285 286 unmasked_bytes = masker.mask(raw_payload_bytes) 287 288 if logger.isEnabledFor(common.LOGLEVEL_FINE): 289 # pylint --py3k W1619 290 logger.log(common.LOGLEVEL_FINE, 291 'Done unmasking payload data at %s MB/s', 292 payload_length / (time.time() - unmask_start) / 1000 / 1000) 293 294 return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 295 296 297class FragmentedFrameBuilder(object): 298 """A stateful class to send a message as fragments.""" 299 def __init__(self, mask, frame_filters=[], encode_utf8=True): 300 """Constructs an instance.""" 301 302 self._mask = mask 303 self._frame_filters = frame_filters 304 # This is for skipping UTF-8 encoding when building text type frames 305 # from compressed data. 306 self._encode_utf8 = encode_utf8 307 308 self._started = False 309 310 # Hold opcode of the first frame in messages to verify types of other 311 # frames in the message are all the same. 312 self._opcode = common.OPCODE_TEXT 313 314 def build(self, payload_data, end, binary): 315 if binary: 316 frame_type = common.OPCODE_BINARY 317 else: 318 frame_type = common.OPCODE_TEXT 319 if self._started: 320 if self._opcode != frame_type: 321 raise ValueError('Message types are different in frames for ' 322 'the same message') 323 opcode = common.OPCODE_CONTINUATION 324 else: 325 opcode = frame_type 326 self._opcode = frame_type 327 328 if end: 329 self._started = False 330 fin = 1 331 else: 332 self._started = True 333 fin = 0 334 335 if binary or not self._encode_utf8: 336 return create_binary_frame(payload_data, opcode, fin, self._mask, 337 self._frame_filters) 338 else: 339 return create_text_frame(payload_data, opcode, fin, self._mask, 340 self._frame_filters) 341 342 343def _create_control_frame(opcode, body, mask, frame_filters): 344 frame = Frame(opcode=opcode, payload=body) 345 346 for frame_filter in frame_filters: 347 frame_filter.filter(frame) 348 349 if len(frame.payload) > 125: 350 raise BadOperationException( 351 'Payload data size of control frames must be 125 bytes or less') 352 353 header = create_header(frame.opcode, len(frame.payload), frame.fin, 354 frame.rsv1, frame.rsv2, frame.rsv3, mask) 355 return _build_frame(header, frame.payload, mask) 356 357 358def create_ping_frame(body, mask=False, frame_filters=[]): 359 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters) 360 361 362def create_pong_frame(body, mask=False, frame_filters=[]): 363 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters) 364 365 366def create_close_frame(body, mask=False, frame_filters=[]): 367 return _create_control_frame(common.OPCODE_CLOSE, body, mask, 368 frame_filters) 369 370 371def create_closing_handshake_body(code, reason): 372 body = b'' 373 if code is not None: 374 if (code > common.STATUS_USER_PRIVATE_MAX 375 or code < common.STATUS_NORMAL_CLOSURE): 376 raise BadOperationException('Status code is out of range') 377 if (code == common.STATUS_NO_STATUS_RECEIVED 378 or code == common.STATUS_ABNORMAL_CLOSURE 379 or code == common.STATUS_TLS_HANDSHAKE): 380 raise BadOperationException('Status code is reserved pseudo ' 381 'code') 382 encoded_reason = reason.encode('utf-8') 383 body = struct.pack('!H', code) + encoded_reason 384 return body 385 386 387class StreamOptions(object): 388 """Holds option values to configure Stream objects.""" 389 def __init__(self): 390 """Constructs StreamOptions.""" 391 392 # Filters applied to frames. 393 self.outgoing_frame_filters = [] 394 self.incoming_frame_filters = [] 395 396 # Filters applied to messages. Control frames are not affected by them. 397 self.outgoing_message_filters = [] 398 self.incoming_message_filters = [] 399 400 self.encode_text_message_to_utf8 = True 401 self.mask_send = False 402 self.unmask_receive = True 403 404 405class Stream(object): 406 """A class for parsing/building frames of the WebSocket protocol 407 (RFC 6455). 408 """ 409 def __init__(self, request, options): 410 """Constructs an instance. 411 412 Args: 413 request: mod_python request. 414 """ 415 416 self._logger = util.get_class_logger(self) 417 418 self._options = options 419 self._request = request 420 421 self._request.client_terminated = False 422 self._request.server_terminated = False 423 424 # Holds body of received fragments. 425 self._received_fragments = [] 426 # Holds the opcode of the first fragment. 427 self._original_opcode = None 428 429 self._writer = FragmentedFrameBuilder( 430 self._options.mask_send, self._options.outgoing_frame_filters, 431 self._options.encode_text_message_to_utf8) 432 433 self._ping_queue = deque() 434 435 def _read(self, length): 436 """Reads length bytes from connection. In case we catch any exception, 437 prepends remote address to the exception message and raise again. 438 439 Raises: 440 ConnectionTerminatedException: when read returns empty string. 441 """ 442 443 try: 444 read_bytes = self._request.connection.read(length) 445 if not read_bytes: 446 raise ConnectionTerminatedException( 447 'Receiving %d byte failed. Peer (%r) closed connection' % 448 (length, (self._request.connection.remote_addr, ))) 449 return read_bytes 450 except IOError as e: 451 # Also catch an IOError because mod_python throws it. 452 raise ConnectionTerminatedException( 453 'Receiving %d byte failed. IOError (%s) occurred' % 454 (length, e)) 455 456 def _write(self, bytes_to_write): 457 """Writes given bytes to connection. In case we catch any exception, 458 prepends remote address to the exception message and raise again. 459 """ 460 461 try: 462 self._request.connection.write(bytes_to_write) 463 except Exception as e: 464 util.prepend_message_to_exception( 465 'Failed to send message to %r: ' % 466 (self._request.connection.remote_addr, ), e) 467 raise 468 469 def receive_bytes(self, length): 470 """Receives multiple bytes. Retries read when we couldn't receive the 471 specified amount. This method returns byte strings. 472 473 Raises: 474 ConnectionTerminatedException: when read returns empty string. 475 """ 476 477 read_bytes = [] 478 while length > 0: 479 new_read_bytes = self._read(length) 480 read_bytes.append(new_read_bytes) 481 length -= len(new_read_bytes) 482 return b''.join(read_bytes) 483 484 def _read_until(self, delim_char): 485 """Reads bytes until we encounter delim_char. The result will not 486 contain delim_char. 487 488 Raises: 489 ConnectionTerminatedException: when read returns empty string. 490 """ 491 492 read_bytes = [] 493 while True: 494 ch = self._read(1) 495 if ch == delim_char: 496 break 497 read_bytes.append(ch) 498 return b''.join(read_bytes) 499 500 def _receive_frame(self): 501 """Receives a frame and return data in the frame as a tuple containing 502 each header field and payload separately. 503 504 Raises: 505 ConnectionTerminatedException: when read returns empty 506 string. 507 InvalidFrameException: when the frame contains invalid data. 508 """ 509 def _receive_bytes(length): 510 return self.receive_bytes(length) 511 512 return parse_frame(receive_bytes=_receive_bytes, 513 logger=self._logger, 514 ws_version=self._request.ws_version, 515 unmask_receive=self._options.unmask_receive) 516 517 def _receive_frame_as_frame_object(self): 518 opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() 519 520 return Frame(fin=fin, 521 rsv1=rsv1, 522 rsv2=rsv2, 523 rsv3=rsv3, 524 opcode=opcode, 525 payload=unmasked_bytes) 526 527 def receive_filtered_frame(self): 528 """Receives a frame and applies frame filters and message filters. 529 The frame to be received must satisfy following conditions: 530 - The frame is not fragmented. 531 - The opcode of the frame is TEXT or BINARY. 532 533 DO NOT USE this method except for testing purpose. 534 """ 535 536 frame = self._receive_frame_as_frame_object() 537 if not frame.fin: 538 raise InvalidFrameException( 539 'Segmented frames must not be received via ' 540 'receive_filtered_frame()') 541 if (frame.opcode != common.OPCODE_TEXT 542 and frame.opcode != common.OPCODE_BINARY): 543 raise InvalidFrameException( 544 'Control frames must not be received via ' 545 'receive_filtered_frame()') 546 547 for frame_filter in self._options.incoming_frame_filters: 548 frame_filter.filter(frame) 549 for message_filter in self._options.incoming_message_filters: 550 frame.payload = message_filter.filter(frame.payload) 551 return frame 552 553 def send_message(self, message, end=True, binary=False): 554 """Send message. 555 556 Args: 557 message: text in unicode or binary in str to send. 558 binary: send message as binary frame. 559 560 Raises: 561 BadOperationException: when called on a server-terminated 562 connection or called with inconsistent message type or 563 binary parameter. 564 """ 565 566 if self._request.server_terminated: 567 raise BadOperationException( 568 'Requested send_message after sending out a closing handshake') 569 570 if binary and isinstance(message, six.text_type): 571 raise BadOperationException( 572 'Message for binary frame must not be instance of Unicode') 573 574 for message_filter in self._options.outgoing_message_filters: 575 message = message_filter.filter(message, end, binary) 576 577 try: 578 # Set this to any positive integer to limit maximum size of data in 579 # payload data of each frame. 580 MAX_PAYLOAD_DATA_SIZE = -1 581 582 if MAX_PAYLOAD_DATA_SIZE <= 0: 583 self._write(self._writer.build(message, end, binary)) 584 return 585 586 bytes_written = 0 587 while True: 588 end_for_this_frame = end 589 bytes_to_write = len(message) - bytes_written 590 if (MAX_PAYLOAD_DATA_SIZE > 0 591 and bytes_to_write > MAX_PAYLOAD_DATA_SIZE): 592 end_for_this_frame = False 593 bytes_to_write = MAX_PAYLOAD_DATA_SIZE 594 595 frame = self._writer.build( 596 message[bytes_written:bytes_written + bytes_to_write], 597 end_for_this_frame, binary) 598 self._write(frame) 599 600 bytes_written += bytes_to_write 601 602 # This if must be placed here (the end of while block) so that 603 # at least one frame is sent. 604 if len(message) <= bytes_written: 605 break 606 except ValueError as e: 607 raise BadOperationException(e) 608 609 def _get_message_from_frame(self, frame): 610 """Gets a message from frame. If the message is composed of fragmented 611 frames and the frame is not the last fragmented frame, this method 612 returns None. The whole message will be returned when the last 613 fragmented frame is passed to this method. 614 615 Raises: 616 InvalidFrameException: when the frame doesn't match defragmentation 617 context, or the frame contains invalid data. 618 """ 619 620 if frame.opcode == common.OPCODE_CONTINUATION: 621 if not self._received_fragments: 622 if frame.fin: 623 raise InvalidFrameException( 624 'Received a termination frame but fragmentation ' 625 'not started') 626 else: 627 raise InvalidFrameException( 628 'Received an intermediate frame but ' 629 'fragmentation not started') 630 631 if frame.fin: 632 # End of fragmentation frame 633 self._received_fragments.append(frame.payload) 634 message = b''.join(self._received_fragments) 635 self._received_fragments = [] 636 return message 637 else: 638 # Intermediate frame 639 self._received_fragments.append(frame.payload) 640 return None 641 else: 642 if self._received_fragments: 643 if frame.fin: 644 raise InvalidFrameException( 645 'Received an unfragmented frame without ' 646 'terminating existing fragmentation') 647 else: 648 raise InvalidFrameException( 649 'New fragmentation started without terminating ' 650 'existing fragmentation') 651 652 if frame.fin: 653 # Unfragmented frame 654 655 self._original_opcode = frame.opcode 656 return frame.payload 657 else: 658 # Start of fragmentation frame 659 660 if common.is_control_opcode(frame.opcode): 661 raise InvalidFrameException( 662 'Control frames must not be fragmented') 663 664 self._original_opcode = frame.opcode 665 self._received_fragments.append(frame.payload) 666 return None 667 668 def _process_close_message(self, message): 669 """Processes close message. 670 671 Args: 672 message: close message. 673 674 Raises: 675 InvalidFrameException: when the message is invalid. 676 """ 677 678 self._request.client_terminated = True 679 680 # Status code is optional. We can have status reason only if we 681 # have status code. Status reason can be empty string. So, 682 # allowed cases are 683 # - no application data: no code no reason 684 # - 2 octet of application data: has code but no reason 685 # - 3 or more octet of application data: both code and reason 686 if len(message) == 0: 687 self._logger.debug('Received close frame (empty body)') 688 self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED 689 elif len(message) == 1: 690 raise InvalidFrameException( 691 'If a close frame has status code, the length of ' 692 'status code must be 2 octet') 693 elif len(message) >= 2: 694 self._request.ws_close_code = struct.unpack('!H', message[0:2])[0] 695 self._request.ws_close_reason = message[2:].decode( 696 'utf-8', 'replace') 697 self._logger.debug('Received close frame (code=%d, reason=%r)', 698 self._request.ws_close_code, 699 self._request.ws_close_reason) 700 701 # As we've received a close frame, no more data is coming over the 702 # socket. We can now safely close the socket without worrying about 703 # RST sending. 704 705 if self._request.server_terminated: 706 self._logger.debug( 707 'Received ack for server-initiated closing handshake') 708 return 709 710 self._logger.debug('Received client-initiated closing handshake') 711 712 code = common.STATUS_NORMAL_CLOSURE 713 reason = '' 714 if hasattr(self._request, '_dispatcher'): 715 dispatcher = self._request._dispatcher 716 code, reason = dispatcher.passive_closing_handshake(self._request) 717 if code is None and reason is not None and len(reason) > 0: 718 self._logger.warning( 719 'Handler specified reason despite code being None') 720 reason = '' 721 if reason is None: 722 reason = '' 723 self._send_closing_handshake(code, reason) 724 self._logger.debug( 725 'Acknowledged closing handshake initiated by the peer ' 726 '(code=%r, reason=%r)', code, reason) 727 728 def _process_ping_message(self, message): 729 """Processes ping message. 730 731 Args: 732 message: ping message. 733 """ 734 735 try: 736 handler = self._request.on_ping_handler 737 if handler: 738 handler(self._request, message) 739 return 740 except AttributeError: 741 pass 742 self._send_pong(message) 743 744 def _process_pong_message(self, message): 745 """Processes pong message. 746 747 Args: 748 message: pong message. 749 """ 750 751 # TODO(tyoshino): Add ping timeout handling. 752 753 inflight_pings = deque() 754 755 while True: 756 try: 757 expected_body = self._ping_queue.popleft() 758 if expected_body == message: 759 # inflight_pings contains pings ignored by the 760 # other peer. Just forget them. 761 self._logger.debug( 762 'Ping %r is acked (%d pings were ignored)', 763 expected_body, len(inflight_pings)) 764 break 765 else: 766 inflight_pings.append(expected_body) 767 except IndexError: 768 # The received pong was unsolicited pong. Keep the 769 # ping queue as is. 770 self._ping_queue = inflight_pings 771 self._logger.debug('Received a unsolicited pong') 772 break 773 774 try: 775 handler = self._request.on_pong_handler 776 if handler: 777 handler(self._request, message) 778 except AttributeError: 779 pass 780 781 def receive_message(self): 782 """Receive a WebSocket frame and return its payload as a text in 783 unicode or a binary in str. 784 785 Returns: 786 payload data of the frame 787 - as unicode instance if received text frame 788 - as str instance if received binary frame 789 or None iff received closing handshake. 790 Raises: 791 BadOperationException: when called on a client-terminated 792 connection. 793 ConnectionTerminatedException: when read returns empty 794 string. 795 InvalidFrameException: when the frame contains invalid 796 data. 797 UnsupportedFrameException: when the received frame has 798 flags, opcode we cannot handle. You can ignore this 799 exception and continue receiving the next frame. 800 """ 801 802 if self._request.client_terminated: 803 raise BadOperationException( 804 'Requested receive_message after receiving a closing ' 805 'handshake') 806 807 while True: 808 # mp_conn.read will block if no bytes are available. 809 810 frame = self._receive_frame_as_frame_object() 811 812 # Check the constraint on the payload size for control frames 813 # before extension processes the frame. 814 # See also http://tools.ietf.org/html/rfc6455#section-5.5 815 if (common.is_control_opcode(frame.opcode) 816 and len(frame.payload) > 125): 817 raise InvalidFrameException( 818 'Payload data size of control frames must be 125 bytes or ' 819 'less') 820 821 for frame_filter in self._options.incoming_frame_filters: 822 frame_filter.filter(frame) 823 824 if frame.rsv1 or frame.rsv2 or frame.rsv3: 825 raise UnsupportedFrameException( 826 'Unsupported flag is set (rsv = %d%d%d)' % 827 (frame.rsv1, frame.rsv2, frame.rsv3)) 828 829 message = self._get_message_from_frame(frame) 830 if message is None: 831 continue 832 833 for message_filter in self._options.incoming_message_filters: 834 message = message_filter.filter(message) 835 836 if self._original_opcode == common.OPCODE_TEXT: 837 # The WebSocket protocol section 4.4 specifies that invalid 838 # characters must be replaced with U+fffd REPLACEMENT 839 # CHARACTER. 840 try: 841 return message.decode('utf-8') 842 except UnicodeDecodeError as e: 843 raise InvalidUTF8Exception(e) 844 elif self._original_opcode == common.OPCODE_BINARY: 845 return message 846 elif self._original_opcode == common.OPCODE_CLOSE: 847 self._process_close_message(message) 848 return None 849 elif self._original_opcode == common.OPCODE_PING: 850 self._process_ping_message(message) 851 elif self._original_opcode == common.OPCODE_PONG: 852 self._process_pong_message(message) 853 else: 854 raise UnsupportedFrameException('Opcode %d is not supported' % 855 self._original_opcode) 856 857 def _send_closing_handshake(self, code, reason): 858 body = create_closing_handshake_body(code, reason) 859 frame = create_close_frame( 860 body, 861 mask=self._options.mask_send, 862 frame_filters=self._options.outgoing_frame_filters) 863 864 self._request.server_terminated = True 865 866 self._write(frame) 867 868 def close_connection(self, 869 code=common.STATUS_NORMAL_CLOSURE, 870 reason='', 871 wait_response=True): 872 """Closes a WebSocket connection. Note that this method blocks until 873 it receives acknowledgement to the closing handshake. 874 875 Args: 876 code: Status code for close frame. If code is None, a close 877 frame with empty body will be sent. 878 reason: string representing close reason. 879 wait_response: True when caller want to wait the response. 880 Raises: 881 BadOperationException: when reason is specified with code None 882 or reason is not an instance of both str and unicode. 883 """ 884 885 if self._request.server_terminated: 886 self._logger.debug( 887 'Requested close_connection but server is already terminated') 888 return 889 890 # When we receive a close frame, we call _process_close_message(). 891 # _process_close_message() immediately acknowledges to the 892 # server-initiated closing handshake and sets server_terminated to 893 # True. So, here we can assume that we haven't received any close 894 # frame. We're initiating a closing handshake. 895 896 if code is None: 897 if reason is not None and len(reason) > 0: 898 raise BadOperationException( 899 'close reason must not be specified if code is None') 900 reason = '' 901 else: 902 if not isinstance(reason, bytes) and not isinstance( 903 reason, six.text_type): 904 raise BadOperationException( 905 'close reason must be an instance of bytes or unicode') 906 907 self._send_closing_handshake(code, reason) 908 self._logger.debug('Initiated closing handshake (code=%r, reason=%r)', 909 code, reason) 910 911 if (code == common.STATUS_GOING_AWAY 912 or code == common.STATUS_PROTOCOL_ERROR) or not wait_response: 913 # It doesn't make sense to wait for a close frame if the reason is 914 # protocol error or that the server is going away. For some of 915 # other reasons, it might not make sense to wait for a close frame, 916 # but it's not clear, yet. 917 return 918 919 # TODO(ukai): 2. wait until the /client terminated/ flag has been set, 920 # or until a server-defined timeout expires. 921 # 922 # For now, we expect receiving closing handshake right after sending 923 # out closing handshake. 924 message = self.receive_message() 925 if message is not None: 926 raise ConnectionTerminatedException( 927 'Didn\'t receive valid ack for closing handshake') 928 # TODO: 3. close the WebSocket connection. 929 # note: mod_python Connection (mp_conn) doesn't have close method. 930 931 def send_ping(self, body, binary=False): 932 if not binary and isinstance(body, six.text_type): 933 body = body.encode('UTF-8') 934 frame = create_ping_frame(body, self._options.mask_send, 935 self._options.outgoing_frame_filters) 936 self._write(frame) 937 938 self._ping_queue.append(body) 939 940 def _send_pong(self, body): 941 frame = create_pong_frame(body, self._options.mask_send, 942 self._options.outgoing_frame_filters) 943 self._write(frame) 944 945 def get_last_received_opcode(self): 946 """Returns the opcode of the WebSocket message which the last received 947 frame belongs to. The return value is valid iff immediately after 948 receive_message call. 949 """ 950 951 return self._original_opcode 952 953 954# vi:sts=4 sw=4 et 955