1# Copyright 2011, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29"""This file provides classes and helper functions for parsing/building frames
30of the WebSocket protocol (RFC 6455).
31
32Specification:
33http://tools.ietf.org/html/rfc6455
34"""
35
36from collections import deque
37import logging
38import os
39import struct
40import time
41import socket
42import six
43
44from mod_pywebsocket import common
45from mod_pywebsocket import util
46from mod_pywebsocket._stream_exceptions import BadOperationException
47from mod_pywebsocket._stream_exceptions import ConnectionTerminatedException
48from mod_pywebsocket._stream_exceptions import InvalidFrameException
49from mod_pywebsocket._stream_exceptions import InvalidUTF8Exception
50from mod_pywebsocket._stream_exceptions import UnsupportedFrameException
51
52_NOOP_MASKER = util.NoopMasker()
53
54
55class Frame(object):
56    def __init__(self,
57                 fin=1,
58                 rsv1=0,
59                 rsv2=0,
60                 rsv3=0,
61                 opcode=None,
62                 payload=b''):
63        self.fin = fin
64        self.rsv1 = rsv1
65        self.rsv2 = rsv2
66        self.rsv3 = rsv3
67        self.opcode = opcode
68        self.payload = payload
69
70
71# Helper functions made public to be used for writing unittests for WebSocket
72# clients.
73
74
75def create_length_header(length, mask):
76    """Creates a length header.
77
78    Args:
79        length: Frame length. Must be less than 2^63.
80        mask: Mask bit. Must be boolean.
81
82    Raises:
83        ValueError: when bad data is given.
84    """
85
86    if mask:
87        mask_bit = 1 << 7
88    else:
89        mask_bit = 0
90
91    if length < 0:
92        raise ValueError('length must be non negative integer')
93    elif length <= 125:
94        return util.pack_byte(mask_bit | length)
95    elif length < (1 << 16):
96        return util.pack_byte(mask_bit | 126) + struct.pack('!H', length)
97    elif length < (1 << 63):
98        return util.pack_byte(mask_bit | 127) + struct.pack('!Q', length)
99    else:
100        raise ValueError('Payload is too big for one frame')
101
102
103def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
104    """Creates a frame header.
105
106    Raises:
107        Exception: when bad data is given.
108    """
109
110    if opcode < 0 or 0xf < opcode:
111        raise ValueError('Opcode out of range')
112
113    if payload_length < 0 or (1 << 63) <= payload_length:
114        raise ValueError('payload_length out of range')
115
116    if (fin | rsv1 | rsv2 | rsv3) & ~1:
117        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
118
119    header = b''
120
121    first_byte = ((fin << 7)
122                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
123                  | opcode)
124    header += util.pack_byte(first_byte)
125    header += create_length_header(payload_length, mask)
126
127    return header
128
129
130def _build_frame(header, body, mask):
131    if not mask:
132        return header + body
133
134    masking_nonce = os.urandom(4)
135    masker = util.RepeatedXorMasker(masking_nonce)
136
137    return header + masking_nonce + masker.mask(body)
138
139
140def _filter_and_format_frame_object(frame, mask, frame_filters):
141    for frame_filter in frame_filters:
142        frame_filter.filter(frame)
143
144    header = create_header(frame.opcode, len(frame.payload), frame.fin,
145                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
146    return _build_frame(header, frame.payload, mask)
147
148
149def create_binary_frame(message,
150                        opcode=common.OPCODE_BINARY,
151                        fin=1,
152                        mask=False,
153                        frame_filters=[]):
154    """Creates a simple binary frame with no extension, reserved bit."""
155
156    frame = Frame(fin=fin, opcode=opcode, payload=message)
157    return _filter_and_format_frame_object(frame, mask, frame_filters)
158
159
160def create_text_frame(message,
161                      opcode=common.OPCODE_TEXT,
162                      fin=1,
163                      mask=False,
164                      frame_filters=[]):
165    """Creates a simple text frame with no extension, reserved bit."""
166
167    encoded_message = message.encode('utf-8')
168    return create_binary_frame(encoded_message, opcode, fin, mask,
169                               frame_filters)
170
171
172def parse_frame(receive_bytes,
173                logger=None,
174                ws_version=common.VERSION_HYBI_LATEST,
175                unmask_receive=True):
176    """Parses a frame. Returns a tuple containing each header field and
177    payload.
178
179    Args:
180        receive_bytes: a function that reads frame data from a stream or
181            something similar. The function takes length of the bytes to be
182            read. The function must raise ConnectionTerminatedException if
183            there is not enough data to be read.
184        logger: a logging object.
185        ws_version: the version of WebSocket protocol.
186        unmask_receive: unmask received frames. When received unmasked
187            frame, raises InvalidFrameException.
188
189    Raises:
190        ConnectionTerminatedException: when receive_bytes raises it.
191        InvalidFrameException: when the frame contains invalid data.
192    """
193
194    if not logger:
195        logger = logging.getLogger()
196
197    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
198
199    first_byte = ord(receive_bytes(1))
200    fin = (first_byte >> 7) & 1
201    rsv1 = (first_byte >> 6) & 1
202    rsv2 = (first_byte >> 5) & 1
203    rsv3 = (first_byte >> 4) & 1
204    opcode = first_byte & 0xf
205
206    second_byte = ord(receive_bytes(1))
207    mask = (second_byte >> 7) & 1
208    payload_length = second_byte & 0x7f
209
210    logger.log(
211        common.LOGLEVEL_FINE, 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
212        'Mask=%s, Payload_length=%s', fin, rsv1, rsv2, rsv3, opcode, mask,
213        payload_length)
214
215    if (mask == 1) != unmask_receive:
216        raise InvalidFrameException(
217            'Mask bit on the received frame did\'nt match masking '
218            'configuration for received frames')
219
220    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
221    # into the 8-octet extended payload length field (or 0x0-0xFD in
222    # 2-octet field).
223    valid_length_encoding = True
224    length_encoding_bytes = 1
225    if payload_length == 127:
226        logger.log(common.LOGLEVEL_FINE,
227                   'Receive 8-octet extended payload length')
228
229        extended_payload_length = receive_bytes(8)
230        payload_length = struct.unpack('!Q', extended_payload_length)[0]
231        if payload_length > 0x7FFFFFFFFFFFFFFF:
232            raise InvalidFrameException('Extended payload length >= 2^63')
233        if ws_version >= 13 and payload_length < 0x10000:
234            valid_length_encoding = False
235            length_encoding_bytes = 8
236
237        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
238                   payload_length)
239    elif payload_length == 126:
240        logger.log(common.LOGLEVEL_FINE,
241                   'Receive 2-octet extended payload length')
242
243        extended_payload_length = receive_bytes(2)
244        payload_length = struct.unpack('!H', extended_payload_length)[0]
245        if ws_version >= 13 and payload_length < 126:
246            valid_length_encoding = False
247            length_encoding_bytes = 2
248
249        logger.log(common.LOGLEVEL_FINE, 'Decoded_payload_length=%s',
250                   payload_length)
251
252    if not valid_length_encoding:
253        logger.warning(
254            'Payload length is not encoded using the minimal number of '
255            'bytes (%d is encoded using %d bytes)', payload_length,
256            length_encoding_bytes)
257
258    if mask == 1:
259        logger.log(common.LOGLEVEL_FINE, 'Receive mask')
260
261        masking_nonce = receive_bytes(4)
262        masker = util.RepeatedXorMasker(masking_nonce)
263
264        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
265    else:
266        masker = _NOOP_MASKER
267
268    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
269    if logger.isEnabledFor(common.LOGLEVEL_FINE):
270        receive_start = time.time()
271
272    raw_payload_bytes = receive_bytes(payload_length)
273
274    if logger.isEnabledFor(common.LOGLEVEL_FINE):
275        logger.log(
276            common.LOGLEVEL_FINE, 'Done receiving payload data at %s MB/s',
277            payload_length / (time.time() - receive_start) / 1000 / 1000)
278    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
279
280    if logger.isEnabledFor(common.LOGLEVEL_FINE):
281        unmask_start = time.time()
282
283    unmasked_bytes = masker.mask(raw_payload_bytes)
284
285    if logger.isEnabledFor(common.LOGLEVEL_FINE):
286        logger.log(common.LOGLEVEL_FINE,
287                   'Done unmasking payload data at %s MB/s',
288                   payload_length / (time.time() - unmask_start) / 1000 / 1000)
289
290    return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
291
292
293class FragmentedFrameBuilder(object):
294    """A stateful class to send a message as fragments."""
295    def __init__(self, mask, frame_filters=[], encode_utf8=True):
296        """Constructs an instance."""
297
298        self._mask = mask
299        self._frame_filters = frame_filters
300        # This is for skipping UTF-8 encoding when building text type frames
301        # from compressed data.
302        self._encode_utf8 = encode_utf8
303
304        self._started = False
305
306        # Hold opcode of the first frame in messages to verify types of other
307        # frames in the message are all the same.
308        self._opcode = common.OPCODE_TEXT
309
310    def build(self, payload_data, end, binary):
311        if binary:
312            frame_type = common.OPCODE_BINARY
313        else:
314            frame_type = common.OPCODE_TEXT
315        if self._started:
316            if self._opcode != frame_type:
317                raise ValueError('Message types are different in frames for '
318                                 'the same message')
319            opcode = common.OPCODE_CONTINUATION
320        else:
321            opcode = frame_type
322            self._opcode = frame_type
323
324        if end:
325            self._started = False
326            fin = 1
327        else:
328            self._started = True
329            fin = 0
330
331        if binary or not self._encode_utf8:
332            return create_binary_frame(payload_data, opcode, fin, self._mask,
333                                       self._frame_filters)
334        else:
335            return create_text_frame(payload_data, opcode, fin, self._mask,
336                                     self._frame_filters)
337
338
339def _create_control_frame(opcode, body, mask, frame_filters):
340    frame = Frame(opcode=opcode, payload=body)
341
342    for frame_filter in frame_filters:
343        frame_filter.filter(frame)
344
345    if len(frame.payload) > 125:
346        raise BadOperationException(
347            'Payload data size of control frames must be 125 bytes or less')
348
349    header = create_header(frame.opcode, len(frame.payload), frame.fin,
350                           frame.rsv1, frame.rsv2, frame.rsv3, mask)
351    return _build_frame(header, frame.payload, mask)
352
353
354def create_ping_frame(body, mask=False, frame_filters=[]):
355    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
356
357
358def create_pong_frame(body, mask=False, frame_filters=[]):
359    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
360
361
362def create_close_frame(body, mask=False, frame_filters=[]):
363    return _create_control_frame(common.OPCODE_CLOSE, body, mask,
364                                 frame_filters)
365
366
367def create_closing_handshake_body(code, reason):
368    body = b''
369    if code is not None:
370        if (code > common.STATUS_USER_PRIVATE_MAX
371                or code < common.STATUS_NORMAL_CLOSURE):
372            raise BadOperationException('Status code is out of range')
373        if (code == common.STATUS_NO_STATUS_RECEIVED
374                or code == common.STATUS_ABNORMAL_CLOSURE
375                or code == common.STATUS_TLS_HANDSHAKE):
376            raise BadOperationException('Status code is reserved pseudo '
377                                        'code')
378        encoded_reason = reason.encode('utf-8')
379        body = struct.pack('!H', code) + encoded_reason
380    return body
381
382
383class StreamOptions(object):
384    """Holds option values to configure Stream objects."""
385    def __init__(self):
386        """Constructs StreamOptions."""
387
388        # Filters applied to frames.
389        self.outgoing_frame_filters = []
390        self.incoming_frame_filters = []
391
392        # Filters applied to messages. Control frames are not affected by them.
393        self.outgoing_message_filters = []
394        self.incoming_message_filters = []
395
396        self.encode_text_message_to_utf8 = True
397        self.mask_send = False
398        self.unmask_receive = True
399
400
401class Stream(object):
402    """A class for parsing/building frames of the WebSocket protocol
403    (RFC 6455).
404    """
405    def __init__(self, request, options):
406        """Constructs an instance.
407
408        Args:
409            request: mod_python request.
410        """
411
412        self._logger = util.get_class_logger(self)
413
414        self._options = options
415        self._request = request
416
417        self._request.client_terminated = False
418        self._request.server_terminated = False
419
420        # Holds body of received fragments.
421        self._received_fragments = []
422        # Holds the opcode of the first fragment.
423        self._original_opcode = None
424
425        self._writer = FragmentedFrameBuilder(
426            self._options.mask_send, self._options.outgoing_frame_filters,
427            self._options.encode_text_message_to_utf8)
428
429        self._ping_queue = deque()
430
431    def _read(self, length):
432        """Reads length bytes from connection. In case we catch any exception,
433        prepends remote address to the exception message and raise again.
434
435        Raises:
436            ConnectionTerminatedException: when read returns empty string.
437        """
438
439        try:
440            read_bytes = self._request.connection.read(length)
441            if not read_bytes:
442                raise ConnectionTerminatedException(
443                    'Receiving %d byte failed. Peer (%r) closed connection' %
444                    (length, (self._request.connection.remote_addr, )))
445            return read_bytes
446        except IOError as e:
447            # Also catch an IOError because mod_python throws it.
448            raise ConnectionTerminatedException(
449                'Receiving %d byte failed. IOError (%s) occurred' %
450                (length, e))
451
452    def _write(self, bytes_to_write):
453        """Writes given bytes to connection. In case we catch any exception,
454        prepends remote address to the exception message and raise again.
455        """
456
457        try:
458            self._request.connection.write(bytes_to_write)
459        except Exception as e:
460            util.prepend_message_to_exception(
461                'Failed to send message to %r: ' %
462                (self._request.connection.remote_addr, ), e)
463            raise
464
465    def receive_bytes(self, length):
466        """Receives multiple bytes. Retries read when we couldn't receive the
467        specified amount. This method returns byte strings.
468
469        Raises:
470            ConnectionTerminatedException: when read returns empty string.
471        """
472
473        read_bytes = []
474        while length > 0:
475            new_read_bytes = self._read(length)
476            read_bytes.append(new_read_bytes)
477            length -= len(new_read_bytes)
478        return b''.join(read_bytes)
479
480    def _read_until(self, delim_char):
481        """Reads bytes until we encounter delim_char. The result will not
482        contain delim_char.
483
484        Raises:
485            ConnectionTerminatedException: when read returns empty string.
486        """
487
488        read_bytes = []
489        while True:
490            ch = self._read(1)
491            if ch == delim_char:
492                break
493            read_bytes.append(ch)
494        return b''.join(read_bytes)
495
496    def _receive_frame(self):
497        """Receives a frame and return data in the frame as a tuple containing
498        each header field and payload separately.
499
500        Raises:
501            ConnectionTerminatedException: when read returns empty
502                string.
503            InvalidFrameException: when the frame contains invalid data.
504        """
505        def _receive_bytes(length):
506            return self.receive_bytes(length)
507
508        return parse_frame(receive_bytes=_receive_bytes,
509                           logger=self._logger,
510                           ws_version=self._request.ws_version,
511                           unmask_receive=self._options.unmask_receive)
512
513    def _receive_frame_as_frame_object(self):
514        opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
515
516        return Frame(fin=fin,
517                     rsv1=rsv1,
518                     rsv2=rsv2,
519                     rsv3=rsv3,
520                     opcode=opcode,
521                     payload=unmasked_bytes)
522
523    def receive_filtered_frame(self):
524        """Receives a frame and applies frame filters and message filters.
525        The frame to be received must satisfy following conditions:
526        - The frame is not fragmented.
527        - The opcode of the frame is TEXT or BINARY.
528
529        DO NOT USE this method except for testing purpose.
530        """
531
532        frame = self._receive_frame_as_frame_object()
533        if not frame.fin:
534            raise InvalidFrameException(
535                'Segmented frames must not be received via '
536                'receive_filtered_frame()')
537        if (frame.opcode != common.OPCODE_TEXT
538                and frame.opcode != common.OPCODE_BINARY):
539            raise InvalidFrameException(
540                'Control frames must not be received via '
541                'receive_filtered_frame()')
542
543        for frame_filter in self._options.incoming_frame_filters:
544            frame_filter.filter(frame)
545        for message_filter in self._options.incoming_message_filters:
546            frame.payload = message_filter.filter(frame.payload)
547        return frame
548
549    def send_message(self, message, end=True, binary=False):
550        """Send message.
551
552        Args:
553            message: text in unicode or binary in str to send.
554            binary: send message as binary frame.
555
556        Raises:
557            BadOperationException: when called on a server-terminated
558                connection or called with inconsistent message type or
559                binary parameter.
560        """
561
562        if self._request.server_terminated:
563            raise BadOperationException(
564                'Requested send_message after sending out a closing handshake')
565
566        if binary and isinstance(message, six.text_type):
567            raise BadOperationException(
568                'Message for binary frame must not be instance of Unicode')
569
570        for message_filter in self._options.outgoing_message_filters:
571            message = message_filter.filter(message, end, binary)
572
573        try:
574            # Set this to any positive integer to limit maximum size of data in
575            # payload data of each frame.
576            MAX_PAYLOAD_DATA_SIZE = -1
577
578            if MAX_PAYLOAD_DATA_SIZE <= 0:
579                self._write(self._writer.build(message, end, binary))
580                return
581
582            bytes_written = 0
583            while True:
584                end_for_this_frame = end
585                bytes_to_write = len(message) - bytes_written
586                if (MAX_PAYLOAD_DATA_SIZE > 0
587                        and bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
588                    end_for_this_frame = False
589                    bytes_to_write = MAX_PAYLOAD_DATA_SIZE
590
591                frame = self._writer.build(
592                    message[bytes_written:bytes_written + bytes_to_write],
593                    end_for_this_frame, binary)
594                self._write(frame)
595
596                bytes_written += bytes_to_write
597
598                # This if must be placed here (the end of while block) so that
599                # at least one frame is sent.
600                if len(message) <= bytes_written:
601                    break
602        except ValueError as e:
603            raise BadOperationException(e)
604
605    def _get_message_from_frame(self, frame):
606        """Gets a message from frame. If the message is composed of fragmented
607        frames and the frame is not the last fragmented frame, this method
608        returns None. The whole message will be returned when the last
609        fragmented frame is passed to this method.
610
611        Raises:
612            InvalidFrameException: when the frame doesn't match defragmentation
613                context, or the frame contains invalid data.
614        """
615
616        if frame.opcode == common.OPCODE_CONTINUATION:
617            if not self._received_fragments:
618                if frame.fin:
619                    raise InvalidFrameException(
620                        'Received a termination frame but fragmentation '
621                        'not started')
622                else:
623                    raise InvalidFrameException(
624                        'Received an intermediate frame but '
625                        'fragmentation not started')
626
627            if frame.fin:
628                # End of fragmentation frame
629                self._received_fragments.append(frame.payload)
630                message = b''.join(self._received_fragments)
631                self._received_fragments = []
632                return message
633            else:
634                # Intermediate frame
635                self._received_fragments.append(frame.payload)
636                return None
637        else:
638            if self._received_fragments:
639                if frame.fin:
640                    raise InvalidFrameException(
641                        'Received an unfragmented frame without '
642                        'terminating existing fragmentation')
643                else:
644                    raise InvalidFrameException(
645                        'New fragmentation started without terminating '
646                        'existing fragmentation')
647
648            if frame.fin:
649                # Unfragmented frame
650
651                self._original_opcode = frame.opcode
652                return frame.payload
653            else:
654                # Start of fragmentation frame
655
656                if common.is_control_opcode(frame.opcode):
657                    raise InvalidFrameException(
658                        'Control frames must not be fragmented')
659
660                self._original_opcode = frame.opcode
661                self._received_fragments.append(frame.payload)
662                return None
663
664    def _process_close_message(self, message):
665        """Processes close message.
666
667        Args:
668            message: close message.
669
670        Raises:
671            InvalidFrameException: when the message is invalid.
672        """
673
674        self._request.client_terminated = True
675
676        # Status code is optional. We can have status reason only if we
677        # have status code. Status reason can be empty string. So,
678        # allowed cases are
679        # - no application data: no code no reason
680        # - 2 octet of application data: has code but no reason
681        # - 3 or more octet of application data: both code and reason
682        if len(message) == 0:
683            self._logger.debug('Received close frame (empty body)')
684            self._request.ws_close_code = common.STATUS_NO_STATUS_RECEIVED
685        elif len(message) == 1:
686            raise InvalidFrameException(
687                'If a close frame has status code, the length of '
688                'status code must be 2 octet')
689        elif len(message) >= 2:
690            self._request.ws_close_code = struct.unpack('!H', message[0:2])[0]
691            self._request.ws_close_reason = message[2:].decode(
692                'utf-8', 'replace')
693            self._logger.debug('Received close frame (code=%d, reason=%r)',
694                               self._request.ws_close_code,
695                               self._request.ws_close_reason)
696
697        # As we've received a close frame, no more data is coming over the
698        # socket. We can now safely close the socket without worrying about
699        # RST sending.
700
701        if self._request.server_terminated:
702            self._logger.debug(
703                'Received ack for server-initiated closing handshake')
704            return
705
706        self._logger.debug('Received client-initiated closing handshake')
707
708        code = common.STATUS_NORMAL_CLOSURE
709        reason = ''
710        if hasattr(self._request, '_dispatcher'):
711            dispatcher = self._request._dispatcher
712            code, reason = dispatcher.passive_closing_handshake(self._request)
713            if code is None and reason is not None and len(reason) > 0:
714                self._logger.warning(
715                    'Handler specified reason despite code being None')
716                reason = ''
717            if reason is None:
718                reason = ''
719        self._send_closing_handshake(code, reason)
720        self._logger.debug(
721            'Acknowledged closing handshake initiated by the peer '
722            '(code=%r, reason=%r)', code, reason)
723
724    def _process_ping_message(self, message):
725        """Processes ping message.
726
727        Args:
728            message: ping message.
729        """
730
731        try:
732            handler = self._request.on_ping_handler
733            if handler:
734                handler(self._request, message)
735                return
736        except AttributeError:
737            pass
738        self._send_pong(message)
739
740    def _process_pong_message(self, message):
741        """Processes pong message.
742
743        Args:
744            message: pong message.
745        """
746
747        # TODO(tyoshino): Add ping timeout handling.
748
749        inflight_pings = deque()
750
751        while True:
752            try:
753                expected_body = self._ping_queue.popleft()
754                if expected_body == message:
755                    # inflight_pings contains pings ignored by the
756                    # other peer. Just forget them.
757                    self._logger.debug(
758                        'Ping %r is acked (%d pings were ignored)',
759                        expected_body, len(inflight_pings))
760                    break
761                else:
762                    inflight_pings.append(expected_body)
763            except IndexError:
764                # The received pong was unsolicited pong. Keep the
765                # ping queue as is.
766                self._ping_queue = inflight_pings
767                self._logger.debug('Received a unsolicited pong')
768                break
769
770        try:
771            handler = self._request.on_pong_handler
772            if handler:
773                handler(self._request, message)
774        except AttributeError:
775            pass
776
777    def receive_message(self):
778        """Receive a WebSocket frame and return its payload as a text in
779        unicode or a binary in str.
780
781        Returns:
782            payload data of the frame
783            - as unicode instance if received text frame
784            - as str instance if received binary frame
785            or None iff received closing handshake.
786        Raises:
787            BadOperationException: when called on a client-terminated
788                connection.
789            ConnectionTerminatedException: when read returns empty
790                string.
791            InvalidFrameException: when the frame contains invalid
792                data.
793            UnsupportedFrameException: when the received frame has
794                flags, opcode we cannot handle. You can ignore this
795                exception and continue receiving the next frame.
796        """
797
798        if self._request.client_terminated:
799            raise BadOperationException(
800                'Requested receive_message after receiving a closing '
801                'handshake')
802
803        while True:
804            # mp_conn.read will block if no bytes are available.
805
806            frame = self._receive_frame_as_frame_object()
807
808            # Check the constraint on the payload size for control frames
809            # before extension processes the frame.
810            # See also http://tools.ietf.org/html/rfc6455#section-5.5
811            if (common.is_control_opcode(frame.opcode)
812                    and len(frame.payload) > 125):
813                raise InvalidFrameException(
814                    'Payload data size of control frames must be 125 bytes or '
815                    'less')
816
817            for frame_filter in self._options.incoming_frame_filters:
818                frame_filter.filter(frame)
819
820            if frame.rsv1 or frame.rsv2 or frame.rsv3:
821                raise UnsupportedFrameException(
822                    'Unsupported flag is set (rsv = %d%d%d)' %
823                    (frame.rsv1, frame.rsv2, frame.rsv3))
824
825            message = self._get_message_from_frame(frame)
826            if message is None:
827                continue
828
829            for message_filter in self._options.incoming_message_filters:
830                message = message_filter.filter(message)
831
832            if self._original_opcode == common.OPCODE_TEXT:
833                # The WebSocket protocol section 4.4 specifies that invalid
834                # characters must be replaced with U+fffd REPLACEMENT
835                # CHARACTER.
836                try:
837                    return message.decode('utf-8')
838                except UnicodeDecodeError as e:
839                    raise InvalidUTF8Exception(e)
840            elif self._original_opcode == common.OPCODE_BINARY:
841                return message
842            elif self._original_opcode == common.OPCODE_CLOSE:
843                self._process_close_message(message)
844                return None
845            elif self._original_opcode == common.OPCODE_PING:
846                self._process_ping_message(message)
847            elif self._original_opcode == common.OPCODE_PONG:
848                self._process_pong_message(message)
849            else:
850                raise UnsupportedFrameException('Opcode %d is not supported' %
851                                                self._original_opcode)
852
853    def _send_closing_handshake(self, code, reason):
854        body = create_closing_handshake_body(code, reason)
855        frame = create_close_frame(
856            body,
857            mask=self._options.mask_send,
858            frame_filters=self._options.outgoing_frame_filters)
859
860        self._request.server_terminated = True
861
862        self._write(frame)
863
864    def close_connection(self,
865                         code=common.STATUS_NORMAL_CLOSURE,
866                         reason='',
867                         wait_response=True):
868        """Closes a WebSocket connection. Note that this method blocks until
869        it receives acknowledgement to the closing handshake.
870
871        Args:
872            code: Status code for close frame. If code is None, a close
873                frame with empty body will be sent.
874            reason: string representing close reason.
875            wait_response: True when caller want to wait the response.
876        Raises:
877            BadOperationException: when reason is specified with code None
878                or reason is not an instance of both str and unicode.
879        """
880
881        if self._request.server_terminated:
882            self._logger.debug(
883                'Requested close_connection but server is already terminated')
884            return
885
886        # When we receive a close frame, we call _process_close_message().
887        # _process_close_message() immediately acknowledges to the
888        # server-initiated closing handshake and sets server_terminated to
889        # True. So, here we can assume that we haven't received any close
890        # frame. We're initiating a closing handshake.
891
892        if code is None:
893            if reason is not None and len(reason) > 0:
894                raise BadOperationException(
895                    'close reason must not be specified if code is None')
896            reason = ''
897        else:
898            if not isinstance(reason, bytes) and not isinstance(
899                    reason, six.text_type):
900                raise BadOperationException(
901                    'close reason must be an instance of bytes or unicode')
902
903        self._send_closing_handshake(code, reason)
904        self._logger.debug('Initiated closing handshake (code=%r, reason=%r)',
905                           code, reason)
906
907        if (code == common.STATUS_GOING_AWAY
908                or code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
909            # It doesn't make sense to wait for a close frame if the reason is
910            # protocol error or that the server is going away. For some of
911            # other reasons, it might not make sense to wait for a close frame,
912            # but it's not clear, yet.
913            return
914
915        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
916        # or until a server-defined timeout expires.
917        #
918        # For now, we expect receiving closing handshake right after sending
919        # out closing handshake.
920        message = self.receive_message()
921        if message is not None:
922            raise ConnectionTerminatedException(
923                'Didn\'t receive valid ack for closing handshake')
924        # TODO: 3. close the WebSocket connection.
925        # note: mod_python Connection (mp_conn) doesn't have close method.
926
927    def send_ping(self, body, binary=False):
928        if not binary and isinstance(body, six.text_type):
929            body = body.encode('UTF-8')
930        frame = create_ping_frame(body, self._options.mask_send,
931                                  self._options.outgoing_frame_filters)
932        self._write(frame)
933
934        self._ping_queue.append(body)
935
936    def _send_pong(self, body):
937        frame = create_pong_frame(body, self._options.mask_send,
938                                  self._options.outgoing_frame_filters)
939        self._write(frame)
940
941    def get_last_received_opcode(self):
942        """Returns the opcode of the WebSocket message which the last received
943        frame belongs to. The return value is valid iff immediately after
944        receive_message call.
945        """
946
947        return self._original_opcode
948
949
950# vi:sts=4 sw=4 et
951