1""" 2 3""" 4 5""" 6websocket - WebSocket client library for Python 7 8Copyright (C) 2010 Hiroki Ohtani(liris) 9 10 This library is free software; you can redistribute it and/or 11 modify it under the terms of the GNU Lesser General Public 12 License as published by the Free Software Foundation; either 13 version 2.1 of the License, or (at your option) any later version. 14 15 This library is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 Lesser General Public License for more details. 19 20 You should have received a copy of the GNU Lesser General Public 21 License along with this library; if not, write to the Free Software 22 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 23 24""" 25import inspect 26import select 27import sys 28import threading 29import time 30import traceback 31 32import six 33 34from ._abnf import ABNF 35from ._core import WebSocket, getdefaulttimeout 36from ._exceptions import * 37from . import _logging 38 39 40__all__ = ["WebSocketApp"] 41 42class Dispatcher: 43 """ 44 Dispatcher 45 """ 46 def __init__(self, app, ping_timeout): 47 self.app = app 48 self.ping_timeout = ping_timeout 49 50 def read(self, sock, read_callback, check_callback): 51 while self.app.keep_running: 52 r, w, e = select.select( 53 (self.app.sock.sock, ), (), (), self.ping_timeout) 54 if r: 55 if not read_callback(): 56 break 57 check_callback() 58 59class SSLDispatcher: 60 """ 61 SSLDispatcher 62 """ 63 def __init__(self, app, ping_timeout): 64 self.app = app 65 self.ping_timeout = ping_timeout 66 67 def read(self, sock, read_callback, check_callback): 68 while self.app.keep_running: 69 r = self.select() 70 if r: 71 if not read_callback(): 72 break 73 check_callback() 74 75 def select(self): 76 sock = self.app.sock.sock 77 if sock.pending(): 78 return [sock,] 79 80 r, w, e = select.select((sock, ), (), (), self.ping_timeout) 81 return r 82 83 84class WebSocketApp(object): 85 """ 86 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 87 """ 88 89 def __init__(self, url, header=None, 90 on_open=None, on_message=None, on_error=None, 91 on_close=None, on_ping=None, on_pong=None, 92 on_cont_message=None, 93 keep_running=True, get_mask_key=None, cookie=None, 94 subprotocols=None, 95 on_data=None): 96 """ 97 WebSocketApp initialization 98 99 Parameters 100 ---------- 101 url: <type> 102 websocket url. 103 header: list or dict 104 custom header for websocket handshake. 105 on_open: <type> 106 callable object which is called at opening websocket. 107 this function has one argument. The argument is this class object. 108 on_message: <type> 109 callable object which is called when received data. 110 on_message has 2 arguments. 111 The 1st argument is this class object. 112 The 2nd argument is utf-8 string which we get from the server. 113 on_error: <type> 114 callable object which is called when we get error. 115 on_error has 2 arguments. 116 The 1st argument is this class object. 117 The 2nd argument is exception object. 118 on_close: <type> 119 callable object which is called when closed the connection. 120 this function has one argument. The argument is this class object. 121 on_cont_message: <type> 122 callback object which is called when receive continued 123 frame data. 124 on_cont_message has 3 arguments. 125 The 1st argument is this class object. 126 The 2nd argument is utf-8 string which we get from the server. 127 The 3rd argument is continue flag. if 0, the data continue 128 to next frame data 129 on_data: <type> 130 callback object which is called when a message received. 131 This is called before on_message or on_cont_message, 132 and then on_message or on_cont_message is called. 133 on_data has 4 argument. 134 The 1st argument is this class object. 135 The 2nd argument is utf-8 string which we get from the server. 136 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 137 The 4th argument is continue flag. if 0, the data continue 138 keep_running: <type> 139 this parameter is obsolete and ignored. 140 get_mask_key: func 141 a callable to produce new mask keys, 142 see the WebSocket.set_mask_key's docstring for more information 143 cookie: str 144 cookie value. 145 subprotocols: <type> 146 array of available sub protocols. default is None. 147 """ 148 self.url = url 149 self.header = header if header is not None else [] 150 self.cookie = cookie 151 152 self.on_open = on_open 153 self.on_message = on_message 154 self.on_data = on_data 155 self.on_error = on_error 156 self.on_close = on_close 157 self.on_ping = on_ping 158 self.on_pong = on_pong 159 self.on_cont_message = on_cont_message 160 self.keep_running = False 161 self.get_mask_key = get_mask_key 162 self.sock = None 163 self.last_ping_tm = 0 164 self.last_pong_tm = 0 165 self.subprotocols = subprotocols 166 167 def send(self, data, opcode=ABNF.OPCODE_TEXT): 168 """ 169 send message 170 171 Parameters 172 ---------- 173 data: <type> 174 Message to send. If you set opcode to OPCODE_TEXT, 175 data must be utf-8 string or unicode. 176 opcode: <type> 177 Operation code of data. default is OPCODE_TEXT. 178 """ 179 180 if not self.sock or self.sock.send(data, opcode) == 0: 181 raise WebSocketConnectionClosedException( 182 "Connection is already closed.") 183 184 def close(self, **kwargs): 185 """ 186 Close websocket connection. 187 """ 188 self.keep_running = False 189 if self.sock: 190 self.sock.close(**kwargs) 191 self.sock = None 192 193 def _send_ping(self, interval, event): 194 while not event.wait(interval): 195 self.last_ping_tm = time.time() 196 if self.sock: 197 try: 198 self.sock.ping() 199 except Exception as ex: 200 _logging.warning("send_ping routine terminated: {}".format(ex)) 201 break 202 203 def run_forever(self, sockopt=None, sslopt=None, 204 ping_interval=0, ping_timeout=None, 205 http_proxy_host=None, http_proxy_port=None, 206 http_no_proxy=None, http_proxy_auth=None, 207 skip_utf8_validation=False, 208 host=None, origin=None, dispatcher=None, 209 suppress_origin=False, proxy_type=None): 210 """ 211 Run event loop for WebSocket framework. 212 213 This loop is an infinite loop and is alive while websocket is available. 214 215 Parameters 216 ---------- 217 sockopt: tuple 218 values for socket.setsockopt. 219 sockopt must be tuple 220 and each element is argument of sock.setsockopt. 221 sslopt: dict 222 optional dict object for ssl socket option. 223 ping_interval: int or float 224 automatically send "ping" command 225 every specified period (in seconds) 226 if set to 0, not send automatically. 227 ping_timeout: int or float 228 timeout (in seconds) if the pong message is not received. 229 http_proxy_host: <type> 230 http proxy host name. 231 http_proxy_port: <type> 232 http proxy port. If not set, set to 80. 233 http_no_proxy: <type> 234 host names, which doesn't use proxy. 235 skip_utf8_validation: bool 236 skip utf8 validation. 237 host: str 238 update host header. 239 origin: str 240 update origin header. 241 dispatcher: <type> 242 customize reading data from socket. 243 suppress_origin: bool 244 suppress outputting origin header. 245 246 Returns 247 ------- 248 teardown: bool 249 False if caught KeyboardInterrupt, True if other exception was raised during a loop 250 """ 251 252 if ping_timeout is not None and ping_timeout <= 0: 253 ping_timeout = None 254 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 255 raise WebSocketException("Ensure ping_interval > ping_timeout") 256 if not sockopt: 257 sockopt = [] 258 if not sslopt: 259 sslopt = {} 260 if self.sock: 261 raise WebSocketException("socket is already opened") 262 thread = None 263 self.keep_running = True 264 self.last_ping_tm = 0 265 self.last_pong_tm = 0 266 267 def teardown(close_frame=None): 268 """ 269 Tears down the connection. 270 271 If close_frame is set, we will invoke the on_close handler with the 272 statusCode and reason from there. 273 """ 274 if thread and thread.is_alive(): 275 event.set() 276 thread.join() 277 self.keep_running = False 278 if self.sock: 279 self.sock.close() 280 close_args = self._get_close_args( 281 close_frame.data if close_frame else None) 282 self._callback(self.on_close, *close_args) 283 self.sock = None 284 285 try: 286 self.sock = WebSocket( 287 self.get_mask_key, sockopt=sockopt, sslopt=sslopt, 288 fire_cont_frame=self.on_cont_message is not None, 289 skip_utf8_validation=skip_utf8_validation, 290 enable_multithread=True if ping_interval else False) 291 self.sock.settimeout(getdefaulttimeout()) 292 self.sock.connect( 293 self.url, header=self.header, cookie=self.cookie, 294 http_proxy_host=http_proxy_host, 295 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, 296 http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, 297 host=host, origin=origin, suppress_origin=suppress_origin, 298 proxy_type=proxy_type) 299 if not dispatcher: 300 dispatcher = self.create_dispatcher(ping_timeout) 301 302 self._callback(self.on_open) 303 304 if ping_interval: 305 event = threading.Event() 306 thread = threading.Thread( 307 target=self._send_ping, args=(ping_interval, event)) 308 thread.setDaemon(True) 309 thread.start() 310 311 def read(): 312 if not self.keep_running: 313 return teardown() 314 315 op_code, frame = self.sock.recv_data_frame(True) 316 if op_code == ABNF.OPCODE_CLOSE: 317 return teardown(frame) 318 elif op_code == ABNF.OPCODE_PING: 319 self._callback(self.on_ping, frame.data) 320 elif op_code == ABNF.OPCODE_PONG: 321 self.last_pong_tm = time.time() 322 self._callback(self.on_pong, frame.data) 323 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 324 self._callback(self.on_data, frame.data, 325 frame.opcode, frame.fin) 326 self._callback(self.on_cont_message, 327 frame.data, frame.fin) 328 else: 329 data = frame.data 330 if six.PY3 and op_code == ABNF.OPCODE_TEXT: 331 data = data.decode("utf-8") 332 self._callback(self.on_data, data, frame.opcode, True) 333 self._callback(self.on_message, data) 334 335 return True 336 337 def check(): 338 if (ping_timeout): 339 has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout 340 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 341 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout 342 343 if (self.last_ping_tm 344 and has_timeout_expired 345 and (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): 346 raise WebSocketTimeoutException("ping/pong timed out") 347 return True 348 349 dispatcher.read(self.sock.sock, read, check) 350 except (Exception, KeyboardInterrupt, SystemExit) as e: 351 self._callback(self.on_error, e) 352 if isinstance(e, SystemExit): 353 # propagate SystemExit further 354 raise 355 teardown() 356 return not isinstance(e, KeyboardInterrupt) 357 358 def create_dispatcher(self, ping_timeout): 359 timeout = ping_timeout or 10 360 if self.sock.is_ssl(): 361 return SSLDispatcher(self, timeout) 362 363 return Dispatcher(self, timeout) 364 365 def _get_close_args(self, data): 366 """ 367 _get_close_args extracts the code, reason from the close body 368 if they exists, and if the self.on_close except three arguments 369 """ 370 # if the on_close callback is "old", just return empty list 371 if sys.version_info < (3, 0): 372 if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: 373 return [] 374 else: 375 if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: 376 return [] 377 378 if data and len(data) >= 2: 379 code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) 380 reason = data[2:].decode('utf-8') 381 return [code, reason] 382 383 return [None, None] 384 385 def _callback(self, callback, *args): 386 if callback: 387 try: 388 callback(self, *args) 389 390 except Exception as e: 391 _logging.error("error from callback {}: {}".format(callback, e)) 392 if _logging.isEnabledForDebug(): 393 _, _, tb = sys.exc_info() 394 traceback.print_tb(tb) 395