1""" 2 3""" 4 5""" 6_app.py 7websocket - WebSocket client library for Python 8 9Copyright 2021 engn33r 10 11Licensed under the Apache License, Version 2.0 (the "License"); 12you may not use this file except in compliance with the License. 13You may obtain a copy of the License at 14 15 http://www.apache.org/licenses/LICENSE-2.0 16 17Unless required by applicable law or agreed to in writing, software 18distributed under the License is distributed on an "AS IS" BASIS, 19WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 20See the License for the specific language governing permissions and 21limitations under the License. 22""" 23import selectors 24import sys 25import threading 26import time 27import traceback 28from ._abnf import ABNF 29from ._core import WebSocket, getdefaulttimeout 30from ._exceptions import * 31from . import _logging 32 33 34__all__ = ["WebSocketApp"] 35 36 37class Dispatcher: 38 """ 39 Dispatcher 40 """ 41 def __init__(self, app, ping_timeout): 42 self.app = app 43 self.ping_timeout = ping_timeout 44 45 def read(self, sock, read_callback, check_callback): 46 while self.app.keep_running: 47 sel = selectors.DefaultSelector() 48 sel.register(self.app.sock.sock, selectors.EVENT_READ) 49 50 r = sel.select(self.ping_timeout) 51 if r: 52 if not read_callback(): 53 break 54 check_callback() 55 sel.close() 56 57 58class SSLDispatcher: 59 """ 60 SSLDispatcher 61 """ 62 def __init__(self, app, ping_timeout): 63 self.app = app 64 self.ping_timeout = ping_timeout 65 66 def read(self, sock, read_callback, check_callback): 67 while self.app.keep_running: 68 r = self.select() 69 if r: 70 if not read_callback(): 71 break 72 check_callback() 73 74 def select(self): 75 sock = self.app.sock.sock 76 if sock.pending(): 77 return [sock,] 78 79 sel = selectors.DefaultSelector() 80 sel.register(sock, selectors.EVENT_READ) 81 82 r = sel.select(self.ping_timeout) 83 sel.close() 84 85 if len(r) > 0: 86 return r[0][0] 87 88 89class WebSocketApp(object): 90 """ 91 Higher level of APIs are provided. The interface is like JavaScript WebSocket object. 92 """ 93 94 def __init__(self, url, header=None, 95 on_open=None, on_message=None, on_error=None, 96 on_close=None, on_ping=None, on_pong=None, 97 on_cont_message=None, 98 keep_running=True, get_mask_key=None, cookie=None, 99 subprotocols=None, 100 on_data=None): 101 """ 102 WebSocketApp initialization 103 104 Parameters 105 ---------- 106 url: str 107 Websocket url. 108 header: list or dict 109 Custom header for websocket handshake. 110 on_open: function 111 Callback object which is called at opening websocket. 112 on_open has one argument. 113 The 1st argument is this class object. 114 on_message: function 115 Callback object which is called when received data. 116 on_message has 2 arguments. 117 The 1st argument is this class object. 118 The 2nd argument is utf-8 data received from the server. 119 on_error: function 120 Callback object which is called when we get error. 121 on_error has 2 arguments. 122 The 1st argument is this class object. 123 The 2nd argument is exception object. 124 on_close: function 125 Callback object which is called when connection is closed. 126 on_close has 3 arguments. 127 The 1st argument is this class object. 128 The 2nd argument is close_status_code. 129 The 3rd argument is close_msg. 130 on_cont_message: function 131 Callback object which is called when a continuation 132 frame is received. 133 on_cont_message has 3 arguments. 134 The 1st argument is this class object. 135 The 2nd argument is utf-8 string which we get from the server. 136 The 3rd argument is continue flag. if 0, the data continue 137 to next frame data 138 on_data: function 139 Callback object which is called when a message received. 140 This is called before on_message or on_cont_message, 141 and then on_message or on_cont_message is called. 142 on_data has 4 argument. 143 The 1st argument is this class object. 144 The 2nd argument is utf-8 string which we get from the server. 145 The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came. 146 The 4th argument is continue flag. If 0, the data continue 147 keep_running: bool 148 This parameter is obsolete and ignored. 149 get_mask_key: function 150 A callable function to get new mask keys, see the 151 WebSocket.set_mask_key's docstring for more information. 152 cookie: str 153 Cookie value. 154 subprotocols: list 155 List of available sub protocols. Default is None. 156 """ 157 self.url = url 158 self.header = header if header is not None else [] 159 self.cookie = cookie 160 161 self.on_open = on_open 162 self.on_message = on_message 163 self.on_data = on_data 164 self.on_error = on_error 165 self.on_close = on_close 166 self.on_ping = on_ping 167 self.on_pong = on_pong 168 self.on_cont_message = on_cont_message 169 self.keep_running = False 170 self.get_mask_key = get_mask_key 171 self.sock = None 172 self.last_ping_tm = 0 173 self.last_pong_tm = 0 174 self.subprotocols = subprotocols 175 176 def send(self, data, opcode=ABNF.OPCODE_TEXT): 177 """ 178 send message 179 180 Parameters 181 ---------- 182 data: str 183 Message to send. If you set opcode to OPCODE_TEXT, 184 data must be utf-8 string or unicode. 185 opcode: int 186 Operation code of data. Default is OPCODE_TEXT. 187 """ 188 189 if not self.sock or self.sock.send(data, opcode) == 0: 190 raise WebSocketConnectionClosedException( 191 "Connection is already closed.") 192 193 def close(self, **kwargs): 194 """ 195 Close websocket connection. 196 """ 197 self.keep_running = False 198 if self.sock: 199 self.sock.close(**kwargs) 200 self.sock = None 201 202 def _send_ping(self, interval, event, payload): 203 while not event.wait(interval): 204 self.last_ping_tm = time.time() 205 if self.sock: 206 try: 207 self.sock.ping(payload) 208 except Exception as ex: 209 _logging.warning("send_ping routine terminated: {}".format(ex)) 210 break 211 212 def run_forever(self, sockopt=None, sslopt=None, 213 ping_interval=0, ping_timeout=None, 214 ping_payload="", 215 http_proxy_host=None, http_proxy_port=None, 216 http_no_proxy=None, http_proxy_auth=None, 217 skip_utf8_validation=False, 218 host=None, origin=None, dispatcher=None, 219 suppress_origin=False, proxy_type=None): 220 """ 221 Run event loop for WebSocket framework. 222 223 This loop is an infinite loop and is alive while websocket is available. 224 225 Parameters 226 ---------- 227 sockopt: tuple 228 Values for socket.setsockopt. 229 sockopt must be tuple 230 and each element is argument of sock.setsockopt. 231 sslopt: dict 232 Optional dict object for ssl socket option. 233 ping_interval: int or float 234 Automatically send "ping" command 235 every specified period (in seconds). 236 If set to 0, no ping is sent periodically. 237 ping_timeout: int or float 238 Timeout (in seconds) if the pong message is not received. 239 ping_payload: str 240 Payload message to send with each ping. 241 http_proxy_host: str 242 HTTP proxy host name. 243 http_proxy_port: int or str 244 HTTP proxy port. If not set, set to 80. 245 http_no_proxy: list 246 Whitelisted host names that don't use the proxy. 247 skip_utf8_validation: bool 248 skip utf8 validation. 249 host: str 250 update host header. 251 origin: str 252 update origin header. 253 dispatcher: Dispatcher object 254 customize reading data from socket. 255 suppress_origin: bool 256 suppress outputting origin header. 257 258 Returns 259 ------- 260 teardown: bool 261 False if caught KeyboardInterrupt, True if other exception was raised during a loop 262 """ 263 264 if ping_timeout is not None and ping_timeout <= 0: 265 raise WebSocketException("Ensure ping_timeout > 0") 266 if ping_interval is not None and ping_interval < 0: 267 raise WebSocketException("Ensure ping_interval >= 0") 268 if ping_timeout and ping_interval and ping_interval <= ping_timeout: 269 raise WebSocketException("Ensure ping_interval > ping_timeout") 270 if not sockopt: 271 sockopt = [] 272 if not sslopt: 273 sslopt = {} 274 if self.sock: 275 raise WebSocketException("socket is already opened") 276 thread = None 277 self.keep_running = True 278 self.last_ping_tm = 0 279 self.last_pong_tm = 0 280 281 def teardown(close_frame=None): 282 """ 283 Tears down the connection. 284 285 Parameters 286 ---------- 287 close_frame: ABNF frame 288 If close_frame is set, the on_close handler is invoked 289 with the statusCode and reason from the provided frame. 290 """ 291 292 if thread and thread.is_alive(): 293 event.set() 294 thread.join() 295 self.keep_running = False 296 if self.sock: 297 self.sock.close() 298 close_status_code, close_reason = self._get_close_args( 299 close_frame if close_frame else None) 300 self.sock = None 301 302 # Finally call the callback AFTER all teardown is complete 303 self._callback(self.on_close, close_status_code, close_reason) 304 305 try: 306 self.sock = WebSocket( 307 self.get_mask_key, sockopt=sockopt, sslopt=sslopt, 308 fire_cont_frame=self.on_cont_message is not None, 309 skip_utf8_validation=skip_utf8_validation, 310 enable_multithread=True) 311 self.sock.settimeout(getdefaulttimeout()) 312 self.sock.connect( 313 self.url, header=self.header, cookie=self.cookie, 314 http_proxy_host=http_proxy_host, 315 http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy, 316 http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols, 317 host=host, origin=origin, suppress_origin=suppress_origin, 318 proxy_type=proxy_type) 319 if not dispatcher: 320 dispatcher = self.create_dispatcher(ping_timeout) 321 322 self._callback(self.on_open) 323 324 if ping_interval: 325 event = threading.Event() 326 thread = threading.Thread( 327 target=self._send_ping, args=(ping_interval, event, ping_payload)) 328 thread.daemon = True 329 thread.start() 330 331 def read(): 332 if not self.keep_running: 333 return teardown() 334 335 op_code, frame = self.sock.recv_data_frame(True) 336 if op_code == ABNF.OPCODE_CLOSE: 337 return teardown(frame) 338 elif op_code == ABNF.OPCODE_PING: 339 self._callback(self.on_ping, frame.data) 340 elif op_code == ABNF.OPCODE_PONG: 341 self.last_pong_tm = time.time() 342 self._callback(self.on_pong, frame.data) 343 elif op_code == ABNF.OPCODE_CONT and self.on_cont_message: 344 self._callback(self.on_data, frame.data, 345 frame.opcode, frame.fin) 346 self._callback(self.on_cont_message, 347 frame.data, frame.fin) 348 else: 349 data = frame.data 350 if op_code == ABNF.OPCODE_TEXT: 351 data = data.decode("utf-8") 352 self._callback(self.on_data, data, frame.opcode, True) 353 self._callback(self.on_message, data) 354 355 return True 356 357 def check(): 358 if (ping_timeout): 359 has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout 360 has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0 361 has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout 362 363 if (self.last_ping_tm and 364 has_timeout_expired and 365 (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)): 366 raise WebSocketTimeoutException("ping/pong timed out") 367 return True 368 369 dispatcher.read(self.sock.sock, read, check) 370 except (Exception, KeyboardInterrupt, SystemExit) as e: 371 self._callback(self.on_error, e) 372 if isinstance(e, SystemExit): 373 # propagate SystemExit further 374 raise 375 teardown() 376 return not isinstance(e, KeyboardInterrupt) 377 378 def create_dispatcher(self, ping_timeout): 379 timeout = ping_timeout or 10 380 if self.sock.is_ssl(): 381 return SSLDispatcher(self, timeout) 382 383 return Dispatcher(self, timeout) 384 385 def _get_close_args(self, close_frame): 386 """ 387 _get_close_args extracts the close code and reason from the close body 388 if it exists (RFC6455 says WebSocket Connection Close Code is optional) 389 """ 390 # Need to catch the case where close_frame is None 391 # Otherwise the following if statement causes an error 392 if not self.on_close or not close_frame: 393 return [None, None] 394 395 # Extract close frame status code 396 if close_frame.data and len(close_frame.data) >= 2: 397 close_status_code = 256 * close_frame.data[0] + close_frame.data[1] 398 reason = close_frame.data[2:].decode('utf-8') 399 return [close_status_code, reason] 400 else: 401 # Most likely reached this because len(close_frame_data.data) < 2 402 return [None, None] 403 404 def _callback(self, callback, *args): 405 if callback: 406 try: 407 callback(self, *args) 408 409 except Exception as e: 410 _logging.error("error from callback {}: {}".format(callback, e)) 411 if self.on_error: 412 self.on_error(self, e) 413