1# -*- Mode: Python -*- 2# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28"""Basic infrastructure for asynchronous socket service clients and servers. 29 30There are only two ways to have a program on a single processor do "more 31than one thing at a time". Multi-threaded programming is the simplest and 32most popular way to do it, but there is another very different technique, 33that lets you have nearly all the advantages of multi-threading, without 34actually using multiple threads. it's really only practical if your program 35is largely I/O bound. If your program is CPU bound, then pre-emptive 36scheduled threads are probably what you really need. Network servers are 37rarely CPU-bound, however. 38 39If your operating system supports the select() system call in its I/O 40library (and nearly all do), then you can use it to juggle multiple 41communication channels at once; doing other work while your I/O is taking 42place in the "background." Although this strategy can seem strange and 43complex, especially at first, it is in many ways easier to understand and 44control than multi-threaded programming. The module documented here solves 45many of the difficult problems for you, making the task of building 46sophisticated high-performance network servers and clients a snap. 47 48NB: this is a fork of asyncore from the stdlib that we've (the waitress 49developers) named 'wasyncore' to ensure forward compatibility, as asyncore 50in the stdlib will be dropped soon. It is neither a copy of the 2.7 asyncore 51nor the 3.X asyncore; it is a version compatible with either 2.7 or 3.X. 52""" 53 54from errno import ( 55 EAGAIN, 56 EALREADY, 57 EBADF, 58 ECONNABORTED, 59 ECONNRESET, 60 EINPROGRESS, 61 EINTR, 62 EINVAL, 63 EISCONN, 64 ENOTCONN, 65 EPIPE, 66 ESHUTDOWN, 67 EWOULDBLOCK, 68 errorcode, 69) 70import logging 71import os 72import select 73import socket 74import sys 75import time 76import warnings 77 78from . import compat, utilities 79 80_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF}) 81 82try: 83 socket_map 84except NameError: 85 socket_map = {} 86 87 88def _strerror(err): 89 try: 90 return os.strerror(err) 91 except (TypeError, ValueError, OverflowError, NameError): 92 return "Unknown error %s" % err 93 94 95class ExitNow(Exception): 96 pass 97 98 99_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) 100 101 102def read(obj): 103 try: 104 obj.handle_read_event() 105 except _reraised_exceptions: 106 raise 107 except: 108 obj.handle_error() 109 110 111def write(obj): 112 try: 113 obj.handle_write_event() 114 except _reraised_exceptions: 115 raise 116 except: 117 obj.handle_error() 118 119 120def _exception(obj): 121 try: 122 obj.handle_expt_event() 123 except _reraised_exceptions: 124 raise 125 except: 126 obj.handle_error() 127 128 129def readwrite(obj, flags): 130 try: 131 if flags & select.POLLIN: 132 obj.handle_read_event() 133 if flags & select.POLLOUT: 134 obj.handle_write_event() 135 if flags & select.POLLPRI: 136 obj.handle_expt_event() 137 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): 138 obj.handle_close() 139 except OSError as e: 140 if e.args[0] not in _DISCONNECTED: 141 obj.handle_error() 142 else: 143 obj.handle_close() 144 except _reraised_exceptions: 145 raise 146 except: 147 obj.handle_error() 148 149 150def poll(timeout=0.0, map=None): 151 if map is None: # pragma: no cover 152 map = socket_map 153 if map: 154 r = [] 155 w = [] 156 e = [] 157 for fd, obj in list(map.items()): # list() call FBO py3 158 is_r = obj.readable() 159 is_w = obj.writable() 160 if is_r: 161 r.append(fd) 162 # accepting sockets should not be writable 163 if is_w and not obj.accepting: 164 w.append(fd) 165 if is_r or is_w: 166 e.append(fd) 167 if [] == r == w == e: 168 time.sleep(timeout) 169 return 170 171 try: 172 r, w, e = select.select(r, w, e, timeout) 173 except OSError as err: 174 if err.args[0] != EINTR: 175 raise 176 else: 177 return 178 179 for fd in r: 180 obj = map.get(fd) 181 if obj is None: # pragma: no cover 182 continue 183 read(obj) 184 185 for fd in w: 186 obj = map.get(fd) 187 if obj is None: # pragma: no cover 188 continue 189 write(obj) 190 191 for fd in e: 192 obj = map.get(fd) 193 if obj is None: # pragma: no cover 194 continue 195 _exception(obj) 196 197 198def poll2(timeout=0.0, map=None): 199 # Use the poll() support added to the select module in Python 2.0 200 if map is None: # pragma: no cover 201 map = socket_map 202 if timeout is not None: 203 # timeout is in milliseconds 204 timeout = int(timeout * 1000) 205 pollster = select.poll() 206 if map: 207 for fd, obj in list(map.items()): 208 flags = 0 209 if obj.readable(): 210 flags |= select.POLLIN | select.POLLPRI 211 # accepting sockets should not be writable 212 if obj.writable() and not obj.accepting: 213 flags |= select.POLLOUT 214 if flags: 215 pollster.register(fd, flags) 216 217 try: 218 r = pollster.poll(timeout) 219 except OSError as err: 220 if err.args[0] != EINTR: 221 raise 222 r = [] 223 224 for fd, flags in r: 225 obj = map.get(fd) 226 if obj is None: # pragma: no cover 227 continue 228 readwrite(obj, flags) 229 230 231poll3 = poll2 # Alias for backward compatibility 232 233 234def loop(timeout=30.0, use_poll=False, map=None, count=None): 235 if map is None: # pragma: no cover 236 map = socket_map 237 238 if use_poll and hasattr(select, "poll"): 239 poll_fun = poll2 240 else: 241 poll_fun = poll 242 243 if count is None: # pragma: no cover 244 while map: 245 poll_fun(timeout, map) 246 247 else: 248 while map and count > 0: 249 poll_fun(timeout, map) 250 count = count - 1 251 252 253def compact_traceback(): 254 t, v, tb = sys.exc_info() 255 tbinfo = [] 256 if not tb: # pragma: no cover 257 raise AssertionError("traceback does not exist") 258 while tb: 259 tbinfo.append( 260 ( 261 tb.tb_frame.f_code.co_filename, 262 tb.tb_frame.f_code.co_name, 263 str(tb.tb_lineno), 264 ) 265 ) 266 tb = tb.tb_next 267 268 # just to be safe 269 del tb 270 271 file, function, line = tbinfo[-1] 272 info = " ".join(["[%s|%s|%s]" % x for x in tbinfo]) 273 return (file, function, line), t, v, info 274 275 276class dispatcher: 277 278 debug = False 279 connected = False 280 accepting = False 281 connecting = False 282 closing = False 283 addr = None 284 ignore_log_types = frozenset({"warning"}) 285 logger = utilities.logger 286 compact_traceback = staticmethod(compact_traceback) # for testing 287 288 def __init__(self, sock=None, map=None): 289 if map is None: # pragma: no cover 290 self._map = socket_map 291 else: 292 self._map = map 293 294 self._fileno = None 295 296 if sock: 297 # Set to nonblocking just to make sure for cases where we 298 # get a socket from a blocking source. 299 sock.setblocking(0) 300 self.set_socket(sock, map) 301 self.connected = True 302 # The constructor no longer requires that the socket 303 # passed be connected. 304 try: 305 self.addr = sock.getpeername() 306 except OSError as err: 307 if err.args[0] in (ENOTCONN, EINVAL): 308 # To handle the case where we got an unconnected 309 # socket. 310 self.connected = False 311 else: 312 # The socket is broken in some unknown way, alert 313 # the user and remove it from the map (to prevent 314 # polling of broken sockets). 315 self.del_channel(map) 316 raise 317 else: 318 self.socket = None 319 320 def __repr__(self): 321 status = [self.__class__.__module__ + "." + self.__class__.__qualname__] 322 if self.accepting and self.addr: 323 status.append("listening") 324 elif self.connected: 325 status.append("connected") 326 if self.addr is not None: 327 try: 328 status.append("%s:%d" % self.addr) 329 except TypeError: # pragma: no cover 330 status.append(repr(self.addr)) 331 return "<%s at %#x>" % (" ".join(status), id(self)) 332 333 __str__ = __repr__ 334 335 def add_channel(self, map=None): 336 # self.log_info('adding channel %s' % self) 337 if map is None: 338 map = self._map 339 map[self._fileno] = self 340 341 def del_channel(self, map=None): 342 fd = self._fileno 343 if map is None: 344 map = self._map 345 if fd in map: 346 # self.log_info('closing channel %d:%s' % (fd, self)) 347 del map[fd] 348 self._fileno = None 349 350 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): 351 self.family_and_type = family, type 352 sock = socket.socket(family, type) 353 sock.setblocking(0) 354 self.set_socket(sock) 355 356 def set_socket(self, sock, map=None): 357 self.socket = sock 358 self._fileno = sock.fileno() 359 self.add_channel(map) 360 361 def set_reuse_addr(self): 362 # try to re-use a server port if possible 363 try: 364 self.socket.setsockopt( 365 socket.SOL_SOCKET, 366 socket.SO_REUSEADDR, 367 self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1, 368 ) 369 except OSError: 370 pass 371 372 # ================================================== 373 # predicates for select() 374 # these are used as filters for the lists of sockets 375 # to pass to select(). 376 # ================================================== 377 378 def readable(self): 379 return True 380 381 def writable(self): 382 return True 383 384 # ================================================== 385 # socket object methods. 386 # ================================================== 387 388 def listen(self, num): 389 self.accepting = True 390 if os.name == "nt" and num > 5: # pragma: no cover 391 num = 5 392 return self.socket.listen(num) 393 394 def bind(self, addr): 395 self.addr = addr 396 return self.socket.bind(addr) 397 398 def connect(self, address): 399 self.connected = False 400 self.connecting = True 401 err = self.socket.connect_ex(address) 402 if ( 403 err in (EINPROGRESS, EALREADY, EWOULDBLOCK) 404 or err == EINVAL 405 and os.name == "nt" 406 ): # pragma: no cover 407 self.addr = address 408 return 409 if err in (0, EISCONN): 410 self.addr = address 411 self.handle_connect_event() 412 else: 413 raise OSError(err, errorcode[err]) 414 415 def accept(self): 416 # XXX can return either an address pair or None 417 try: 418 conn, addr = self.socket.accept() 419 except TypeError: 420 return None 421 except OSError as why: 422 if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN): 423 return None 424 else: 425 raise 426 else: 427 return conn, addr 428 429 def send(self, data): 430 try: 431 result = self.socket.send(data) 432 return result 433 except OSError as why: 434 if why.args[0] == EWOULDBLOCK: 435 return 0 436 elif why.args[0] in _DISCONNECTED: 437 self.handle_close() 438 return 0 439 else: 440 raise 441 442 def recv(self, buffer_size): 443 try: 444 data = self.socket.recv(buffer_size) 445 if not data: 446 # a closed connection is indicated by signaling 447 # a read condition, and having recv() return 0. 448 self.handle_close() 449 return b"" 450 else: 451 return data 452 except OSError as why: 453 # winsock sometimes raises ENOTCONN 454 if why.args[0] in _DISCONNECTED: 455 self.handle_close() 456 return b"" 457 else: 458 raise 459 460 def close(self): 461 self.connected = False 462 self.accepting = False 463 self.connecting = False 464 self.del_channel() 465 if self.socket is not None: 466 try: 467 self.socket.close() 468 except OSError as why: 469 if why.args[0] not in (ENOTCONN, EBADF): 470 raise 471 472 # log and log_info may be overridden to provide more sophisticated 473 # logging and warning methods. In general, log is for 'hit' logging 474 # and 'log_info' is for informational, warning and error logging. 475 476 def log(self, message): 477 self.logger.log(logging.DEBUG, message) 478 479 def log_info(self, message, type="info"): 480 severity = { 481 "info": logging.INFO, 482 "warning": logging.WARN, 483 "error": logging.ERROR, 484 } 485 self.logger.log(severity.get(type, logging.INFO), message) 486 487 def handle_read_event(self): 488 if self.accepting: 489 # accepting sockets are never connected, they "spawn" new 490 # sockets that are connected 491 self.handle_accept() 492 elif not self.connected: 493 if self.connecting: 494 self.handle_connect_event() 495 self.handle_read() 496 else: 497 self.handle_read() 498 499 def handle_connect_event(self): 500 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 501 if err != 0: 502 raise OSError(err, _strerror(err)) 503 self.handle_connect() 504 self.connected = True 505 self.connecting = False 506 507 def handle_write_event(self): 508 if self.accepting: 509 # Accepting sockets shouldn't get a write event. 510 # We will pretend it didn't happen. 511 return 512 513 if not self.connected: 514 if self.connecting: 515 self.handle_connect_event() 516 self.handle_write() 517 518 def handle_expt_event(self): 519 # handle_expt_event() is called if there might be an error on the 520 # socket, or if there is OOB data 521 # check for the error condition first 522 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 523 if err != 0: 524 # we can get here when select.select() says that there is an 525 # exceptional condition on the socket 526 # since there is an error, we'll go ahead and close the socket 527 # like we would in a subclassed handle_read() that received no 528 # data 529 self.handle_close() 530 else: 531 self.handle_expt() 532 533 def handle_error(self): 534 nil, t, v, tbinfo = self.compact_traceback() 535 536 # sometimes a user repr method will crash. 537 try: 538 self_repr = repr(self) 539 except: # pragma: no cover 540 self_repr = "<__repr__(self) failed for object at %0x>" % id(self) 541 542 self.log_info( 543 "uncaptured python exception, closing channel %s (%s:%s %s)" 544 % (self_repr, t, v, tbinfo), 545 "error", 546 ) 547 self.handle_close() 548 549 def handle_expt(self): 550 self.log_info("unhandled incoming priority event", "warning") 551 552 def handle_read(self): 553 self.log_info("unhandled read event", "warning") 554 555 def handle_write(self): 556 self.log_info("unhandled write event", "warning") 557 558 def handle_connect(self): 559 self.log_info("unhandled connect event", "warning") 560 561 def handle_accept(self): 562 pair = self.accept() 563 if pair is not None: 564 self.handle_accepted(*pair) 565 566 def handle_accepted(self, sock, addr): 567 sock.close() 568 self.log_info("unhandled accepted event", "warning") 569 570 def handle_close(self): 571 self.log_info("unhandled close event", "warning") 572 self.close() 573 574 575# --------------------------------------------------------------------------- 576# adds simple buffered output capability, useful for simple clients. 577# [for more sophisticated usage use asynchat.async_chat] 578# --------------------------------------------------------------------------- 579 580 581class dispatcher_with_send(dispatcher): 582 def __init__(self, sock=None, map=None): 583 dispatcher.__init__(self, sock, map) 584 self.out_buffer = b"" 585 586 def initiate_send(self): 587 num_sent = 0 588 num_sent = dispatcher.send(self, self.out_buffer[:65536]) 589 self.out_buffer = self.out_buffer[num_sent:] 590 591 handle_write = initiate_send 592 593 def writable(self): 594 return (not self.connected) or len(self.out_buffer) 595 596 def send(self, data): 597 if self.debug: # pragma: no cover 598 self.log_info("sending %s" % repr(data)) 599 self.out_buffer = self.out_buffer + data 600 self.initiate_send() 601 602 603def close_all(map=None, ignore_all=False): 604 if map is None: # pragma: no cover 605 map = socket_map 606 for x in list(map.values()): # list() FBO py3 607 try: 608 x.close() 609 except OSError as x: 610 if x.args[0] == EBADF: 611 pass 612 elif not ignore_all: 613 raise 614 except _reraised_exceptions: 615 raise 616 except: 617 if not ignore_all: 618 raise 619 map.clear() 620 621 622# Asynchronous File I/O: 623# 624# After a little research (reading man pages on various unixen, and 625# digging through the linux kernel), I've determined that select() 626# isn't meant for doing asynchronous file i/o. 627# Heartening, though - reading linux/mm/filemap.c shows that linux 628# supports asynchronous read-ahead. So _MOST_ of the time, the data 629# will be sitting in memory for us already when we go to read it. 630# 631# What other OS's (besides NT) support async file i/o? [VMS?] 632# 633# Regardless, this is useful for pipes, and stdin/stdout... 634 635if os.name == "posix": 636 637 class file_wrapper: 638 # Here we override just enough to make a file 639 # look like a socket for the purposes of asyncore. 640 # The passed fd is automatically os.dup()'d 641 642 def __init__(self, fd): 643 self.fd = os.dup(fd) 644 645 def __del__(self): 646 if self.fd >= 0: 647 warnings.warn("unclosed file %r" % self, ResourceWarning) 648 self.close() 649 650 def recv(self, *args): 651 return os.read(self.fd, *args) 652 653 def send(self, *args): 654 return os.write(self.fd, *args) 655 656 def getsockopt(self, level, optname, buflen=None): # pragma: no cover 657 if level == socket.SOL_SOCKET and optname == socket.SO_ERROR and not buflen: 658 return 0 659 raise NotImplementedError( 660 "Only asyncore specific behaviour " "implemented." 661 ) 662 663 read = recv 664 write = send 665 666 def close(self): 667 if self.fd < 0: 668 return 669 fd = self.fd 670 self.fd = -1 671 os.close(fd) 672 673 def fileno(self): 674 return self.fd 675 676 class file_dispatcher(dispatcher): 677 def __init__(self, fd, map=None): 678 dispatcher.__init__(self, None, map) 679 self.connected = True 680 try: 681 fd = fd.fileno() 682 except AttributeError: 683 pass 684 self.set_file(fd) 685 # set it to non-blocking mode 686 os.set_blocking(fd, False) 687 688 def set_file(self, fd): 689 self.socket = file_wrapper(fd) 690 self._fileno = self.socket.fileno() 691 self.add_channel() 692