1"""Generic socket server classes. 2 3This module tries to capture the various aspects of defining a server: 4 5For socket-based servers: 6 7- address family: 8 - AF_INET{,6}: IP (Internet Protocol) sockets (default) 9 - AF_UNIX: Unix domain sockets 10 - others, e.g. AF_DECNET are conceivable (see <socket.h> 11- socket type: 12 - SOCK_STREAM (reliable stream, e.g. TCP) 13 - SOCK_DGRAM (datagrams, e.g. UDP) 14 15For request-based servers (including socket-based): 16 17- client address verification before further looking at the request 18 (This is actually a hook for any processing that needs to look 19 at the request before anything else, e.g. logging) 20- how to handle multiple requests: 21 - synchronous (one request is handled at a time) 22 - forking (each request is handled by a new process) 23 - threading (each request is handled by a new thread) 24 25The classes in this module favor the server type that is simplest to 26write: a synchronous TCP/IP server. This is bad class design, but 27save some typing. (There's also the issue that a deep class hierarchy 28slows down method lookups.) 29 30There are five classes in an inheritance diagram, four of which represent 31synchronous servers of four types: 32 33 +------------+ 34 | BaseServer | 35 +------------+ 36 | 37 v 38 +-----------+ +------------------+ 39 | TCPServer |------->| UnixStreamServer | 40 +-----------+ +------------------+ 41 | 42 v 43 +-----------+ +--------------------+ 44 | UDPServer |------->| UnixDatagramServer | 45 +-----------+ +--------------------+ 46 47Note that UnixDatagramServer derives from UDPServer, not from 48UnixStreamServer -- the only difference between an IP and a Unix 49stream server is the address family, which is simply repeated in both 50unix server classes. 51 52Forking and threading versions of each type of server can be created 53using the ForkingMixIn and ThreadingMixIn mix-in classes. For 54instance, a threading UDP server class is created as follows: 55 56 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 57 58The Mix-in class must come first, since it overrides a method defined 59in UDPServer! Setting the various member variables also changes 60the behavior of the underlying server mechanism. 61 62To implement a service, you must derive a class from 63BaseRequestHandler and redefine its handle() method. You can then run 64various versions of the service by combining one of the server classes 65with your request handler class. 66 67The request handler class must be different for datagram or stream 68services. This can be hidden by using the request handler 69subclasses StreamRequestHandler or DatagramRequestHandler. 70 71Of course, you still have to use your head! 72 73For instance, it makes no sense to use a forking server if the service 74contains state in memory that can be modified by requests (since the 75modifications in the child process would never reach the initial state 76kept in the parent process and passed to each child). In this case, 77you can use a threading server, but you will probably have to use 78locks to avoid two requests that come in nearly simultaneous to apply 79conflicting changes to the server state. 80 81On the other hand, if you are building e.g. an HTTP server, where all 82data is stored externally (e.g. in the file system), a synchronous 83class will essentially render the service "deaf" while one request is 84being handled -- which may be for a very long time if a client is slow 85to read all the data it has requested. Here a threading or forking 86server is appropriate. 87 88In some cases, it may be appropriate to process part of a request 89synchronously, but to finish processing in a forked child depending on 90the request data. This can be implemented by using a synchronous 91server and doing an explicit fork in the request handler class 92handle() method. 93 94Another approach to handling multiple simultaneous requests in an 95environment that supports neither threads nor fork (or where these are 96too expensive or inappropriate for the service) is to maintain an 97explicit table of partially finished requests and to use select() to 98decide which request to work on next (or whether to handle a new 99incoming request). This is particularly important for stream services 100where each client can potentially be connected for a long time (if 101threads or subprocesses cannot be used). 102 103Future work: 104- Standard classes for Sun RPC (which uses either UDP or TCP) 105- Standard mix-in classes to implement various authentication 106 and encryption schemes 107- Standard framework for select-based multiplexing 108 109XXX Open problems: 110- What to do with out-of-band data? 111 112BaseServer: 113- split generic "request" functionality out into BaseServer class. 114 Copyright (C) 2000 Luke Kenneth Casson Leighton <lkcl@samba.org> 115 116 example: read entries from a SQL database (requires overriding 117 get_request() to return a table entry from the database). 118 entry is processed by a RequestHandlerClass. 119 120""" 121 122# Author of the BaseServer patch: Luke Kenneth Casson Leighton 123 124# XXX Warning! 125# There is a test suite for this module, but it cannot be run by the 126# standard regression test. 127# To run it manually, run Lib/test/test_socketserver.py. 128 129from __future__ import (absolute_import, print_function) 130 131__version__ = "0.4" 132 133 134import socket 135import select 136import sys 137import os 138import errno 139try: 140 import threading 141except ImportError: 142 import dummy_threading as threading 143 144__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer", 145 "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler", 146 "StreamRequestHandler","DatagramRequestHandler", 147 "ThreadingMixIn", "ForkingMixIn"] 148if hasattr(socket, "AF_UNIX"): 149 __all__.extend(["UnixStreamServer","UnixDatagramServer", 150 "ThreadingUnixStreamServer", 151 "ThreadingUnixDatagramServer"]) 152 153def _eintr_retry(func, *args): 154 """restart a system call interrupted by EINTR""" 155 while True: 156 try: 157 return func(*args) 158 except OSError as e: 159 if e.errno != errno.EINTR: 160 raise 161 162class BaseServer(object): 163 164 """Base class for server classes. 165 166 Methods for the caller: 167 168 - __init__(server_address, RequestHandlerClass) 169 - serve_forever(poll_interval=0.5) 170 - shutdown() 171 - handle_request() # if you do not use serve_forever() 172 - fileno() -> int # for select() 173 174 Methods that may be overridden: 175 176 - server_bind() 177 - server_activate() 178 - get_request() -> request, client_address 179 - handle_timeout() 180 - verify_request(request, client_address) 181 - server_close() 182 - process_request(request, client_address) 183 - shutdown_request(request) 184 - close_request(request) 185 - service_actions() 186 - handle_error() 187 188 Methods for derived classes: 189 190 - finish_request(request, client_address) 191 192 Class variables that may be overridden by derived classes or 193 instances: 194 195 - timeout 196 - address_family 197 - socket_type 198 - allow_reuse_address 199 200 Instance variables: 201 202 - RequestHandlerClass 203 - socket 204 205 """ 206 207 timeout = None 208 209 def __init__(self, server_address, RequestHandlerClass): 210 """Constructor. May be extended, do not override.""" 211 self.server_address = server_address 212 self.RequestHandlerClass = RequestHandlerClass 213 self.__is_shut_down = threading.Event() 214 self.__shutdown_request = False 215 216 def server_activate(self): 217 """Called by constructor to activate the server. 218 219 May be overridden. 220 221 """ 222 pass 223 224 def serve_forever(self, poll_interval=0.5): 225 """Handle one request at a time until shutdown. 226 227 Polls for shutdown every poll_interval seconds. Ignores 228 self.timeout. If you need to do periodic tasks, do them in 229 another thread. 230 """ 231 self.__is_shut_down.clear() 232 try: 233 while not self.__shutdown_request: 234 # XXX: Consider using another file descriptor or 235 # connecting to the socket to wake this up instead of 236 # polling. Polling reduces our responsiveness to a 237 # shutdown request and wastes cpu at all other times. 238 r, w, e = _eintr_retry(select.select, [self], [], [], 239 poll_interval) 240 if self in r: 241 self._handle_request_noblock() 242 243 self.service_actions() 244 finally: 245 self.__shutdown_request = False 246 self.__is_shut_down.set() 247 248 def shutdown(self): 249 """Stops the serve_forever loop. 250 251 Blocks until the loop has finished. This must be called while 252 serve_forever() is running in another thread, or it will 253 deadlock. 254 """ 255 self.__shutdown_request = True 256 self.__is_shut_down.wait() 257 258 def service_actions(self): 259 """Called by the serve_forever() loop. 260 261 May be overridden by a subclass / Mixin to implement any code that 262 needs to be run during the loop. 263 """ 264 pass 265 266 # The distinction between handling, getting, processing and 267 # finishing a request is fairly arbitrary. Remember: 268 # 269 # - handle_request() is the top-level call. It calls 270 # select, get_request(), verify_request() and process_request() 271 # - get_request() is different for stream or datagram sockets 272 # - process_request() is the place that may fork a new process 273 # or create a new thread to finish the request 274 # - finish_request() instantiates the request handler class; 275 # this constructor will handle the request all by itself 276 277 def handle_request(self): 278 """Handle one request, possibly blocking. 279 280 Respects self.timeout. 281 """ 282 # Support people who used socket.settimeout() to escape 283 # handle_request before self.timeout was available. 284 timeout = self.socket.gettimeout() 285 if timeout is None: 286 timeout = self.timeout 287 elif self.timeout is not None: 288 timeout = min(timeout, self.timeout) 289 fd_sets = _eintr_retry(select.select, [self], [], [], timeout) 290 if not fd_sets[0]: 291 self.handle_timeout() 292 return 293 self._handle_request_noblock() 294 295 def _handle_request_noblock(self): 296 """Handle one request, without blocking. 297 298 I assume that select.select has returned that the socket is 299 readable before this function was called, so there should be 300 no risk of blocking in get_request(). 301 """ 302 try: 303 request, client_address = self.get_request() 304 except socket.error: 305 return 306 if self.verify_request(request, client_address): 307 try: 308 self.process_request(request, client_address) 309 except: 310 self.handle_error(request, client_address) 311 self.shutdown_request(request) 312 313 def handle_timeout(self): 314 """Called if no new request arrives within self.timeout. 315 316 Overridden by ForkingMixIn. 317 """ 318 pass 319 320 def verify_request(self, request, client_address): 321 """Verify the request. May be overridden. 322 323 Return True if we should proceed with this request. 324 325 """ 326 return True 327 328 def process_request(self, request, client_address): 329 """Call finish_request. 330 331 Overridden by ForkingMixIn and ThreadingMixIn. 332 333 """ 334 self.finish_request(request, client_address) 335 self.shutdown_request(request) 336 337 def server_close(self): 338 """Called to clean-up the server. 339 340 May be overridden. 341 342 """ 343 pass 344 345 def finish_request(self, request, client_address): 346 """Finish one request by instantiating RequestHandlerClass.""" 347 self.RequestHandlerClass(request, client_address, self) 348 349 def shutdown_request(self, request): 350 """Called to shutdown and close an individual request.""" 351 self.close_request(request) 352 353 def close_request(self, request): 354 """Called to clean up an individual request.""" 355 pass 356 357 def handle_error(self, request, client_address): 358 """Handle an error gracefully. May be overridden. 359 360 The default is to print a traceback and continue. 361 362 """ 363 print('-'*40) 364 print('Exception happened during processing of request from', end=' ') 365 print(client_address) 366 import traceback 367 traceback.print_exc() # XXX But this goes to stderr! 368 print('-'*40) 369 370 371class TCPServer(BaseServer): 372 373 """Base class for various socket-based server classes. 374 375 Defaults to synchronous IP stream (i.e., TCP). 376 377 Methods for the caller: 378 379 - __init__(server_address, RequestHandlerClass, bind_and_activate=True) 380 - serve_forever(poll_interval=0.5) 381 - shutdown() 382 - handle_request() # if you don't use serve_forever() 383 - fileno() -> int # for select() 384 385 Methods that may be overridden: 386 387 - server_bind() 388 - server_activate() 389 - get_request() -> request, client_address 390 - handle_timeout() 391 - verify_request(request, client_address) 392 - process_request(request, client_address) 393 - shutdown_request(request) 394 - close_request(request) 395 - handle_error() 396 397 Methods for derived classes: 398 399 - finish_request(request, client_address) 400 401 Class variables that may be overridden by derived classes or 402 instances: 403 404 - timeout 405 - address_family 406 - socket_type 407 - request_queue_size (only for stream sockets) 408 - allow_reuse_address 409 410 Instance variables: 411 412 - server_address 413 - RequestHandlerClass 414 - socket 415 416 """ 417 418 address_family = socket.AF_INET 419 420 socket_type = socket.SOCK_STREAM 421 422 request_queue_size = 5 423 424 allow_reuse_address = False 425 426 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): 427 """Constructor. May be extended, do not override.""" 428 BaseServer.__init__(self, server_address, RequestHandlerClass) 429 self.socket = socket.socket(self.address_family, 430 self.socket_type) 431 if bind_and_activate: 432 self.server_bind() 433 self.server_activate() 434 435 def server_bind(self): 436 """Called by constructor to bind the socket. 437 438 May be overridden. 439 440 """ 441 if self.allow_reuse_address: 442 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 443 self.socket.bind(self.server_address) 444 self.server_address = self.socket.getsockname() 445 446 def server_activate(self): 447 """Called by constructor to activate the server. 448 449 May be overridden. 450 451 """ 452 self.socket.listen(self.request_queue_size) 453 454 def server_close(self): 455 """Called to clean-up the server. 456 457 May be overridden. 458 459 """ 460 self.socket.close() 461 462 def fileno(self): 463 """Return socket file number. 464 465 Interface required by select(). 466 467 """ 468 return self.socket.fileno() 469 470 def get_request(self): 471 """Get the request and client address from the socket. 472 473 May be overridden. 474 475 """ 476 return self.socket.accept() 477 478 def shutdown_request(self, request): 479 """Called to shutdown and close an individual request.""" 480 try: 481 #explicitly shutdown. socket.close() merely releases 482 #the socket and waits for GC to perform the actual close. 483 request.shutdown(socket.SHUT_WR) 484 except socket.error: 485 pass #some platforms may raise ENOTCONN here 486 self.close_request(request) 487 488 def close_request(self, request): 489 """Called to clean up an individual request.""" 490 request.close() 491 492 493class UDPServer(TCPServer): 494 495 """UDP server class.""" 496 497 allow_reuse_address = False 498 499 socket_type = socket.SOCK_DGRAM 500 501 max_packet_size = 8192 502 503 def get_request(self): 504 data, client_addr = self.socket.recvfrom(self.max_packet_size) 505 return (data, self.socket), client_addr 506 507 def server_activate(self): 508 # No need to call listen() for UDP. 509 pass 510 511 def shutdown_request(self, request): 512 # No need to shutdown anything. 513 self.close_request(request) 514 515 def close_request(self, request): 516 # No need to close anything. 517 pass 518 519class ForkingMixIn(object): 520 521 """Mix-in class to handle each request in a new process.""" 522 523 timeout = 300 524 active_children = None 525 max_children = 40 526 527 def collect_children(self): 528 """Internal routine to wait for children that have exited.""" 529 if self.active_children is None: return 530 while len(self.active_children) >= self.max_children: 531 # XXX: This will wait for any child process, not just ones 532 # spawned by this library. This could confuse other 533 # libraries that expect to be able to wait for their own 534 # children. 535 try: 536 pid, status = os.waitpid(0, 0) 537 except os.error: 538 pid = None 539 if pid not in self.active_children: continue 540 self.active_children.remove(pid) 541 542 # XXX: This loop runs more system calls than it ought 543 # to. There should be a way to put the active_children into a 544 # process group and then use os.waitpid(-pgid) to wait for any 545 # of that set, but I couldn't find a way to allocate pgids 546 # that couldn't collide. 547 for child in self.active_children: 548 try: 549 pid, status = os.waitpid(child, os.WNOHANG) 550 except os.error: 551 pid = None 552 if not pid: continue 553 try: 554 self.active_children.remove(pid) 555 except ValueError as e: 556 raise ValueError('%s. x=%d and list=%r' % (e.message, pid, 557 self.active_children)) 558 559 def handle_timeout(self): 560 """Wait for zombies after self.timeout seconds of inactivity. 561 562 May be extended, do not override. 563 """ 564 self.collect_children() 565 566 def service_actions(self): 567 """Collect the zombie child processes regularly in the ForkingMixIn. 568 569 service_actions is called in the BaseServer's serve_forver loop. 570 """ 571 self.collect_children() 572 573 def process_request(self, request, client_address): 574 """Fork a new subprocess to process the request.""" 575 pid = os.fork() 576 if pid: 577 # Parent process 578 if self.active_children is None: 579 self.active_children = [] 580 self.active_children.append(pid) 581 self.close_request(request) 582 return 583 else: 584 # Child process. 585 # This must never return, hence os._exit()! 586 try: 587 self.finish_request(request, client_address) 588 self.shutdown_request(request) 589 os._exit(0) 590 except: 591 try: 592 self.handle_error(request, client_address) 593 self.shutdown_request(request) 594 finally: 595 os._exit(1) 596 597 598class ThreadingMixIn(object): 599 """Mix-in class to handle each request in a new thread.""" 600 601 # Decides how threads will act upon termination of the 602 # main process 603 daemon_threads = False 604 605 def process_request_thread(self, request, client_address): 606 """Same as in BaseServer but as a thread. 607 608 In addition, exception handling is done here. 609 610 """ 611 try: 612 self.finish_request(request, client_address) 613 self.shutdown_request(request) 614 except: 615 self.handle_error(request, client_address) 616 self.shutdown_request(request) 617 618 def process_request(self, request, client_address): 619 """Start a new thread to process the request.""" 620 t = threading.Thread(target = self.process_request_thread, 621 args = (request, client_address)) 622 t.daemon = self.daemon_threads 623 t.start() 624 625 626class ForkingUDPServer(ForkingMixIn, UDPServer): pass 627class ForkingTCPServer(ForkingMixIn, TCPServer): pass 628 629class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 630class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 631 632if hasattr(socket, 'AF_UNIX'): 633 634 class UnixStreamServer(TCPServer): 635 address_family = socket.AF_UNIX 636 637 class UnixDatagramServer(UDPServer): 638 address_family = socket.AF_UNIX 639 640 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass 641 642 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass 643 644class BaseRequestHandler(object): 645 646 """Base class for request handler classes. 647 648 This class is instantiated for each request to be handled. The 649 constructor sets the instance variables request, client_address 650 and server, and then calls the handle() method. To implement a 651 specific service, all you need to do is to derive a class which 652 defines a handle() method. 653 654 The handle() method can find the request as self.request, the 655 client address as self.client_address, and the server (in case it 656 needs access to per-server information) as self.server. Since a 657 separate instance is created for each request, the handle() method 658 can define arbitrary other instance variariables. 659 660 """ 661 662 def __init__(self, request, client_address, server): 663 self.request = request 664 self.client_address = client_address 665 self.server = server 666 self.setup() 667 try: 668 self.handle() 669 finally: 670 self.finish() 671 672 def setup(self): 673 pass 674 675 def handle(self): 676 pass 677 678 def finish(self): 679 pass 680 681 682# The following two classes make it possible to use the same service 683# class for stream or datagram servers. 684# Each class sets up these instance variables: 685# - rfile: a file object from which receives the request is read 686# - wfile: a file object to which the reply is written 687# When the handle() method returns, wfile is flushed properly 688 689 690class StreamRequestHandler(BaseRequestHandler): 691 692 """Define self.rfile and self.wfile for stream sockets.""" 693 694 # Default buffer sizes for rfile, wfile. 695 # We default rfile to buffered because otherwise it could be 696 # really slow for large data (a getc() call per byte); we make 697 # wfile unbuffered because (a) often after a write() we want to 698 # read and we need to flush the line; (b) big writes to unbuffered 699 # files are typically optimized by stdio even when big reads 700 # aren't. 701 rbufsize = -1 702 wbufsize = 0 703 704 # A timeout to apply to the request socket, if not None. 705 timeout = None 706 707 # Disable nagle algorithm for this socket, if True. 708 # Use only when wbufsize != 0, to avoid small packets. 709 disable_nagle_algorithm = False 710 711 def setup(self): 712 self.connection = self.request 713 if self.timeout is not None: 714 self.connection.settimeout(self.timeout) 715 if self.disable_nagle_algorithm: 716 self.connection.setsockopt(socket.IPPROTO_TCP, 717 socket.TCP_NODELAY, True) 718 self.rfile = self.connection.makefile('rb', self.rbufsize) 719 self.wfile = self.connection.makefile('wb', self.wbufsize) 720 721 def finish(self): 722 if not self.wfile.closed: 723 try: 724 self.wfile.flush() 725 except socket.error: 726 # An final socket error may have occurred here, such as 727 # the local error ECONNABORTED. 728 pass 729 self.wfile.close() 730 self.rfile.close() 731 732 733class DatagramRequestHandler(BaseRequestHandler): 734 735 # XXX Regrettably, I cannot get this working on Linux; 736 # s.recvfrom() doesn't return a meaningful client address. 737 738 """Define self.rfile and self.wfile for datagram sockets.""" 739 740 def setup(self): 741 from io import BytesIO 742 self.packet, self.socket = self.request 743 self.rfile = BytesIO(self.packet) 744 self.wfile = BytesIO() 745 746 def finish(self): 747 self.socket.sendto(self.wfile.getvalue(), self.client_address) 748