1"""Generic socket server classes.
2
3This module tries to capture the various aspects of defining a server:
4
5For socket-based servers:
6
7- address family:
8        - AF_INET{,6}: IP (Internet Protocol) sockets (default)
9        - AF_UNIX: Unix domain sockets
10        - others, e.g. AF_DECNET are conceivable (see <socket.h>
11- socket type:
12        - SOCK_STREAM (reliable stream, e.g. TCP)
13        - SOCK_DGRAM (datagrams, e.g. UDP)
14
15For request-based servers (including socket-based):
16
17- client address verification before further looking at the request
18        (This is actually a hook for any processing that needs to look
19         at the request before anything else, e.g. logging)
20- how to handle multiple requests:
21        - synchronous (one request is handled at a time)
22        - forking (each request is handled by a new process)
23        - threading (each request is handled by a new thread)
24
25The classes in this module favor the server type that is simplest to
26write: a synchronous TCP/IP server.  This is bad class design, but
27save some typing.  (There's also the issue that a deep class hierarchy
28slows down method lookups.)
29
30There are five classes in an inheritance diagram, four of which represent
31synchronous servers of four types:
32
33        +------------+
34        | BaseServer |
35        +------------+
36              |
37              v
38        +-----------+        +------------------+
39        | TCPServer |------->| UnixStreamServer |
40        +-----------+        +------------------+
41              |
42              v
43        +-----------+        +--------------------+
44        | UDPServer |------->| UnixDatagramServer |
45        +-----------+        +--------------------+
46
47Note that UnixDatagramServer derives from UDPServer, not from
48UnixStreamServer -- the only difference between an IP and a Unix
49stream server is the address family, which is simply repeated in both
50unix server classes.
51
52Forking and threading versions of each type of server can be created
53using the ForkingMixIn and ThreadingMixIn mix-in classes.  For
54instance, a threading UDP server class is created as follows:
55
56        class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
57
58The Mix-in class must come first, since it overrides a method defined
59in UDPServer! Setting the various member variables also changes
60the behavior of the underlying server mechanism.
61
62To implement a service, you must derive a class from
63BaseRequestHandler and redefine its handle() method.  You can then run
64various versions of the service by combining one of the server classes
65with your request handler class.
66
67The request handler class must be different for datagram or stream
68services.  This can be hidden by using the request handler
69subclasses StreamRequestHandler or DatagramRequestHandler.
70
71Of course, you still have to use your head!
72
73For instance, it makes no sense to use a forking server if the service
74contains state in memory that can be modified by requests (since the
75modifications in the child process would never reach the initial state
76kept in the parent process and passed to each child).  In this case,
77you can use a threading server, but you will probably have to use
78locks to avoid two requests that come in nearly simultaneous to apply
79conflicting changes to the server state.
80
81On the other hand, if you are building e.g. an HTTP server, where all
82data is stored externally (e.g. in the file system), a synchronous
83class will essentially render the service "deaf" while one request is
84being handled -- which may be for a very long time if a client is slow
85to read all the data it has requested.  Here a threading or forking
86server is appropriate.
87
88In some cases, it may be appropriate to process part of a request
89synchronously, but to finish processing in a forked child depending on
90the request data.  This can be implemented by using a synchronous
91server and doing an explicit fork in the request handler class
92handle() method.
93
94Another approach to handling multiple simultaneous requests in an
95environment that supports neither threads nor fork (or where these are
96too expensive or inappropriate for the service) is to maintain an
97explicit table of partially finished requests and to use select() to
98decide which request to work on next (or whether to handle a new
99incoming request).  This is particularly important for stream services
100where each client can potentially be connected for a long time (if
101threads or subprocesses cannot be used).
102
103Future work:
104- Standard classes for Sun RPC (which uses either UDP or TCP)
105- Standard mix-in classes to implement various authentication
106  and encryption schemes
107- Standard framework for select-based multiplexing
108
109XXX Open problems:
110- What to do with out-of-band data?
111
112BaseServer:
113- split generic "request" functionality out into BaseServer class.
114  Copyright (C) 2000  Luke Kenneth Casson Leighton <lkcl@samba.org>
115
116  example: read entries from a SQL database (requires overriding
117  get_request() to return a table entry from the database).
118  entry is processed by a RequestHandlerClass.
119
120"""
121
122# Author of the BaseServer patch: Luke Kenneth Casson Leighton
123
124# XXX Warning!
125# There is a test suite for this module, but it cannot be run by the
126# standard regression test.
127# To run it manually, run Lib/test/test_socketserver.py.
128
129__version__ = "0.4"
130
131
132from _pydev_imps._pydev_saved_modules import socket
133from _pydev_imps._pydev_saved_modules import select
134import sys
135import os
136try:
137    from _pydev_imps._pydev_saved_modules import threading
138except ImportError:
139    import dummy_threading as threading
140
141__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
142           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
143           "StreamRequestHandler","DatagramRequestHandler",
144           "ThreadingMixIn", "ForkingMixIn"]
145if hasattr(socket, "AF_UNIX"):
146    __all__.extend(["UnixStreamServer","UnixDatagramServer",
147                    "ThreadingUnixStreamServer",
148                    "ThreadingUnixDatagramServer"])
149
150class BaseServer:
151
152    """Base class for server classes.
153
154    Methods for the caller:
155
156    - __init__(server_address, RequestHandlerClass)
157    - serve_forever(poll_interval=0.5)
158    - shutdown()
159    - handle_request()  # if you do not use serve_forever()
160    - fileno() -> int   # for select()
161
162    Methods that may be overridden:
163
164    - server_bind()
165    - server_activate()
166    - get_request() -> request, client_address
167    - handle_timeout()
168    - verify_request(request, client_address)
169    - server_close()
170    - process_request(request, client_address)
171    - shutdown_request(request)
172    - close_request(request)
173    - handle_error()
174
175    Methods for derived classes:
176
177    - finish_request(request, client_address)
178
179    Class variables that may be overridden by derived classes or
180    instances:
181
182    - timeout
183    - address_family
184    - socket_type
185    - allow_reuse_address
186
187    Instance variables:
188
189    - RequestHandlerClass
190    - socket
191
192    """
193
194    timeout = None
195
196    def __init__(self, server_address, RequestHandlerClass):
197        """Constructor.  May be extended, do not override."""
198        self.server_address = server_address
199        self.RequestHandlerClass = RequestHandlerClass
200        self.__is_shut_down = threading.Event()  # @UndefinedVariable
201        self.__shutdown_request = False
202
203    def server_activate(self):
204        """Called by constructor to activate the server.
205
206        May be overridden.
207
208        """
209        pass
210
211    def serve_forever(self, poll_interval=0.5):
212        """Handle one request at a time until shutdown.
213
214        Polls for shutdown every poll_interval seconds. Ignores
215        self.timeout. If you need to do periodic tasks, do them in
216        another thread.
217        """
218        self.__is_shut_down.clear()
219        try:
220            while not self.__shutdown_request:
221                # XXX: Consider using another file descriptor or
222                # connecting to the socket to wake this up instead of
223                # polling. Polling reduces our responsiveness to a
224                # shutdown request and wastes cpu at all other times.
225                r, w, e = select.select([self], [], [], poll_interval)
226                if self in r:
227                    self._handle_request_noblock()
228        finally:
229            self.__shutdown_request = False
230            self.__is_shut_down.set()
231
232    def shutdown(self):
233        """Stops the serve_forever loop.
234
235        Blocks until the loop has finished. This must be called while
236        serve_forever() is running in another thread, or it will
237        deadlock.
238        """
239        self.__shutdown_request = True
240        self.__is_shut_down.wait()
241
242    # The distinction between handling, getting, processing and
243    # finishing a request is fairly arbitrary.  Remember:
244    #
245    # - handle_request() is the top-level call.  It calls
246    #   select, get_request(), verify_request() and process_request()
247    # - get_request() is different for stream or datagram sockets
248    # - process_request() is the place that may fork a new process
249    #   or create a new thread to finish the request
250    # - finish_request() instantiates the request handler class;
251    #   this constructor will handle the request all by itself
252
253    def handle_request(self):
254        """Handle one request, possibly blocking.
255
256        Respects self.timeout.
257        """
258        # Support people who used socket.settimeout() to escape
259        # handle_request before self.timeout was available.
260        timeout = self.socket.gettimeout()
261        if timeout is None:
262            timeout = self.timeout
263        elif self.timeout is not None:
264            timeout = min(timeout, self.timeout)
265        fd_sets = select.select([self], [], [], timeout)
266        if not fd_sets[0]:
267            self.handle_timeout()
268            return
269        self._handle_request_noblock()
270
271    def _handle_request_noblock(self):
272        """Handle one request, without blocking.
273
274        I assume that select.select has returned that the socket is
275        readable before this function was called, so there should be
276        no risk of blocking in get_request().
277        """
278        try:
279            request, client_address = self.get_request()
280        except socket.error:
281            return
282        if self.verify_request(request, client_address):
283            try:
284                self.process_request(request, client_address)
285            except:
286                self.handle_error(request, client_address)
287                self.shutdown_request(request)
288
289    def handle_timeout(self):
290        """Called if no new request arrives within self.timeout.
291
292        Overridden by ForkingMixIn.
293        """
294        pass
295
296    def verify_request(self, request, client_address):
297        """Verify the request.  May be overridden.
298
299        Return True if we should proceed with this request.
300
301        """
302        return True
303
304    def process_request(self, request, client_address):
305        """Call finish_request.
306
307        Overridden by ForkingMixIn and ThreadingMixIn.
308
309        """
310        self.finish_request(request, client_address)
311        self.shutdown_request(request)
312
313    def server_close(self):
314        """Called to clean-up the server.
315
316        May be overridden.
317
318        """
319        pass
320
321    def finish_request(self, request, client_address):
322        """Finish one request by instantiating RequestHandlerClass."""
323        self.RequestHandlerClass(request, client_address, self)
324
325    def shutdown_request(self, request):
326        """Called to shutdown and close an individual request."""
327        self.close_request(request)
328
329    def close_request(self, request):
330        """Called to clean up an individual request."""
331        pass
332
333    def handle_error(self, request, client_address):
334        """Handle an error gracefully.  May be overridden.
335
336        The default is to print a traceback and continue.
337
338        """
339        print('-'*40)
340        print('Exception happened during processing of request from')
341        print(client_address)
342        import traceback
343        traceback.print_exc() # XXX But this goes to stderr!
344        print('-'*40)
345
346
347class TCPServer(BaseServer):
348
349    """Base class for various socket-based server classes.
350
351    Defaults to synchronous IP stream (i.e., TCP).
352
353    Methods for the caller:
354
355    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
356    - serve_forever(poll_interval=0.5)
357    - shutdown()
358    - handle_request()  # if you don't use serve_forever()
359    - fileno() -> int   # for select()
360
361    Methods that may be overridden:
362
363    - server_bind()
364    - server_activate()
365    - get_request() -> request, client_address
366    - handle_timeout()
367    - verify_request(request, client_address)
368    - process_request(request, client_address)
369    - shutdown_request(request)
370    - close_request(request)
371    - handle_error()
372
373    Methods for derived classes:
374
375    - finish_request(request, client_address)
376
377    Class variables that may be overridden by derived classes or
378    instances:
379
380    - timeout
381    - address_family
382    - socket_type
383    - request_queue_size (only for stream sockets)
384    - allow_reuse_address
385
386    Instance variables:
387
388    - server_address
389    - RequestHandlerClass
390    - socket
391
392    """
393
394    address_family = socket.AF_INET
395
396    socket_type = socket.SOCK_STREAM
397
398    request_queue_size = 5
399
400    allow_reuse_address = False
401
402    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
403        """Constructor.  May be extended, do not override."""
404        BaseServer.__init__(self, server_address, RequestHandlerClass)
405        self.socket = socket.socket(self.address_family,
406                                    self.socket_type)
407        if bind_and_activate:
408            self.server_bind()
409            self.server_activate()
410
411    def server_bind(self):
412        """Called by constructor to bind the socket.
413
414        May be overridden.
415
416        """
417        if self.allow_reuse_address:
418            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
419        self.socket.bind(self.server_address)
420        self.server_address = self.socket.getsockname()
421
422    def server_activate(self):
423        """Called by constructor to activate the server.
424
425        May be overridden.
426
427        """
428        self.socket.listen(self.request_queue_size)
429
430    def server_close(self):
431        """Called to clean-up the server.
432
433        May be overridden.
434
435        """
436        self.socket.close()
437
438    def fileno(self):
439        """Return socket file number.
440
441        Interface required by select().
442
443        """
444        return self.socket.fileno()
445
446    def get_request(self):
447        """Get the request and client address from the socket.
448
449        May be overridden.
450
451        """
452        return self.socket.accept()
453
454    def shutdown_request(self, request):
455        """Called to shutdown and close an individual request."""
456        try:
457            #explicitly shutdown.  socket.close() merely releases
458            #the socket and waits for GC to perform the actual close.
459            request.shutdown(socket.SHUT_WR)
460        except socket.error:
461            pass #some platforms may raise ENOTCONN here
462        self.close_request(request)
463
464    def close_request(self, request):
465        """Called to clean up an individual request."""
466        request.close()
467
468
469class UDPServer(TCPServer):
470
471    """UDP server class."""
472
473    allow_reuse_address = False
474
475    socket_type = socket.SOCK_DGRAM
476
477    max_packet_size = 8192
478
479    def get_request(self):
480        data, client_addr = self.socket.recvfrom(self.max_packet_size)
481        return (data, self.socket), client_addr
482
483    def server_activate(self):
484        # No need to call listen() for UDP.
485        pass
486
487    def shutdown_request(self, request):
488        # No need to shutdown anything.
489        self.close_request(request)
490
491    def close_request(self, request):
492        # No need to close anything.
493        pass
494
495class ForkingMixIn:
496
497    """Mix-in class to handle each request in a new process."""
498
499    timeout = 300
500    active_children = None
501    max_children = 40
502
503    def collect_children(self):
504        """Internal routine to wait for children that have exited."""
505        if self.active_children is None: return
506        while len(self.active_children) >= self.max_children:
507            # XXX: This will wait for any child process, not just ones
508            # spawned by this library. This could confuse other
509            # libraries that expect to be able to wait for their own
510            # children.
511            try:
512                pid, status = os.waitpid(0, 0)
513            except os.error:
514                pid = None
515            if pid not in self.active_children: continue
516            self.active_children.remove(pid)
517
518        # XXX: This loop runs more system calls than it ought
519        # to. There should be a way to put the active_children into a
520        # process group and then use os.waitpid(-pgid) to wait for any
521        # of that set, but I couldn't find a way to allocate pgids
522        # that couldn't collide.
523        for child in self.active_children:
524            try:
525                pid, status = os.waitpid(child, os.WNOHANG)  # @UndefinedVariable
526            except os.error:
527                pid = None
528            if not pid: continue
529            try:
530                self.active_children.remove(pid)
531            except ValueError as e:
532                raise ValueError('%s. x=%d and list=%r' % (e.message, pid,
533                                                           self.active_children))
534
535    def handle_timeout(self):
536        """Wait for zombies after self.timeout seconds of inactivity.
537
538        May be extended, do not override.
539        """
540        self.collect_children()
541
542    def process_request(self, request, client_address):
543        """Fork a new subprocess to process the request."""
544        self.collect_children()
545        pid = os.fork()  # @UndefinedVariable
546        if pid:
547            # Parent process
548            if self.active_children is None:
549                self.active_children = []
550            self.active_children.append(pid)
551            self.close_request(request) #close handle in parent process
552            return
553        else:
554            # Child process.
555            # This must never return, hence os._exit()!
556            try:
557                self.finish_request(request, client_address)
558                self.shutdown_request(request)
559                os._exit(0)
560            except:
561                try:
562                    self.handle_error(request, client_address)
563                    self.shutdown_request(request)
564                finally:
565                    os._exit(1)
566
567
568class ThreadingMixIn:
569    """Mix-in class to handle each request in a new thread."""
570
571    # Decides how threads will act upon termination of the
572    # main process
573    daemon_threads = False
574
575    def process_request_thread(self, request, client_address):
576        """Same as in BaseServer but as a thread.
577
578        In addition, exception handling is done here.
579
580        """
581        try:
582            self.finish_request(request, client_address)
583            self.shutdown_request(request)
584        except:
585            self.handle_error(request, client_address)
586            self.shutdown_request(request)
587
588    def process_request(self, request, client_address):
589        """Start a new thread to process the request."""
590        t = threading.Thread(target = self.process_request_thread,  # @UndefinedVariable
591                             args = (request, client_address))
592        t.daemon = self.daemon_threads
593        t.start()
594
595
596class ForkingUDPServer(ForkingMixIn, UDPServer): pass
597class ForkingTCPServer(ForkingMixIn, TCPServer): pass
598
599class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
600class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
601
602if hasattr(socket, 'AF_UNIX'):
603
604    class UnixStreamServer(TCPServer):
605        address_family = socket.AF_UNIX  # @UndefinedVariable
606
607    class UnixDatagramServer(UDPServer):
608        address_family = socket.AF_UNIX  # @UndefinedVariable
609
610    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
611
612    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
613
614class BaseRequestHandler:
615
616    """Base class for request handler classes.
617
618    This class is instantiated for each request to be handled.  The
619    constructor sets the instance variables request, client_address
620    and server, and then calls the handle() method.  To implement a
621    specific service, all you need to do is to derive a class which
622    defines a handle() method.
623
624    The handle() method can find the request as self.request, the
625    client address as self.client_address, and the server (in case it
626    needs access to per-server information) as self.server.  Since a
627    separate instance is created for each request, the handle() method
628    can define arbitrary other instance variariables.
629
630    """
631
632    def __init__(self, request, client_address, server):
633        self.request = request
634        self.client_address = client_address
635        self.server = server
636        self.setup()
637        try:
638            self.handle()
639        finally:
640            self.finish()
641
642    def setup(self):
643        pass
644
645    def handle(self):
646        pass
647
648    def finish(self):
649        pass
650
651
652# The following two classes make it possible to use the same service
653# class for stream or datagram servers.
654# Each class sets up these instance variables:
655# - rfile: a file object from which receives the request is read
656# - wfile: a file object to which the reply is written
657# When the handle() method returns, wfile is flushed properly
658
659
660class StreamRequestHandler(BaseRequestHandler):
661
662    """Define self.rfile and self.wfile for stream sockets."""
663
664    # Default buffer sizes for rfile, wfile.
665    # We default rfile to buffered because otherwise it could be
666    # really slow for large data (a getc() call per byte); we make
667    # wfile unbuffered because (a) often after a write() we want to
668    # read and we need to flush the line; (b) big writes to unbuffered
669    # files are typically optimized by stdio even when big reads
670    # aren't.
671    rbufsize = -1
672    wbufsize = 0
673
674    # A timeout to apply to the request socket, if not None.
675    timeout = None
676
677    # Disable nagle algorithm for this socket, if True.
678    # Use only when wbufsize != 0, to avoid small packets.
679    disable_nagle_algorithm = False
680
681    def setup(self):
682        self.connection = self.request
683        if self.timeout is not None:
684            self.connection.settimeout(self.timeout)
685        if self.disable_nagle_algorithm:
686            self.connection.setsockopt(socket.IPPROTO_TCP,
687                                       socket.TCP_NODELAY, True)
688        self.rfile = self.connection.makefile('rb', self.rbufsize)
689        self.wfile = self.connection.makefile('wb', self.wbufsize)
690
691    def finish(self):
692        if not self.wfile.closed:
693            self.wfile.flush()
694        self.wfile.close()
695        self.rfile.close()
696
697
698class DatagramRequestHandler(BaseRequestHandler):
699
700    # XXX Regrettably, I cannot get this working on Linux;
701    # s.recvfrom() doesn't return a meaningful client address.
702
703    """Define self.rfile and self.wfile for datagram sockets."""
704
705    def setup(self):
706        try:
707            from cStringIO import StringIO
708        except ImportError:
709            from StringIO import StringIO
710        self.packet, self.socket = self.request
711        self.rfile = StringIO(self.packet)
712        self.wfile = StringIO()
713
714    def finish(self):
715        self.socket.sendto(self.wfile.getvalue(), self.client_address)
716