1import asyncio
2
3import engineio
4
5from . import asyncio_manager
6from . import exceptions
7from . import packet
8from . import server
9
10
11class AsyncServer(server.Server):
12    """A Socket.IO server for asyncio.
13
14    This class implements a fully compliant Socket.IO web server with support
15    for websocket and long-polling transports, compatible with the asyncio
16    framework.
17
18    :param client_manager: The client manager instance that will manage the
19                           client list. When this is omitted, the client list
20                           is stored in an in-memory structure, so the use of
21                           multiple connected servers is not possible.
22    :param logger: To enable logging set to ``True`` or pass a logger object to
23                   use. To disable logging set to ``False``. Note that fatal
24                   errors are logged even when ``logger`` is ``False``.
25    :param json: An alternative json module to use for encoding and decoding
26                 packets. Custom json modules must have ``dumps`` and ``loads``
27                 functions that are compatible with the standard library
28                 versions.
29    :param async_handlers: If set to ``True``, event handlers for a client are
30                           executed in separate threads. To run handlers for a
31                           client synchronously, set to ``False``. The default
32                           is ``True``.
33    :param always_connect: When set to ``False``, new connections are
34                           provisory until the connect handler returns
35                           something other than ``False``, at which point they
36                           are accepted. When set to ``True``, connections are
37                           immediately accepted, and then if the connect
38                           handler returns ``False`` a disconnect is issued.
39                           Set to ``True`` if you need to emit events from the
40                           connect handler and your client is confused when it
41                           receives events before the connection acceptance.
42                           In any other case use the default of ``False``.
43    :param kwargs: Connection parameters for the underlying Engine.IO server.
44
45    The Engine.IO configuration supports the following settings:
46
47    :param async_mode: The asynchronous model to use. See the Deployment
48                       section in the documentation for a description of the
49                       available options. Valid async modes are "threading",
50                       "eventlet", "gevent" and "gevent_uwsgi". If this
51                       argument is not given, "eventlet" is tried first, then
52                       "gevent_uwsgi", then "gevent", and finally "threading".
53                       The first async mode that has all its dependencies
54                       installed is then one that is chosen.
55    :param ping_interval: The interval in seconds at which the server pings
56                          the client. The default is 25 seconds. For advanced
57                          control, a two element tuple can be given, where
58                          the first number is the ping interval and the second
59                          is a grace period added by the server.
60    :param ping_timeout: The time in seconds that the client waits for the
61                         server to respond before disconnecting. The default
62                         is 5 seconds.
63    :param max_http_buffer_size: The maximum size of a message when using the
64                                 polling transport. The default is 1,000,000
65                                 bytes.
66    :param allow_upgrades: Whether to allow transport upgrades or not. The
67                           default is ``True``.
68    :param http_compression: Whether to compress packages when using the
69                             polling transport. The default is ``True``.
70    :param compression_threshold: Only compress messages when their byte size
71                                  is greater than this value. The default is
72                                  1024 bytes.
73    :param cookie: If set to a string, it is the name of the HTTP cookie the
74                   server sends back tot he client containing the client
75                   session id. If set to a dictionary, the ``'name'`` key
76                   contains the cookie name and other keys define cookie
77                   attributes, where the value of each attribute can be a
78                   string, a callable with no arguments, or a boolean. If set
79                   to ``None`` (the default), a cookie is not sent to the
80                   client.
81    :param cors_allowed_origins: Origin or list of origins that are allowed to
82                                 connect to this server. Only the same origin
83                                 is allowed by default. Set this argument to
84                                 ``'*'`` to allow all origins, or to ``[]`` to
85                                 disable CORS handling.
86    :param cors_credentials: Whether credentials (cookies, authentication) are
87                             allowed in requests to this server. The default is
88                             ``True``.
89    :param monitor_clients: If set to ``True``, a background task will ensure
90                            inactive clients are closed. Set to ``False`` to
91                            disable the monitoring task (not recommended). The
92                            default is ``True``.
93    :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
94                            a logger object to use. To disable logging set to
95                            ``False``. The default is ``False``. Note that
96                            fatal errors are logged even when
97                            ``engineio_logger`` is ``False``.
98    """
99    def __init__(self, client_manager=None, logger=False, json=None,
100                 async_handlers=True, **kwargs):
101        if client_manager is None:
102            client_manager = asyncio_manager.AsyncManager()
103        super().__init__(client_manager=client_manager, logger=logger,
104                         json=json, async_handlers=async_handlers, **kwargs)
105
106    def is_asyncio_based(self):
107        return True
108
109    def attach(self, app, socketio_path='socket.io'):
110        """Attach the Socket.IO server to an application."""
111        self.eio.attach(app, socketio_path)
112
113    async def emit(self, event, data=None, to=None, room=None, skip_sid=None,
114                   namespace=None, callback=None, **kwargs):
115        """Emit a custom event to one or more connected clients.
116
117        :param event: The event name. It can be any string. The event names
118                      ``'connect'``, ``'message'`` and ``'disconnect'`` are
119                      reserved and should not be used.
120        :param data: The data to send to the client or clients. Data can be of
121                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
122                     multiple arguments, use a tuple where each element is of
123                     one of the types indicated above.
124        :param to: The recipient of the message. This can be set to the
125                   session ID of a client to address only that client, or to
126                   to any custom room created by the application to address all
127                   the clients in that room, If this argument is omitted the
128                   event is broadcasted to all connected clients.
129        :param room: Alias for the ``to`` parameter.
130        :param skip_sid: The session ID of a client to skip when broadcasting
131                         to a room or to all clients. This can be used to
132                         prevent a message from being sent to the sender.
133        :param namespace: The Socket.IO namespace for the event. If this
134                          argument is omitted the event is emitted to the
135                          default namespace.
136        :param callback: If given, this function will be called to acknowledge
137                         the the client has received the message. The arguments
138                         that will be passed to the function are those provided
139                         by the client. Callback functions can only be used
140                         when addressing an individual client.
141        :param ignore_queue: Only used when a message queue is configured. If
142                             set to ``True``, the event is emitted to the
143                             clients directly, without going through the queue.
144                             This is more efficient, but only works when a
145                             single server process is used. It is recommended
146                             to always leave this parameter with its default
147                             value of ``False``.
148
149        Note: this method is not designed to be used concurrently. If multiple
150        tasks are emitting at the same time to the same client connection, then
151        messages composed of multiple packets may end up being sent in an
152        incorrect sequence. Use standard concurrency solutions (such as a Lock
153        object) to prevent this situation.
154
155        Note 2: this method is a coroutine.
156        """
157        namespace = namespace or '/'
158        room = to or room
159        self.logger.info('emitting event "%s" to %s [%s]', event,
160                         room or 'all', namespace)
161        await self.manager.emit(event, data, namespace, room=room,
162                                skip_sid=skip_sid, callback=callback,
163                                **kwargs)
164
165    async def send(self, data, to=None, room=None, skip_sid=None,
166                   namespace=None, callback=None, **kwargs):
167        """Send a message to one or more connected clients.
168
169        This function emits an event with the name ``'message'``. Use
170        :func:`emit` to issue custom event names.
171
172        :param data: The data to send to the client or clients. Data can be of
173                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
174                     multiple arguments, use a tuple where each element is of
175                     one of the types indicated above.
176        :param to: The recipient of the message. This can be set to the
177                   session ID of a client to address only that client, or to
178                   to any custom room created by the application to address all
179                   the clients in that room, If this argument is omitted the
180                   event is broadcasted to all connected clients.
181        :param room: Alias for the ``to`` parameter.
182        :param skip_sid: The session ID of a client to skip when broadcasting
183                         to a room or to all clients. This can be used to
184                         prevent a message from being sent to the sender.
185        :param namespace: The Socket.IO namespace for the event. If this
186                          argument is omitted the event is emitted to the
187                          default namespace.
188        :param callback: If given, this function will be called to acknowledge
189                         the the client has received the message. The arguments
190                         that will be passed to the function are those provided
191                         by the client. Callback functions can only be used
192                         when addressing an individual client.
193        :param ignore_queue: Only used when a message queue is configured. If
194                             set to ``True``, the event is emitted to the
195                             clients directly, without going through the queue.
196                             This is more efficient, but only works when a
197                             single server process is used. It is recommended
198                             to always leave this parameter with its default
199                             value of ``False``.
200
201        Note: this method is a coroutine.
202        """
203        await self.emit('message', data=data, to=to, room=room,
204                        skip_sid=skip_sid, namespace=namespace,
205                        callback=callback, **kwargs)
206
207    async def call(self, event, data=None, to=None, sid=None, namespace=None,
208                   timeout=60, **kwargs):
209        """Emit a custom event to a client and wait for the response.
210
211        :param event: The event name. It can be any string. The event names
212                      ``'connect'``, ``'message'`` and ``'disconnect'`` are
213                      reserved and should not be used.
214        :param data: The data to send to the client or clients. Data can be of
215                     type ``str``, ``bytes``, ``list`` or ``dict``. To send
216                     multiple arguments, use a tuple where each element is of
217                     one of the types indicated above.
218        :param to: The session ID of the recipient client.
219        :param sid: Alias for the ``to`` parameter.
220        :param namespace: The Socket.IO namespace for the event. If this
221                          argument is omitted the event is emitted to the
222                          default namespace.
223        :param timeout: The waiting timeout. If the timeout is reached before
224                        the client acknowledges the event, then a
225                        ``TimeoutError`` exception is raised.
226        :param ignore_queue: Only used when a message queue is configured. If
227                             set to ``True``, the event is emitted to the
228                             client directly, without going through the queue.
229                             This is more efficient, but only works when a
230                             single server process is used. It is recommended
231                             to always leave this parameter with its default
232                             value of ``False``.
233
234        Note: this method is not designed to be used concurrently. If multiple
235        tasks are emitting at the same time to the same client connection, then
236        messages composed of multiple packets may end up being sent in an
237        incorrect sequence. Use standard concurrency solutions (such as a Lock
238        object) to prevent this situation.
239
240        Note 2: this method is a coroutine.
241        """
242        if to is None and sid is None:
243            raise ValueError('Cannot use call() to broadcast.')
244        if not self.async_handlers:
245            raise RuntimeError(
246                'Cannot use call() when async_handlers is False.')
247        callback_event = self.eio.create_event()
248        callback_args = []
249
250        def event_callback(*args):
251            callback_args.append(args)
252            callback_event.set()
253
254        await self.emit(event, data=data, room=to or sid, namespace=namespace,
255                        callback=event_callback, **kwargs)
256        try:
257            await asyncio.wait_for(callback_event.wait(), timeout)
258        except asyncio.TimeoutError:
259            raise exceptions.TimeoutError() from None
260        return callback_args[0] if len(callback_args[0]) > 1 \
261            else callback_args[0][0] if len(callback_args[0]) == 1 \
262            else None
263
264    async def close_room(self, room, namespace=None):
265        """Close a room.
266
267        This function removes all the clients from the given room.
268
269        :param room: Room name.
270        :param namespace: The Socket.IO namespace for the event. If this
271                          argument is omitted the default namespace is used.
272
273        Note: this method is a coroutine.
274        """
275        namespace = namespace or '/'
276        self.logger.info('room %s is closing [%s]', room, namespace)
277        await self.manager.close_room(room, namespace)
278
279    async def get_session(self, sid, namespace=None):
280        """Return the user session for a client.
281
282        :param sid: The session id of the client.
283        :param namespace: The Socket.IO namespace. If this argument is omitted
284                          the default namespace is used.
285
286        The return value is a dictionary. Modifications made to this
287        dictionary are not guaranteed to be preserved. If you want to modify
288        the user session, use the ``session`` context manager instead.
289        """
290        namespace = namespace or '/'
291        eio_sid = self.manager.eio_sid_from_sid(sid, namespace)
292        eio_session = await self.eio.get_session(eio_sid)
293        return eio_session.setdefault(namespace, {})
294
295    async def save_session(self, sid, session, namespace=None):
296        """Store the user session for a client.
297
298        :param sid: The session id of the client.
299        :param session: The session dictionary.
300        :param namespace: The Socket.IO namespace. If this argument is omitted
301                          the default namespace is used.
302        """
303        namespace = namespace or '/'
304        eio_sid = self.manager.eio_sid_from_sid(sid, namespace)
305        eio_session = await self.eio.get_session(eio_sid)
306        eio_session[namespace] = session
307
308    def session(self, sid, namespace=None):
309        """Return the user session for a client with context manager syntax.
310
311        :param sid: The session id of the client.
312
313        This is a context manager that returns the user session dictionary for
314        the client. Any changes that are made to this dictionary inside the
315        context manager block are saved back to the session. Example usage::
316
317            @eio.on('connect')
318            def on_connect(sid, environ):
319                username = authenticate_user(environ)
320                if not username:
321                    return False
322                with eio.session(sid) as session:
323                    session['username'] = username
324
325            @eio.on('message')
326            def on_message(sid, msg):
327                async with eio.session(sid) as session:
328                    print('received message from ', session['username'])
329        """
330        class _session_context_manager(object):
331            def __init__(self, server, sid, namespace):
332                self.server = server
333                self.sid = sid
334                self.namespace = namespace
335                self.session = None
336
337            async def __aenter__(self):
338                self.session = await self.server.get_session(
339                    sid, namespace=self.namespace)
340                return self.session
341
342            async def __aexit__(self, *args):
343                await self.server.save_session(sid, self.session,
344                                               namespace=self.namespace)
345
346        return _session_context_manager(self, sid, namespace)
347
348    async def disconnect(self, sid, namespace=None, ignore_queue=False):
349        """Disconnect a client.
350
351        :param sid: Session ID of the client.
352        :param namespace: The Socket.IO namespace to disconnect. If this
353                          argument is omitted the default namespace is used.
354        :param ignore_queue: Only used when a message queue is configured. If
355                             set to ``True``, the disconnect is processed
356                             locally, without broadcasting on the queue. It is
357                             recommended to always leave this parameter with
358                             its default value of ``False``.
359
360        Note: this method is a coroutine.
361        """
362        namespace = namespace or '/'
363        if ignore_queue:
364            delete_it = self.manager.is_connected(sid, namespace)
365        else:
366            delete_it = await self.manager.can_disconnect(sid, namespace)
367        if delete_it:
368            self.logger.info('Disconnecting %s [%s]', sid, namespace)
369            eio_sid = self.manager.pre_disconnect(sid, namespace=namespace)
370            await self._send_packet(eio_sid, packet.Packet(
371                packet.DISCONNECT, namespace=namespace))
372            await self._trigger_event('disconnect', namespace, sid)
373            self.manager.disconnect(sid, namespace=namespace)
374
375    async def handle_request(self, *args, **kwargs):
376        """Handle an HTTP request from the client.
377
378        This is the entry point of the Socket.IO application. This function
379        returns the HTTP response body to deliver to the client.
380
381        Note: this method is a coroutine.
382        """
383        return await self.eio.handle_request(*args, **kwargs)
384
385    def start_background_task(self, target, *args, **kwargs):
386        """Start a background task using the appropriate async model.
387
388        This is a utility function that applications can use to start a
389        background task using the method that is compatible with the
390        selected async mode.
391
392        :param target: the target function to execute. Must be a coroutine.
393        :param args: arguments to pass to the function.
394        :param kwargs: keyword arguments to pass to the function.
395
396        The return value is a ``asyncio.Task`` object.
397
398        Note: this method is a coroutine.
399        """
400        return self.eio.start_background_task(target, *args, **kwargs)
401
402    async def sleep(self, seconds=0):
403        """Sleep for the requested amount of time using the appropriate async
404        model.
405
406        This is a utility function that applications can use to put a task to
407        sleep without having to worry about using the correct call for the
408        selected async mode.
409
410        Note: this method is a coroutine.
411        """
412        return await self.eio.sleep(seconds)
413
414    async def _emit_internal(self, sid, event, data, namespace=None, id=None):
415        """Send a message to a client."""
416        # tuples are expanded to multiple arguments, everything else is sent
417        # as a single argument
418        if isinstance(data, tuple):
419            data = list(data)
420        elif data is not None:
421            data = [data]
422        else:
423            data = []
424        await self._send_packet(sid, packet.Packet(
425            packet.EVENT, namespace=namespace, data=[event] + data, id=id))
426
427    async def _send_packet(self, eio_sid, pkt):
428        """Send a Socket.IO packet to a client."""
429        encoded_packet = pkt.encode()
430        if isinstance(encoded_packet, list):
431            for ep in encoded_packet:
432                await self.eio.send(eio_sid, ep)
433        else:
434            await self.eio.send(eio_sid, encoded_packet)
435
436    async def _handle_connect(self, eio_sid, namespace, data):
437        """Handle a client connection request."""
438        namespace = namespace or '/'
439        sid = self.manager.connect(eio_sid, namespace)
440        if self.always_connect:
441            await self._send_packet(eio_sid, packet.Packet(
442                packet.CONNECT, {'sid': sid}, namespace=namespace))
443        fail_reason = exceptions.ConnectionRefusedError().error_args
444        try:
445            if data:
446                success = await self._trigger_event(
447                    'connect', namespace, sid, self.environ[eio_sid], data)
448            else:
449                try:
450                    success = await self._trigger_event(
451                        'connect', namespace, sid, self.environ[eio_sid])
452                except TypeError:
453                    success = await self._trigger_event(
454                        'connect', namespace, sid, self.environ[eio_sid], None)
455        except exceptions.ConnectionRefusedError as exc:
456            fail_reason = exc.error_args
457            success = False
458
459        if success is False:
460            if self.always_connect:
461                self.manager.pre_disconnect(sid, namespace)
462                await self._send_packet(eio_sid, packet.Packet(
463                    packet.DISCONNECT, data=fail_reason, namespace=namespace))
464            else:
465                await self._send_packet(eio_sid, packet.Packet(
466                    packet.CONNECT_ERROR, data=fail_reason,
467                    namespace=namespace))
468            self.manager.disconnect(sid, namespace)
469        elif not self.always_connect:
470            await self._send_packet(eio_sid, packet.Packet(
471                packet.CONNECT, {'sid': sid}, namespace=namespace))
472
473    async def _handle_disconnect(self, eio_sid, namespace):
474        """Handle a client disconnect."""
475        namespace = namespace or '/'
476        sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
477        if not self.manager.is_connected(sid, namespace):  # pragma: no cover
478            return
479        self.manager.pre_disconnect(sid, namespace=namespace)
480        await self._trigger_event('disconnect', namespace, sid)
481        self.manager.disconnect(sid, namespace)
482
483    async def _handle_event(self, eio_sid, namespace, id, data):
484        """Handle an incoming client event."""
485        namespace = namespace or '/'
486        sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
487        self.logger.info('received event "%s" from %s [%s]', data[0], sid,
488                         namespace)
489        if not self.manager.is_connected(sid, namespace):
490            self.logger.warning('%s is not connected to namespace %s',
491                                sid, namespace)
492            return
493        if self.async_handlers:
494            self.start_background_task(self._handle_event_internal, self, sid,
495                                       eio_sid, data, namespace, id)
496        else:
497            await self._handle_event_internal(self, sid, eio_sid, data,
498                                              namespace, id)
499
500    async def _handle_event_internal(self, server, sid, eio_sid, data,
501                                     namespace, id):
502        r = await server._trigger_event(data[0], namespace, sid, *data[1:])
503        if id is not None:
504            # send ACK packet with the response returned by the handler
505            # tuples are expanded as multiple arguments
506            if r is None:
507                data = []
508            elif isinstance(r, tuple):
509                data = list(r)
510            else:
511                data = [r]
512            await server._send_packet(eio_sid, packet.Packet(
513                packet.ACK, namespace=namespace, id=id, data=data))
514
515    async def _handle_ack(self, eio_sid, namespace, id, data):
516        """Handle ACK packets from the client."""
517        namespace = namespace or '/'
518        sid = self.manager.sid_from_eio_sid(eio_sid, namespace)
519        self.logger.info('received ack from %s [%s]', sid, namespace)
520        await self.manager.trigger_callback(sid, id, data)
521
522    async def _trigger_event(self, event, namespace, *args):
523        """Invoke an application event handler."""
524        # first see if we have an explicit handler for the event
525        if namespace in self.handlers and event in self.handlers[namespace]:
526            if asyncio.iscoroutinefunction(self.handlers[namespace][event]) \
527                    is True:
528                try:
529                    ret = await self.handlers[namespace][event](*args)
530                except asyncio.CancelledError:  # pragma: no cover
531                    ret = None
532            else:
533                ret = self.handlers[namespace][event](*args)
534            return ret
535
536        # or else, forward the event to a namepsace handler if one exists
537        elif namespace in self.namespace_handlers:
538            return await self.namespace_handlers[namespace].trigger_event(
539                event, *args)
540
541    async def _handle_eio_connect(self, eio_sid, environ):
542        """Handle the Engine.IO connection event."""
543        if not self.manager_initialized:
544            self.manager_initialized = True
545            self.manager.initialize()
546        self.environ[eio_sid] = environ
547
548    async def _handle_eio_message(self, eio_sid, data):
549        """Dispatch Engine.IO messages."""
550        if eio_sid in self._binary_packet:
551            pkt = self._binary_packet[eio_sid]
552            if pkt.add_attachment(data):
553                del self._binary_packet[eio_sid]
554                if pkt.packet_type == packet.BINARY_EVENT:
555                    await self._handle_event(eio_sid, pkt.namespace, pkt.id,
556                                             pkt.data)
557                else:
558                    await self._handle_ack(eio_sid, pkt.namespace, pkt.id,
559                                           pkt.data)
560        else:
561            pkt = packet.Packet(encoded_packet=data)
562            if pkt.packet_type == packet.CONNECT:
563                await self._handle_connect(eio_sid, pkt.namespace, pkt.data)
564            elif pkt.packet_type == packet.DISCONNECT:
565                await self._handle_disconnect(eio_sid, pkt.namespace)
566            elif pkt.packet_type == packet.EVENT:
567                await self._handle_event(eio_sid, pkt.namespace, pkt.id,
568                                         pkt.data)
569            elif pkt.packet_type == packet.ACK:
570                await self._handle_ack(eio_sid, pkt.namespace, pkt.id,
571                                       pkt.data)
572            elif pkt.packet_type == packet.BINARY_EVENT or \
573                    pkt.packet_type == packet.BINARY_ACK:
574                self._binary_packet[eio_sid] = pkt
575            elif pkt.packet_type == packet.CONNECT_ERROR:
576                raise ValueError('Unexpected CONNECT_ERROR packet.')
577            else:
578                raise ValueError('Unknown packet type.')
579
580    async def _handle_eio_disconnect(self, eio_sid):
581        """Handle Engine.IO disconnect event."""
582        for n in list(self.manager.get_namespaces()).copy():
583            await self._handle_disconnect(eio_sid, n)
584        if eio_sid in self.environ:
585            del self.environ[eio_sid]
586
587    def _engineio_server_class(self):
588        return engineio.AsyncServer
589