1import asyncio 2 3import engineio 4 5from . import asyncio_manager 6from . import exceptions 7from . import packet 8from . import server 9 10 11class AsyncServer(server.Server): 12 """A Socket.IO server for asyncio. 13 14 This class implements a fully compliant Socket.IO web server with support 15 for websocket and long-polling transports, compatible with the asyncio 16 framework. 17 18 :param client_manager: The client manager instance that will manage the 19 client list. When this is omitted, the client list 20 is stored in an in-memory structure, so the use of 21 multiple connected servers is not possible. 22 :param logger: To enable logging set to ``True`` or pass a logger object to 23 use. To disable logging set to ``False``. Note that fatal 24 errors are logged even when ``logger`` is ``False``. 25 :param json: An alternative json module to use for encoding and decoding 26 packets. Custom json modules must have ``dumps`` and ``loads`` 27 functions that are compatible with the standard library 28 versions. 29 :param async_handlers: If set to ``True``, event handlers for a client are 30 executed in separate threads. To run handlers for a 31 client synchronously, set to ``False``. The default 32 is ``True``. 33 :param always_connect: When set to ``False``, new connections are 34 provisory until the connect handler returns 35 something other than ``False``, at which point they 36 are accepted. When set to ``True``, connections are 37 immediately accepted, and then if the connect 38 handler returns ``False`` a disconnect is issued. 39 Set to ``True`` if you need to emit events from the 40 connect handler and your client is confused when it 41 receives events before the connection acceptance. 42 In any other case use the default of ``False``. 43 :param kwargs: Connection parameters for the underlying Engine.IO server. 44 45 The Engine.IO configuration supports the following settings: 46 47 :param async_mode: The asynchronous model to use. See the Deployment 48 section in the documentation for a description of the 49 available options. Valid async modes are "threading", 50 "eventlet", "gevent" and "gevent_uwsgi". If this 51 argument is not given, "eventlet" is tried first, then 52 "gevent_uwsgi", then "gevent", and finally "threading". 53 The first async mode that has all its dependencies 54 installed is then one that is chosen. 55 :param ping_interval: The interval in seconds at which the server pings 56 the client. The default is 25 seconds. For advanced 57 control, a two element tuple can be given, where 58 the first number is the ping interval and the second 59 is a grace period added by the server. 60 :param ping_timeout: The time in seconds that the client waits for the 61 server to respond before disconnecting. The default 62 is 5 seconds. 63 :param max_http_buffer_size: The maximum size of a message when using the 64 polling transport. The default is 1,000,000 65 bytes. 66 :param allow_upgrades: Whether to allow transport upgrades or not. The 67 default is ``True``. 68 :param http_compression: Whether to compress packages when using the 69 polling transport. The default is ``True``. 70 :param compression_threshold: Only compress messages when their byte size 71 is greater than this value. The default is 72 1024 bytes. 73 :param cookie: If set to a string, it is the name of the HTTP cookie the 74 server sends back tot he client containing the client 75 session id. If set to a dictionary, the ``'name'`` key 76 contains the cookie name and other keys define cookie 77 attributes, where the value of each attribute can be a 78 string, a callable with no arguments, or a boolean. If set 79 to ``None`` (the default), a cookie is not sent to the 80 client. 81 :param cors_allowed_origins: Origin or list of origins that are allowed to 82 connect to this server. Only the same origin 83 is allowed by default. Set this argument to 84 ``'*'`` to allow all origins, or to ``[]`` to 85 disable CORS handling. 86 :param cors_credentials: Whether credentials (cookies, authentication) are 87 allowed in requests to this server. The default is 88 ``True``. 89 :param monitor_clients: If set to ``True``, a background task will ensure 90 inactive clients are closed. Set to ``False`` to 91 disable the monitoring task (not recommended). The 92 default is ``True``. 93 :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass 94 a logger object to use. To disable logging set to 95 ``False``. The default is ``False``. Note that 96 fatal errors are logged even when 97 ``engineio_logger`` is ``False``. 98 """ 99 def __init__(self, client_manager=None, logger=False, json=None, 100 async_handlers=True, **kwargs): 101 if client_manager is None: 102 client_manager = asyncio_manager.AsyncManager() 103 super().__init__(client_manager=client_manager, logger=logger, 104 json=json, async_handlers=async_handlers, **kwargs) 105 106 def is_asyncio_based(self): 107 return True 108 109 def attach(self, app, socketio_path='socket.io'): 110 """Attach the Socket.IO server to an application.""" 111 self.eio.attach(app, socketio_path) 112 113 async def emit(self, event, data=None, to=None, room=None, skip_sid=None, 114 namespace=None, callback=None, **kwargs): 115 """Emit a custom event to one or more connected clients. 116 117 :param event: The event name. It can be any string. The event names 118 ``'connect'``, ``'message'`` and ``'disconnect'`` are 119 reserved and should not be used. 120 :param data: The data to send to the client or clients. Data can be of 121 type ``str``, ``bytes``, ``list`` or ``dict``. To send 122 multiple arguments, use a tuple where each element is of 123 one of the types indicated above. 124 :param to: The recipient of the message. This can be set to the 125 session ID of a client to address only that client, or to 126 to any custom room created by the application to address all 127 the clients in that room, If this argument is omitted the 128 event is broadcasted to all connected clients. 129 :param room: Alias for the ``to`` parameter. 130 :param skip_sid: The session ID of a client to skip when broadcasting 131 to a room or to all clients. This can be used to 132 prevent a message from being sent to the sender. 133 :param namespace: The Socket.IO namespace for the event. If this 134 argument is omitted the event is emitted to the 135 default namespace. 136 :param callback: If given, this function will be called to acknowledge 137 the the client has received the message. The arguments 138 that will be passed to the function are those provided 139 by the client. Callback functions can only be used 140 when addressing an individual client. 141 :param ignore_queue: Only used when a message queue is configured. If 142 set to ``True``, the event is emitted to the 143 clients directly, without going through the queue. 144 This is more efficient, but only works when a 145 single server process is used. It is recommended 146 to always leave this parameter with its default 147 value of ``False``. 148 149 Note: this method is not designed to be used concurrently. If multiple 150 tasks are emitting at the same time to the same client connection, then 151 messages composed of multiple packets may end up being sent in an 152 incorrect sequence. Use standard concurrency solutions (such as a Lock 153 object) to prevent this situation. 154 155 Note 2: this method is a coroutine. 156 """ 157 namespace = namespace or '/' 158 room = to or room 159 self.logger.info('emitting event "%s" to %s [%s]', event, 160 room or 'all', namespace) 161 await self.manager.emit(event, data, namespace, room=room, 162 skip_sid=skip_sid, callback=callback, 163 **kwargs) 164 165 async def send(self, data, to=None, room=None, skip_sid=None, 166 namespace=None, callback=None, **kwargs): 167 """Send a message to one or more connected clients. 168 169 This function emits an event with the name ``'message'``. Use 170 :func:`emit` to issue custom event names. 171 172 :param data: The data to send to the client or clients. Data can be of 173 type ``str``, ``bytes``, ``list`` or ``dict``. To send 174 multiple arguments, use a tuple where each element is of 175 one of the types indicated above. 176 :param to: The recipient of the message. This can be set to the 177 session ID of a client to address only that client, or to 178 to any custom room created by the application to address all 179 the clients in that room, If this argument is omitted the 180 event is broadcasted to all connected clients. 181 :param room: Alias for the ``to`` parameter. 182 :param skip_sid: The session ID of a client to skip when broadcasting 183 to a room or to all clients. This can be used to 184 prevent a message from being sent to the sender. 185 :param namespace: The Socket.IO namespace for the event. If this 186 argument is omitted the event is emitted to the 187 default namespace. 188 :param callback: If given, this function will be called to acknowledge 189 the the client has received the message. The arguments 190 that will be passed to the function are those provided 191 by the client. Callback functions can only be used 192 when addressing an individual client. 193 :param ignore_queue: Only used when a message queue is configured. If 194 set to ``True``, the event is emitted to the 195 clients directly, without going through the queue. 196 This is more efficient, but only works when a 197 single server process is used. It is recommended 198 to always leave this parameter with its default 199 value of ``False``. 200 201 Note: this method is a coroutine. 202 """ 203 await self.emit('message', data=data, to=to, room=room, 204 skip_sid=skip_sid, namespace=namespace, 205 callback=callback, **kwargs) 206 207 async def call(self, event, data=None, to=None, sid=None, namespace=None, 208 timeout=60, **kwargs): 209 """Emit a custom event to a client and wait for the response. 210 211 :param event: The event name. It can be any string. The event names 212 ``'connect'``, ``'message'`` and ``'disconnect'`` are 213 reserved and should not be used. 214 :param data: The data to send to the client or clients. Data can be of 215 type ``str``, ``bytes``, ``list`` or ``dict``. To send 216 multiple arguments, use a tuple where each element is of 217 one of the types indicated above. 218 :param to: The session ID of the recipient client. 219 :param sid: Alias for the ``to`` parameter. 220 :param namespace: The Socket.IO namespace for the event. If this 221 argument is omitted the event is emitted to the 222 default namespace. 223 :param timeout: The waiting timeout. If the timeout is reached before 224 the client acknowledges the event, then a 225 ``TimeoutError`` exception is raised. 226 :param ignore_queue: Only used when a message queue is configured. If 227 set to ``True``, the event is emitted to the 228 client directly, without going through the queue. 229 This is more efficient, but only works when a 230 single server process is used. It is recommended 231 to always leave this parameter with its default 232 value of ``False``. 233 234 Note: this method is not designed to be used concurrently. If multiple 235 tasks are emitting at the same time to the same client connection, then 236 messages composed of multiple packets may end up being sent in an 237 incorrect sequence. Use standard concurrency solutions (such as a Lock 238 object) to prevent this situation. 239 240 Note 2: this method is a coroutine. 241 """ 242 if to is None and sid is None: 243 raise ValueError('Cannot use call() to broadcast.') 244 if not self.async_handlers: 245 raise RuntimeError( 246 'Cannot use call() when async_handlers is False.') 247 callback_event = self.eio.create_event() 248 callback_args = [] 249 250 def event_callback(*args): 251 callback_args.append(args) 252 callback_event.set() 253 254 await self.emit(event, data=data, room=to or sid, namespace=namespace, 255 callback=event_callback, **kwargs) 256 try: 257 await asyncio.wait_for(callback_event.wait(), timeout) 258 except asyncio.TimeoutError: 259 raise exceptions.TimeoutError() from None 260 return callback_args[0] if len(callback_args[0]) > 1 \ 261 else callback_args[0][0] if len(callback_args[0]) == 1 \ 262 else None 263 264 async def close_room(self, room, namespace=None): 265 """Close a room. 266 267 This function removes all the clients from the given room. 268 269 :param room: Room name. 270 :param namespace: The Socket.IO namespace for the event. If this 271 argument is omitted the default namespace is used. 272 273 Note: this method is a coroutine. 274 """ 275 namespace = namespace or '/' 276 self.logger.info('room %s is closing [%s]', room, namespace) 277 await self.manager.close_room(room, namespace) 278 279 async def get_session(self, sid, namespace=None): 280 """Return the user session for a client. 281 282 :param sid: The session id of the client. 283 :param namespace: The Socket.IO namespace. If this argument is omitted 284 the default namespace is used. 285 286 The return value is a dictionary. Modifications made to this 287 dictionary are not guaranteed to be preserved. If you want to modify 288 the user session, use the ``session`` context manager instead. 289 """ 290 namespace = namespace or '/' 291 eio_sid = self.manager.eio_sid_from_sid(sid, namespace) 292 eio_session = await self.eio.get_session(eio_sid) 293 return eio_session.setdefault(namespace, {}) 294 295 async def save_session(self, sid, session, namespace=None): 296 """Store the user session for a client. 297 298 :param sid: The session id of the client. 299 :param session: The session dictionary. 300 :param namespace: The Socket.IO namespace. If this argument is omitted 301 the default namespace is used. 302 """ 303 namespace = namespace or '/' 304 eio_sid = self.manager.eio_sid_from_sid(sid, namespace) 305 eio_session = await self.eio.get_session(eio_sid) 306 eio_session[namespace] = session 307 308 def session(self, sid, namespace=None): 309 """Return the user session for a client with context manager syntax. 310 311 :param sid: The session id of the client. 312 313 This is a context manager that returns the user session dictionary for 314 the client. Any changes that are made to this dictionary inside the 315 context manager block are saved back to the session. Example usage:: 316 317 @eio.on('connect') 318 def on_connect(sid, environ): 319 username = authenticate_user(environ) 320 if not username: 321 return False 322 with eio.session(sid) as session: 323 session['username'] = username 324 325 @eio.on('message') 326 def on_message(sid, msg): 327 async with eio.session(sid) as session: 328 print('received message from ', session['username']) 329 """ 330 class _session_context_manager(object): 331 def __init__(self, server, sid, namespace): 332 self.server = server 333 self.sid = sid 334 self.namespace = namespace 335 self.session = None 336 337 async def __aenter__(self): 338 self.session = await self.server.get_session( 339 sid, namespace=self.namespace) 340 return self.session 341 342 async def __aexit__(self, *args): 343 await self.server.save_session(sid, self.session, 344 namespace=self.namespace) 345 346 return _session_context_manager(self, sid, namespace) 347 348 async def disconnect(self, sid, namespace=None, ignore_queue=False): 349 """Disconnect a client. 350 351 :param sid: Session ID of the client. 352 :param namespace: The Socket.IO namespace to disconnect. If this 353 argument is omitted the default namespace is used. 354 :param ignore_queue: Only used when a message queue is configured. If 355 set to ``True``, the disconnect is processed 356 locally, without broadcasting on the queue. It is 357 recommended to always leave this parameter with 358 its default value of ``False``. 359 360 Note: this method is a coroutine. 361 """ 362 namespace = namespace or '/' 363 if ignore_queue: 364 delete_it = self.manager.is_connected(sid, namespace) 365 else: 366 delete_it = await self.manager.can_disconnect(sid, namespace) 367 if delete_it: 368 self.logger.info('Disconnecting %s [%s]', sid, namespace) 369 eio_sid = self.manager.pre_disconnect(sid, namespace=namespace) 370 await self._send_packet(eio_sid, packet.Packet( 371 packet.DISCONNECT, namespace=namespace)) 372 await self._trigger_event('disconnect', namespace, sid) 373 self.manager.disconnect(sid, namespace=namespace) 374 375 async def handle_request(self, *args, **kwargs): 376 """Handle an HTTP request from the client. 377 378 This is the entry point of the Socket.IO application. This function 379 returns the HTTP response body to deliver to the client. 380 381 Note: this method is a coroutine. 382 """ 383 return await self.eio.handle_request(*args, **kwargs) 384 385 def start_background_task(self, target, *args, **kwargs): 386 """Start a background task using the appropriate async model. 387 388 This is a utility function that applications can use to start a 389 background task using the method that is compatible with the 390 selected async mode. 391 392 :param target: the target function to execute. Must be a coroutine. 393 :param args: arguments to pass to the function. 394 :param kwargs: keyword arguments to pass to the function. 395 396 The return value is a ``asyncio.Task`` object. 397 398 Note: this method is a coroutine. 399 """ 400 return self.eio.start_background_task(target, *args, **kwargs) 401 402 async def sleep(self, seconds=0): 403 """Sleep for the requested amount of time using the appropriate async 404 model. 405 406 This is a utility function that applications can use to put a task to 407 sleep without having to worry about using the correct call for the 408 selected async mode. 409 410 Note: this method is a coroutine. 411 """ 412 return await self.eio.sleep(seconds) 413 414 async def _emit_internal(self, sid, event, data, namespace=None, id=None): 415 """Send a message to a client.""" 416 # tuples are expanded to multiple arguments, everything else is sent 417 # as a single argument 418 if isinstance(data, tuple): 419 data = list(data) 420 elif data is not None: 421 data = [data] 422 else: 423 data = [] 424 await self._send_packet(sid, packet.Packet( 425 packet.EVENT, namespace=namespace, data=[event] + data, id=id)) 426 427 async def _send_packet(self, eio_sid, pkt): 428 """Send a Socket.IO packet to a client.""" 429 encoded_packet = pkt.encode() 430 if isinstance(encoded_packet, list): 431 for ep in encoded_packet: 432 await self.eio.send(eio_sid, ep) 433 else: 434 await self.eio.send(eio_sid, encoded_packet) 435 436 async def _handle_connect(self, eio_sid, namespace, data): 437 """Handle a client connection request.""" 438 namespace = namespace or '/' 439 sid = self.manager.connect(eio_sid, namespace) 440 if self.always_connect: 441 await self._send_packet(eio_sid, packet.Packet( 442 packet.CONNECT, {'sid': sid}, namespace=namespace)) 443 fail_reason = exceptions.ConnectionRefusedError().error_args 444 try: 445 if data: 446 success = await self._trigger_event( 447 'connect', namespace, sid, self.environ[eio_sid], data) 448 else: 449 try: 450 success = await self._trigger_event( 451 'connect', namespace, sid, self.environ[eio_sid]) 452 except TypeError: 453 success = await self._trigger_event( 454 'connect', namespace, sid, self.environ[eio_sid], None) 455 except exceptions.ConnectionRefusedError as exc: 456 fail_reason = exc.error_args 457 success = False 458 459 if success is False: 460 if self.always_connect: 461 self.manager.pre_disconnect(sid, namespace) 462 await self._send_packet(eio_sid, packet.Packet( 463 packet.DISCONNECT, data=fail_reason, namespace=namespace)) 464 else: 465 await self._send_packet(eio_sid, packet.Packet( 466 packet.CONNECT_ERROR, data=fail_reason, 467 namespace=namespace)) 468 self.manager.disconnect(sid, namespace) 469 elif not self.always_connect: 470 await self._send_packet(eio_sid, packet.Packet( 471 packet.CONNECT, {'sid': sid}, namespace=namespace)) 472 473 async def _handle_disconnect(self, eio_sid, namespace): 474 """Handle a client disconnect.""" 475 namespace = namespace or '/' 476 sid = self.manager.sid_from_eio_sid(eio_sid, namespace) 477 if not self.manager.is_connected(sid, namespace): # pragma: no cover 478 return 479 self.manager.pre_disconnect(sid, namespace=namespace) 480 await self._trigger_event('disconnect', namespace, sid) 481 self.manager.disconnect(sid, namespace) 482 483 async def _handle_event(self, eio_sid, namespace, id, data): 484 """Handle an incoming client event.""" 485 namespace = namespace or '/' 486 sid = self.manager.sid_from_eio_sid(eio_sid, namespace) 487 self.logger.info('received event "%s" from %s [%s]', data[0], sid, 488 namespace) 489 if not self.manager.is_connected(sid, namespace): 490 self.logger.warning('%s is not connected to namespace %s', 491 sid, namespace) 492 return 493 if self.async_handlers: 494 self.start_background_task(self._handle_event_internal, self, sid, 495 eio_sid, data, namespace, id) 496 else: 497 await self._handle_event_internal(self, sid, eio_sid, data, 498 namespace, id) 499 500 async def _handle_event_internal(self, server, sid, eio_sid, data, 501 namespace, id): 502 r = await server._trigger_event(data[0], namespace, sid, *data[1:]) 503 if id is not None: 504 # send ACK packet with the response returned by the handler 505 # tuples are expanded as multiple arguments 506 if r is None: 507 data = [] 508 elif isinstance(r, tuple): 509 data = list(r) 510 else: 511 data = [r] 512 await server._send_packet(eio_sid, packet.Packet( 513 packet.ACK, namespace=namespace, id=id, data=data)) 514 515 async def _handle_ack(self, eio_sid, namespace, id, data): 516 """Handle ACK packets from the client.""" 517 namespace = namespace or '/' 518 sid = self.manager.sid_from_eio_sid(eio_sid, namespace) 519 self.logger.info('received ack from %s [%s]', sid, namespace) 520 await self.manager.trigger_callback(sid, id, data) 521 522 async def _trigger_event(self, event, namespace, *args): 523 """Invoke an application event handler.""" 524 # first see if we have an explicit handler for the event 525 if namespace in self.handlers and event in self.handlers[namespace]: 526 if asyncio.iscoroutinefunction(self.handlers[namespace][event]) \ 527 is True: 528 try: 529 ret = await self.handlers[namespace][event](*args) 530 except asyncio.CancelledError: # pragma: no cover 531 ret = None 532 else: 533 ret = self.handlers[namespace][event](*args) 534 return ret 535 536 # or else, forward the event to a namepsace handler if one exists 537 elif namespace in self.namespace_handlers: 538 return await self.namespace_handlers[namespace].trigger_event( 539 event, *args) 540 541 async def _handle_eio_connect(self, eio_sid, environ): 542 """Handle the Engine.IO connection event.""" 543 if not self.manager_initialized: 544 self.manager_initialized = True 545 self.manager.initialize() 546 self.environ[eio_sid] = environ 547 548 async def _handle_eio_message(self, eio_sid, data): 549 """Dispatch Engine.IO messages.""" 550 if eio_sid in self._binary_packet: 551 pkt = self._binary_packet[eio_sid] 552 if pkt.add_attachment(data): 553 del self._binary_packet[eio_sid] 554 if pkt.packet_type == packet.BINARY_EVENT: 555 await self._handle_event(eio_sid, pkt.namespace, pkt.id, 556 pkt.data) 557 else: 558 await self._handle_ack(eio_sid, pkt.namespace, pkt.id, 559 pkt.data) 560 else: 561 pkt = packet.Packet(encoded_packet=data) 562 if pkt.packet_type == packet.CONNECT: 563 await self._handle_connect(eio_sid, pkt.namespace, pkt.data) 564 elif pkt.packet_type == packet.DISCONNECT: 565 await self._handle_disconnect(eio_sid, pkt.namespace) 566 elif pkt.packet_type == packet.EVENT: 567 await self._handle_event(eio_sid, pkt.namespace, pkt.id, 568 pkt.data) 569 elif pkt.packet_type == packet.ACK: 570 await self._handle_ack(eio_sid, pkt.namespace, pkt.id, 571 pkt.data) 572 elif pkt.packet_type == packet.BINARY_EVENT or \ 573 pkt.packet_type == packet.BINARY_ACK: 574 self._binary_packet[eio_sid] = pkt 575 elif pkt.packet_type == packet.CONNECT_ERROR: 576 raise ValueError('Unexpected CONNECT_ERROR packet.') 577 else: 578 raise ValueError('Unknown packet type.') 579 580 async def _handle_eio_disconnect(self, eio_sid): 581 """Handle Engine.IO disconnect event.""" 582 for n in list(self.manager.get_namespaces()).copy(): 583 await self._handle_disconnect(eio_sid, n) 584 if eio_sid in self.environ: 585 del self.environ[eio_sid] 586 587 def _engineio_server_class(self): 588 return engineio.AsyncServer 589