1"""
2
3"""
4
5"""
6_app.py
7websocket - WebSocket client library for Python
8
9Copyright 2021 engn33r
10
11Licensed under the Apache License, Version 2.0 (the "License");
12you may not use this file except in compliance with the License.
13You may obtain a copy of the License at
14
15    http://www.apache.org/licenses/LICENSE-2.0
16
17Unless required by applicable law or agreed to in writing, software
18distributed under the License is distributed on an "AS IS" BASIS,
19WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20See the License for the specific language governing permissions and
21limitations under the License.
22"""
23import selectors
24import sys
25import threading
26import time
27import traceback
28from ._abnf import ABNF
29from ._core import WebSocket, getdefaulttimeout
30from ._exceptions import *
31from . import _logging
32
33
34__all__ = ["WebSocketApp"]
35
36
37class Dispatcher:
38    """
39    Dispatcher
40    """
41    def __init__(self, app, ping_timeout):
42        self.app = app
43        self.ping_timeout = ping_timeout
44
45    def read(self, sock, read_callback, check_callback):
46        while self.app.keep_running:
47            sel = selectors.DefaultSelector()
48            sel.register(self.app.sock.sock, selectors.EVENT_READ)
49
50            r = sel.select(self.ping_timeout)
51            if r:
52                if not read_callback():
53                    break
54            check_callback()
55            sel.close()
56
57
58class SSLDispatcher:
59    """
60    SSLDispatcher
61    """
62    def __init__(self, app, ping_timeout):
63        self.app = app
64        self.ping_timeout = ping_timeout
65
66    def read(self, sock, read_callback, check_callback):
67        while self.app.keep_running:
68            r = self.select()
69            if r:
70                if not read_callback():
71                    break
72            check_callback()
73
74    def select(self):
75        sock = self.app.sock.sock
76        if sock.pending():
77            return [sock,]
78
79        sel = selectors.DefaultSelector()
80        sel.register(sock, selectors.EVENT_READ)
81
82        r = sel.select(self.ping_timeout)
83        sel.close()
84
85        if len(r) > 0:
86            return r[0][0]
87
88
89class WebSocketApp(object):
90    """
91    Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
92    """
93
94    def __init__(self, url, header=None,
95                 on_open=None, on_message=None, on_error=None,
96                 on_close=None, on_ping=None, on_pong=None,
97                 on_cont_message=None,
98                 keep_running=True, get_mask_key=None, cookie=None,
99                 subprotocols=None,
100                 on_data=None):
101        """
102        WebSocketApp initialization
103
104        Parameters
105        ----------
106        url: str
107            Websocket url.
108        header: list or dict
109            Custom header for websocket handshake.
110        on_open: function
111            Callback object which is called at opening websocket.
112            on_open has one argument.
113            The 1st argument is this class object.
114        on_message: function
115            Callback object which is called when received data.
116            on_message has 2 arguments.
117            The 1st argument is this class object.
118            The 2nd argument is utf-8 data received from the server.
119        on_error: function
120            Callback object which is called when we get error.
121            on_error has 2 arguments.
122            The 1st argument is this class object.
123            The 2nd argument is exception object.
124        on_close: function
125            Callback object which is called when connection is closed.
126            on_close has 3 arguments.
127            The 1st argument is this class object.
128            The 2nd argument is close_status_code.
129            The 3rd argument is close_msg.
130        on_cont_message: function
131            Callback object which is called when a continuation
132            frame is received.
133            on_cont_message has 3 arguments.
134            The 1st argument is this class object.
135            The 2nd argument is utf-8 string which we get from the server.
136            The 3rd argument is continue flag. if 0, the data continue
137            to next frame data
138        on_data: function
139            Callback object which is called when a message received.
140            This is called before on_message or on_cont_message,
141            and then on_message or on_cont_message is called.
142            on_data has 4 argument.
143            The 1st argument is this class object.
144            The 2nd argument is utf-8 string which we get from the server.
145            The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
146            The 4th argument is continue flag. If 0, the data continue
147        keep_running: bool
148            This parameter is obsolete and ignored.
149        get_mask_key: function
150            A callable function to get new mask keys, see the
151            WebSocket.set_mask_key's docstring for more information.
152        cookie: str
153            Cookie value.
154        subprotocols: list
155            List of available sub protocols. Default is None.
156        """
157        self.url = url
158        self.header = header if header is not None else []
159        self.cookie = cookie
160
161        self.on_open = on_open
162        self.on_message = on_message
163        self.on_data = on_data
164        self.on_error = on_error
165        self.on_close = on_close
166        self.on_ping = on_ping
167        self.on_pong = on_pong
168        self.on_cont_message = on_cont_message
169        self.keep_running = False
170        self.get_mask_key = get_mask_key
171        self.sock = None
172        self.last_ping_tm = 0
173        self.last_pong_tm = 0
174        self.subprotocols = subprotocols
175
176    def send(self, data, opcode=ABNF.OPCODE_TEXT):
177        """
178        send message
179
180        Parameters
181        ----------
182        data: str
183            Message to send. If you set opcode to OPCODE_TEXT,
184            data must be utf-8 string or unicode.
185        opcode: int
186            Operation code of data. Default is OPCODE_TEXT.
187        """
188
189        if not self.sock or self.sock.send(data, opcode) == 0:
190            raise WebSocketConnectionClosedException(
191                "Connection is already closed.")
192
193    def close(self, **kwargs):
194        """
195        Close websocket connection.
196        """
197        self.keep_running = False
198        if self.sock:
199            self.sock.close(**kwargs)
200            self.sock = None
201
202    def _send_ping(self, interval, event, payload):
203        while not event.wait(interval):
204            self.last_ping_tm = time.time()
205            if self.sock:
206                try:
207                    self.sock.ping(payload)
208                except Exception as ex:
209                    _logging.warning("send_ping routine terminated: {}".format(ex))
210                    break
211
212    def run_forever(self, sockopt=None, sslopt=None,
213                    ping_interval=0, ping_timeout=None,
214                    ping_payload="",
215                    http_proxy_host=None, http_proxy_port=None,
216                    http_no_proxy=None, http_proxy_auth=None,
217                    skip_utf8_validation=False,
218                    host=None, origin=None, dispatcher=None,
219                    suppress_origin=False, proxy_type=None):
220        """
221        Run event loop for WebSocket framework.
222
223        This loop is an infinite loop and is alive while websocket is available.
224
225        Parameters
226        ----------
227        sockopt: tuple
228            Values for socket.setsockopt.
229            sockopt must be tuple
230            and each element is argument of sock.setsockopt.
231        sslopt: dict
232            Optional dict object for ssl socket option.
233        ping_interval: int or float
234            Automatically send "ping" command
235            every specified period (in seconds).
236            If set to 0, no ping is sent periodically.
237        ping_timeout: int or float
238            Timeout (in seconds) if the pong message is not received.
239        ping_payload: str
240            Payload message to send with each ping.
241        http_proxy_host: str
242            HTTP proxy host name.
243        http_proxy_port: int or str
244            HTTP proxy port. If not set, set to 80.
245        http_no_proxy: list
246            Whitelisted host names that don't use the proxy.
247        skip_utf8_validation: bool
248            skip utf8 validation.
249        host: str
250            update host header.
251        origin: str
252            update origin header.
253        dispatcher: Dispatcher object
254            customize reading data from socket.
255        suppress_origin: bool
256            suppress outputting origin header.
257
258        Returns
259        -------
260        teardown: bool
261            False if caught KeyboardInterrupt, True if other exception was raised during a loop
262        """
263
264        if ping_timeout is not None and ping_timeout <= 0:
265            raise WebSocketException("Ensure ping_timeout > 0")
266        if ping_interval is not None and ping_interval < 0:
267            raise WebSocketException("Ensure ping_interval >= 0")
268        if ping_timeout and ping_interval and ping_interval <= ping_timeout:
269            raise WebSocketException("Ensure ping_interval > ping_timeout")
270        if not sockopt:
271            sockopt = []
272        if not sslopt:
273            sslopt = {}
274        if self.sock:
275            raise WebSocketException("socket is already opened")
276        thread = None
277        self.keep_running = True
278        self.last_ping_tm = 0
279        self.last_pong_tm = 0
280
281        def teardown(close_frame=None):
282            """
283            Tears down the connection.
284
285            Parameters
286            ----------
287            close_frame: ABNF frame
288                If close_frame is set, the on_close handler is invoked
289                with the statusCode and reason from the provided frame.
290            """
291
292            if thread and thread.is_alive():
293                event.set()
294                thread.join()
295            self.keep_running = False
296            if self.sock:
297                self.sock.close()
298            close_status_code, close_reason = self._get_close_args(
299                close_frame if close_frame else None)
300            self.sock = None
301
302            # Finally call the callback AFTER all teardown is complete
303            self._callback(self.on_close, close_status_code, close_reason)
304
305        try:
306            self.sock = WebSocket(
307                self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
308                fire_cont_frame=self.on_cont_message is not None,
309                skip_utf8_validation=skip_utf8_validation,
310                enable_multithread=True)
311            self.sock.settimeout(getdefaulttimeout())
312            self.sock.connect(
313                self.url, header=self.header, cookie=self.cookie,
314                http_proxy_host=http_proxy_host,
315                http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
316                http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
317                host=host, origin=origin, suppress_origin=suppress_origin,
318                proxy_type=proxy_type)
319            if not dispatcher:
320                dispatcher = self.create_dispatcher(ping_timeout)
321
322            self._callback(self.on_open)
323
324            if ping_interval:
325                event = threading.Event()
326                thread = threading.Thread(
327                    target=self._send_ping, args=(ping_interval, event, ping_payload))
328                thread.daemon = True
329                thread.start()
330
331            def read():
332                if not self.keep_running:
333                    return teardown()
334
335                op_code, frame = self.sock.recv_data_frame(True)
336                if op_code == ABNF.OPCODE_CLOSE:
337                    return teardown(frame)
338                elif op_code == ABNF.OPCODE_PING:
339                    self._callback(self.on_ping, frame.data)
340                elif op_code == ABNF.OPCODE_PONG:
341                    self.last_pong_tm = time.time()
342                    self._callback(self.on_pong, frame.data)
343                elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
344                    self._callback(self.on_data, frame.data,
345                                   frame.opcode, frame.fin)
346                    self._callback(self.on_cont_message,
347                                   frame.data, frame.fin)
348                else:
349                    data = frame.data
350                    if op_code == ABNF.OPCODE_TEXT:
351                        data = data.decode("utf-8")
352                    self._callback(self.on_data, data, frame.opcode, True)
353                    self._callback(self.on_message, data)
354
355                return True
356
357            def check():
358                if (ping_timeout):
359                    has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
360                    has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
361                    has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
362
363                    if (self.last_ping_tm and
364                            has_timeout_expired and
365                            (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
366                        raise WebSocketTimeoutException("ping/pong timed out")
367                return True
368
369            dispatcher.read(self.sock.sock, read, check)
370        except (Exception, KeyboardInterrupt, SystemExit) as e:
371            self._callback(self.on_error, e)
372            if isinstance(e, SystemExit):
373                # propagate SystemExit further
374                raise
375            teardown()
376            return not isinstance(e, KeyboardInterrupt)
377
378    def create_dispatcher(self, ping_timeout):
379        timeout = ping_timeout or 10
380        if self.sock.is_ssl():
381            return SSLDispatcher(self, timeout)
382
383        return Dispatcher(self, timeout)
384
385    def _get_close_args(self, close_frame):
386        """
387        _get_close_args extracts the close code and reason from the close body
388        if it exists (RFC6455 says WebSocket Connection Close Code is optional)
389        """
390        # Need to catch the case where close_frame is None
391        # Otherwise the following if statement causes an error
392        if not self.on_close or not close_frame:
393            return [None, None]
394
395        # Extract close frame status code
396        if close_frame.data and len(close_frame.data) >= 2:
397            close_status_code = 256 * close_frame.data[0] + close_frame.data[1]
398            reason = close_frame.data[2:].decode('utf-8')
399            return [close_status_code, reason]
400        else:
401            # Most likely reached this because len(close_frame_data.data) < 2
402            return [None, None]
403
404    def _callback(self, callback, *args):
405        if callback:
406            try:
407                callback(self, *args)
408
409            except Exception as e:
410                _logging.error("error from callback {}: {}".format(callback, e))
411                if self.on_error:
412                    self.on_error(self, e)
413