1import itertools 2import logging 3import random 4import signal 5import threading 6 7import engineio 8 9from . import exceptions 10from . import namespace 11from . import packet 12 13default_logger = logging.getLogger('socketio.client') 14reconnecting_clients = [] 15 16 17def signal_handler(sig, frame): # pragma: no cover 18 """SIGINT handler. 19 20 Notify any clients that are in a reconnect loop to abort. Other 21 disconnection tasks are handled at the engine.io level. 22 """ 23 for client in reconnecting_clients[:]: 24 client._reconnect_abort.set() 25 if callable(original_signal_handler): 26 return original_signal_handler(sig, frame) 27 else: # pragma: no cover 28 # Handle case where no original SIGINT handler was present. 29 return signal.default_int_handler(sig, frame) 30 31 32original_signal_handler = None 33 34 35class Client(object): 36 """A Socket.IO client. 37 38 This class implements a fully compliant Socket.IO web client with support 39 for websocket and long-polling transports. 40 41 :param reconnection: ``True`` if the client should automatically attempt to 42 reconnect to the server after an interruption, or 43 ``False`` to not reconnect. The default is ``True``. 44 :param reconnection_attempts: How many reconnection attempts to issue 45 before giving up, or 0 for infinity attempts. 46 The default is 0. 47 :param reconnection_delay: How long to wait in seconds before the first 48 reconnection attempt. Each successive attempt 49 doubles this delay. 50 :param reconnection_delay_max: The maximum delay between reconnection 51 attempts. 52 :param randomization_factor: Randomization amount for each delay between 53 reconnection attempts. The default is 0.5, 54 which means that each delay is randomly 55 adjusted by +/- 50%. 56 :param logger: To enable logging set to ``True`` or pass a logger object to 57 use. To disable logging set to ``False``. The default is 58 ``False``. Note that fatal errors are logged even when 59 ``logger`` is ``False``. 60 :param json: An alternative json module to use for encoding and decoding 61 packets. Custom json modules must have ``dumps`` and ``loads`` 62 functions that are compatible with the standard library 63 versions. 64 65 The Engine.IO configuration supports the following settings: 66 67 :param request_timeout: A timeout in seconds for requests. The default is 68 5 seconds. 69 :param http_session: an initialized ``requests.Session`` object to be used 70 when sending requests to the server. Use it if you 71 need to add special client options such as proxy 72 servers, SSL certificates, etc. 73 :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to 74 skip SSL certificate verification, allowing 75 connections to servers with self signed certificates. 76 The default is ``True``. 77 :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass 78 a logger object to use. To disable logging set to 79 ``False``. The default is ``False``. Note that 80 fatal errors are logged even when 81 ``engineio_logger`` is ``False``. 82 """ 83 def __init__(self, reconnection=True, reconnection_attempts=0, 84 reconnection_delay=1, reconnection_delay_max=5, 85 randomization_factor=0.5, logger=False, json=None, **kwargs): 86 global original_signal_handler 87 if original_signal_handler is None and \ 88 threading.current_thread() == threading.main_thread(): 89 original_signal_handler = signal.signal(signal.SIGINT, 90 signal_handler) 91 self.reconnection = reconnection 92 self.reconnection_attempts = reconnection_attempts 93 self.reconnection_delay = reconnection_delay 94 self.reconnection_delay_max = reconnection_delay_max 95 self.randomization_factor = randomization_factor 96 97 engineio_options = kwargs 98 engineio_logger = engineio_options.pop('engineio_logger', None) 99 if engineio_logger is not None: 100 engineio_options['logger'] = engineio_logger 101 if json is not None: 102 packet.Packet.json = json 103 engineio_options['json'] = json 104 105 self.eio = self._engineio_client_class()(**engineio_options) 106 self.eio.on('connect', self._handle_eio_connect) 107 self.eio.on('message', self._handle_eio_message) 108 self.eio.on('disconnect', self._handle_eio_disconnect) 109 110 if not isinstance(logger, bool): 111 self.logger = logger 112 else: 113 self.logger = default_logger 114 if self.logger.level == logging.NOTSET: 115 if logger: 116 self.logger.setLevel(logging.INFO) 117 else: 118 self.logger.setLevel(logging.ERROR) 119 self.logger.addHandler(logging.StreamHandler()) 120 121 self.connection_url = None 122 self.connection_headers = None 123 self.connection_transports = None 124 self.connection_namespaces = [] 125 self.socketio_path = None 126 self.sid = None 127 128 self.connected = False 129 self.namespaces = {} 130 self.handlers = {} 131 self.namespace_handlers = {} 132 self.callbacks = {} 133 self._binary_packet = None 134 self._connect_event = None 135 self._reconnect_task = None 136 self._reconnect_abort = None 137 138 def is_asyncio_based(self): 139 return False 140 141 def on(self, event, handler=None, namespace=None): 142 """Register an event handler. 143 144 :param event: The event name. It can be any string. The event names 145 ``'connect'``, ``'message'`` and ``'disconnect'`` are 146 reserved and should not be used. 147 :param handler: The function that should be invoked to handle the 148 event. When this parameter is not given, the method 149 acts as a decorator for the handler function. 150 :param namespace: The Socket.IO namespace for the event. If this 151 argument is omitted the handler is associated with 152 the default namespace. 153 154 Example usage:: 155 156 # as a decorator: 157 @sio.on('connect') 158 def connect_handler(): 159 print('Connected!') 160 161 # as a method: 162 def message_handler(msg): 163 print('Received message: ', msg) 164 sio.send( 'response') 165 sio.on('message', message_handler) 166 167 The ``'connect'`` event handler receives no arguments. The 168 ``'message'`` handler and handlers for custom event names receive the 169 message payload as only argument. Any values returned from a message 170 handler will be passed to the client's acknowledgement callback 171 function if it exists. The ``'disconnect'`` handler does not take 172 arguments. 173 """ 174 namespace = namespace or '/' 175 176 def set_handler(handler): 177 if namespace not in self.handlers: 178 self.handlers[namespace] = {} 179 self.handlers[namespace][event] = handler 180 return handler 181 182 if handler is None: 183 return set_handler 184 set_handler(handler) 185 186 def event(self, *args, **kwargs): 187 """Decorator to register an event handler. 188 189 This is a simplified version of the ``on()`` method that takes the 190 event name from the decorated function. 191 192 Example usage:: 193 194 @sio.event 195 def my_event(data): 196 print('Received data: ', data) 197 198 The above example is equivalent to:: 199 200 @sio.on('my_event') 201 def my_event(data): 202 print('Received data: ', data) 203 204 A custom namespace can be given as an argument to the decorator:: 205 206 @sio.event(namespace='/test') 207 def my_event(data): 208 print('Received data: ', data) 209 """ 210 if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): 211 # the decorator was invoked without arguments 212 # args[0] is the decorated function 213 return self.on(args[0].__name__)(args[0]) 214 else: 215 # the decorator was invoked with arguments 216 def set_handler(handler): 217 return self.on(handler.__name__, *args, **kwargs)(handler) 218 219 return set_handler 220 221 def register_namespace(self, namespace_handler): 222 """Register a namespace handler object. 223 224 :param namespace_handler: An instance of a :class:`Namespace` 225 subclass that handles all the event traffic 226 for a namespace. 227 """ 228 if not isinstance(namespace_handler, namespace.ClientNamespace): 229 raise ValueError('Not a namespace instance') 230 if self.is_asyncio_based() != namespace_handler.is_asyncio_based(): 231 raise ValueError('Not a valid namespace class for this client') 232 namespace_handler._set_client(self) 233 self.namespace_handlers[namespace_handler.namespace] = \ 234 namespace_handler 235 236 def connect(self, url, headers={}, transports=None, 237 namespaces=None, socketio_path='socket.io', wait=True, 238 wait_timeout=1): 239 """Connect to a Socket.IO server. 240 241 :param url: The URL of the Socket.IO server. It can include custom 242 query string parameters if required by the server. 243 :param headers: A dictionary with custom headers to send with the 244 connection request. 245 :param transports: The list of allowed transports. Valid transports 246 are ``'polling'`` and ``'websocket'``. If not 247 given, the polling transport is connected first, 248 then an upgrade to websocket is attempted. 249 :param namespaces: The namespaces to connect as a string or list of 250 strings. If not given, the namespaces that have 251 registered event handlers are connected. 252 :param socketio_path: The endpoint where the Socket.IO server is 253 installed. The default value is appropriate for 254 most cases. 255 :param wait: if set to ``True`` (the default) the call only returns 256 when all the namespaces are connected. If set to 257 ``False``, the call returns as soon as the Engine.IO 258 transport is connected, and the namespaces will connect 259 in the background. 260 :param wait_timeout: How long the client should wait for the 261 connection. The default is 1 second. This 262 argument is only considered when ``wait`` is set 263 to ``True``. 264 265 Example usage:: 266 267 sio = socketio.Client() 268 sio.connect('http://localhost:5000') 269 """ 270 if self.connected: 271 raise exceptions.ConnectionError('Already connected') 272 273 self.connection_url = url 274 self.connection_headers = headers 275 self.connection_transports = transports 276 self.connection_namespaces = namespaces 277 self.socketio_path = socketio_path 278 279 if namespaces is None: 280 namespaces = list(set(self.handlers.keys()).union( 281 set(self.namespace_handlers.keys()))) 282 if len(namespaces) == 0: 283 namespaces = ['/'] 284 elif isinstance(namespaces, str): 285 namespaces = [namespaces] 286 self.connection_namespaces = namespaces 287 self.namespaces = {} 288 if self._connect_event is None: 289 self._connect_event = self.eio.create_event() 290 else: 291 self._connect_event.clear() 292 try: 293 self.eio.connect(url, headers=headers, transports=transports, 294 engineio_path=socketio_path) 295 except engineio.exceptions.ConnectionError as exc: 296 self._trigger_event( 297 'connect_error', '/', 298 exc.args[1] if len(exc.args) > 1 else exc.args[0]) 299 raise exceptions.ConnectionError(exc.args[0]) from None 300 301 if wait: 302 while self._connect_event.wait(timeout=wait_timeout): 303 self._connect_event.clear() 304 if set(self.namespaces) == set(self.connection_namespaces): 305 break 306 if set(self.namespaces) != set(self.connection_namespaces): 307 self.disconnect() 308 raise exceptions.ConnectionError( 309 'One or more namespaces failed to connect') 310 311 self.connected = True 312 313 def wait(self): 314 """Wait until the connection with the server ends. 315 316 Client applications can use this function to block the main thread 317 during the life of the connection. 318 """ 319 while True: 320 self.eio.wait() 321 self.sleep(1) # give the reconnect task time to start up 322 if not self._reconnect_task: 323 break 324 self._reconnect_task.join() 325 if self.eio.state != 'connected': 326 break 327 328 def emit(self, event, data=None, namespace=None, callback=None): 329 """Emit a custom event to one or more connected clients. 330 331 :param event: The event name. It can be any string. The event names 332 ``'connect'``, ``'message'`` and ``'disconnect'`` are 333 reserved and should not be used. 334 :param data: The data to send to the server. Data can be of 335 type ``str``, ``bytes``, ``list`` or ``dict``. To send 336 multiple arguments, use a tuple where each element is of 337 one of the types indicated above. 338 :param namespace: The Socket.IO namespace for the event. If this 339 argument is omitted the event is emitted to the 340 default namespace. 341 :param callback: If given, this function will be called to acknowledge 342 the the server has received the message. The arguments 343 that will be passed to the function are those provided 344 by the server. 345 346 Note: this method is not thread safe. If multiple threads are emitting 347 at the same time on the same client connection, messages composed of 348 multiple packets may end up being sent in an incorrect sequence. Use 349 standard concurrency solutions (such as a Lock object) to prevent this 350 situation. 351 """ 352 namespace = namespace or '/' 353 if namespace not in self.namespaces: 354 raise exceptions.BadNamespaceError( 355 namespace + ' is not a connected namespace.') 356 self.logger.info('Emitting event "%s" [%s]', event, namespace) 357 if callback is not None: 358 id = self._generate_ack_id(namespace, callback) 359 else: 360 id = None 361 # tuples are expanded to multiple arguments, everything else is sent 362 # as a single argument 363 if isinstance(data, tuple): 364 data = list(data) 365 elif data is not None: 366 data = [data] 367 else: 368 data = [] 369 self._send_packet(packet.Packet(packet.EVENT, namespace=namespace, 370 data=[event] + data, id=id)) 371 372 def send(self, data, namespace=None, callback=None): 373 """Send a message to one or more connected clients. 374 375 This function emits an event with the name ``'message'``. Use 376 :func:`emit` to issue custom event names. 377 378 :param data: The data to send to the server. Data can be of 379 type ``str``, ``bytes``, ``list`` or ``dict``. To send 380 multiple arguments, use a tuple where each element is of 381 one of the types indicated above. 382 :param namespace: The Socket.IO namespace for the event. If this 383 argument is omitted the event is emitted to the 384 default namespace. 385 :param callback: If given, this function will be called to acknowledge 386 the the server has received the message. The arguments 387 that will be passed to the function are those provided 388 by the server. 389 """ 390 self.emit('message', data=data, namespace=namespace, 391 callback=callback) 392 393 def call(self, event, data=None, namespace=None, timeout=60): 394 """Emit a custom event to a client and wait for the response. 395 396 :param event: The event name. It can be any string. The event names 397 ``'connect'``, ``'message'`` and ``'disconnect'`` are 398 reserved and should not be used. 399 :param data: The data to send to the server. Data can be of 400 type ``str``, ``bytes``, ``list`` or ``dict``. To send 401 multiple arguments, use a tuple where each element is of 402 one of the types indicated above. 403 :param namespace: The Socket.IO namespace for the event. If this 404 argument is omitted the event is emitted to the 405 default namespace. 406 :param timeout: The waiting timeout. If the timeout is reached before 407 the client acknowledges the event, then a 408 ``TimeoutError`` exception is raised. 409 410 Note: this method is not thread safe. If multiple threads are emitting 411 at the same time on the same client connection, messages composed of 412 multiple packets may end up being sent in an incorrect sequence. Use 413 standard concurrency solutions (such as a Lock object) to prevent this 414 situation. 415 """ 416 callback_event = self.eio.create_event() 417 callback_args = [] 418 419 def event_callback(*args): 420 callback_args.append(args) 421 callback_event.set() 422 423 self.emit(event, data=data, namespace=namespace, 424 callback=event_callback) 425 if not callback_event.wait(timeout=timeout): 426 raise exceptions.TimeoutError() 427 return callback_args[0] if len(callback_args[0]) > 1 \ 428 else callback_args[0][0] if len(callback_args[0]) == 1 \ 429 else None 430 431 def disconnect(self): 432 """Disconnect from the server.""" 433 # here we just request the disconnection 434 # later in _handle_eio_disconnect we invoke the disconnect handler 435 for n in self.namespaces: 436 self._send_packet(packet.Packet(packet.DISCONNECT, namespace=n)) 437 self.eio.disconnect(abort=True) 438 439 def get_sid(self, namespace=None): 440 """Return the ``sid`` associated with a connection. 441 442 :param namespace: The Socket.IO namespace. If this argument is omitted 443 the handler is associated with the default 444 namespace. Note that unlike previous versions, the 445 current version of the Socket.IO protocol uses 446 different ``sid`` values per namespace. 447 448 This method returns the ``sid`` for the requested namespace as a 449 string. 450 """ 451 return self.namespaces.get(namespace or '/') 452 453 def transport(self): 454 """Return the name of the transport used by the client. 455 456 The two possible values returned by this function are ``'polling'`` 457 and ``'websocket'``. 458 """ 459 return self.eio.transport() 460 461 def start_background_task(self, target, *args, **kwargs): 462 """Start a background task using the appropriate async model. 463 464 This is a utility function that applications can use to start a 465 background task using the method that is compatible with the 466 selected async mode. 467 468 :param target: the target function to execute. 469 :param args: arguments to pass to the function. 470 :param kwargs: keyword arguments to pass to the function. 471 472 This function returns an object compatible with the `Thread` class in 473 the Python standard library. The `start()` method on this object is 474 already called by this function. 475 """ 476 return self.eio.start_background_task(target, *args, **kwargs) 477 478 def sleep(self, seconds=0): 479 """Sleep for the requested amount of time using the appropriate async 480 model. 481 482 This is a utility function that applications can use to put a task to 483 sleep without having to worry about using the correct call for the 484 selected async mode. 485 """ 486 return self.eio.sleep(seconds) 487 488 def _send_packet(self, pkt): 489 """Send a Socket.IO packet to the server.""" 490 encoded_packet = pkt.encode() 491 if isinstance(encoded_packet, list): 492 for ep in encoded_packet: 493 self.eio.send(ep) 494 else: 495 self.eio.send(encoded_packet) 496 497 def _generate_ack_id(self, namespace, callback): 498 """Generate a unique identifier for an ACK packet.""" 499 namespace = namespace or '/' 500 if namespace not in self.callbacks: 501 self.callbacks[namespace] = {0: itertools.count(1)} 502 id = next(self.callbacks[namespace][0]) 503 self.callbacks[namespace][id] = callback 504 return id 505 506 def _handle_connect(self, namespace, data): 507 namespace = namespace or '/' 508 if namespace not in self.namespaces: 509 self.logger.info('Namespace {} is connected'.format(namespace)) 510 self.namespaces[namespace] = (data or {}).get('sid', self.sid) 511 self._trigger_event('connect', namespace=namespace) 512 self._connect_event.set() 513 514 def _handle_disconnect(self, namespace): 515 if not self.connected: 516 return 517 namespace = namespace or '/' 518 self._trigger_event('disconnect', namespace=namespace) 519 if namespace in self.namespaces: 520 del self.namespaces[namespace] 521 if not self.namespaces: 522 self.connected = False 523 self.eio.disconnect(abort=True) 524 525 def _handle_event(self, namespace, id, data): 526 namespace = namespace or '/' 527 self.logger.info('Received event "%s" [%s]', data[0], namespace) 528 r = self._trigger_event(data[0], namespace, *data[1:]) 529 if id is not None: 530 # send ACK packet with the response returned by the handler 531 # tuples are expanded as multiple arguments 532 if r is None: 533 data = [] 534 elif isinstance(r, tuple): 535 data = list(r) 536 else: 537 data = [r] 538 self._send_packet(packet.Packet(packet.ACK, namespace=namespace, 539 id=id, data=data)) 540 541 def _handle_ack(self, namespace, id, data): 542 namespace = namespace or '/' 543 self.logger.info('Received ack [%s]', namespace) 544 callback = None 545 try: 546 callback = self.callbacks[namespace][id] 547 except KeyError: 548 # if we get an unknown callback we just ignore it 549 self.logger.warning('Unknown callback received, ignoring.') 550 else: 551 del self.callbacks[namespace][id] 552 if callback is not None: 553 callback(*data) 554 555 def _handle_error(self, namespace, data): 556 namespace = namespace or '/' 557 self.logger.info('Connection to namespace {} was rejected'.format( 558 namespace)) 559 if data is None: 560 data = tuple() 561 elif not isinstance(data, (tuple, list)): 562 data = (data,) 563 self._trigger_event('connect_error', namespace, *data) 564 self._connect_event.set() 565 if namespace in self.namespaces: 566 del self.namespaces[namespace] 567 if namespace == '/': 568 self.namespaces = {} 569 self.connected = False 570 571 def _trigger_event(self, event, namespace, *args): 572 """Invoke an application event handler.""" 573 # first see if we have an explicit handler for the event 574 if namespace in self.handlers and event in self.handlers[namespace]: 575 return self.handlers[namespace][event](*args) 576 577 # or else, forward the event to a namespace handler if one exists 578 elif namespace in self.namespace_handlers: 579 return self.namespace_handlers[namespace].trigger_event( 580 event, *args) 581 582 def _handle_reconnect(self): 583 if self._reconnect_abort is None: # pragma: no cover 584 self._reconnect_abort = self.eio.create_event() 585 self._reconnect_abort.clear() 586 reconnecting_clients.append(self) 587 attempt_count = 0 588 current_delay = self.reconnection_delay 589 while True: 590 delay = current_delay 591 current_delay *= 2 592 if delay > self.reconnection_delay_max: 593 delay = self.reconnection_delay_max 594 delay += self.randomization_factor * (2 * random.random() - 1) 595 self.logger.info( 596 'Connection failed, new attempt in {:.02f} seconds'.format( 597 delay)) 598 if self._reconnect_abort.wait(delay): 599 self.logger.info('Reconnect task aborted') 600 break 601 attempt_count += 1 602 try: 603 self.connect(self.connection_url, 604 headers=self.connection_headers, 605 transports=self.connection_transports, 606 namespaces=self.connection_namespaces, 607 socketio_path=self.socketio_path) 608 except (exceptions.ConnectionError, ValueError): 609 pass 610 else: 611 self.logger.info('Reconnection successful') 612 self._reconnect_task = None 613 break 614 if self.reconnection_attempts and \ 615 attempt_count >= self.reconnection_attempts: 616 self.logger.info( 617 'Maximum reconnection attempts reached, giving up') 618 break 619 reconnecting_clients.remove(self) 620 621 def _handle_eio_connect(self): 622 """Handle the Engine.IO connection event.""" 623 self.logger.info('Engine.IO connection established') 624 self.sid = self.eio.sid 625 for n in self.connection_namespaces: 626 self._send_packet(packet.Packet(packet.CONNECT, namespace=n)) 627 628 def _handle_eio_message(self, data): 629 """Dispatch Engine.IO messages.""" 630 if self._binary_packet: 631 pkt = self._binary_packet 632 if pkt.add_attachment(data): 633 self._binary_packet = None 634 if pkt.packet_type == packet.BINARY_EVENT: 635 self._handle_event(pkt.namespace, pkt.id, pkt.data) 636 else: 637 self._handle_ack(pkt.namespace, pkt.id, pkt.data) 638 else: 639 pkt = packet.Packet(encoded_packet=data) 640 if pkt.packet_type == packet.CONNECT: 641 self._handle_connect(pkt.namespace, pkt.data) 642 elif pkt.packet_type == packet.DISCONNECT: 643 self._handle_disconnect(pkt.namespace) 644 elif pkt.packet_type == packet.EVENT: 645 self._handle_event(pkt.namespace, pkt.id, pkt.data) 646 elif pkt.packet_type == packet.ACK: 647 self._handle_ack(pkt.namespace, pkt.id, pkt.data) 648 elif pkt.packet_type == packet.BINARY_EVENT or \ 649 pkt.packet_type == packet.BINARY_ACK: 650 self._binary_packet = pkt 651 elif pkt.packet_type == packet.CONNECT_ERROR: 652 self._handle_error(pkt.namespace, pkt.data) 653 else: 654 raise ValueError('Unknown packet type.') 655 656 def _handle_eio_disconnect(self): 657 """Handle the Engine.IO disconnection event.""" 658 self.logger.info('Engine.IO connection dropped') 659 if self.connected: 660 for n in self.namespaces: 661 self._trigger_event('disconnect', namespace=n) 662 self.namespaces = {} 663 self.connected = False 664 self.callbacks = {} 665 self._binary_packet = None 666 self.sid = None 667 if self.eio.state == 'connected' and self.reconnection: 668 self._reconnect_task = self.start_background_task( 669 self._handle_reconnect) 670 671 def _engineio_client_class(self): 672 return engineio.Client 673