1import itertools
2import logging
3import random
4import signal
5import threading
6
7import engineio
8
9from . import exceptions
10from . import namespace
11from . import packet
12
13default_logger = logging.getLogger('socketio.client')
14reconnecting_clients = []
15
16
17def signal_handler(sig, frame):  # pragma: no cover
18    """SIGINT handler.
19
20    Notify any clients that are in a reconnect loop to abort. Other
21    disconnection tasks are handled at the engine.io level.
22    """
23    for client in reconnecting_clients[:]:
24        client._reconnect_abort.set()
25    if callable(original_signal_handler):
26        return original_signal_handler(sig, frame)
27    else:  # pragma: no cover
28        # Handle case where no original SIGINT handler was present.
29        return signal.default_int_handler(sig, frame)
30
31
32original_signal_handler = None
33
34
35class Client(object):
36    """A Socket.IO client.
37
38    This class implements a fully compliant Socket.IO web client with support
39    for websocket and long-polling transports.
40
41    :param reconnection: ``True`` if the client should automatically attempt to
42                         reconnect to the server after an interruption, or
43                         ``False`` to not reconnect. The default is ``True``.
44    :param reconnection_attempts: How many reconnection attempts to issue
45                                  before giving up, or 0 for infinity attempts.
46                                  The default is 0.
47    :param reconnection_delay: How long to wait in seconds before the first
48                               reconnection attempt. Each successive attempt
49                               doubles this delay.
50    :param reconnection_delay_max: The maximum delay between reconnection
51                                   attempts.
52    :param randomization_factor: Randomization amount for each delay between
53                                 reconnection attempts. The default is 0.5,
54                                 which means that each delay is randomly
55                                 adjusted by +/- 50%.
56    :param logger: To enable logging set to ``True`` or pass a logger object to
57                   use. To disable logging set to ``False``. The default is
58                   ``False``. Note that fatal errors are logged even when
59                   ``logger`` is ``False``.
60    :param json: An alternative json module to use for encoding and decoding
61                 packets. Custom json modules must have ``dumps`` and ``loads``
62                 functions that are compatible with the standard library
63                 versions.
64
65    The Engine.IO configuration supports the following settings:
66
67    :param request_timeout: A timeout in seconds for requests. The default is
68                            5 seconds.
69    :param http_session: an initialized ``requests.Session`` object to be used
70                         when sending requests to the server. Use it if you
71                         need to add special client options such as proxy
72                         servers, SSL certificates, etc.
73    :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to
74                       skip SSL certificate verification, allowing
75                       connections to servers with self signed certificates.
76                       The default is ``True``.
77    :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
78                            a logger object to use. To disable logging set to
79                            ``False``. The default is ``False``. Note that
80                            fatal errors are logged even when
81                            ``engineio_logger`` is ``False``.
82    """
83    def __init__(self, reconnection=True, reconnection_attempts=0,
84                 reconnection_delay=1, reconnection_delay_max=5,
85                 randomization_factor=0.5, logger=False, json=None, **kwargs):
86        global original_signal_handler
87        if original_signal_handler is None and \
88                threading.current_thread() == threading.main_thread():
89            original_signal_handler = signal.signal(signal.SIGINT,
90                                                    signal_handler)
91        self.reconnection = reconnection
92        self.reconnection_attempts = reconnection_attempts
93        self.reconnection_delay = reconnection_delay
94        self.reconnection_delay_max = reconnection_delay_max
95        self.randomization_factor = randomization_factor
96
97        engineio_options = kwargs
98        engineio_logger = engineio_options.pop('engineio_logger', None)
99        if engineio_logger is not None:
100            engineio_options['logger'] = engineio_logger
101        if json is not None:
102            packet.Packet.json = json
103            engineio_options['json'] = json
104
105        self.eio = self._engineio_client_class()(**engineio_options)
106        self.eio.on('connect', self._handle_eio_connect)
107        self.eio.on('message', self._handle_eio_message)
108        self.eio.on('disconnect', self._handle_eio_disconnect)
109
110        if not isinstance(logger, bool):
111            self.logger = logger
112        else:
113            self.logger = default_logger
114            if self.logger.level == logging.NOTSET:
115                if logger:
116                    self.logger.setLevel(logging.INFO)
117                else:
118                    self.logger.setLevel(logging.ERROR)
119                self.logger.addHandler(logging.StreamHandler())
120
121        self.connection_url = None
122        self.connection_headers = None
123        self.connection_transports = None
124        self.connection_namespaces = []
125        self.socketio_path = None
126        self.sid = None
127
128        self.connected = False
129        self.namespaces = {}
130        self.handlers = {}
131        self.namespace_handlers = {}
132        self.callbacks = {}
133        self._binary_packet = None
134        self._connect_event = None
135        self._reconnect_task = None
136        self._reconnect_abort = None
137
138    def is_asyncio_based(self):
139        return False
140
141    def on(self, event, handler=None, namespace=None):
142        """Register an event handler.
143
144        :param event: The event name. It can be any string. The event names
145                      ``'connect'``, ``'message'`` and ``'disconnect'`` are
146                      reserved and should not be used.
147        :param handler: The function that should be invoked to handle the
148                        event. When this parameter is not given, the method
149                        acts as a decorator for the handler function.
150        :param namespace: The Socket.IO namespace for the event. If this
151                          argument is omitted the handler is associated with
152                          the default namespace.
153
154        Example usage::
155
156            # as a decorator:
157            @sio.on('connect')
158            def connect_handler():
159                print('Connected!')
160
161            # as a method:
162            def message_handler(msg):
163                print('Received message: ', msg)
164                sio.send( 'response')
165            sio.on('message', message_handler)
166
167        The ``'connect'`` event handler receives no arguments. The
168        ``'message'`` handler and handlers for custom event names receive the
169        message payload as only argument. Any values returned from a message
170        handler will be passed to the client's acknowledgement callback
171        function if it exists. The ``'disconnect'`` handler does not take
172        arguments.
173        """
174        namespace = namespace or '/'
175
176        def set_handler(handler):
177            if namespace not in self.handlers:
178                self.handlers[namespace] = {}
179            self.handlers[namespace][event] = handler
180            return handler
181
182        if handler is None:
183            return set_handler
184        set_handler(handler)
185
186    def event(self, *args, **kwargs):
187        """Decorator to register an event handler.
188
189        This is a simplified version of the ``on()`` method that takes the
190        event name from the decorated function.
191
192        Example usage::
193
194            @sio.event
195            def my_event(data):
196                print('Received data: ', data)
197
198        The above example is equivalent to::
199
200            @sio.on('my_event')
201            def my_event(data):
202                print('Received data: ', data)
203
204        A custom namespace can be given as an argument to the decorator::
205
206            @sio.event(namespace='/test')
207            def my_event(data):
208                print('Received data: ', data)
209        """
210        if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
211            # the decorator was invoked without arguments
212            # args[0] is the decorated function
213            return self.on(args[0].__name__)(args[0])
214        else:
215            # the decorator was invoked with arguments
216            def set_handler(handler):
217                return self.on(handler.__name__, *args, **kwargs)(handler)
218
219            return set_handler
220
221    def register_namespace(self, namespace_handler):
222        """Register a namespace handler object.
223
224        :param namespace_handler: An instance of a :class:`Namespace`
225                                  subclass that handles all the event traffic
226                                  for a namespace.
227        """
228        if not isinstance(namespace_handler, namespace.ClientNamespace):
229            raise ValueError('Not a namespace instance')
230        if self.is_asyncio_based() != namespace_handler.is_asyncio_based():
231            raise ValueError('Not a valid namespace class for this client')
232        namespace_handler._set_client(self)
233        self.namespace_handlers[namespace_handler.namespace] = \
234            namespace_handler
235
236    def connect(self, url, headers={}, transports=None,
237                namespaces=None, socketio_path='socket.io', wait=True,
238                wait_timeout=1):
239        """Connect to a Socket.IO server.
240
241        :param url: The URL of the Socket.IO server. It can include custom
242                    query string parameters if required by the server.
243        :param headers: A dictionary with custom headers to send with the
244                        connection request.
245        :param transports: The list of allowed transports. Valid transports
246                           are ``'polling'`` and ``'websocket'``. If not
247                           given, the polling transport is connected first,
248                           then an upgrade to websocket is attempted.
249        :param namespaces: The namespaces to connect as a string or list of
250                           strings. If not given, the namespaces that have
251                           registered event handlers are connected.
252        :param socketio_path: The endpoint where the Socket.IO server is
253                              installed. The default value is appropriate for
254                              most cases.
255        :param wait: if set to ``True`` (the default) the call only returns
256                     when all the namespaces are connected. If set to
257                     ``False``, the call returns as soon as the Engine.IO
258                     transport is connected, and the namespaces will connect
259                     in the background.
260        :param wait_timeout: How long the client should wait for the
261                             connection. The default is 1 second. This
262                             argument is only considered when ``wait`` is set
263                             to ``True``.
264
265        Example usage::
266
267            sio = socketio.Client()
268            sio.connect('http://localhost:5000')
269        """
270        if self.connected:
271            raise exceptions.ConnectionError('Already connected')
272
273        self.connection_url = url
274        self.connection_headers = headers
275        self.connection_transports = transports
276        self.connection_namespaces = namespaces
277        self.socketio_path = socketio_path
278
279        if namespaces is None:
280            namespaces = list(set(self.handlers.keys()).union(
281                set(self.namespace_handlers.keys())))
282            if len(namespaces) == 0:
283                namespaces = ['/']
284        elif isinstance(namespaces, str):
285            namespaces = [namespaces]
286        self.connection_namespaces = namespaces
287        self.namespaces = {}
288        if self._connect_event is None:
289            self._connect_event = self.eio.create_event()
290        else:
291            self._connect_event.clear()
292        try:
293            self.eio.connect(url, headers=headers, transports=transports,
294                             engineio_path=socketio_path)
295        except engineio.exceptions.ConnectionError as exc:
296            self._trigger_event(
297                'connect_error', '/',
298                exc.args[1] if len(exc.args) > 1 else exc.args[0])
299            raise exceptions.ConnectionError(exc.args[0]) from None
300
301        if wait:
302            while self._connect_event.wait(timeout=wait_timeout):
303                self._connect_event.clear()
304                if set(self.namespaces) == set(self.connection_namespaces):
305                    break
306            if set(self.namespaces) != set(self.connection_namespaces):
307                self.disconnect()
308                raise exceptions.ConnectionError(
309                    'One or more namespaces failed to connect')
310
311        self.connected = True
312
313    def wait(self):
314        """Wait until the connection with the server ends.
315
316        Client applications can use this function to block the main thread
317        during the life of the connection.
318        """
319        while True:
320            self.eio.wait()
321            self.sleep(1)  # give the reconnect task time to start up
322            if not self._reconnect_task:
323                break
324            self._reconnect_task.join()
325            if self.eio.state != 'connected':
326                break
327
328    def emit(self, event, data=None, namespace=None, callback=None):
329        """Emit a custom event to one or more connected clients.
330
331        :param event: The event name. It can be any string. The event names
332                      ``'connect'``, ``'message'`` and ``'disconnect'`` are
333                      reserved and should not be used.
334        :param data: The data to send to the server. Data can be of
335                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
336                     multiple arguments, use a tuple where each element is of
337                     one of the types indicated above.
338        :param namespace: The Socket.IO namespace for the event. If this
339                          argument is omitted the event is emitted to the
340                          default namespace.
341        :param callback: If given, this function will be called to acknowledge
342                         the the server has received the message. The arguments
343                         that will be passed to the function are those provided
344                         by the server.
345
346        Note: this method is not thread safe. If multiple threads are emitting
347        at the same time on the same client connection, messages composed of
348        multiple packets may end up being sent in an incorrect sequence. Use
349        standard concurrency solutions (such as a Lock object) to prevent this
350        situation.
351        """
352        namespace = namespace or '/'
353        if namespace not in self.namespaces:
354            raise exceptions.BadNamespaceError(
355                namespace + ' is not a connected namespace.')
356        self.logger.info('Emitting event "%s" [%s]', event, namespace)
357        if callback is not None:
358            id = self._generate_ack_id(namespace, callback)
359        else:
360            id = None
361        # tuples are expanded to multiple arguments, everything else is sent
362        # as a single argument
363        if isinstance(data, tuple):
364            data = list(data)
365        elif data is not None:
366            data = [data]
367        else:
368            data = []
369        self._send_packet(packet.Packet(packet.EVENT, namespace=namespace,
370                                        data=[event] + data, id=id))
371
372    def send(self, data, namespace=None, callback=None):
373        """Send a message to one or more connected clients.
374
375        This function emits an event with the name ``'message'``. Use
376        :func:`emit` to issue custom event names.
377
378        :param data: The data to send to the server. Data can be of
379                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
380                     multiple arguments, use a tuple where each element is of
381                     one of the types indicated above.
382        :param namespace: The Socket.IO namespace for the event. If this
383                          argument is omitted the event is emitted to the
384                          default namespace.
385        :param callback: If given, this function will be called to acknowledge
386                         the the server has received the message. The arguments
387                         that will be passed to the function are those provided
388                         by the server.
389        """
390        self.emit('message', data=data, namespace=namespace,
391                  callback=callback)
392
393    def call(self, event, data=None, namespace=None, timeout=60):
394        """Emit a custom event to a client and wait for the response.
395
396        :param event: The event name. It can be any string. The event names
397                      ``'connect'``, ``'message'`` and ``'disconnect'`` are
398                      reserved and should not be used.
399        :param data: The data to send to the server. Data can be of
400                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
401                     multiple arguments, use a tuple where each element is of
402                     one of the types indicated above.
403        :param namespace: The Socket.IO namespace for the event. If this
404                          argument is omitted the event is emitted to the
405                          default namespace.
406        :param timeout: The waiting timeout. If the timeout is reached before
407                        the client acknowledges the event, then a
408                        ``TimeoutError`` exception is raised.
409
410        Note: this method is not thread safe. If multiple threads are emitting
411        at the same time on the same client connection, messages composed of
412        multiple packets may end up being sent in an incorrect sequence. Use
413        standard concurrency solutions (such as a Lock object) to prevent this
414        situation.
415        """
416        callback_event = self.eio.create_event()
417        callback_args = []
418
419        def event_callback(*args):
420            callback_args.append(args)
421            callback_event.set()
422
423        self.emit(event, data=data, namespace=namespace,
424                  callback=event_callback)
425        if not callback_event.wait(timeout=timeout):
426            raise exceptions.TimeoutError()
427        return callback_args[0] if len(callback_args[0]) > 1 \
428            else callback_args[0][0] if len(callback_args[0]) == 1 \
429            else None
430
431    def disconnect(self):
432        """Disconnect from the server."""
433        # here we just request the disconnection
434        # later in _handle_eio_disconnect we invoke the disconnect handler
435        for n in self.namespaces:
436            self._send_packet(packet.Packet(packet.DISCONNECT, namespace=n))
437        self.eio.disconnect(abort=True)
438
439    def get_sid(self, namespace=None):
440        """Return the ``sid`` associated with a connection.
441
442        :param namespace: The Socket.IO namespace. If this argument is omitted
443                          the handler is associated with the default
444                          namespace. Note that unlike previous versions, the
445                          current version of the Socket.IO protocol uses
446                          different ``sid`` values per namespace.
447
448        This method returns the ``sid`` for the requested namespace as a
449        string.
450        """
451        return self.namespaces.get(namespace or '/')
452
453    def transport(self):
454        """Return the name of the transport used by the client.
455
456        The two possible values returned by this function are ``'polling'``
457        and ``'websocket'``.
458        """
459        return self.eio.transport()
460
461    def start_background_task(self, target, *args, **kwargs):
462        """Start a background task using the appropriate async model.
463
464        This is a utility function that applications can use to start a
465        background task using the method that is compatible with the
466        selected async mode.
467
468        :param target: the target function to execute.
469        :param args: arguments to pass to the function.
470        :param kwargs: keyword arguments to pass to the function.
471
472        This function returns an object compatible with the `Thread` class in
473        the Python standard library. The `start()` method on this object is
474        already called by this function.
475        """
476        return self.eio.start_background_task(target, *args, **kwargs)
477
478    def sleep(self, seconds=0):
479        """Sleep for the requested amount of time using the appropriate async
480        model.
481
482        This is a utility function that applications can use to put a task to
483        sleep without having to worry about using the correct call for the
484        selected async mode.
485        """
486        return self.eio.sleep(seconds)
487
488    def _send_packet(self, pkt):
489        """Send a Socket.IO packet to the server."""
490        encoded_packet = pkt.encode()
491        if isinstance(encoded_packet, list):
492            for ep in encoded_packet:
493                self.eio.send(ep)
494        else:
495            self.eio.send(encoded_packet)
496
497    def _generate_ack_id(self, namespace, callback):
498        """Generate a unique identifier for an ACK packet."""
499        namespace = namespace or '/'
500        if namespace not in self.callbacks:
501            self.callbacks[namespace] = {0: itertools.count(1)}
502        id = next(self.callbacks[namespace][0])
503        self.callbacks[namespace][id] = callback
504        return id
505
506    def _handle_connect(self, namespace, data):
507        namespace = namespace or '/'
508        if namespace not in self.namespaces:
509            self.logger.info('Namespace {} is connected'.format(namespace))
510            self.namespaces[namespace] = (data or {}).get('sid', self.sid)
511            self._trigger_event('connect', namespace=namespace)
512            self._connect_event.set()
513
514    def _handle_disconnect(self, namespace):
515        if not self.connected:
516            return
517        namespace = namespace or '/'
518        self._trigger_event('disconnect', namespace=namespace)
519        if namespace in self.namespaces:
520            del self.namespaces[namespace]
521        if not self.namespaces:
522            self.connected = False
523            self.eio.disconnect(abort=True)
524
525    def _handle_event(self, namespace, id, data):
526        namespace = namespace or '/'
527        self.logger.info('Received event "%s" [%s]', data[0], namespace)
528        r = self._trigger_event(data[0], namespace, *data[1:])
529        if id is not None:
530            # send ACK packet with the response returned by the handler
531            # tuples are expanded as multiple arguments
532            if r is None:
533                data = []
534            elif isinstance(r, tuple):
535                data = list(r)
536            else:
537                data = [r]
538            self._send_packet(packet.Packet(packet.ACK, namespace=namespace,
539                              id=id, data=data))
540
541    def _handle_ack(self, namespace, id, data):
542        namespace = namespace or '/'
543        self.logger.info('Received ack [%s]', namespace)
544        callback = None
545        try:
546            callback = self.callbacks[namespace][id]
547        except KeyError:
548            # if we get an unknown callback we just ignore it
549            self.logger.warning('Unknown callback received, ignoring.')
550        else:
551            del self.callbacks[namespace][id]
552        if callback is not None:
553            callback(*data)
554
555    def _handle_error(self, namespace, data):
556        namespace = namespace or '/'
557        self.logger.info('Connection to namespace {} was rejected'.format(
558            namespace))
559        if data is None:
560            data = tuple()
561        elif not isinstance(data, (tuple, list)):
562            data = (data,)
563        self._trigger_event('connect_error', namespace, *data)
564        self._connect_event.set()
565        if namespace in self.namespaces:
566            del self.namespaces[namespace]
567        if namespace == '/':
568            self.namespaces = {}
569            self.connected = False
570
571    def _trigger_event(self, event, namespace, *args):
572        """Invoke an application event handler."""
573        # first see if we have an explicit handler for the event
574        if namespace in self.handlers and event in self.handlers[namespace]:
575            return self.handlers[namespace][event](*args)
576
577        # or else, forward the event to a namespace handler if one exists
578        elif namespace in self.namespace_handlers:
579            return self.namespace_handlers[namespace].trigger_event(
580                event, *args)
581
582    def _handle_reconnect(self):
583        if self._reconnect_abort is None:  # pragma: no cover
584            self._reconnect_abort = self.eio.create_event()
585        self._reconnect_abort.clear()
586        reconnecting_clients.append(self)
587        attempt_count = 0
588        current_delay = self.reconnection_delay
589        while True:
590            delay = current_delay
591            current_delay *= 2
592            if delay > self.reconnection_delay_max:
593                delay = self.reconnection_delay_max
594            delay += self.randomization_factor * (2 * random.random() - 1)
595            self.logger.info(
596                'Connection failed, new attempt in {:.02f} seconds'.format(
597                    delay))
598            if self._reconnect_abort.wait(delay):
599                self.logger.info('Reconnect task aborted')
600                break
601            attempt_count += 1
602            try:
603                self.connect(self.connection_url,
604                             headers=self.connection_headers,
605                             transports=self.connection_transports,
606                             namespaces=self.connection_namespaces,
607                             socketio_path=self.socketio_path)
608            except (exceptions.ConnectionError, ValueError):
609                pass
610            else:
611                self.logger.info('Reconnection successful')
612                self._reconnect_task = None
613                break
614            if self.reconnection_attempts and \
615                    attempt_count >= self.reconnection_attempts:
616                self.logger.info(
617                    'Maximum reconnection attempts reached, giving up')
618                break
619        reconnecting_clients.remove(self)
620
621    def _handle_eio_connect(self):
622        """Handle the Engine.IO connection event."""
623        self.logger.info('Engine.IO connection established')
624        self.sid = self.eio.sid
625        for n in self.connection_namespaces:
626            self._send_packet(packet.Packet(packet.CONNECT, namespace=n))
627
628    def _handle_eio_message(self, data):
629        """Dispatch Engine.IO messages."""
630        if self._binary_packet:
631            pkt = self._binary_packet
632            if pkt.add_attachment(data):
633                self._binary_packet = None
634                if pkt.packet_type == packet.BINARY_EVENT:
635                    self._handle_event(pkt.namespace, pkt.id, pkt.data)
636                else:
637                    self._handle_ack(pkt.namespace, pkt.id, pkt.data)
638        else:
639            pkt = packet.Packet(encoded_packet=data)
640            if pkt.packet_type == packet.CONNECT:
641                self._handle_connect(pkt.namespace, pkt.data)
642            elif pkt.packet_type == packet.DISCONNECT:
643                self._handle_disconnect(pkt.namespace)
644            elif pkt.packet_type == packet.EVENT:
645                self._handle_event(pkt.namespace, pkt.id, pkt.data)
646            elif pkt.packet_type == packet.ACK:
647                self._handle_ack(pkt.namespace, pkt.id, pkt.data)
648            elif pkt.packet_type == packet.BINARY_EVENT or \
649                    pkt.packet_type == packet.BINARY_ACK:
650                self._binary_packet = pkt
651            elif pkt.packet_type == packet.CONNECT_ERROR:
652                self._handle_error(pkt.namespace, pkt.data)
653            else:
654                raise ValueError('Unknown packet type.')
655
656    def _handle_eio_disconnect(self):
657        """Handle the Engine.IO disconnection event."""
658        self.logger.info('Engine.IO connection dropped')
659        if self.connected:
660            for n in self.namespaces:
661                self._trigger_event('disconnect', namespace=n)
662            self.namespaces = {}
663            self.connected = False
664        self.callbacks = {}
665        self._binary_packet = None
666        self.sid = None
667        if self.eio.state == 'connected' and self.reconnection:
668            self._reconnect_task = self.start_background_task(
669                self._handle_reconnect)
670
671    def _engineio_client_class(self):
672        return engineio.Client
673