1"""
2
3"""
4
5"""
6websocket - WebSocket client library for Python
7
8Copyright (C) 2010 Hiroki Ohtani(liris)
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2.1 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
23
24"""
25import inspect
26import select
27import sys
28import threading
29import time
30import traceback
31
32import six
33
34from ._abnf import ABNF
35from ._core import WebSocket, getdefaulttimeout
36from ._exceptions import *
37from . import _logging
38
39
40__all__ = ["WebSocketApp"]
41
42class Dispatcher:
43    """
44    Dispatcher
45    """
46    def __init__(self, app, ping_timeout):
47        self.app = app
48        self.ping_timeout = ping_timeout
49
50    def read(self, sock, read_callback, check_callback):
51        while self.app.keep_running:
52            r, w, e = select.select(
53                    (self.app.sock.sock, ), (), (), self.ping_timeout)
54            if r:
55                if not read_callback():
56                    break
57            check_callback()
58
59class SSLDispatcher:
60    """
61    SSLDispatcher
62    """
63    def __init__(self, app, ping_timeout):
64        self.app = app
65        self.ping_timeout = ping_timeout
66
67    def read(self, sock, read_callback, check_callback):
68        while self.app.keep_running:
69            r = self.select()
70            if r:
71                if not read_callback():
72                    break
73            check_callback()
74
75    def select(self):
76        sock = self.app.sock.sock
77        if sock.pending():
78            return [sock,]
79
80        r, w, e = select.select((sock, ), (), (), self.ping_timeout)
81        return r
82
83
84class WebSocketApp(object):
85    """
86    Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
87    """
88
89    def __init__(self, url, header=None,
90                 on_open=None, on_message=None, on_error=None,
91                 on_close=None, on_ping=None, on_pong=None,
92                 on_cont_message=None,
93                 keep_running=True, get_mask_key=None, cookie=None,
94                 subprotocols=None,
95                 on_data=None):
96        """
97        WebSocketApp initialization
98
99        Parameters
100        ----------
101        url: <type>
102            websocket url.
103        header: list or dict
104            custom header for websocket handshake.
105        on_open: <type>
106            callable object which is called at opening websocket.
107            this function has one argument. The argument is this class object.
108        on_message: <type>
109            callable object which is called when received data.
110            on_message has 2 arguments.
111            The 1st argument is this class object.
112            The 2nd argument is utf-8 string which we get from the server.
113        on_error: <type>
114            callable object which is called when we get error.
115            on_error has 2 arguments.
116            The 1st argument is this class object.
117            The 2nd argument is exception object.
118        on_close: <type>
119            callable object which is called when closed the connection.
120            this function has one argument. The argument is this class object.
121        on_cont_message: <type>
122            callback object which is called when receive continued
123            frame data.
124            on_cont_message has 3 arguments.
125            The 1st argument is this class object.
126            The 2nd argument is utf-8 string which we get from the server.
127            The 3rd argument is continue flag. if 0, the data continue
128            to next frame data
129        on_data: <type>
130            callback object which is called when a message received.
131            This is called before on_message or on_cont_message,
132            and then on_message or on_cont_message is called.
133            on_data has 4 argument.
134            The 1st argument is this class object.
135            The 2nd argument is utf-8 string which we get from the server.
136            The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
137            The 4th argument is continue flag. if 0, the data continue
138        keep_running: <type>
139            this parameter is obsolete and ignored.
140        get_mask_key: func
141            a callable to produce new mask keys,
142            see the WebSocket.set_mask_key's docstring for more information
143        cookie: str
144            cookie value.
145        subprotocols: <type>
146            array of available sub protocols. default is None.
147        """
148        self.url = url
149        self.header = header if header is not None else []
150        self.cookie = cookie
151
152        self.on_open = on_open
153        self.on_message = on_message
154        self.on_data = on_data
155        self.on_error = on_error
156        self.on_close = on_close
157        self.on_ping = on_ping
158        self.on_pong = on_pong
159        self.on_cont_message = on_cont_message
160        self.keep_running = False
161        self.get_mask_key = get_mask_key
162        self.sock = None
163        self.last_ping_tm = 0
164        self.last_pong_tm = 0
165        self.subprotocols = subprotocols
166
167    def send(self, data, opcode=ABNF.OPCODE_TEXT):
168        """
169        send message
170
171        Parameters
172        ----------
173        data: <type>
174            Message to send. If you set opcode to OPCODE_TEXT,
175            data must be utf-8 string or unicode.
176        opcode: <type>
177            Operation code of data. default is OPCODE_TEXT.
178        """
179
180        if not self.sock or self.sock.send(data, opcode) == 0:
181            raise WebSocketConnectionClosedException(
182                "Connection is already closed.")
183
184    def close(self, **kwargs):
185        """
186        Close websocket connection.
187        """
188        self.keep_running = False
189        if self.sock:
190            self.sock.close(**kwargs)
191            self.sock = None
192
193    def _send_ping(self, interval, event):
194        while not event.wait(interval):
195            self.last_ping_tm = time.time()
196            if self.sock:
197                try:
198                    self.sock.ping()
199                except Exception as ex:
200                    _logging.warning("send_ping routine terminated: {}".format(ex))
201                    break
202
203    def run_forever(self, sockopt=None, sslopt=None,
204                    ping_interval=0, ping_timeout=None,
205                    http_proxy_host=None, http_proxy_port=None,
206                    http_no_proxy=None, http_proxy_auth=None,
207                    skip_utf8_validation=False,
208                    host=None, origin=None, dispatcher=None,
209                    suppress_origin=False, proxy_type=None):
210        """
211        Run event loop for WebSocket framework.
212
213        This loop is an infinite loop and is alive while websocket is available.
214
215        Parameters
216        ----------
217        sockopt: tuple
218            values for socket.setsockopt.
219            sockopt must be tuple
220            and each element is argument of sock.setsockopt.
221        sslopt: dict
222            optional dict object for ssl socket option.
223        ping_interval: int or float
224            automatically send "ping" command
225            every specified period (in seconds)
226            if set to 0, not send automatically.
227        ping_timeout: int or float
228            timeout (in seconds) if the pong message is not received.
229        http_proxy_host: <type>
230            http proxy host name.
231        http_proxy_port: <type>
232            http proxy port. If not set, set to 80.
233        http_no_proxy: <type>
234            host names, which doesn't use proxy.
235        skip_utf8_validation: bool
236            skip utf8 validation.
237        host: str
238            update host header.
239        origin: str
240            update origin header.
241        dispatcher: <type>
242            customize reading data from socket.
243        suppress_origin: bool
244            suppress outputting origin header.
245
246        Returns
247        -------
248        teardown: bool
249            False if caught KeyboardInterrupt, True if other exception was raised during a loop
250        """
251
252        if ping_timeout is not None and ping_timeout <= 0:
253            ping_timeout = None
254        if ping_timeout and ping_interval and ping_interval <= ping_timeout:
255            raise WebSocketException("Ensure ping_interval > ping_timeout")
256        if not sockopt:
257            sockopt = []
258        if not sslopt:
259            sslopt = {}
260        if self.sock:
261            raise WebSocketException("socket is already opened")
262        thread = None
263        self.keep_running = True
264        self.last_ping_tm = 0
265        self.last_pong_tm = 0
266
267        def teardown(close_frame=None):
268            """
269            Tears down the connection.
270
271            If close_frame is set, we will invoke the on_close handler with the
272            statusCode and reason from there.
273            """
274            if thread and thread.is_alive():
275                event.set()
276                thread.join()
277            self.keep_running = False
278            if self.sock:
279                self.sock.close()
280            close_args = self._get_close_args(
281                close_frame.data if close_frame else None)
282            self._callback(self.on_close, *close_args)
283            self.sock = None
284
285        try:
286            self.sock = WebSocket(
287                self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
288                fire_cont_frame=self.on_cont_message is not None,
289                skip_utf8_validation=skip_utf8_validation,
290                enable_multithread=True if ping_interval else False)
291            self.sock.settimeout(getdefaulttimeout())
292            self.sock.connect(
293                self.url, header=self.header, cookie=self.cookie,
294                http_proxy_host=http_proxy_host,
295                http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
296                http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
297                host=host, origin=origin, suppress_origin=suppress_origin,
298                proxy_type=proxy_type)
299            if not dispatcher:
300                dispatcher = self.create_dispatcher(ping_timeout)
301
302            self._callback(self.on_open)
303
304            if ping_interval:
305                event = threading.Event()
306                thread = threading.Thread(
307                    target=self._send_ping, args=(ping_interval, event))
308                thread.setDaemon(True)
309                thread.start()
310
311            def read():
312                if not self.keep_running:
313                    return teardown()
314
315                op_code, frame = self.sock.recv_data_frame(True)
316                if op_code == ABNF.OPCODE_CLOSE:
317                    return teardown(frame)
318                elif op_code == ABNF.OPCODE_PING:
319                    self._callback(self.on_ping, frame.data)
320                elif op_code == ABNF.OPCODE_PONG:
321                    self.last_pong_tm = time.time()
322                    self._callback(self.on_pong, frame.data)
323                elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
324                    self._callback(self.on_data, frame.data,
325                                   frame.opcode, frame.fin)
326                    self._callback(self.on_cont_message,
327                                   frame.data, frame.fin)
328                else:
329                    data = frame.data
330                    if six.PY3 and op_code == ABNF.OPCODE_TEXT:
331                        data = data.decode("utf-8")
332                    self._callback(self.on_data, data, frame.opcode, True)
333                    self._callback(self.on_message, data)
334
335                return True
336
337            def check():
338                if (ping_timeout):
339                    has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
340                    has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
341                    has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
342
343                    if (self.last_ping_tm
344                            and has_timeout_expired
345                            and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
346                        raise WebSocketTimeoutException("ping/pong timed out")
347                return True
348
349            dispatcher.read(self.sock.sock, read, check)
350        except (Exception, KeyboardInterrupt, SystemExit) as e:
351            self._callback(self.on_error, e)
352            if isinstance(e, SystemExit):
353                # propagate SystemExit further
354                raise
355            teardown()
356            return not isinstance(e, KeyboardInterrupt)
357
358    def create_dispatcher(self, ping_timeout):
359        timeout = ping_timeout or 10
360        if self.sock.is_ssl():
361            return SSLDispatcher(self, timeout)
362
363        return Dispatcher(self, timeout)
364
365    def _get_close_args(self, data):
366        """
367        _get_close_args extracts the code, reason from the close body
368        if they exists, and if the self.on_close except three arguments
369        """
370        # if the on_close callback is "old", just return empty list
371        if sys.version_info < (3, 0):
372            if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
373                return []
374        else:
375            if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
376                return []
377
378        if data and len(data) >= 2:
379            code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
380            reason = data[2:].decode('utf-8')
381            return [code, reason]
382
383        return [None, None]
384
385    def _callback(self, callback, *args):
386        if callback:
387            try:
388                callback(self, *args)
389
390            except Exception as e:
391                _logging.error("error from callback {}: {}".format(callback, e))
392                if _logging.isEnabledForDebug():
393                    _, _, tb = sys.exc_info()
394                    traceback.print_tb(tb)
395