1# Copyright 2011, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29"""This file provides classes and helper functions for parsing/building frames
30of the WebSocket protocol (RFC 6455).
31
32Specification:
33http://tools.ietf.org/html/rfc6455
34"""
35
36from __future__ import absolute_import, division
37
38from collections import deque
39import logging
40import os
41import struct
42import time
43import socket
44import six
45
46from mod_pywebsocket import common
47from mod_pywebsocket import util
48from mod_pywebsocket._stream_exceptions import BadOperationException
49from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException
50from mod_pywebsocket._stream_exceptions import InvalidFrameException
51from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception
52from mod_pywebsocket._stream_exceptions import UnsupportedFrameException
53
54_NOOP_MASKER = util.NoopMasker()
55
56
57class Frame(object):
58    def __init__(self,
59                 fin=1,
60                 rsv1=0,
61                 rsv2=0,
62                 rsv3=0,
63                 opcode=None,
64                 payload=b''):
65        self.fin = fin
66        self.rsv1 = rsv1
67        self.rsv2 = rsv2
68        self.rsv3 = rsv3
69        self.opcode = opcode
70        self.payload = payload
71
72
73# Helper functions made public to be used for writing unittests for WebSocket
74# clients.
75
76
77def create_length_header(length, mask):
78    """Creates a length header.
79
80    Args:
81        length: Frame length. Must be less than 2^63.
82        mask: Mask bit. Must be boolean.
83
84    Raises:
85        ValueError: when bad data is given.
86    """
87
88    if mask:
89        mask_bit = 1 << 7
90    else:
91        mask_bit = 0
92
93    if length < 0:
94        raise ValueError('length must be non negative integer')
95    elif length <= 125:
96        return util.pack_byte(mask_bit | length)
97    elif length < (1 << 16):
98        return util.pack_byte(mask_bit | 126) + struct.pack('!H', length)
99    elif length < (1 << 63):
100        return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length)
101    else:
102        raise ValueError('Payload is too big for one frame')
103
104
105def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
106    """Creates a frame header.
107
108    Raises:
109        Exception: when bad data is given.
110    """
111
112    if opcode < 0 or 0xf < opcode:
113        raise ValueError('Opcode out of range')
114
115    if payload_length < 0 or (1 << 63) <= payload_length:
116        raise ValueError('payload_length out of range')
117
118    if (fin | rsv1 | rsv2 | rsv3) & ~1:
119        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
120
121    header = b''
122
123    first_byte = ((fin << 7)
124                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
125                  | opcode)
126    header += util.pack_byte(first_byte)
127    header += create_length_header(payload_length, mask)
128
129    return header
130
131
132def _build_frame(header, body, mask):
133    if not mask:
134        return header + body
135
136    masking_nonce = os.urandom(4)
137    masker = util.RepeatedXorMasker(masking_nonce)
138
139    return header + masking_nonce + masker.mask(body)
140
141
142def _filter_and_format_frame_object(frame, mask, frame_filters):
143    for frame_filter in frame_filters:
144        frame_filter.filter(frame)
145
146    header = create_header(frame.opcode, len(frame.payload), frame.fin,
147                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
148    return _build_frame(header, frame.payload, mask)
149
150
151def create_binary_frame(message,
152                        opcode=common.OPCODE_BINARY,
153                        fin=1,
154                        mask=False,
155                        frame_filters=[]):
156    """Creates a simple binary frame with no extension, reserved bit."""
157
158    frame = Frame(fin=fin, opcode=opcode, payload=message)
159    return _filter_and_format_frame_object(frame, mask, frame_filters)
160
161
162def create_text_frame(message,
163                      opcode=common.OPCODE_TEXT,
164                      fin=1,
165                      mask=False,
166                      frame_filters=[]):
167    """Creates a simple text frame with no extension, reserved bit."""
168
169    encoded_message = message.encode('utf-8')
170    return create_binary_frame(encoded_message, opcode, fin, mask,
171                               frame_filters)
172
173
174def parse_frame(receive_bytes,
175                logger=None,
176                ws_version=common.VERSION_HYBI_LATEST,
177                unmask_receive=True):
178    """Parses a frame. Returns a tuple containing each header field and
179    payload.
180
181    Args:
182        receive_bytes: a function that reads frame data from a stream or
183            something similar. The function takes length of the bytes to be
184            read. The function must raise ConnectionTerminatedException if
185            there is not enough data to be read.
186        logger: a logging object.
187        ws_version: the version of WebSocket protocol.
188        unmask_receive: unmask received frames. When received unmasked
189            frame, raises InvalidFrameException.
190
191    Raises:
192        ConnectionTerminatedException: when receive_bytes raises it.
193        InvalidFrameException: when the frame contains invalid data.
194    """
195
196    if not logger:
197        logger = logging.getLogger()
198
199    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
200
201    first_byte = ord(receive_bytes(1))
202    fin = (first_byte >> 7) & 1
203    rsv1 = (first_byte >> 6) & 1
204    rsv2 = (first_byte >> 5) & 1
205    rsv3 = (first_byte >> 4) & 1
206    opcode = first_byte & 0xf
207
208    second_byte = ord(receive_bytes(1))
209    mask = (second_byte >> 7) & 1
210    payload_length = second_byte & 0x7f
211
212    logger.log(
213        common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
214        'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask,
215        payload_length)
216
217    if (mask == 1) != unmask_receive:
218        raise InvalidFrameException(
219            'Mask bit on the received frame did\'nt match masking '
220            'configuration for received frames')
221
222    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
223    # into the 8-octet extended payload length field (or 0x0-0xFD in
224    # 2-octet field).
225    valid_length_encoding = True
226    length_encoding_bytes = 1
227    if payload_length == 127:
228        logger.log(common.LOGLEVEL_FINE,
229                   'Receive 8-octet extended payload length')
230
231        extended_payload_length = receive_bytes(8)
232        payload_length = struct.unpack('!Q', extended_payload_length)[0]
233        if payload_length > 0x7FFFFFFFFFFFFFFF:
234            raise InvalidFrameException('Extended payload length >= 2^63')
235        if ws_version >= 13 and payload_length < 0x10000:
236            valid_length_encoding = False
237            length_encoding_bytes = 8
238
239        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
240                   payload_length)
241    elif payload_length == 126:
242        logger.log(common.LOGLEVEL_FINE,
243                   'Receive 2-octet extended payload length')
244
245        extended_payload_length = receive_bytes(2)
246        payload_length = struct.unpack('!H', extended_payload_length)[0]
247        if ws_version >= 13 and payload_length < 126:
248            valid_length_encoding = False
249            length_encoding_bytes = 2
250
251        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
252                   payload_length)
253
254    if not valid_length_encoding:
255        logger.warning(
256            'Payload length is not encoded using the minimal number of '
257            'bytes (%d is encoded using %d bytes)', payload_length,
258            length_encoding_bytes)
259
260    if mask == 1:
261        logger.log(common.LOGLEVEL_FINE, 'Receive mask')
262
263        masking_nonce = receive_bytes(4)
264        masker = util.RepeatedXorMasker(masking_nonce)
265
266        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
267    else:
268        masker = _NOOP_MASKER
269
270    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
271    if logger.isEnabledFor(common.LOGLEVEL_FINE):
272        receive_start = time.time()
273
274    raw_payload_bytes = receive_bytes(payload_length)
275
276    if logger.isEnabledFor(common.LOGLEVEL_FINE):
277        # pylint --py3k W1619
278        logger.log(
279            common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s',
280            payload_length / (time.time() - receive_start) / 1000 / 1000)
281    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
282
283    if logger.isEnabledFor(common.LOGLEVEL_FINE):
284        unmask_start = time.time()
285
286    unmasked_bytes = masker.mask(raw_payload_bytes)
287
288    if logger.isEnabledFor(common.LOGLEVEL_FINE):
289        # pylint --py3k W1619
290        logger.log(common.LOGLEVEL_FINE,
291                   'Done unmasking payload data at %s MB/s',
292                   payload_length / (time.time() - unmask_start) / 1000 / 1000)
293
294    return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
295
296
297class FragmentedFrameBuilder(object):
298    """A stateful class to send a message as fragments."""
299    def __init__(self, mask, frame_filters=[], encode_utf8=True):
300        """Constructs an instance."""
301
302        self._mask = mask
303        self._frame_filters = frame_filters
304        # This is for skipping UTF-8 encoding when building text type frames
305        # from compressed data.
306        self._encode_utf8 = encode_utf8
307
308        self._started = False
309
310        # Hold opcode of the first frame in messages to verify types of other
311        # frames in the message are all the same.
312        self._opcode = common.OPCODE_TEXT
313
314    def build(self, payload_data, end, binary):
315        if binary:
316            frame_type = common.OPCODE_BINARY
317        else:
318            frame_type = common.OPCODE_TEXT
319        if self._started:
320            if self._opcode != frame_type:
321                raise ValueError('Message types are different in frames for '
322                                 'the same message')
323            opcode = common.OPCODE_CONTINUATION
324        else:
325            opcode = frame_type
326            self._opcode = frame_type
327
328        if end:
329            self._started = False
330            fin = 1
331        else:
332            self._started = True
333            fin = 0
334
335        if binary or not self._encode_utf8:
336            return create_binary_frame(payload_data, opcode, fin, self._mask,
337                                       self._frame_filters)
338        else:
339            return create_text_frame(payload_data, opcode, fin, self._mask,
340                                     self._frame_filters)
341
342
343def _create_control_frame(opcode, body, mask, frame_filters):
344    frame = Frame(opcode=opcode, payload=body)
345
346    for frame_filter in frame_filters:
347        frame_filter.filter(frame)
348
349    if len(frame.payload) > 125:
350        raise BadOperationException(
351            'Payload data size of control frames must be 125 bytes or less')
352
353    header = create_header(frame.opcode, len(frame.payload), frame.fin,
354                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
355    return _build_frame(header, frame.payload, mask)
356
357
358def create_ping_frame(body, mask=False, frame_filters=[]):
359    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
360
361
362def create_pong_frame(body, mask=False, frame_filters=[]):
363    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
364
365
366def create_close_frame(body, mask=False, frame_filters=[]):
367    return _create_control_frame(common.OPCODE_CLOSE, body, mask,
368                                 frame_filters)
369
370
371def create_closing_handshake_body(code, reason):
372    body = b''
373    if code is not None:
374        if (code > common.STATUS_USER_PRIVATE_MAX
375                or code < common.STATUS_NORMAL_CLOSURE):
376            raise BadOperationException('Status code is out of range')
377        if (code == common.STATUS_NO_STATUS_RECEIVED
378                or code == common.STATUS_ABNORMAL_CLOSURE
379                or code == common.STATUS_TLS_HANDSHAKE):
380            raise BadOperationException('Status code is reserved pseudo '
381                                        'code')
382        encoded_reason = reason.encode('utf-8')
383        body = struct.pack('!H', code) + encoded_reason
384    return body
385
386
387class StreamOptions(object):
388    """Holds option values to configure Stream objects."""
389    def __init__(self):
390        """Constructs StreamOptions."""
391
392        # Filters applied to frames.
393        self.outgoing_frame_filters = []
394        self.incoming_frame_filters = []
395
396        # Filters applied to messages. Control frames are not affected by them.
397        self.outgoing_message_filters = []
398        self.incoming_message_filters = []
399
400        self.encode_text_message_to_utf8 = True
401        self.mask_send = False
402        self.unmask_receive = True
403
404
405class Stream(object):
406    """A class for parsing/building frames of the WebSocket protocol
407    (RFC 6455).
408    """
409    def __init__(self, request, options):
410        """Constructs an instance.
411
412        Args:
413            request: mod_python request.
414        """
415
416        self._logger = util.get_class_logger(self)
417
418        self._options = options
419        self._request = request
420
421        self._request.client_terminated = False
422        self._request.server_terminated = False
423
424        # Holds body of received fragments.
425        self._received_fragments = []
426        # Holds the opcode of the first fragment.
427        self._original_opcode = None
428
429        self._writer = FragmentedFrameBuilder(
430            self._options.mask_send, self._options.outgoing_frame_filters,
431            self._options.encode_text_message_to_utf8)
432
433        self._ping_queue = deque()
434
435    def _read(self, length):
436        """Reads length bytes from connection. In case we catch any exception,
437        prepends remote address to the exception message and raise again.
438
439        Raises:
440            ConnectionTerminatedException: when read returns empty string.
441        """
442
443        try:
444            read_bytes = self._request.connection.read(length)
445            if not read_bytes:
446                raise ConnectionTerminatedException(
447                    'Receiving %d byte failed. Peer (%r) closed connection' %
448                    (length, (self._request.connection.remote_addr, )))
449            return read_bytes
450        except IOError as e:
451            # Also catch an IOError because mod_python throws it.
452            raise ConnectionTerminatedException(
453                'Receiving %d byte failed. IOError (%s) occurred' %
454                (length, e))
455
456    def _write(self, bytes_to_write):
457        """Writes given bytes to connection. In case we catch any exception,
458        prepends remote address to the exception message and raise again.
459        """
460
461        try:
462            self._request.connection.write(bytes_to_write)
463        except Exception as e:
464            util.prepend_message_to_exception(
465                'Failed to send message to %r: ' %
466                (self._request.connection.remote_addr, ), e)
467            raise
468
469    def receive_bytes(self, length):
470        """Receives multiple bytes. Retries read when we couldn't receive the
471        specified amount. This method returns byte strings.
472
473        Raises:
474            ConnectionTerminatedException: when read returns empty string.
475        """
476
477        read_bytes = []
478        while length > 0:
479            new_read_bytes = self._read(length)
480            read_bytes.append(new_read_bytes)
481            length -= len(new_read_bytes)
482        return b''.join(read_bytes)
483
484    def _read_until(self, delim_char):
485        """Reads bytes until we encounter delim_char. The result will not
486        contain delim_char.
487
488        Raises:
489            ConnectionTerminatedException: when read returns empty string.
490        """
491
492        read_bytes = []
493        while True:
494            ch = self._read(1)
495            if ch == delim_char:
496                break
497            read_bytes.append(ch)
498        return b''.join(read_bytes)
499
500    def _receive_frame(self):
501        """Receives a frame and return data in the frame as a tuple containing
502        each header field and payload separately.
503
504        Raises:
505            ConnectionTerminatedException: when read returns empty
506                string.
507            InvalidFrameException: when the frame contains invalid data.
508        """
509        def _receive_bytes(length):
510            return self.receive_bytes(length)
511
512        return parse_frame(receive_bytes=_receive_bytes,
513                           logger=self._logger,
514                           ws_version=self._request.ws_version,
515                           unmask_receive=self._options.unmask_receive)
516
517    def _receive_frame_as_frame_object(self):
518        opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
519
520        return Frame(fin=fin,
521                     rsv1=rsv1,
522                     rsv2=rsv2,
523                     rsv3=rsv3,
524                     opcode=opcode,
525                     payload=unmasked_bytes)
526
527    def receive_filtered_frame(self):
528        """Receives a frame and applies frame filters and message filters.
529        The frame to be received must satisfy following conditions:
530        - The frame is not fragmented.
531        - The opcode of the frame is TEXT or BINARY.
532
533        DO NOT USE this method except for testing purpose.
534        """
535
536        frame = self._receive_frame_as_frame_object()
537        if not frame.fin:
538            raise InvalidFrameException(
539                'Segmented frames must not be received via '
540                'receive_filtered_frame()')
541        if (frame.opcode != common.OPCODE_TEXT
542                and frame.opcode != common.OPCODE_BINARY):
543            raise InvalidFrameException(
544                'Control frames must not be received via '
545                'receive_filtered_frame()')
546
547        for frame_filter in self._options.incoming_frame_filters:
548            frame_filter.filter(frame)
549        for message_filter in self._options.incoming_message_filters:
550            frame.payload = message_filter.filter(frame.payload)
551        return frame
552
553    def send_message(self, message, end=True, binary=False):
554        """Send message.
555
556        Args:
557            message: text in unicode or binary in str to send.
558            binary: send message as binary frame.
559
560        Raises:
561            BadOperationException: when called on a server-terminated
562                connection or called with inconsistent message type or
563                binary parameter.
564        """
565
566        if self._request.server_terminated:
567            raise BadOperationException(
568                'Requested send_message after sending out a closing handshake')
569
570        if binary and isinstance(message, six.text_type):
571            raise BadOperationException(
572                'Message for binary frame must not be instance of Unicode')
573
574        for message_filter in self._options.outgoing_message_filters:
575            message = message_filter.filter(message, end, binary)
576
577        try:
578            # Set this to any positive integer to limit maximum size of data in
579            # payload data of each frame.
580            MAX_PAYLOAD_DATA_SIZE = -1
581
582            if MAX_PAYLOAD_DATA_SIZE <= 0:
583                self._write(self._writer.build(message, end, binary))
584                return
585
586            bytes_written = 0
587            while True:
588                end_for_this_frame = end
589                bytes_to_write = len(message) - bytes_written
590                if (MAX_PAYLOAD_DATA_SIZE > 0
591                        and bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
592                    end_for_this_frame = False
593                    bytes_to_write = MAX_PAYLOAD_DATA_SIZE
594
595                frame = self._writer.build(
596                    message[bytes_written:bytes_written + bytes_to_write],
597                    end_for_this_frame, binary)
598                self._write(frame)
599
600                bytes_written += bytes_to_write
601
602                # This if must be placed here (the end of while block) so that
603                # at least one frame is sent.
604                if len(message) <= bytes_written:
605                    break
606        except ValueError as e:
607            raise BadOperationException(e)
608
609    def _get_message_from_frame(self, frame):
610        """Gets a message from frame. If the message is composed of fragmented
611        frames and the frame is not the last fragmented frame, this method
612        returns None. The whole message will be returned when the last
613        fragmented frame is passed to this method.
614
615        Raises:
616            InvalidFrameException: when the frame doesn't match defragmentation
617                context, or the frame contains invalid data.
618        """
619
620        if frame.opcode == common.OPCODE_CONTINUATION:
621            if not self._received_fragments:
622                if frame.fin:
623                    raise InvalidFrameException(
624                        'Received a termination frame but fragmentation '
625                        'not started')
626                else:
627                    raise InvalidFrameException(
628                        'Received an intermediate frame but '
629                        'fragmentation not started')
630
631            if frame.fin:
632                # End of fragmentation frame
633                self._received_fragments.append(frame.payload)
634                message = b''.join(self._received_fragments)
635                self._received_fragments = []
636                return message
637            else:
638                # Intermediate frame
639                self._received_fragments.append(frame.payload)
640                return None
641        else:
642            if self._received_fragments:
643                if frame.fin:
644                    raise InvalidFrameException(
645                        'Received an unfragmented frame without '
646                        'terminating existing fragmentation')
647                else:
648                    raise InvalidFrameException(
649                        'New fragmentation started without terminating '
650                        'existing fragmentation')
651
652            if frame.fin:
653                # Unfragmented frame
654
655                self._original_opcode = frame.opcode
656                return frame.payload
657            else:
658                # Start of fragmentation frame
659
660                if common.is_control_opcode(frame.opcode):
661                    raise InvalidFrameException(
662                        'Control frames must not be fragmented')
663
664                self._original_opcode = frame.opcode
665                self._received_fragments.append(frame.payload)
666                return None
667
668    def _process_close_message(self, message):
669        """Processes close message.
670
671        Args:
672            message: close message.
673
674        Raises:
675            InvalidFrameException: when the message is invalid.
676        """
677
678        self._request.client_terminated = True
679
680        # Status code is optional. We can have status reason only if we
681        # have status code. Status reason can be empty string. So,
682        # allowed cases are
683        # - no application data: no code no reason
684        # - 2 octet of application data: has code but no reason
685        # - 3 or more octet of application data: both code and reason
686        if len(message) == 0:
687            self._logger.debug('Received close frame (empty body)')
688            self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED
689        elif len(message) == 1:
690            raise InvalidFrameException(
691                'If a close frame has status code, the length of '
692                'status code must be 2 octet')
693        elif len(message) >= 2:
694            self._request.ws_close_code = struct.unpack('!H', message[0:2])[0]
695            self._request.ws_close_reason = message[2:].decode(
696                'utf-8', 'replace')
697            self._logger.debug('Received close frame (code=%d, reason=%r)',
698                               self._request.ws_close_code,
699                               self._request.ws_close_reason)
700
701        # As we've received a close frame, no more data is coming over the
702        # socket. We can now safely close the socket without worrying about
703        # RST sending.
704
705        if self._request.server_terminated:
706            self._logger.debug(
707                'Received ack for server-initiated closing handshake')
708            return
709
710        self._logger.debug('Received client-initiated closing handshake')
711
712        code = common.STATUS_NORMAL_CLOSURE
713        reason = ''
714        if hasattr(self._request, '_dispatcher'):
715            dispatcher = self._request._dispatcher
716            code, reason = dispatcher.passive_closing_handshake(self._request)
717            if code is None and reason is not None and len(reason) > 0:
718                self._logger.warning(
719                    'Handler specified reason despite code being None')
720                reason = ''
721            if reason is None:
722                reason = ''
723        self._send_closing_handshake(code, reason)
724        self._logger.debug(
725            'Acknowledged closing handshake initiated by the peer '
726            '(code=%r, reason=%r)', code, reason)
727
728    def _process_ping_message(self, message):
729        """Processes ping message.
730
731        Args:
732            message: ping message.
733        """
734
735        try:
736            handler = self._request.on_ping_handler
737            if handler:
738                handler(self._request, message)
739                return
740        except AttributeError:
741            pass
742        self._send_pong(message)
743
744    def _process_pong_message(self, message):
745        """Processes pong message.
746
747        Args:
748            message: pong message.
749        """
750
751        # TODO(tyoshino): Add ping timeout handling.
752
753        inflight_pings = deque()
754
755        while True:
756            try:
757                expected_body = self._ping_queue.popleft()
758                if expected_body == message:
759                    # inflight_pings contains pings ignored by the
760                    # other peer. Just forget them.
761                    self._logger.debug(
762                        'Ping %r is acked (%d pings were ignored)',
763                        expected_body, len(inflight_pings))
764                    break
765                else:
766                    inflight_pings.append(expected_body)
767            except IndexError:
768                # The received pong was unsolicited pong. Keep the
769                # ping queue as is.
770                self._ping_queue = inflight_pings
771                self._logger.debug('Received a unsolicited pong')
772                break
773
774        try:
775            handler = self._request.on_pong_handler
776            if handler:
777                handler(self._request, message)
778        except AttributeError:
779            pass
780
781    def receive_message(self):
782        """Receive a WebSocket frame and return its payload as a text in
783        unicode or a binary in str.
784
785        Returns:
786            payload data of the frame
787            - as unicode instance if received text frame
788            - as str instance if received binary frame
789            or None iff received closing handshake.
790        Raises:
791            BadOperationException: when called on a client-terminated
792                connection.
793            ConnectionTerminatedException: when read returns empty
794                string.
795            InvalidFrameException: when the frame contains invalid
796                data.
797            UnsupportedFrameException: when the received frame has
798                flags, opcode we cannot handle. You can ignore this
799                exception and continue receiving the next frame.
800        """
801
802        if self._request.client_terminated:
803            raise BadOperationException(
804                'Requested receive_message after receiving a closing '
805                'handshake')
806
807        while True:
808            # mp_conn.read will block if no bytes are available.
809
810            frame = self._receive_frame_as_frame_object()
811
812            # Check the constraint on the payload size for control frames
813            # before extension processes the frame.
814            # See also http://tools.ietf.org/html/rfc6455#section-5.5
815            if (common.is_control_opcode(frame.opcode)
816                    and len(frame.payload) > 125):
817                raise InvalidFrameException(
818                    'Payload data size of control frames must be 125 bytes or '
819                    'less')
820
821            for frame_filter in self._options.incoming_frame_filters:
822                frame_filter.filter(frame)
823
824            if frame.rsv1 or frame.rsv2 or frame.rsv3:
825                raise UnsupportedFrameException(
826                    'Unsupported flag is set (rsv = %d%d%d)' %
827                    (frame.rsv1, frame.rsv2, frame.rsv3))
828
829            message = self._get_message_from_frame(frame)
830            if message is None:
831                continue
832
833            for message_filter in self._options.incoming_message_filters:
834                message = message_filter.filter(message)
835
836            if self._original_opcode == common.OPCODE_TEXT:
837                # The WebSocket protocol section 4.4 specifies that invalid
838                # characters must be replaced with U+fffd REPLACEMENT
839                # CHARACTER.
840                try:
841                    return message.decode('utf-8')
842                except UnicodeDecodeError as e:
843                    raise InvalidUTF8Exception(e)
844            elif self._original_opcode == common.OPCODE_BINARY:
845                return message
846            elif self._original_opcode == common.OPCODE_CLOSE:
847                self._process_close_message(message)
848                return None
849            elif self._original_opcode == common.OPCODE_PING:
850                self._process_ping_message(message)
851            elif self._original_opcode == common.OPCODE_PONG:
852                self._process_pong_message(message)
853            else:
854                raise UnsupportedFrameException('Opcode %d is not supported' %
855                                                self._original_opcode)
856
857    def _send_closing_handshake(self, code, reason):
858        body = create_closing_handshake_body(code, reason)
859        frame = create_close_frame(
860            body,
861            mask=self._options.mask_send,
862            frame_filters=self._options.outgoing_frame_filters)
863
864        self._request.server_terminated = True
865
866        self._write(frame)
867
868    def close_connection(self,
869                         code=common.STATUS_NORMAL_CLOSURE,
870                         reason='',
871                         wait_response=True):
872        """Closes a WebSocket connection. Note that this method blocks until
873        it receives acknowledgement to the closing handshake.
874
875        Args:
876            code: Status code for close frame. If code is None, a close
877                frame with empty body will be sent.
878            reason: string representing close reason.
879            wait_response: True when caller want to wait the response.
880        Raises:
881            BadOperationException: when reason is specified with code None
882            or reason is not an instance of both str and unicode.
883        """
884
885        if self._request.server_terminated:
886            self._logger.debug(
887                'Requested close_connection but server is already terminated')
888            return
889
890        # When we receive a close frame, we call _process_close_message().
891        # _process_close_message() immediately acknowledges to the
892        # server-initiated closing handshake and sets server_terminated to
893        # True. So, here we can assume that we haven't received any close
894        # frame. We're initiating a closing handshake.
895
896        if code is None:
897            if reason is not None and len(reason) > 0:
898                raise BadOperationException(
899                    'close reason must not be specified if code is None')
900            reason = ''
901        else:
902            if not isinstance(reason, bytes) and not isinstance(
903                    reason, six.text_type):
904                raise BadOperationException(
905                    'close reason must be an instance of bytes or unicode')
906
907        self._send_closing_handshake(code, reason)
908        self._logger.debug('Initiated closing handshake (code=%r, reason=%r)',
909                           code, reason)
910
911        if (code == common.STATUS_GOING_AWAY
912                or code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
913            # It doesn't make sense to wait for a close frame if the reason is
914            # protocol error or that the server is going away. For some of
915            # other reasons, it might not make sense to wait for a close frame,
916            # but it's not clear, yet.
917            return
918
919        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
920        # or until a server-defined timeout expires.
921        #
922        # For now, we expect receiving closing handshake right after sending
923        # out closing handshake.
924        message = self.receive_message()
925        if message is not None:
926            raise ConnectionTerminatedException(
927                'Didn\'t receive valid ack for closing handshake')
928        # TODO: 3. close the WebSocket connection.
929        # note: mod_python Connection (mp_conn) doesn't have close method.
930
931    def send_ping(self, body, binary=False):
932        if not binary and isinstance(body, six.text_type):
933            body = body.encode('UTF-8')
934        frame = create_ping_frame(body, self._options.mask_send,
935                                  self._options.outgoing_frame_filters)
936        self._write(frame)
937
938        self._ping_queue.append(body)
939
940    def _send_pong(self, body):
941        frame = create_pong_frame(body, self._options.mask_send,
942                                  self._options.outgoing_frame_filters)
943        self._write(frame)
944
945    def get_last_received_opcode(self):
946        """Returns the opcode of the WebSocket message which the last received
947        frame belongs to. The return value is valid iff immediately after
948        receive_message call.
949        """
950
951        return self._original_opcode
952
953
954# vi:sts=4 sw=4 et
955