1# Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com>
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions
6# are met:
7# 1. Redistributions of source code must retain the above copyright
8#    notice, this list of conditions and the following disclaimer.
9# 2. Redistributions in binary form must reproduce the above copyright
10#    notice, this list of conditions and the following disclaimer in the
11#    documentation and/or other materials provided with the distribution.
12#
13# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23# SUCH DAMAGE.
24#
25# $Id$
26
27__author__ = 'Allan Saddi <allan@saddi.com>'
28__version__ = '$Revision$'
29
30import sys
31import os
32import signal
33import struct
34import cStringIO as StringIO
35import select
36import socket
37import errno
38import traceback
39
40try:
41    import thread
42    import threading
43    thread_available = True
44except ImportError:
45    import dummy_thread as thread
46    import dummy_threading as threading
47    thread_available = False
48
49# Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case.
50if not hasattr(socket, 'SHUT_WR'):
51    socket.SHUT_WR = 1
52
53__all__ = ['BaseFCGIServer']
54
55# Constants from the spec.
56FCGI_LISTENSOCK_FILENO = 0
57
58FCGI_HEADER_LEN = 8
59
60FCGI_VERSION_1 = 1
61
62FCGI_BEGIN_REQUEST = 1
63FCGI_ABORT_REQUEST = 2
64FCGI_END_REQUEST = 3
65FCGI_PARAMS = 4
66FCGI_STDIN = 5
67FCGI_STDOUT = 6
68FCGI_STDERR = 7
69FCGI_DATA = 8
70FCGI_GET_VALUES = 9
71FCGI_GET_VALUES_RESULT = 10
72FCGI_UNKNOWN_TYPE = 11
73FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
74
75FCGI_NULL_REQUEST_ID = 0
76
77FCGI_KEEP_CONN = 1
78
79FCGI_RESPONDER = 1
80FCGI_AUTHORIZER = 2
81FCGI_FILTER = 3
82
83FCGI_REQUEST_COMPLETE = 0
84FCGI_CANT_MPX_CONN = 1
85FCGI_OVERLOADED = 2
86FCGI_UNKNOWN_ROLE = 3
87
88FCGI_MAX_CONNS = 'FCGI_MAX_CONNS'
89FCGI_MAX_REQS = 'FCGI_MAX_REQS'
90FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS'
91
92FCGI_Header = '!BBHHBx'
93FCGI_BeginRequestBody = '!HB5x'
94FCGI_EndRequestBody = '!LB3x'
95FCGI_UnknownTypeBody = '!B7x'
96
97FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody)
98FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody)
99
100if __debug__:
101    import time
102
103    # Set non-zero to write debug output to a file.
104    DEBUG = 0
105    DEBUGLOG = '/tmp/fcgi.log'
106
107    def _debug(level, msg):
108        if DEBUG < level:
109            return
110
111        try:
112            f = open(DEBUGLOG, 'a')
113            f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg))
114            f.close()
115        except:
116            pass
117
118class InputStream(object):
119    """
120    File-like object representing FastCGI input streams (FCGI_STDIN and
121    FCGI_DATA). Supports the minimum methods required by WSGI spec.
122    """
123    def __init__(self, conn):
124        self._conn = conn
125
126        # See Server.
127        self._shrinkThreshold = conn.server.inputStreamShrinkThreshold
128
129        self._buf = ''
130        self._bufList = []
131        self._pos = 0 # Current read position.
132        self._avail = 0 # Number of bytes currently available.
133
134        self._eof = False # True when server has sent EOF notification.
135
136    def _shrinkBuffer(self):
137        """Gets rid of already read data (since we can't rewind)."""
138        if self._pos >= self._shrinkThreshold:
139            self._buf = self._buf[self._pos:]
140            self._avail -= self._pos
141            self._pos = 0
142
143            assert self._avail >= 0
144
145    def _waitForData(self):
146        """Waits for more data to become available."""
147        self._conn.process_input()
148
149    def read(self, n=-1):
150        if self._pos == self._avail and self._eof:
151            return ''
152        while True:
153            if n < 0 or (self._avail - self._pos) < n:
154                # Not enough data available.
155                if self._eof:
156                    # And there's no more coming.
157                    newPos = self._avail
158                    break
159                else:
160                    # Wait for more data.
161                    self._waitForData()
162                    continue
163            else:
164                newPos = self._pos + n
165                break
166        # Merge buffer list, if necessary.
167        if self._bufList:
168            self._buf += ''.join(self._bufList)
169            self._bufList = []
170        r = self._buf[self._pos:newPos]
171        self._pos = newPos
172        self._shrinkBuffer()
173        return r
174
175    def readline(self, length=None):
176        if self._pos == self._avail and self._eof:
177            return ''
178        while True:
179            # Unfortunately, we need to merge the buffer list early.
180            if self._bufList:
181                self._buf += ''.join(self._bufList)
182                self._bufList = []
183            # Find newline.
184            i = self._buf.find('\n', self._pos)
185            if i < 0:
186                # Not found?
187                if self._eof:
188                    # No more data coming.
189                    newPos = self._avail
190                    break
191                else:
192                    if length is not None and len(self._buf) >= length + self._pos:
193                        newPos = self._pos + length
194                        break
195                    # Wait for more to come.
196                    self._waitForData()
197                    continue
198            else:
199                newPos = i + 1
200                break
201        r = self._buf[self._pos:newPos]
202        self._pos = newPos
203        self._shrinkBuffer()
204        return r
205
206    def readlines(self, sizehint=0):
207        total = 0
208        lines = []
209        line = self.readline()
210        while line:
211            lines.append(line)
212            total += len(line)
213            if 0 < sizehint <= total:
214                break
215            line = self.readline()
216        return lines
217
218    def __iter__(self):
219        return self
220
221    def next(self):
222        r = self.readline()
223        if not r:
224            raise StopIteration
225        return r
226
227    def add_data(self, data):
228        if not data:
229            self._eof = True
230        else:
231            self._bufList.append(data)
232            self._avail += len(data)
233
234class MultiplexedInputStream(InputStream):
235    """
236    A version of InputStream meant to be used with MultiplexedConnections.
237    Assumes the MultiplexedConnection (the producer) and the Request
238    (the consumer) are running in different threads.
239    """
240    def __init__(self, conn):
241        super(MultiplexedInputStream, self).__init__(conn)
242
243        # Arbitrates access to this InputStream (it's used simultaneously
244        # by a Request and its owning Connection object).
245        lock = threading.RLock()
246
247        # Notifies Request thread that there is new data available.
248        self._lock = threading.Condition(lock)
249
250    def _waitForData(self):
251        # Wait for notification from add_data().
252        self._lock.wait()
253
254    def read(self, n=-1):
255        self._lock.acquire()
256        try:
257            return super(MultiplexedInputStream, self).read(n)
258        finally:
259            self._lock.release()
260
261    def readline(self, length=None):
262        self._lock.acquire()
263        try:
264            return super(MultiplexedInputStream, self).readline(length)
265        finally:
266            self._lock.release()
267
268    def add_data(self, data):
269        self._lock.acquire()
270        try:
271            super(MultiplexedInputStream, self).add_data(data)
272            self._lock.notify()
273        finally:
274            self._lock.release()
275
276class OutputStream(object):
277    """
278    FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to
279    write() or writelines() immediately result in Records being sent back
280    to the server. Buffering should be done in a higher level!
281    """
282    def __init__(self, conn, req, type, buffered=False):
283        self._conn = conn
284        self._req = req
285        self._type = type
286        self._buffered = buffered
287        self._bufList = [] # Used if buffered is True
288        self.dataWritten = False
289        self.closed = False
290
291    def _write(self, data):
292        length = len(data)
293        while length:
294            toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN)
295
296            rec = Record(self._type, self._req.requestId)
297            rec.contentLength = toWrite
298            rec.contentData = data[:toWrite]
299            self._conn.writeRecord(rec)
300
301            data = data[toWrite:]
302            length -= toWrite
303
304    def write(self, data):
305        assert not self.closed
306
307        if not data:
308            return
309
310        self.dataWritten = True
311
312        if self._buffered:
313            self._bufList.append(data)
314        else:
315            self._write(data)
316
317    def writelines(self, lines):
318        assert not self.closed
319
320        for line in lines:
321            self.write(line)
322
323    def flush(self):
324        # Only need to flush if this OutputStream is actually buffered.
325        if self._buffered:
326            data = ''.join(self._bufList)
327            self._bufList = []
328            self._write(data)
329
330    # Though available, the following should NOT be called by WSGI apps.
331    def close(self):
332        """Sends end-of-stream notification, if necessary."""
333        if not self.closed and self.dataWritten:
334            self.flush()
335            rec = Record(self._type, self._req.requestId)
336            self._conn.writeRecord(rec)
337            self.closed = True
338
339class TeeOutputStream(object):
340    """
341    Simple wrapper around two or more output file-like objects that copies
342    written data to all streams.
343    """
344    def __init__(self, streamList):
345        self._streamList = streamList
346
347    def write(self, data):
348        for f in self._streamList:
349            f.write(data)
350
351    def writelines(self, lines):
352        for line in lines:
353            self.write(line)
354
355    def flush(self):
356        for f in self._streamList:
357            f.flush()
358
359class StdoutWrapper(object):
360    """
361    Wrapper for sys.stdout so we know if data has actually been written.
362    """
363    def __init__(self, stdout):
364        self._file = stdout
365        self.dataWritten = False
366
367    def write(self, data):
368        if data:
369            self.dataWritten = True
370        self._file.write(data)
371
372    def writelines(self, lines):
373        for line in lines:
374            self.write(line)
375
376    def __getattr__(self, name):
377        return getattr(self._file, name)
378
379def decode_pair(s, pos=0):
380    """
381    Decodes a name/value pair.
382
383    The number of bytes decoded as well as the name/value pair
384    are returned.
385    """
386    nameLength = ord(s[pos])
387    if nameLength & 128:
388        nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
389        pos += 4
390    else:
391        pos += 1
392
393    valueLength = ord(s[pos])
394    if valueLength & 128:
395        valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff
396        pos += 4
397    else:
398        pos += 1
399
400    name = s[pos:pos+nameLength]
401    pos += nameLength
402    value = s[pos:pos+valueLength]
403    pos += valueLength
404
405    return (pos, (name, value))
406
407def encode_pair(name, value):
408    """
409    Encodes a name/value pair.
410
411    The encoded string is returned.
412    """
413    nameLength = len(name)
414    if nameLength < 128:
415        s = chr(nameLength)
416    else:
417        s = struct.pack('!L', nameLength | 0x80000000L)
418
419    valueLength = len(value)
420    if valueLength < 128:
421        s += chr(valueLength)
422    else:
423        s += struct.pack('!L', valueLength | 0x80000000L)
424
425    return s + name + value
426
427class Record(object):
428    """
429    A FastCGI Record.
430
431    Used for encoding/decoding records.
432    """
433    def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID):
434        self.version = FCGI_VERSION_1
435        self.type = type
436        self.requestId = requestId
437        self.contentLength = 0
438        self.paddingLength = 0
439        self.contentData = ''
440
441    def _recvall(sock, length):
442        """
443        Attempts to receive length bytes from a socket, blocking if necessary.
444        (Socket may be blocking or non-blocking.)
445        """
446        dataList = []
447        recvLen = 0
448        while length:
449            try:
450                data = sock.recv(length)
451            except socket.error, e:
452                if e[0] == errno.EAGAIN:
453                    select.select([sock], [], [])
454                    continue
455                else:
456                    raise
457            if not data: # EOF
458                break
459            dataList.append(data)
460            dataLen = len(data)
461            recvLen += dataLen
462            length -= dataLen
463        return ''.join(dataList), recvLen
464    _recvall = staticmethod(_recvall)
465
466    def read(self, sock):
467        """Read and decode a Record from a socket."""
468        try:
469            header, length = self._recvall(sock, FCGI_HEADER_LEN)
470        except:
471            raise EOFError
472
473        if length < FCGI_HEADER_LEN:
474            raise EOFError
475
476        self.version, self.type, self.requestId, self.contentLength, \
477                      self.paddingLength = struct.unpack(FCGI_Header, header)
478
479        if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, '
480                             'contentLength = %d' %
481                             (sock.fileno(), self.type, self.requestId,
482                              self.contentLength))
483
484        if self.contentLength:
485            try:
486                self.contentData, length = self._recvall(sock,
487                                                         self.contentLength)
488            except:
489                raise EOFError
490
491            if length < self.contentLength:
492                raise EOFError
493
494        if self.paddingLength:
495            try:
496                self._recvall(sock, self.paddingLength)
497            except:
498                raise EOFError
499
500    def _sendall(sock, data):
501        """
502        Writes data to a socket and does not return until all the data is sent.
503        """
504        length = len(data)
505        while length:
506            try:
507                sent = sock.send(data)
508            except socket.error, e:
509                if e[0] == errno.EAGAIN:
510                    select.select([], [sock], [])
511                    continue
512                else:
513                    raise
514            data = data[sent:]
515            length -= sent
516    _sendall = staticmethod(_sendall)
517
518    def write(self, sock):
519        """Encode and write a Record to a socket."""
520        self.paddingLength = -self.contentLength & 7
521
522        if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, '
523                             'contentLength = %d' %
524                             (sock.fileno(), self.type, self.requestId,
525                              self.contentLength))
526
527        header = struct.pack(FCGI_Header, self.version, self.type,
528                             self.requestId, self.contentLength,
529                             self.paddingLength)
530        self._sendall(sock, header)
531        if self.contentLength:
532            self._sendall(sock, self.contentData)
533        if self.paddingLength:
534            self._sendall(sock, '\x00'*self.paddingLength)
535
536class TimeoutException(Exception):
537    pass
538
539class Request(object):
540    """
541    Represents a single FastCGI request.
542
543    These objects are passed to your handler and is the main interface
544    between your handler and the fcgi module. The methods should not
545    be called by your handler. However, server, params, stdin, stdout,
546    stderr, and data are free for your handler's use.
547    """
548    def __init__(self, conn, inputStreamClass, timeout):
549        self._conn = conn
550        self._timeout = timeout
551
552        self.server = conn.server
553        self.params = {}
554        self.stdin = inputStreamClass(conn)
555        self.stdout = OutputStream(conn, self, FCGI_STDOUT)
556        self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True)
557        self.data = inputStreamClass(conn)
558
559    def timeout_handler(self, signum, frame):
560        self.stderr.write('Timeout Exceeded\n')
561        self.stderr.write("\n".join(traceback.format_stack(frame)))
562        self.stderr.flush()
563
564        raise TimeoutException
565
566    def run(self):
567        """Runs the handler, flushes the streams, and ends the request."""
568        # If there is a timeout
569        if self._timeout:
570            old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler)
571            signal.alarm(self._timeout)
572
573        try:
574            protocolStatus, appStatus = self.server.handler(self)
575        except:
576            traceback.print_exc(file=self.stderr)
577            self.stderr.flush()
578            if not self.stdout.dataWritten:
579                self.server.error(self)
580
581            protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0
582
583        if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' %
584                             (protocolStatus, appStatus))
585
586        # Restore old handler if timeout was given
587        if self._timeout:
588            signal.alarm(0)
589            signal.signal(signal.SIGALRM, old_alarm)
590
591        try:
592            self._flush()
593            self._end(appStatus, protocolStatus)
594        except socket.error, e:
595            if e[0] != errno.EPIPE:
596                raise
597
598    def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
599        self._conn.end_request(self, appStatus, protocolStatus)
600
601    def _flush(self):
602        self.stdout.close()
603        self.stderr.close()
604
605class CGIRequest(Request):
606    """A normal CGI request disguised as a FastCGI request."""
607    def __init__(self, server):
608        # These are normally filled in by Connection.
609        self.requestId = 1
610        self.role = FCGI_RESPONDER
611        self.flags = 0
612        self.aborted = False
613
614        self.server = server
615        self.params = dict(os.environ)
616        self.stdin = sys.stdin
617        self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity!
618        self.stderr = sys.stderr
619        self.data = StringIO.StringIO()
620        self._timeout = 0
621
622    def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE):
623        sys.exit(appStatus)
624
625    def _flush(self):
626        # Not buffered, do nothing.
627        pass
628
629class Connection(object):
630    """
631    A Connection with the web server.
632
633    Each Connection is associated with a single socket (which is
634    connected to the web server) and is responsible for handling all
635    the FastCGI message processing for that socket.
636    """
637    _multiplexed = False
638    _inputStreamClass = InputStream
639
640    def __init__(self, sock, addr, server, timeout):
641        self._sock = sock
642        self._addr = addr
643        self.server = server
644        self._timeout = timeout
645
646        # Active Requests for this Connection, mapped by request ID.
647        self._requests = {}
648
649    def _cleanupSocket(self):
650        """Close the Connection's socket."""
651        try:
652            self._sock.shutdown(socket.SHUT_WR)
653        except:
654            return
655        try:
656            while True:
657                r, w, e = select.select([self._sock], [], [])
658                if not r or not self._sock.recv(1024):
659                    break
660        except:
661            pass
662        self._sock.close()
663
664    def run(self):
665        """Begin processing data from the socket."""
666        self._keepGoing = True
667        while self._keepGoing:
668            try:
669                self.process_input()
670            except (EOFError, KeyboardInterrupt):
671                break
672            except (select.error, socket.error), e:
673                if e[0] == errno.EBADF: # Socket was closed by Request.
674                    break
675                raise
676
677        self._cleanupSocket()
678
679    def process_input(self):
680        """Attempt to read a single Record from the socket and process it."""
681        # Currently, any children Request threads notify this Connection
682        # that it is no longer needed by closing the Connection's socket.
683        # We need to put a timeout on select, otherwise we might get
684        # stuck in it indefinitely... (I don't like this solution.)
685        while self._keepGoing:
686            try:
687                r, w, e = select.select([self._sock], [], [], 1.0)
688            except ValueError:
689                # Sigh. ValueError gets thrown sometimes when passing select
690                # a closed socket.
691                raise EOFError
692            if r: break
693        if not self._keepGoing:
694            return
695        rec = Record()
696        rec.read(self._sock)
697
698        if rec.type == FCGI_GET_VALUES:
699            self._do_get_values(rec)
700        elif rec.type == FCGI_BEGIN_REQUEST:
701            self._do_begin_request(rec)
702        elif rec.type == FCGI_ABORT_REQUEST:
703            self._do_abort_request(rec)
704        elif rec.type == FCGI_PARAMS:
705            self._do_params(rec)
706        elif rec.type == FCGI_STDIN:
707            self._do_stdin(rec)
708        elif rec.type == FCGI_DATA:
709            self._do_data(rec)
710        elif rec.requestId == FCGI_NULL_REQUEST_ID:
711            self._do_unknown_type(rec)
712        else:
713            # Need to complain about this.
714            pass
715
716    def writeRecord(self, rec):
717        """
718        Write a Record to the socket.
719        """
720        rec.write(self._sock)
721
722    def end_request(self, req, appStatus=0L,
723                    protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
724        """
725        End a Request.
726
727        Called by Request objects. An FCGI_END_REQUEST Record is
728        sent to the web server. If the web server no longer requires
729        the connection, the socket is closed, thereby ending this
730        Connection (run() returns).
731        """
732        rec = Record(FCGI_END_REQUEST, req.requestId)
733        rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus,
734                                      protocolStatus)
735        rec.contentLength = FCGI_EndRequestBody_LEN
736        self.writeRecord(rec)
737
738        if remove:
739            del self._requests[req.requestId]
740
741        if __debug__: _debug(2, 'end_request: flags = %d' % req.flags)
742
743        if not (req.flags & FCGI_KEEP_CONN) and not self._requests:
744            self._cleanupSocket()
745            self._keepGoing = False
746
747    def _do_get_values(self, inrec):
748        """Handle an FCGI_GET_VALUES request from the web server."""
749        outrec = Record(FCGI_GET_VALUES_RESULT)
750
751        pos = 0
752        while pos < inrec.contentLength:
753            pos, (name, value) = decode_pair(inrec.contentData, pos)
754            cap = self.server.capability.get(name)
755            if cap is not None:
756                outrec.contentData += encode_pair(name, str(cap))
757
758        outrec.contentLength = len(outrec.contentData)
759        self.writeRecord(outrec)
760
761    def _do_begin_request(self, inrec):
762        """Handle an FCGI_BEGIN_REQUEST from the web server."""
763        role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData)
764
765        req = self.server.request_class(self, self._inputStreamClass,
766                                        self._timeout)
767        req.requestId, req.role, req.flags = inrec.requestId, role, flags
768        req.aborted = False
769
770        if not self._multiplexed and self._requests:
771            # Can't multiplex requests.
772            self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False)
773        else:
774            self._requests[inrec.requestId] = req
775
776    def _do_abort_request(self, inrec):
777        """
778        Handle an FCGI_ABORT_REQUEST from the web server.
779
780        We just mark a flag in the associated Request.
781        """
782        req = self._requests.get(inrec.requestId)
783        if req is not None:
784            req.aborted = True
785
786    def _start_request(self, req):
787        """Run the request."""
788        # Not multiplexed, so run it inline.
789        req.run()
790
791    def _do_params(self, inrec):
792        """
793        Handle an FCGI_PARAMS Record.
794
795        If the last FCGI_PARAMS Record is received, start the request.
796        """
797        req = self._requests.get(inrec.requestId)
798        if req is not None:
799            if inrec.contentLength:
800                pos = 0
801                while pos < inrec.contentLength:
802                    pos, (name, value) = decode_pair(inrec.contentData, pos)
803                    req.params[name] = value
804            else:
805                self._start_request(req)
806
807    def _do_stdin(self, inrec):
808        """Handle the FCGI_STDIN stream."""
809        req = self._requests.get(inrec.requestId)
810        if req is not None:
811            req.stdin.add_data(inrec.contentData)
812
813    def _do_data(self, inrec):
814        """Handle the FCGI_DATA stream."""
815        req = self._requests.get(inrec.requestId)
816        if req is not None:
817            req.data.add_data(inrec.contentData)
818
819    def _do_unknown_type(self, inrec):
820        """Handle an unknown request type. Respond accordingly."""
821        outrec = Record(FCGI_UNKNOWN_TYPE)
822        outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type)
823        outrec.contentLength = FCGI_UnknownTypeBody_LEN
824        self.writeRecord(outrec)
825
826class MultiplexedConnection(Connection):
827    """
828    A version of Connection capable of handling multiple requests
829    simultaneously.
830    """
831    _multiplexed = True
832    _inputStreamClass = MultiplexedInputStream
833
834    def __init__(self, sock, addr, server, timeout):
835        super(MultiplexedConnection, self).__init__(sock, addr, server,
836                                                    timeout)
837
838        # Used to arbitrate access to self._requests.
839        lock = threading.RLock()
840
841        # Notification is posted everytime a request completes, allowing us
842        # to quit cleanly.
843        self._lock = threading.Condition(lock)
844
845    def _cleanupSocket(self):
846        # Wait for any outstanding requests before closing the socket.
847        self._lock.acquire()
848        while self._requests:
849            self._lock.wait()
850        self._lock.release()
851
852        super(MultiplexedConnection, self)._cleanupSocket()
853
854    def writeRecord(self, rec):
855        # Must use locking to prevent intermingling of Records from different
856        # threads.
857        self._lock.acquire()
858        try:
859            # Probably faster than calling super. ;)
860            rec.write(self._sock)
861        finally:
862            self._lock.release()
863
864    def end_request(self, req, appStatus=0L,
865                    protocolStatus=FCGI_REQUEST_COMPLETE, remove=True):
866        self._lock.acquire()
867        try:
868            super(MultiplexedConnection, self).end_request(req, appStatus,
869                                                           protocolStatus,
870                                                           remove)
871            self._lock.notify()
872        finally:
873            self._lock.release()
874
875    def _do_begin_request(self, inrec):
876        self._lock.acquire()
877        try:
878            super(MultiplexedConnection, self)._do_begin_request(inrec)
879        finally:
880            self._lock.release()
881
882    def _do_abort_request(self, inrec):
883        self._lock.acquire()
884        try:
885            super(MultiplexedConnection, self)._do_abort_request(inrec)
886        finally:
887            self._lock.release()
888
889    def _start_request(self, req):
890        try:
891            thread.start_new_thread(req.run, ())
892        except thread.error, e:
893            self.end_request(req, 0L, FCGI_OVERLOADED, remove=True)
894
895    def _do_params(self, inrec):
896        self._lock.acquire()
897        try:
898            super(MultiplexedConnection, self)._do_params(inrec)
899        finally:
900            self._lock.release()
901
902    def _do_stdin(self, inrec):
903        self._lock.acquire()
904        try:
905            super(MultiplexedConnection, self)._do_stdin(inrec)
906        finally:
907            self._lock.release()
908
909    def _do_data(self, inrec):
910        self._lock.acquire()
911        try:
912            super(MultiplexedConnection, self)._do_data(inrec)
913        finally:
914            self._lock.release()
915
916class BaseFCGIServer(object):
917    request_class = Request
918    cgirequest_class = CGIRequest
919
920    # The maximum number of bytes (per Record) to write to the server.
921    # I've noticed mod_fastcgi has a relatively small receive buffer (8K or
922    # so).
923    maxwrite = 8192
924
925    # Limits the size of the InputStream's string buffer to this size + the
926    # server's maximum Record size. Since the InputStream is not seekable,
927    # we throw away already-read data once this certain amount has been read.
928    inputStreamShrinkThreshold = 102400 - 8192
929
930    def __init__(self, application, environ=None,
931                 multithreaded=True, multiprocess=False,
932                 bindAddress=None, umask=None, multiplexed=False,
933                 debug=False, roles=(FCGI_RESPONDER,),
934                 forceCGI=False):
935        """
936        bindAddress, if present, must either be a string or a 2-tuple. If
937        present, run() will open its own listening socket. You would use
938        this if you wanted to run your application as an 'external' FastCGI
939        app. (i.e. the webserver would no longer be responsible for starting
940        your app) If a string, it will be interpreted as a filename and a UNIX
941        socket will be opened. If a tuple, the first element, a string,
942        is the interface name/IP to bind to, and the second element (an int)
943        is the port number.
944
945        If binding to a UNIX socket, umask may be set to specify what
946        the umask is to be changed to before the socket is created in the
947        filesystem. After the socket is created, the previous umask is
948        restored.
949
950        Set multiplexed to True if you want to handle multiple requests
951        per connection. Some FastCGI backends (namely mod_fastcgi) don't
952        multiplex requests at all, so by default this is off (which saves
953        on thread creation/locking overhead). If threads aren't available,
954        this keyword is ignored; it's not possible to multiplex requests
955        at all.
956        """
957        if environ is None:
958            environ = {}
959
960        self.application = application
961        self.environ = environ
962        self.multithreaded = multithreaded
963        self.multiprocess = multiprocess
964        self.debug = debug
965        self.roles = roles
966        self.forceCGI = forceCGI
967
968        self._bindAddress = bindAddress
969        self._umask = umask
970
971        # Used to force single-threadedness
972        self._appLock = thread.allocate_lock()
973
974        if thread_available:
975            try:
976                import resource
977                # Attempt to glean the maximum number of connections
978                # from the OS.
979                maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
980            except ImportError:
981                maxConns = 100 # Just some made up number.
982            maxReqs = maxConns
983            if multiplexed:
984                self._connectionClass = MultiplexedConnection
985                maxReqs *= 5 # Another made up number.
986            else:
987                self._connectionClass = Connection
988            self.capability = {
989                FCGI_MAX_CONNS: maxConns,
990                FCGI_MAX_REQS: maxReqs,
991                FCGI_MPXS_CONNS: multiplexed and 1 or 0
992                }
993        else:
994            self._connectionClass = Connection
995            self.capability = {
996                # If threads aren't available, these are pretty much correct.
997                FCGI_MAX_CONNS: 1,
998                FCGI_MAX_REQS: 1,
999                FCGI_MPXS_CONNS: 0
1000                }
1001
1002    def _setupSocket(self):
1003        if self._bindAddress is None:
1004            # Run as a normal FastCGI?
1005
1006            # FastCGI/CGI discrimination is broken on Mac OS X.
1007            # Set the environment variable FCGI_FORCE_CGI to "Y" or "y"
1008            # if you want to run your app as a simple CGI. (You can do
1009            # this with Apache's mod_env [not loaded by default in OS X
1010            # client, ha ha] and the SetEnv directive.)
1011            forceCGI = self.forceCGI or \
1012               os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y')
1013
1014            if forceCGI:
1015                isFCGI = False
1016            else:
1017                if not hasattr(socket, 'fromfd'):
1018                    # can happen on win32, no socket.fromfd there!
1019                    raise ValueError(
1020                        'If you want FCGI, please create an external FCGI server '
1021                        'by providing a valid bindAddress. '
1022                        'If you want CGI, please force CGI operation. Use '
1023                        'FCGI_FORCE_CGI=Y environment or forceCGI parameter.')
1024                sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET,
1025                                     socket.SOCK_STREAM)
1026                isFCGI = True
1027                try:
1028                    sock.getpeername()
1029                except socket.error, e:
1030                    if e[0] == errno.ENOTSOCK:
1031                        # Not a socket, assume CGI context.
1032                        isFCGI = False
1033                    elif e[0] != errno.ENOTCONN:
1034                        raise
1035
1036            if not isFCGI:
1037                req = self.cgirequest_class(self)
1038                req.run()
1039                sys.exit(0)
1040        else:
1041            # Run as a server
1042            oldUmask = None
1043            if type(self._bindAddress) is str:
1044                # Unix socket
1045                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1046                try:
1047                    os.unlink(self._bindAddress)
1048                except OSError:
1049                    pass
1050                if self._umask is not None:
1051                    oldUmask = os.umask(self._umask)
1052            else:
1053                # INET socket
1054                assert type(self._bindAddress) is tuple
1055                assert len(self._bindAddress) == 2
1056                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1057                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1058
1059            sock.bind(self._bindAddress)
1060            sock.listen(socket.SOMAXCONN)
1061
1062            if oldUmask is not None:
1063                os.umask(oldUmask)
1064
1065        return sock
1066
1067    def _cleanupSocket(self, sock):
1068        """Closes the main socket."""
1069        sock.close()
1070
1071    def handler(self, req):
1072        """Special handler for WSGI."""
1073        if req.role not in self.roles:
1074            return FCGI_UNKNOWN_ROLE, 0
1075
1076        # Mostly taken from example CGI gateway.
1077        environ = req.params
1078        environ.update(self.environ)
1079
1080        environ['wsgi.version'] = (1,0)
1081        environ['wsgi.input'] = req.stdin
1082        if self._bindAddress is None:
1083            stderr = req.stderr
1084        else:
1085            stderr = TeeOutputStream((sys.stderr, req.stderr))
1086        environ['wsgi.errors'] = stderr
1087        environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \
1088                                      thread_available and self.multithreaded
1089        environ['wsgi.multiprocess'] = isinstance(req, CGIRequest) or \
1090                                       self.multiprocess
1091        environ['wsgi.run_once'] = isinstance(req, CGIRequest)
1092
1093        if environ.get('HTTPS', 'off') in ('on', '1'):
1094            environ['wsgi.url_scheme'] = 'https'
1095        else:
1096            environ['wsgi.url_scheme'] = 'http'
1097
1098        self._sanitizeEnv(environ)
1099
1100        headers_set = []
1101        headers_sent = []
1102        result = None
1103
1104        def write(data):
1105            assert type(data) is str, 'write() argument must be string'
1106            assert headers_set, 'write() before start_response()'
1107
1108            if not headers_sent:
1109                status, responseHeaders = headers_sent[:] = headers_set
1110                found = False
1111                for header,value in responseHeaders:
1112                    if header.lower() == 'content-length':
1113                        found = True
1114                        break
1115                if not found and result is not None:
1116                    try:
1117                        if len(result) == 1:
1118                            responseHeaders.append(('Content-Length',
1119                                                    str(len(data))))
1120                    except:
1121                        pass
1122                s = 'Status: %s\r\n' % status
1123                for header in responseHeaders:
1124                    s += '%s: %s\r\n' % header
1125                s += '\r\n'
1126                req.stdout.write(s)
1127
1128            req.stdout.write(data)
1129            req.stdout.flush()
1130
1131        def start_response(status, response_headers, exc_info=None):
1132            if exc_info:
1133                try:
1134                    if headers_sent:
1135                        # Re-raise if too late
1136                        raise exc_info[0], exc_info[1], exc_info[2]
1137                finally:
1138                    exc_info = None # avoid dangling circular ref
1139            else:
1140                assert not headers_set, 'Headers already set!'
1141
1142            assert type(status) is str, 'Status must be a string'
1143            assert len(status) >= 4, 'Status must be at least 4 characters'
1144            assert int(status[:3]), 'Status must begin with 3-digit code'
1145            assert status[3] == ' ', 'Status must have a space after code'
1146            assert type(response_headers) is list, 'Headers must be a list'
1147            if __debug__:
1148                for name,val in response_headers:
1149                    assert type(name) is str, 'Header name "%s" must be a string' % name
1150                    assert type(val) is str, 'Value of header "%s" must be a string' % name
1151
1152            headers_set[:] = [status, response_headers]
1153            return write
1154
1155        if not self.multithreaded:
1156            self._appLock.acquire()
1157        try:
1158            try:
1159                result = self.application(environ, start_response)
1160                try:
1161                    for data in result:
1162                        if data:
1163                            write(data)
1164                    if not headers_sent:
1165                        write('') # in case body was empty
1166                finally:
1167                    if hasattr(result, 'close'):
1168                        result.close()
1169            except socket.error, e:
1170                if e[0] != errno.EPIPE:
1171                    raise # Don't let EPIPE propagate beyond server
1172        finally:
1173            if not self.multithreaded:
1174                self._appLock.release()
1175
1176        return FCGI_REQUEST_COMPLETE, 0
1177
1178    def _sanitizeEnv(self, environ):
1179        """Ensure certain values are present, if required by WSGI."""
1180        if not environ.has_key('SCRIPT_NAME'):
1181            environ['SCRIPT_NAME'] = ''
1182
1183        reqUri = None
1184        if environ.has_key('REQUEST_URI'):
1185            reqUri = environ['REQUEST_URI'].split('?', 1)
1186
1187        if not environ.has_key('PATH_INFO') or not environ['PATH_INFO']:
1188            if reqUri is not None:
1189                scriptName = environ['SCRIPT_NAME']
1190                if not reqUri[0].startswith(scriptName):
1191                    environ['wsgi.errors'].write('WARNING: SCRIPT_NAME does not match REQUEST_URI')
1192                environ['PATH_INFO'] = reqUri[0][len(scriptName):]
1193            else:
1194                environ['PATH_INFO'] = ''
1195        if not environ.has_key('QUERY_STRING') or not environ['QUERY_STRING']:
1196            if reqUri is not None and len(reqUri) > 1:
1197                environ['QUERY_STRING'] = reqUri[1]
1198            else:
1199                environ['QUERY_STRING'] = ''
1200
1201        # If any of these are missing, it probably signifies a broken
1202        # server...
1203        for name,default in [('REQUEST_METHOD', 'GET'),
1204                             ('SERVER_NAME', 'localhost'),
1205                             ('SERVER_PORT', '80'),
1206                             ('SERVER_PROTOCOL', 'HTTP/1.0')]:
1207            if not environ.has_key(name):
1208                environ['wsgi.errors'].write('%s: missing FastCGI param %s '
1209                                             'required by WSGI!\n' %
1210                                             (self.__class__.__name__, name))
1211                environ[name] = default
1212
1213    def error(self, req):
1214        """
1215        Called by Request if an exception occurs within the handler. May and
1216        should be overridden.
1217        """
1218        if self.debug:
1219            import cgitb
1220            req.stdout.write('Status: 500 Internal Server Error\r\n' +
1221                             'Content-Type: text/html\r\n\r\n' +
1222                             cgitb.html(sys.exc_info()))
1223        else:
1224            errorpage = """<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
1225<html><head>
1226<title>Unhandled Exception</title>
1227</head><body>
1228<h1>Unhandled Exception</h1>
1229<p>An unhandled exception was thrown by the application.</p>
1230</body></html>
1231"""
1232            req.stdout.write('Status: 500 Internal Server Error\r\n' +
1233                             'Content-Type: text/html\r\n\r\n' +
1234                             errorpage)
1235