1# Copyright (c) 2002, 2003, 2005, 2006 Allan Saddi <allan@saddi.com> 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions 6# are met: 7# 1. Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# 2. Redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution. 12# 13# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 14# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 15# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 17# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 21# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 22# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 23# SUCH DAMAGE. 24# 25# $Id$ 26 27__author__ = 'Allan Saddi <allan@saddi.com>' 28__version__ = '$Revision$' 29 30import sys 31import os 32import signal 33import struct 34import cStringIO as StringIO 35import select 36import socket 37import errno 38import traceback 39 40try: 41 import thread 42 import threading 43 thread_available = True 44except ImportError: 45 import dummy_thread as thread 46 import dummy_threading as threading 47 thread_available = False 48 49# Apparently 2.3 doesn't define SHUT_WR? Assume it is 1 in this case. 50if not hasattr(socket, 'SHUT_WR'): 51 socket.SHUT_WR = 1 52 53__all__ = ['BaseFCGIServer'] 54 55# Constants from the spec. 56FCGI_LISTENSOCK_FILENO = 0 57 58FCGI_HEADER_LEN = 8 59 60FCGI_VERSION_1 = 1 61 62FCGI_BEGIN_REQUEST = 1 63FCGI_ABORT_REQUEST = 2 64FCGI_END_REQUEST = 3 65FCGI_PARAMS = 4 66FCGI_STDIN = 5 67FCGI_STDOUT = 6 68FCGI_STDERR = 7 69FCGI_DATA = 8 70FCGI_GET_VALUES = 9 71FCGI_GET_VALUES_RESULT = 10 72FCGI_UNKNOWN_TYPE = 11 73FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE 74 75FCGI_NULL_REQUEST_ID = 0 76 77FCGI_KEEP_CONN = 1 78 79FCGI_RESPONDER = 1 80FCGI_AUTHORIZER = 2 81FCGI_FILTER = 3 82 83FCGI_REQUEST_COMPLETE = 0 84FCGI_CANT_MPX_CONN = 1 85FCGI_OVERLOADED = 2 86FCGI_UNKNOWN_ROLE = 3 87 88FCGI_MAX_CONNS = 'FCGI_MAX_CONNS' 89FCGI_MAX_REQS = 'FCGI_MAX_REQS' 90FCGI_MPXS_CONNS = 'FCGI_MPXS_CONNS' 91 92FCGI_Header = '!BBHHBx' 93FCGI_BeginRequestBody = '!HB5x' 94FCGI_EndRequestBody = '!LB3x' 95FCGI_UnknownTypeBody = '!B7x' 96 97FCGI_EndRequestBody_LEN = struct.calcsize(FCGI_EndRequestBody) 98FCGI_UnknownTypeBody_LEN = struct.calcsize(FCGI_UnknownTypeBody) 99 100if __debug__: 101 import time 102 103 # Set non-zero to write debug output to a file. 104 DEBUG = 0 105 DEBUGLOG = '/tmp/fcgi.log' 106 107 def _debug(level, msg): 108 if DEBUG < level: 109 return 110 111 try: 112 f = open(DEBUGLOG, 'a') 113 f.write('%sfcgi: %s\n' % (time.ctime()[4:-4], msg)) 114 f.close() 115 except: 116 pass 117 118class InputStream(object): 119 """ 120 File-like object representing FastCGI input streams (FCGI_STDIN and 121 FCGI_DATA). Supports the minimum methods required by WSGI spec. 122 """ 123 def __init__(self, conn): 124 self._conn = conn 125 126 # See Server. 127 self._shrinkThreshold = conn.server.inputStreamShrinkThreshold 128 129 self._buf = '' 130 self._bufList = [] 131 self._pos = 0 # Current read position. 132 self._avail = 0 # Number of bytes currently available. 133 134 self._eof = False # True when server has sent EOF notification. 135 136 def _shrinkBuffer(self): 137 """Gets rid of already read data (since we can't rewind).""" 138 if self._pos >= self._shrinkThreshold: 139 self._buf = self._buf[self._pos:] 140 self._avail -= self._pos 141 self._pos = 0 142 143 assert self._avail >= 0 144 145 def _waitForData(self): 146 """Waits for more data to become available.""" 147 self._conn.process_input() 148 149 def read(self, n=-1): 150 if self._pos == self._avail and self._eof: 151 return '' 152 while True: 153 if n < 0 or (self._avail - self._pos) < n: 154 # Not enough data available. 155 if self._eof: 156 # And there's no more coming. 157 newPos = self._avail 158 break 159 else: 160 # Wait for more data. 161 self._waitForData() 162 continue 163 else: 164 newPos = self._pos + n 165 break 166 # Merge buffer list, if necessary. 167 if self._bufList: 168 self._buf += ''.join(self._bufList) 169 self._bufList = [] 170 r = self._buf[self._pos:newPos] 171 self._pos = newPos 172 self._shrinkBuffer() 173 return r 174 175 def readline(self, length=None): 176 if self._pos == self._avail and self._eof: 177 return '' 178 while True: 179 # Unfortunately, we need to merge the buffer list early. 180 if self._bufList: 181 self._buf += ''.join(self._bufList) 182 self._bufList = [] 183 # Find newline. 184 i = self._buf.find('\n', self._pos) 185 if i < 0: 186 # Not found? 187 if self._eof: 188 # No more data coming. 189 newPos = self._avail 190 break 191 else: 192 if length is not None and len(self._buf) >= length + self._pos: 193 newPos = self._pos + length 194 break 195 # Wait for more to come. 196 self._waitForData() 197 continue 198 else: 199 newPos = i + 1 200 break 201 r = self._buf[self._pos:newPos] 202 self._pos = newPos 203 self._shrinkBuffer() 204 return r 205 206 def readlines(self, sizehint=0): 207 total = 0 208 lines = [] 209 line = self.readline() 210 while line: 211 lines.append(line) 212 total += len(line) 213 if 0 < sizehint <= total: 214 break 215 line = self.readline() 216 return lines 217 218 def __iter__(self): 219 return self 220 221 def next(self): 222 r = self.readline() 223 if not r: 224 raise StopIteration 225 return r 226 227 def add_data(self, data): 228 if not data: 229 self._eof = True 230 else: 231 self._bufList.append(data) 232 self._avail += len(data) 233 234class MultiplexedInputStream(InputStream): 235 """ 236 A version of InputStream meant to be used with MultiplexedConnections. 237 Assumes the MultiplexedConnection (the producer) and the Request 238 (the consumer) are running in different threads. 239 """ 240 def __init__(self, conn): 241 super(MultiplexedInputStream, self).__init__(conn) 242 243 # Arbitrates access to this InputStream (it's used simultaneously 244 # by a Request and its owning Connection object). 245 lock = threading.RLock() 246 247 # Notifies Request thread that there is new data available. 248 self._lock = threading.Condition(lock) 249 250 def _waitForData(self): 251 # Wait for notification from add_data(). 252 self._lock.wait() 253 254 def read(self, n=-1): 255 self._lock.acquire() 256 try: 257 return super(MultiplexedInputStream, self).read(n) 258 finally: 259 self._lock.release() 260 261 def readline(self, length=None): 262 self._lock.acquire() 263 try: 264 return super(MultiplexedInputStream, self).readline(length) 265 finally: 266 self._lock.release() 267 268 def add_data(self, data): 269 self._lock.acquire() 270 try: 271 super(MultiplexedInputStream, self).add_data(data) 272 self._lock.notify() 273 finally: 274 self._lock.release() 275 276class OutputStream(object): 277 """ 278 FastCGI output stream (FCGI_STDOUT/FCGI_STDERR). By default, calls to 279 write() or writelines() immediately result in Records being sent back 280 to the server. Buffering should be done in a higher level! 281 """ 282 def __init__(self, conn, req, type, buffered=False): 283 self._conn = conn 284 self._req = req 285 self._type = type 286 self._buffered = buffered 287 self._bufList = [] # Used if buffered is True 288 self.dataWritten = False 289 self.closed = False 290 291 def _write(self, data): 292 length = len(data) 293 while length: 294 toWrite = min(length, self._req.server.maxwrite - FCGI_HEADER_LEN) 295 296 rec = Record(self._type, self._req.requestId) 297 rec.contentLength = toWrite 298 rec.contentData = data[:toWrite] 299 self._conn.writeRecord(rec) 300 301 data = data[toWrite:] 302 length -= toWrite 303 304 def write(self, data): 305 assert not self.closed 306 307 if not data: 308 return 309 310 self.dataWritten = True 311 312 if self._buffered: 313 self._bufList.append(data) 314 else: 315 self._write(data) 316 317 def writelines(self, lines): 318 assert not self.closed 319 320 for line in lines: 321 self.write(line) 322 323 def flush(self): 324 # Only need to flush if this OutputStream is actually buffered. 325 if self._buffered: 326 data = ''.join(self._bufList) 327 self._bufList = [] 328 self._write(data) 329 330 # Though available, the following should NOT be called by WSGI apps. 331 def close(self): 332 """Sends end-of-stream notification, if necessary.""" 333 if not self.closed and self.dataWritten: 334 self.flush() 335 rec = Record(self._type, self._req.requestId) 336 self._conn.writeRecord(rec) 337 self.closed = True 338 339class TeeOutputStream(object): 340 """ 341 Simple wrapper around two or more output file-like objects that copies 342 written data to all streams. 343 """ 344 def __init__(self, streamList): 345 self._streamList = streamList 346 347 def write(self, data): 348 for f in self._streamList: 349 f.write(data) 350 351 def writelines(self, lines): 352 for line in lines: 353 self.write(line) 354 355 def flush(self): 356 for f in self._streamList: 357 f.flush() 358 359class StdoutWrapper(object): 360 """ 361 Wrapper for sys.stdout so we know if data has actually been written. 362 """ 363 def __init__(self, stdout): 364 self._file = stdout 365 self.dataWritten = False 366 367 def write(self, data): 368 if data: 369 self.dataWritten = True 370 self._file.write(data) 371 372 def writelines(self, lines): 373 for line in lines: 374 self.write(line) 375 376 def __getattr__(self, name): 377 return getattr(self._file, name) 378 379def decode_pair(s, pos=0): 380 """ 381 Decodes a name/value pair. 382 383 The number of bytes decoded as well as the name/value pair 384 are returned. 385 """ 386 nameLength = ord(s[pos]) 387 if nameLength & 128: 388 nameLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff 389 pos += 4 390 else: 391 pos += 1 392 393 valueLength = ord(s[pos]) 394 if valueLength & 128: 395 valueLength = struct.unpack('!L', s[pos:pos+4])[0] & 0x7fffffff 396 pos += 4 397 else: 398 pos += 1 399 400 name = s[pos:pos+nameLength] 401 pos += nameLength 402 value = s[pos:pos+valueLength] 403 pos += valueLength 404 405 return (pos, (name, value)) 406 407def encode_pair(name, value): 408 """ 409 Encodes a name/value pair. 410 411 The encoded string is returned. 412 """ 413 nameLength = len(name) 414 if nameLength < 128: 415 s = chr(nameLength) 416 else: 417 s = struct.pack('!L', nameLength | 0x80000000L) 418 419 valueLength = len(value) 420 if valueLength < 128: 421 s += chr(valueLength) 422 else: 423 s += struct.pack('!L', valueLength | 0x80000000L) 424 425 return s + name + value 426 427class Record(object): 428 """ 429 A FastCGI Record. 430 431 Used for encoding/decoding records. 432 """ 433 def __init__(self, type=FCGI_UNKNOWN_TYPE, requestId=FCGI_NULL_REQUEST_ID): 434 self.version = FCGI_VERSION_1 435 self.type = type 436 self.requestId = requestId 437 self.contentLength = 0 438 self.paddingLength = 0 439 self.contentData = '' 440 441 def _recvall(sock, length): 442 """ 443 Attempts to receive length bytes from a socket, blocking if necessary. 444 (Socket may be blocking or non-blocking.) 445 """ 446 dataList = [] 447 recvLen = 0 448 while length: 449 try: 450 data = sock.recv(length) 451 except socket.error, e: 452 if e[0] == errno.EAGAIN: 453 select.select([sock], [], []) 454 continue 455 else: 456 raise 457 if not data: # EOF 458 break 459 dataList.append(data) 460 dataLen = len(data) 461 recvLen += dataLen 462 length -= dataLen 463 return ''.join(dataList), recvLen 464 _recvall = staticmethod(_recvall) 465 466 def read(self, sock): 467 """Read and decode a Record from a socket.""" 468 try: 469 header, length = self._recvall(sock, FCGI_HEADER_LEN) 470 except: 471 raise EOFError 472 473 if length < FCGI_HEADER_LEN: 474 raise EOFError 475 476 self.version, self.type, self.requestId, self.contentLength, \ 477 self.paddingLength = struct.unpack(FCGI_Header, header) 478 479 if __debug__: _debug(9, 'read: fd = %d, type = %d, requestId = %d, ' 480 'contentLength = %d' % 481 (sock.fileno(), self.type, self.requestId, 482 self.contentLength)) 483 484 if self.contentLength: 485 try: 486 self.contentData, length = self._recvall(sock, 487 self.contentLength) 488 except: 489 raise EOFError 490 491 if length < self.contentLength: 492 raise EOFError 493 494 if self.paddingLength: 495 try: 496 self._recvall(sock, self.paddingLength) 497 except: 498 raise EOFError 499 500 def _sendall(sock, data): 501 """ 502 Writes data to a socket and does not return until all the data is sent. 503 """ 504 length = len(data) 505 while length: 506 try: 507 sent = sock.send(data) 508 except socket.error, e: 509 if e[0] == errno.EAGAIN: 510 select.select([], [sock], []) 511 continue 512 else: 513 raise 514 data = data[sent:] 515 length -= sent 516 _sendall = staticmethod(_sendall) 517 518 def write(self, sock): 519 """Encode and write a Record to a socket.""" 520 self.paddingLength = -self.contentLength & 7 521 522 if __debug__: _debug(9, 'write: fd = %d, type = %d, requestId = %d, ' 523 'contentLength = %d' % 524 (sock.fileno(), self.type, self.requestId, 525 self.contentLength)) 526 527 header = struct.pack(FCGI_Header, self.version, self.type, 528 self.requestId, self.contentLength, 529 self.paddingLength) 530 self._sendall(sock, header) 531 if self.contentLength: 532 self._sendall(sock, self.contentData) 533 if self.paddingLength: 534 self._sendall(sock, '\x00'*self.paddingLength) 535 536class TimeoutException(Exception): 537 pass 538 539class Request(object): 540 """ 541 Represents a single FastCGI request. 542 543 These objects are passed to your handler and is the main interface 544 between your handler and the fcgi module. The methods should not 545 be called by your handler. However, server, params, stdin, stdout, 546 stderr, and data are free for your handler's use. 547 """ 548 def __init__(self, conn, inputStreamClass, timeout): 549 self._conn = conn 550 self._timeout = timeout 551 552 self.server = conn.server 553 self.params = {} 554 self.stdin = inputStreamClass(conn) 555 self.stdout = OutputStream(conn, self, FCGI_STDOUT) 556 self.stderr = OutputStream(conn, self, FCGI_STDERR, buffered=True) 557 self.data = inputStreamClass(conn) 558 559 def timeout_handler(self, signum, frame): 560 self.stderr.write('Timeout Exceeded\n') 561 self.stderr.write("\n".join(traceback.format_stack(frame))) 562 self.stderr.flush() 563 564 raise TimeoutException 565 566 def run(self): 567 """Runs the handler, flushes the streams, and ends the request.""" 568 # If there is a timeout 569 if self._timeout: 570 old_alarm = signal.signal(signal.SIGALRM, self.timeout_handler) 571 signal.alarm(self._timeout) 572 573 try: 574 protocolStatus, appStatus = self.server.handler(self) 575 except: 576 traceback.print_exc(file=self.stderr) 577 self.stderr.flush() 578 if not self.stdout.dataWritten: 579 self.server.error(self) 580 581 protocolStatus, appStatus = FCGI_REQUEST_COMPLETE, 0 582 583 if __debug__: _debug(1, 'protocolStatus = %d, appStatus = %d' % 584 (protocolStatus, appStatus)) 585 586 # Restore old handler if timeout was given 587 if self._timeout: 588 signal.alarm(0) 589 signal.signal(signal.SIGALRM, old_alarm) 590 591 try: 592 self._flush() 593 self._end(appStatus, protocolStatus) 594 except socket.error, e: 595 if e[0] != errno.EPIPE: 596 raise 597 598 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): 599 self._conn.end_request(self, appStatus, protocolStatus) 600 601 def _flush(self): 602 self.stdout.close() 603 self.stderr.close() 604 605class CGIRequest(Request): 606 """A normal CGI request disguised as a FastCGI request.""" 607 def __init__(self, server): 608 # These are normally filled in by Connection. 609 self.requestId = 1 610 self.role = FCGI_RESPONDER 611 self.flags = 0 612 self.aborted = False 613 614 self.server = server 615 self.params = dict(os.environ) 616 self.stdin = sys.stdin 617 self.stdout = StdoutWrapper(sys.stdout) # Oh, the humanity! 618 self.stderr = sys.stderr 619 self.data = StringIO.StringIO() 620 self._timeout = 0 621 622 def _end(self, appStatus=0L, protocolStatus=FCGI_REQUEST_COMPLETE): 623 sys.exit(appStatus) 624 625 def _flush(self): 626 # Not buffered, do nothing. 627 pass 628 629class Connection(object): 630 """ 631 A Connection with the web server. 632 633 Each Connection is associated with a single socket (which is 634 connected to the web server) and is responsible for handling all 635 the FastCGI message processing for that socket. 636 """ 637 _multiplexed = False 638 _inputStreamClass = InputStream 639 640 def __init__(self, sock, addr, server, timeout): 641 self._sock = sock 642 self._addr = addr 643 self.server = server 644 self._timeout = timeout 645 646 # Active Requests for this Connection, mapped by request ID. 647 self._requests = {} 648 649 def _cleanupSocket(self): 650 """Close the Connection's socket.""" 651 try: 652 self._sock.shutdown(socket.SHUT_WR) 653 except: 654 return 655 try: 656 while True: 657 r, w, e = select.select([self._sock], [], []) 658 if not r or not self._sock.recv(1024): 659 break 660 except: 661 pass 662 self._sock.close() 663 664 def run(self): 665 """Begin processing data from the socket.""" 666 self._keepGoing = True 667 while self._keepGoing: 668 try: 669 self.process_input() 670 except (EOFError, KeyboardInterrupt): 671 break 672 except (select.error, socket.error), e: 673 if e[0] == errno.EBADF: # Socket was closed by Request. 674 break 675 raise 676 677 self._cleanupSocket() 678 679 def process_input(self): 680 """Attempt to read a single Record from the socket and process it.""" 681 # Currently, any children Request threads notify this Connection 682 # that it is no longer needed by closing the Connection's socket. 683 # We need to put a timeout on select, otherwise we might get 684 # stuck in it indefinitely... (I don't like this solution.) 685 while self._keepGoing: 686 try: 687 r, w, e = select.select([self._sock], [], [], 1.0) 688 except ValueError: 689 # Sigh. ValueError gets thrown sometimes when passing select 690 # a closed socket. 691 raise EOFError 692 if r: break 693 if not self._keepGoing: 694 return 695 rec = Record() 696 rec.read(self._sock) 697 698 if rec.type == FCGI_GET_VALUES: 699 self._do_get_values(rec) 700 elif rec.type == FCGI_BEGIN_REQUEST: 701 self._do_begin_request(rec) 702 elif rec.type == FCGI_ABORT_REQUEST: 703 self._do_abort_request(rec) 704 elif rec.type == FCGI_PARAMS: 705 self._do_params(rec) 706 elif rec.type == FCGI_STDIN: 707 self._do_stdin(rec) 708 elif rec.type == FCGI_DATA: 709 self._do_data(rec) 710 elif rec.requestId == FCGI_NULL_REQUEST_ID: 711 self._do_unknown_type(rec) 712 else: 713 # Need to complain about this. 714 pass 715 716 def writeRecord(self, rec): 717 """ 718 Write a Record to the socket. 719 """ 720 rec.write(self._sock) 721 722 def end_request(self, req, appStatus=0L, 723 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): 724 """ 725 End a Request. 726 727 Called by Request objects. An FCGI_END_REQUEST Record is 728 sent to the web server. If the web server no longer requires 729 the connection, the socket is closed, thereby ending this 730 Connection (run() returns). 731 """ 732 rec = Record(FCGI_END_REQUEST, req.requestId) 733 rec.contentData = struct.pack(FCGI_EndRequestBody, appStatus, 734 protocolStatus) 735 rec.contentLength = FCGI_EndRequestBody_LEN 736 self.writeRecord(rec) 737 738 if remove: 739 del self._requests[req.requestId] 740 741 if __debug__: _debug(2, 'end_request: flags = %d' % req.flags) 742 743 if not (req.flags & FCGI_KEEP_CONN) and not self._requests: 744 self._cleanupSocket() 745 self._keepGoing = False 746 747 def _do_get_values(self, inrec): 748 """Handle an FCGI_GET_VALUES request from the web server.""" 749 outrec = Record(FCGI_GET_VALUES_RESULT) 750 751 pos = 0 752 while pos < inrec.contentLength: 753 pos, (name, value) = decode_pair(inrec.contentData, pos) 754 cap = self.server.capability.get(name) 755 if cap is not None: 756 outrec.contentData += encode_pair(name, str(cap)) 757 758 outrec.contentLength = len(outrec.contentData) 759 self.writeRecord(outrec) 760 761 def _do_begin_request(self, inrec): 762 """Handle an FCGI_BEGIN_REQUEST from the web server.""" 763 role, flags = struct.unpack(FCGI_BeginRequestBody, inrec.contentData) 764 765 req = self.server.request_class(self, self._inputStreamClass, 766 self._timeout) 767 req.requestId, req.role, req.flags = inrec.requestId, role, flags 768 req.aborted = False 769 770 if not self._multiplexed and self._requests: 771 # Can't multiplex requests. 772 self.end_request(req, 0L, FCGI_CANT_MPX_CONN, remove=False) 773 else: 774 self._requests[inrec.requestId] = req 775 776 def _do_abort_request(self, inrec): 777 """ 778 Handle an FCGI_ABORT_REQUEST from the web server. 779 780 We just mark a flag in the associated Request. 781 """ 782 req = self._requests.get(inrec.requestId) 783 if req is not None: 784 req.aborted = True 785 786 def _start_request(self, req): 787 """Run the request.""" 788 # Not multiplexed, so run it inline. 789 req.run() 790 791 def _do_params(self, inrec): 792 """ 793 Handle an FCGI_PARAMS Record. 794 795 If the last FCGI_PARAMS Record is received, start the request. 796 """ 797 req = self._requests.get(inrec.requestId) 798 if req is not None: 799 if inrec.contentLength: 800 pos = 0 801 while pos < inrec.contentLength: 802 pos, (name, value) = decode_pair(inrec.contentData, pos) 803 req.params[name] = value 804 else: 805 self._start_request(req) 806 807 def _do_stdin(self, inrec): 808 """Handle the FCGI_STDIN stream.""" 809 req = self._requests.get(inrec.requestId) 810 if req is not None: 811 req.stdin.add_data(inrec.contentData) 812 813 def _do_data(self, inrec): 814 """Handle the FCGI_DATA stream.""" 815 req = self._requests.get(inrec.requestId) 816 if req is not None: 817 req.data.add_data(inrec.contentData) 818 819 def _do_unknown_type(self, inrec): 820 """Handle an unknown request type. Respond accordingly.""" 821 outrec = Record(FCGI_UNKNOWN_TYPE) 822 outrec.contentData = struct.pack(FCGI_UnknownTypeBody, inrec.type) 823 outrec.contentLength = FCGI_UnknownTypeBody_LEN 824 self.writeRecord(outrec) 825 826class MultiplexedConnection(Connection): 827 """ 828 A version of Connection capable of handling multiple requests 829 simultaneously. 830 """ 831 _multiplexed = True 832 _inputStreamClass = MultiplexedInputStream 833 834 def __init__(self, sock, addr, server, timeout): 835 super(MultiplexedConnection, self).__init__(sock, addr, server, 836 timeout) 837 838 # Used to arbitrate access to self._requests. 839 lock = threading.RLock() 840 841 # Notification is posted everytime a request completes, allowing us 842 # to quit cleanly. 843 self._lock = threading.Condition(lock) 844 845 def _cleanupSocket(self): 846 # Wait for any outstanding requests before closing the socket. 847 self._lock.acquire() 848 while self._requests: 849 self._lock.wait() 850 self._lock.release() 851 852 super(MultiplexedConnection, self)._cleanupSocket() 853 854 def writeRecord(self, rec): 855 # Must use locking to prevent intermingling of Records from different 856 # threads. 857 self._lock.acquire() 858 try: 859 # Probably faster than calling super. ;) 860 rec.write(self._sock) 861 finally: 862 self._lock.release() 863 864 def end_request(self, req, appStatus=0L, 865 protocolStatus=FCGI_REQUEST_COMPLETE, remove=True): 866 self._lock.acquire() 867 try: 868 super(MultiplexedConnection, self).end_request(req, appStatus, 869 protocolStatus, 870 remove) 871 self._lock.notify() 872 finally: 873 self._lock.release() 874 875 def _do_begin_request(self, inrec): 876 self._lock.acquire() 877 try: 878 super(MultiplexedConnection, self)._do_begin_request(inrec) 879 finally: 880 self._lock.release() 881 882 def _do_abort_request(self, inrec): 883 self._lock.acquire() 884 try: 885 super(MultiplexedConnection, self)._do_abort_request(inrec) 886 finally: 887 self._lock.release() 888 889 def _start_request(self, req): 890 try: 891 thread.start_new_thread(req.run, ()) 892 except thread.error, e: 893 self.end_request(req, 0L, FCGI_OVERLOADED, remove=True) 894 895 def _do_params(self, inrec): 896 self._lock.acquire() 897 try: 898 super(MultiplexedConnection, self)._do_params(inrec) 899 finally: 900 self._lock.release() 901 902 def _do_stdin(self, inrec): 903 self._lock.acquire() 904 try: 905 super(MultiplexedConnection, self)._do_stdin(inrec) 906 finally: 907 self._lock.release() 908 909 def _do_data(self, inrec): 910 self._lock.acquire() 911 try: 912 super(MultiplexedConnection, self)._do_data(inrec) 913 finally: 914 self._lock.release() 915 916class BaseFCGIServer(object): 917 request_class = Request 918 cgirequest_class = CGIRequest 919 920 # The maximum number of bytes (per Record) to write to the server. 921 # I've noticed mod_fastcgi has a relatively small receive buffer (8K or 922 # so). 923 maxwrite = 8192 924 925 # Limits the size of the InputStream's string buffer to this size + the 926 # server's maximum Record size. Since the InputStream is not seekable, 927 # we throw away already-read data once this certain amount has been read. 928 inputStreamShrinkThreshold = 102400 - 8192 929 930 def __init__(self, application, environ=None, 931 multithreaded=True, multiprocess=False, 932 bindAddress=None, umask=None, multiplexed=False, 933 debug=False, roles=(FCGI_RESPONDER,), 934 forceCGI=False): 935 """ 936 bindAddress, if present, must either be a string or a 2-tuple. If 937 present, run() will open its own listening socket. You would use 938 this if you wanted to run your application as an 'external' FastCGI 939 app. (i.e. the webserver would no longer be responsible for starting 940 your app) If a string, it will be interpreted as a filename and a UNIX 941 socket will be opened. If a tuple, the first element, a string, 942 is the interface name/IP to bind to, and the second element (an int) 943 is the port number. 944 945 If binding to a UNIX socket, umask may be set to specify what 946 the umask is to be changed to before the socket is created in the 947 filesystem. After the socket is created, the previous umask is 948 restored. 949 950 Set multiplexed to True if you want to handle multiple requests 951 per connection. Some FastCGI backends (namely mod_fastcgi) don't 952 multiplex requests at all, so by default this is off (which saves 953 on thread creation/locking overhead). If threads aren't available, 954 this keyword is ignored; it's not possible to multiplex requests 955 at all. 956 """ 957 if environ is None: 958 environ = {} 959 960 self.application = application 961 self.environ = environ 962 self.multithreaded = multithreaded 963 self.multiprocess = multiprocess 964 self.debug = debug 965 self.roles = roles 966 self.forceCGI = forceCGI 967 968 self._bindAddress = bindAddress 969 self._umask = umask 970 971 # Used to force single-threadedness 972 self._appLock = thread.allocate_lock() 973 974 if thread_available: 975 try: 976 import resource 977 # Attempt to glean the maximum number of connections 978 # from the OS. 979 maxConns = resource.getrlimit(resource.RLIMIT_NOFILE)[0] 980 except ImportError: 981 maxConns = 100 # Just some made up number. 982 maxReqs = maxConns 983 if multiplexed: 984 self._connectionClass = MultiplexedConnection 985 maxReqs *= 5 # Another made up number. 986 else: 987 self._connectionClass = Connection 988 self.capability = { 989 FCGI_MAX_CONNS: maxConns, 990 FCGI_MAX_REQS: maxReqs, 991 FCGI_MPXS_CONNS: multiplexed and 1 or 0 992 } 993 else: 994 self._connectionClass = Connection 995 self.capability = { 996 # If threads aren't available, these are pretty much correct. 997 FCGI_MAX_CONNS: 1, 998 FCGI_MAX_REQS: 1, 999 FCGI_MPXS_CONNS: 0 1000 } 1001 1002 def _setupSocket(self): 1003 if self._bindAddress is None: 1004 # Run as a normal FastCGI? 1005 1006 # FastCGI/CGI discrimination is broken on Mac OS X. 1007 # Set the environment variable FCGI_FORCE_CGI to "Y" or "y" 1008 # if you want to run your app as a simple CGI. (You can do 1009 # this with Apache's mod_env [not loaded by default in OS X 1010 # client, ha ha] and the SetEnv directive.) 1011 forceCGI = self.forceCGI or \ 1012 os.environ.get('FCGI_FORCE_CGI', 'N').upper().startswith('Y') 1013 1014 if forceCGI: 1015 isFCGI = False 1016 else: 1017 if not hasattr(socket, 'fromfd'): 1018 # can happen on win32, no socket.fromfd there! 1019 raise ValueError( 1020 'If you want FCGI, please create an external FCGI server ' 1021 'by providing a valid bindAddress. ' 1022 'If you want CGI, please force CGI operation. Use ' 1023 'FCGI_FORCE_CGI=Y environment or forceCGI parameter.') 1024 sock = socket.fromfd(FCGI_LISTENSOCK_FILENO, socket.AF_INET, 1025 socket.SOCK_STREAM) 1026 isFCGI = True 1027 try: 1028 sock.getpeername() 1029 except socket.error, e: 1030 if e[0] == errno.ENOTSOCK: 1031 # Not a socket, assume CGI context. 1032 isFCGI = False 1033 elif e[0] != errno.ENOTCONN: 1034 raise 1035 1036 if not isFCGI: 1037 req = self.cgirequest_class(self) 1038 req.run() 1039 sys.exit(0) 1040 else: 1041 # Run as a server 1042 oldUmask = None 1043 if type(self._bindAddress) is str: 1044 # Unix socket 1045 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1046 try: 1047 os.unlink(self._bindAddress) 1048 except OSError: 1049 pass 1050 if self._umask is not None: 1051 oldUmask = os.umask(self._umask) 1052 else: 1053 # INET socket 1054 assert type(self._bindAddress) is tuple 1055 assert len(self._bindAddress) == 2 1056 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1057 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1058 1059 sock.bind(self._bindAddress) 1060 sock.listen(socket.SOMAXCONN) 1061 1062 if oldUmask is not None: 1063 os.umask(oldUmask) 1064 1065 return sock 1066 1067 def _cleanupSocket(self, sock): 1068 """Closes the main socket.""" 1069 sock.close() 1070 1071 def handler(self, req): 1072 """Special handler for WSGI.""" 1073 if req.role not in self.roles: 1074 return FCGI_UNKNOWN_ROLE, 0 1075 1076 # Mostly taken from example CGI gateway. 1077 environ = req.params 1078 environ.update(self.environ) 1079 1080 environ['wsgi.version'] = (1,0) 1081 environ['wsgi.input'] = req.stdin 1082 if self._bindAddress is None: 1083 stderr = req.stderr 1084 else: 1085 stderr = TeeOutputStream((sys.stderr, req.stderr)) 1086 environ['wsgi.errors'] = stderr 1087 environ['wsgi.multithread'] = not isinstance(req, CGIRequest) and \ 1088 thread_available and self.multithreaded 1089 environ['wsgi.multiprocess'] = isinstance(req, CGIRequest) or \ 1090 self.multiprocess 1091 environ['wsgi.run_once'] = isinstance(req, CGIRequest) 1092 1093 if environ.get('HTTPS', 'off') in ('on', '1'): 1094 environ['wsgi.url_scheme'] = 'https' 1095 else: 1096 environ['wsgi.url_scheme'] = 'http' 1097 1098 self._sanitizeEnv(environ) 1099 1100 headers_set = [] 1101 headers_sent = [] 1102 result = None 1103 1104 def write(data): 1105 assert type(data) is str, 'write() argument must be string' 1106 assert headers_set, 'write() before start_response()' 1107 1108 if not headers_sent: 1109 status, responseHeaders = headers_sent[:] = headers_set 1110 found = False 1111 for header,value in responseHeaders: 1112 if header.lower() == 'content-length': 1113 found = True 1114 break 1115 if not found and result is not None: 1116 try: 1117 if len(result) == 1: 1118 responseHeaders.append(('Content-Length', 1119 str(len(data)))) 1120 except: 1121 pass 1122 s = 'Status: %s\r\n' % status 1123 for header in responseHeaders: 1124 s += '%s: %s\r\n' % header 1125 s += '\r\n' 1126 req.stdout.write(s) 1127 1128 req.stdout.write(data) 1129 req.stdout.flush() 1130 1131 def start_response(status, response_headers, exc_info=None): 1132 if exc_info: 1133 try: 1134 if headers_sent: 1135 # Re-raise if too late 1136 raise exc_info[0], exc_info[1], exc_info[2] 1137 finally: 1138 exc_info = None # avoid dangling circular ref 1139 else: 1140 assert not headers_set, 'Headers already set!' 1141 1142 assert type(status) is str, 'Status must be a string' 1143 assert len(status) >= 4, 'Status must be at least 4 characters' 1144 assert int(status[:3]), 'Status must begin with 3-digit code' 1145 assert status[3] == ' ', 'Status must have a space after code' 1146 assert type(response_headers) is list, 'Headers must be a list' 1147 if __debug__: 1148 for name,val in response_headers: 1149 assert type(name) is str, 'Header name "%s" must be a string' % name 1150 assert type(val) is str, 'Value of header "%s" must be a string' % name 1151 1152 headers_set[:] = [status, response_headers] 1153 return write 1154 1155 if not self.multithreaded: 1156 self._appLock.acquire() 1157 try: 1158 try: 1159 result = self.application(environ, start_response) 1160 try: 1161 for data in result: 1162 if data: 1163 write(data) 1164 if not headers_sent: 1165 write('') # in case body was empty 1166 finally: 1167 if hasattr(result, 'close'): 1168 result.close() 1169 except socket.error, e: 1170 if e[0] != errno.EPIPE: 1171 raise # Don't let EPIPE propagate beyond server 1172 finally: 1173 if not self.multithreaded: 1174 self._appLock.release() 1175 1176 return FCGI_REQUEST_COMPLETE, 0 1177 1178 def _sanitizeEnv(self, environ): 1179 """Ensure certain values are present, if required by WSGI.""" 1180 if not environ.has_key('SCRIPT_NAME'): 1181 environ['SCRIPT_NAME'] = '' 1182 1183 reqUri = None 1184 if environ.has_key('REQUEST_URI'): 1185 reqUri = environ['REQUEST_URI'].split('?', 1) 1186 1187 if not environ.has_key('PATH_INFO') or not environ['PATH_INFO']: 1188 if reqUri is not None: 1189 scriptName = environ['SCRIPT_NAME'] 1190 if not reqUri[0].startswith(scriptName): 1191 environ['wsgi.errors'].write('WARNING: SCRIPT_NAME does not match REQUEST_URI') 1192 environ['PATH_INFO'] = reqUri[0][len(scriptName):] 1193 else: 1194 environ['PATH_INFO'] = '' 1195 if not environ.has_key('QUERY_STRING') or not environ['QUERY_STRING']: 1196 if reqUri is not None and len(reqUri) > 1: 1197 environ['QUERY_STRING'] = reqUri[1] 1198 else: 1199 environ['QUERY_STRING'] = '' 1200 1201 # If any of these are missing, it probably signifies a broken 1202 # server... 1203 for name,default in [('REQUEST_METHOD', 'GET'), 1204 ('SERVER_NAME', 'localhost'), 1205 ('SERVER_PORT', '80'), 1206 ('SERVER_PROTOCOL', 'HTTP/1.0')]: 1207 if not environ.has_key(name): 1208 environ['wsgi.errors'].write('%s: missing FastCGI param %s ' 1209 'required by WSGI!\n' % 1210 (self.__class__.__name__, name)) 1211 environ[name] = default 1212 1213 def error(self, req): 1214 """ 1215 Called by Request if an exception occurs within the handler. May and 1216 should be overridden. 1217 """ 1218 if self.debug: 1219 import cgitb 1220 req.stdout.write('Status: 500 Internal Server Error\r\n' + 1221 'Content-Type: text/html\r\n\r\n' + 1222 cgitb.html(sys.exc_info())) 1223 else: 1224 errorpage = """<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN"> 1225<html><head> 1226<title>Unhandled Exception</title> 1227</head><body> 1228<h1>Unhandled Exception</h1> 1229<p>An unhandled exception was thrown by the application.</p> 1230</body></html> 1231""" 1232 req.stdout.write('Status: 500 Internal Server Error\r\n' + 1233 'Content-Type: text/html\r\n\r\n' + 1234 errorpage) 1235