1#!/usr/bin/env python
2
3#############################################################################
4#
5# Zope Public License (ZPL) Version 2.0
6# -----------------------------------------------
7#
8# This software is Copyright (c) Zope Corporation (tm) and
9# Contributors. All rights reserved.
10#
11# This license has been certified as open source. It has also
12# been designated as GPL compatible by the Free Software
13# Foundation (FSF).
14#
15# Redistribution and use in source and binary forms, with or
16# without modification, are permitted provided that the
17# following conditions are met:
18#
19# 1. Redistributions in source code must retain the above
20#    copyright notice, this list of conditions, and the following
21#    disclaimer.
22#
23# 2. Redistributions in binary form must reproduce the above
24#    copyright notice, this list of conditions, and the following
25#    disclaimer in the documentation and/or other materials
26#    provided with the distribution.
27#
28# 3. The name Zope Corporation (tm) must not be used to
29#    endorse or promote products derived from this software
30#    without prior written permission from Zope Corporation.
31#
32# 4. The right to distribute this software or to use it for
33#    any purpose does not give you the right to use Servicemarks
34#    (sm) or Trademarks (tm) of Zope Corporation. Use of them is
35#    covered in a separate agreement (see
36#    http://www.zope.com/Marks).
37#
38# 5. If any files are modified, you must cause the modified
39#    files to carry prominent notices stating that you changed
40#    the files and the date of any change.
41#
42# Disclaimer
43#
44#   THIS SOFTWARE IS PROVIDED BY ZOPE CORPORATION ``AS IS''
45#   AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
46#   NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
47#   AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN
48#   NO EVENT SHALL ZOPE CORPORATION OR ITS CONTRIBUTORS BE
49#   LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
50#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
51#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
52#   LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
53#   HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
54#   CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
55#   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
56#   SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
57#   DAMAGE.
58#
59#
60# This software consists of contributions made by Zope
61# Corporation and many individuals on behalf of Zope
62# Corporation.  Specific attributions are listed in the
63# accompanying credits file.
64#
65#############################################################################
66"""TCPWatch, a connection forwarder and HTTP proxy for monitoring connections.
67
68Requires Python 2.1 or above.
69
70Revision information:
71$Id: tcpwatch.py,v 1.9 2004/06/17 00:03:46 shane Exp $
72"""
73
74from __future__ import nested_scopes
75
76VERSION = '1.3'
77COPYRIGHT = (
78    'TCPWatch %s Copyright 2001 Shane Hathaway, Zope Corporation'
79    % VERSION)
80
81import sys
82import os
83import socket
84import asyncore
85import getopt
86from time import time, localtime
87
88
89RECV_BUFFER_SIZE = 8192
90show_cr = 0
91
92
93#############################################################################
94#
95# Connection forwarder
96#
97#############################################################################
98
99
100class ForwardingEndpoint (asyncore.dispatcher):
101    """A socket wrapper that accepts and generates stream messages.
102    """
103    _dests = ()
104
105    def __init__(self, conn=None):
106        self._outbuf = []
107        asyncore.dispatcher.__init__(self, conn)
108
109    def set_dests(self, dests):
110        """Sets the destination streams.
111        """
112        self._dests = dests
113
114    def write(self, data):
115        if data:
116            self._outbuf.append(data)
117            self.handle_write()
118
119    def readable(self):
120        return 1
121
122    def writable(self):
123        return not self.connected or len(self._outbuf) > 0
124
125    def handle_connect(self):
126        for d in self._dests:
127            d.write('')  # A blank string means the socket just connected.
128
129    def received(self, data):
130        if data:
131            for d in self._dests:
132                d.write(data)
133
134    def handle_read(self):
135        data = self.recv(RECV_BUFFER_SIZE)
136        self.received(data)
137
138    def handle_write(self):
139        if not self.connected:
140            # Wait for a connection.
141            return
142        buf = self._outbuf
143        while buf:
144            data = buf.pop(0)
145            if data:
146                sent = self.send(data)
147                if sent < len(data):
148                    buf.insert(0, data[sent:])
149                    break
150
151    def handle_close (self):
152        dests = self._dests
153        self._dests = ()
154        for d in dests:
155            d.close()
156        self.close()
157
158    def handle_error(self):
159        t, v = sys.exc_info()[:2]
160        for d in self._dests:
161            if hasattr(d, 'error'):
162                d.error(t, v)
163        self.handle_close()
164
165
166
167class EndpointObserver:
168    """Sends stream events to a ConnectionObserver.
169
170    Streams don't distinguish sources, while ConnectionObservers do.
171    This adapter adds source information to stream events.
172    """
173
174    def __init__(self, obs, from_client):
175        self.obs = obs
176        self.from_client = from_client
177
178    def write(self, data):
179        if data:
180            self.obs.received(data, self.from_client)
181        else:
182            self.obs.connected(self.from_client)
183
184    def close(self):
185        self.obs.closed(self.from_client)
186
187    def error(self, t, v):
188        self.obs.error(self.from_client, t, v)
189
190
191
192class ForwardedConnectionInfo:
193    transaction = 1
194
195    def __init__(self, connection_number, client_addr, server_addr=None):
196        self.opened = time()
197        self.connection_number = connection_number
198        self.client_addr = client_addr
199        self.server_addr = server_addr
200
201    def dup(self):
202        return ForwardedConnectionInfo(self.connection_number,
203                                       self.client_addr,
204                                       self.server_addr)
205
206
207
208class ForwardingService (asyncore.dispatcher):
209
210    _counter = 0
211
212    def __init__(self, listen_host, listen_port, dest_host, dest_port,
213                 observer_factory=None):
214        self._obs_factory = observer_factory
215        self._dest = (dest_host, dest_port)
216        asyncore.dispatcher.__init__(self)
217        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
218        self.set_reuse_addr()
219        self.bind((listen_host, listen_port))
220        self.listen(5)
221
222    def handle_accept(self):
223        info = self.accept()
224        if info:
225            # Got a connection.
226            conn, addr = info
227            conn.setblocking(0)
228
229            ep1 = ForwardingEndpoint()  # connects client to self
230            ep2 = ForwardingEndpoint()  # connects self to server
231
232            counter = self._counter + 1
233            self._counter = counter
234            factory = self._obs_factory
235            if factory is not None:
236                fci = ForwardedConnectionInfo(counter, addr, self._dest)
237                obs = factory(fci)
238                dests1 = (ep2, EndpointObserver(obs, 1))
239                dests2 = (ep1, EndpointObserver(obs, 0))
240            else:
241                dests1 = (ep2,)
242                dests2 = (ep1,)
243
244            ep1.set_dests(dests1)
245            ep2.set_dests(dests2)
246
247            # Now everything is hooked up.  Let data pass.
248            ep2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
249            ep1.set_socket(conn)
250            ep1.connected = 1  # We already know the client connected.
251            ep2.connect(self._dest)
252
253    def handle_error(self):
254        # Don't stop the server.
255        import traceback
256        traceback.print_exc()
257
258
259
260class IConnectionObserver:
261
262    def connected(from_client):
263        """Called when the client or the server connects.
264        """
265
266    def received(data, from_client):
267        """Called when the client or the server sends data.
268        """
269
270    def closed(from_client):
271        """Called when the client or the server closes the channel.
272        """
273
274    def error(from_client, type, value):
275        """Called when an error occurs in the client or the server channel.
276        """
277
278
279#############################################################################
280#
281# Basic abstract connection observer and stdout observer
282#
283#############################################################################
284
285
286def escape(s):
287    # XXX This might be a brittle trick. :-(
288    return repr('"\'' + str(s))[4:-1]
289
290
291class BasicObserver:
292
293    continuing_line = -1  # Tracks when a line isn't finished.
294    arrows = ('<==', '==>')
295
296    def __init__(self):
297        self._start = time()
298
299    def _output_message(self, m, from_client):
300        if self.continuing_line >= 0:
301            self.write('\n')
302            self.continuing_line = -1
303        if from_client:
304            who = 'client'
305        else:
306            who = 'server'
307
308        t = time() - self._start
309        min, sec = divmod(t, 60)
310        self.write('[%02d:%06.3f - %s %s]\n' % (min, sec, who, m))
311        self.flush()
312
313    def connection_from(self, fci):
314        if fci.server_addr is not None:
315            self._output_message(
316                '%s:%s forwarded to %s:%s' %
317                (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
318        else:
319            self._output_message(
320                'connection from %s:%s' %
321                (tuple(fci.client_addr)), 1)
322
323        if fci.transaction > 1:
324            self._output_message(
325                ('HTTP transaction #%d' % fci.transaction), 1)
326
327    def connected(self, from_client):
328        self._output_message('connected', from_client)
329
330    def received(self, data, from_client):
331        arrow = self.arrows[from_client]
332        cl = self.continuing_line
333        if cl >= 0:
334            if cl != from_client:
335                # Switching directions.
336                self.write('\n%s' % arrow)
337        else:
338            self.write(arrow)
339
340        if data.endswith('\n'):
341            data = data[:-1]
342            newline = 1
343        else:
344            newline = 0
345
346        if not show_cr:
347            data = data.replace('\r', '')
348        lines = data.split('\n')
349        lines = map(escape, lines)
350        s = ('\n%s' % arrow).join(lines)
351        self.write(s)
352
353        if newline:
354            self.write('\n')
355            self.continuing_line = -1
356        else:
357            self.continuing_line = from_client
358        self.flush()
359
360    def closed(self, from_client):
361        self._output_message('closed', from_client)
362
363    def error(self, from_client, type, value):
364        self._output_message(
365            'connection error %s: %s' % (type, value), from_client)
366
367    def write(self, s):
368        raise NotImplementedError
369
370    def flush(self):
371        raise NotImplementedError
372
373
374class StdoutObserver (BasicObserver):
375
376    # __implements__ = IConnectionObserver
377
378    def __init__(self, fci):
379        BasicObserver.__init__(self)
380        self.connection_from(fci)
381
382    def write(self, s):
383        sys.stdout.write(s)
384
385    def flush(self):
386        sys.stdout.flush()
387
388
389# 'log_number' is a log file counter used for naming log files.
390log_number = 0
391
392def nextLogNumber():
393    global log_number
394    log_number = log_number + 1
395    return log_number
396
397
398class RecordingObserver (BasicObserver):
399    """Log request to a file.
400
401    o Filenames mangle connection and transaction numbers from the
402      ForwardedConnectionInfo passed as 'fci'.
403
404    o Decorates an underlying observer, created via the passed 'sub_factory'.
405
406    o Files are created in the supplied 'record_directory'.
407
408    o Unless suppressed, log response and error to corresponding files.
409    """
410    _ERROR_SOURCES = ('Server', 'Client')
411
412    # __implements__ = IConnectionObserver
413
414    def __init__(self, fci, sub_factory, record_directory,
415                 record_prefix='watch', record_responses=1, record_errors=1):
416        self._log_number = nextLogNumber()
417        self._decorated = sub_factory(fci)
418        self._directory = record_directory
419        self._prefix = record_prefix
420        self._response = record_responses
421        self._errors = record_errors
422
423    def connected(self, from_client):
424        """See IConnectionObserver.
425        """
426        self._decorated.connected(from_client)
427
428    def received(self, data, from_client):
429        """See IConnectionObserver.
430        """
431        if from_client or self._response:
432            extension = from_client and 'request' or 'response'
433            file = self._openForAppend(extension=extension)
434            file.write(data)
435            file.close()
436        self._decorated.received(data, from_client)
437
438    def closed(self, from_client):
439        """See IConnectionObserver.
440        """
441        self._decorated.closed(from_client)
442
443    def error(self, from_client, type, value):
444        """See IConnectionObserver.
445        """
446        if self._errors:
447            file = self._openForAppend(extension='errors')
448            file.write('(%s) %s: %s\n' % (self._ERROR_SOURCES[from_client],
449                                          type, value))
450        self._decorated.error(from_client, type, value)
451
452    def _openForAppend(self, extension):
453        """Open a file with the given extension for appending.
454
455        o File should be in the directory indicated by self._directory.
456
457        o File should have a filename '<prefix>_<conn #>.<extension>'.
458        """
459        filename = '%s%04d.%s' % (self._prefix, self._log_number, extension)
460        fqpath = os.path.join(self._directory, filename)
461        return open(fqpath, 'a')
462
463
464#############################################################################
465#
466# Tkinter GUI
467#
468#############################################################################
469
470
471def setupTk(titlepart, config_info, colorized=1):
472    """Starts the Tk application and returns an observer factory.
473    """
474
475    import Tkinter
476    from ScrolledText import ScrolledText
477    from Queue import Queue, Empty
478    try:
479        from cStringIO import StringIO
480    except ImportError:
481        from StringIO import StringIO
482
483    startup_text = COPYRIGHT + ("""
484
485Use your client to connect to the proxied port(s) then click
486the list on the left to see the data transferred.
487
488%s
489""" % config_info)
490
491
492    class TkTCPWatch (Tkinter.Frame):
493        '''The tcpwatch top-level window.
494        '''
495        def __init__(self, master):
496            Tkinter.Frame.__init__(self, master)
497            self.createWidgets()
498            # connections maps ids to TkConnectionObservers.
499            self.connections = {}
500            self.showingid = ''
501            self.queue = Queue()
502            self.processQueue()
503
504        def createWidgets(self):
505            listframe = Tkinter.Frame(self)
506            listframe.pack(side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
507            scrollbar = Tkinter.Scrollbar(listframe, orient=Tkinter.VERTICAL)
508            self.connectlist = Tkinter.Listbox(
509                listframe, yscrollcommand=scrollbar.set, exportselection=0)
510            scrollbar.config(command=self.connectlist.yview)
511            scrollbar.pack(side=Tkinter.RIGHT, fill=Tkinter.Y)
512            self.connectlist.pack(
513                side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
514            self.connectlist.bind('<Button-1>', self.mouseListSelect)
515            self.textbox = ScrolledText(self, background="#ffffff")
516            self.textbox.tag_config("message", foreground="#000000")
517            self.textbox.tag_config("client", foreground="#007700")
518            self.textbox.tag_config(
519                "clientesc", foreground="#007700", background="#dddddd")
520            self.textbox.tag_config("server", foreground="#770000")
521            self.textbox.tag_config(
522                "serveresc", foreground="#770000", background="#dddddd")
523            self.textbox.insert(Tkinter.END, startup_text, "message")
524            self.textbox.pack(side='right', fill=Tkinter.BOTH, expand=1)
525            self.pack(fill=Tkinter.BOTH, expand=1)
526
527        def addConnection(self, id, conn):
528            self.connections[id] = conn
529            connectlist = self.connectlist
530            connectlist.insert(Tkinter.END, id)
531
532        def updateConnection(self, id, output):
533            if id == self.showingid:
534                textbox = self.textbox
535                for data, style in output:
536                    textbox.insert(Tkinter.END, data, style)
537
538        def mouseListSelect(self, event=None):
539            connectlist = self.connectlist
540            idx = connectlist.nearest(event.y)
541            sel = connectlist.get(idx)
542            connections = self.connections
543            if connections.has_key(sel):
544                self.showingid = ''
545                output = connections[sel].getOutput()
546                self.textbox.delete(1.0, Tkinter.END)
547                for data, style in output:
548                    self.textbox.insert(Tkinter.END, data, style)
549                self.showingid = sel
550
551        def processQueue(self):
552            try:
553                if not self.queue.empty():
554                    # Process messages for up to 1/4 second
555                    from time import time
556                    limit = time() + 0.25
557                    while time() < limit:
558                        try:
559                            f, args = self.queue.get_nowait()
560                        except Empty:
561                            break
562                        f(*args)
563            finally:
564                self.master.after(50, self.processQueue)
565
566
567    class TkConnectionObserver (BasicObserver):
568        '''A connection observer which shows captured data in a TCPWatch
569        frame.  The data is mangled for presentation.
570        '''
571        # __implements__ = IConnectionObserver
572
573        def __init__(self, frame, fci, colorized=1):
574            BasicObserver.__init__(self)
575            self._output = []  # list of tuples containing (text, style)
576            self._frame = frame
577            self._colorized = colorized
578            t = localtime(fci.opened)
579            if fci.transaction > 1:
580                base_id = '%03d-%02d' % (
581                    fci.connection_number, fci.transaction)
582            else:
583                base_id = '%03d' % fci.connection_number
584            id = '%s (%02d:%02d:%02d)' % (base_id, t[3], t[4], t[5])
585            self._id = id
586            frame.queue.put((frame.addConnection, (id, self)))
587            self.connection_from(fci)
588
589        def write(self, s):
590            output = [(s, "message")]
591            self._output.extend(output)
592            self._frame.queue.put(
593                (self._frame.updateConnection, (self._id, output)))
594
595        def flush(self):
596            pass
597
598        def received(self, data, from_client):
599            if not self._colorized:
600                BasicObserver.received(self, data, from_client)
601                return
602
603            if not show_cr:
604                data = data.replace('\r', '')
605
606            output = []
607
608            extra_color = (self._colorized == 2)
609
610            if extra_color:
611                # 4 colors: Change the color client/server and escaped chars
612                def append(ss, escaped, output=output,
613                           from_client=from_client, escape=escape):
614                    if escaped:
615                        output.append((escape(ss), from_client
616                                       and 'clientesc' or 'serveresc'))
617                    else:
618                        output.append((ss, from_client
619                                       and 'client' or 'server'))
620            else:
621                # 2 colors: Only change color for client/server
622                segments = []
623                def append(ss, escaped, segments=segments,
624                           escape=escape):
625                    if escaped:
626                        segments.append(escape(ss))
627                    else:
628                        segments.append(ss)
629
630            # Escape the input data.
631            was_escaped = 0
632            start_idx = 0
633            for idx in xrange(len(data)):
634                c = data[idx]
635                escaped = (c < ' ' and c != '\n') or c >= '\x80'
636                if was_escaped != escaped:
637                    ss = data[start_idx:idx]
638                    if ss:
639                        append(ss, was_escaped)
640                    was_escaped = escaped
641                    start_idx = idx
642            ss = data[start_idx:]
643            if ss:
644                append(ss, was_escaped)
645
646            if not extra_color:
647                output.append((''.join(segments),
648                               from_client and 'client' or 'server'))
649
650            # Send output to the frame.
651            self._output.extend(output)
652            self._frame.queue.put(
653                (self._frame.updateConnection, (self._id, output)))
654            if data.endswith('\n'):
655                self.continuing_line = -1
656            else:
657                self.continuing_line = from_client
658
659        def getOutput(self):
660            return self._output
661
662
663    def createApp(titlepart):
664        master = Tkinter.Tk()
665        app = TkTCPWatch(master)
666        try:
667            wm_title = app.master.wm_title
668        except AttributeError:
669            pass  # No wm_title method available.
670        else:
671            wm_title('TCPWatch [%s]' % titlepart)
672        return app
673
674    app = createApp(titlepart)
675
676    def tkObserverFactory(fci, app=app, colorized=colorized):
677        return TkConnectionObserver(app, fci, colorized)
678
679    return tkObserverFactory, app.mainloop
680
681
682
683#############################################################################
684#
685# The HTTP splitter
686#
687# Derived from Zope.Server.HTTPServer.
688#
689#############################################################################
690
691
692def find_double_newline(s):
693    """Returns the position just after the double newline."""
694    pos1 = s.find('\n\r\n')  # One kind of double newline
695    if pos1 >= 0:
696        pos1 += 3
697    pos2 = s.find('\n\n')    # Another kind of double newline
698    if pos2 >= 0:
699        pos2 += 2
700
701    if pos1 >= 0:
702        if pos2 >= 0:
703            return min(pos1, pos2)
704        else:
705            return pos1
706    else:
707        return pos2
708
709
710
711class StreamedReceiver:
712    """Accepts data up to a specific limit."""
713
714    completed = 0
715
716    def __init__(self, cl, buf=None):
717        self.remain = cl
718        self.buf = buf
719        if cl < 1:
720            self.completed = 1
721
722    def received(self, data):
723        rm = self.remain
724        if rm < 1:
725            self.completed = 1  # Avoid any chance of spinning
726            return 0
727        buf = self.buf
728        datalen = len(data)
729        if rm <= datalen:
730            if buf is not None:
731                buf.append(data[:rm])
732            self.remain = 0
733            self.completed = 1
734            return rm
735        else:
736            if buf is not None:
737                buf.append(data)
738            self.remain -= datalen
739            return datalen
740
741
742
743class UnlimitedReceiver:
744    """Accepts data without limits."""
745
746    completed = 0
747
748    def received(self, data):
749        # always consume everything
750        return len(data)
751
752
753
754class ChunkedReceiver:
755    """Accepts all chunks."""
756
757    chunk_remainder = 0
758    control_line = ''
759    all_chunks_received = 0
760    trailer = ''
761    completed = 0
762
763
764    def __init__(self, buf=None):
765        self.buf = buf
766
767    def received(self, s):
768        # Returns the number of bytes consumed.
769        if self.completed:
770            return 0
771        orig_size = len(s)
772        while s:
773            rm = self.chunk_remainder
774            if rm > 0:
775                # Receive the remainder of a chunk.
776                to_write = s[:rm]
777                if self.buf is not None:
778                    self.buf.append(to_write)
779                written = len(to_write)
780                s = s[written:]
781                self.chunk_remainder -= written
782            elif not self.all_chunks_received:
783                # Receive a control line.
784                s = self.control_line + s
785                pos = s.find('\n')
786                if pos < 0:
787                    # Control line not finished.
788                    self.control_line = s
789                    s = ''
790                else:
791                    # Control line finished.
792                    line = s[:pos]
793                    s = s[pos + 1:]
794                    self.control_line = ''
795                    line = line.strip()
796                    if line:
797                        # Begin a new chunk.
798                        semi = line.find(';')
799                        if semi >= 0:
800                            # discard extension info.
801                            line = line[:semi]
802                        sz = int(line.strip(), 16)  # hexadecimal
803                        if sz > 0:
804                            # Start a new chunk.
805                            self.chunk_remainder = sz
806                        else:
807                            # Finished chunks.
808                            self.all_chunks_received = 1
809                    # else expect a control line.
810            else:
811                # Receive the trailer.
812                trailer = self.trailer + s
813                if trailer[:2] == '\r\n':
814                    # No trailer.
815                    self.completed = 1
816                    return orig_size - (len(trailer) - 2)
817                elif trailer[:1] == '\n':
818                    # No trailer.
819                    self.completed = 1
820                    return orig_size - (len(trailer) - 1)
821                pos = find_double_newline(trailer)
822                if pos < 0:
823                    # Trailer not finished.
824                    self.trailer = trailer
825                    s = ''
826                else:
827                    # Finished the trailer.
828                    self.completed = 1
829                    self.trailer = trailer[:pos]
830                    return orig_size - (len(trailer) - pos)
831        return orig_size
832
833
834
835class HTTPStreamParser:
836    """A structure that parses the HTTP stream.
837    """
838
839    completed = 0    # Set once request is completed.
840    empty = 0        # Set if no request was made.
841    header_plus = ''
842    chunked = 0
843    content_length = 0
844    body_rcv = None
845
846    # headers is a mapping containing keys translated to uppercase
847    # with dashes turned into underscores.
848
849    def __init__(self, is_a_request):
850        self.headers = {}
851        self.is_a_request = is_a_request
852        self.body_data = []
853
854    def received(self, data):
855        """Receives the HTTP stream for one request.
856
857        Returns the number of bytes consumed.
858        Sets the completed flag once both the header and the
859        body have been received.
860        """
861        if self.completed:
862            return 0  # Can't consume any more.
863        datalen = len(data)
864        br = self.body_rcv
865        if br is None:
866            # In header.
867            s = self.header_plus + data
868            index = find_double_newline(s)
869            if index >= 0:
870                # Header finished.
871                header_plus = s[:index]
872                consumed = len(data) - (len(s) - index)
873                self.in_header = 0
874                # Remove preceeding blank lines.
875                header_plus = header_plus.lstrip()
876                if not header_plus:
877                    self.empty = 1
878                    self.completed = 1
879                else:
880                    self.parse_header(header_plus)
881                    if self.body_rcv is None or self.body_rcv.completed:
882                        self.completed = 1
883                return consumed
884            else:
885                # Header not finished yet.
886                self.header_plus = s
887                return datalen
888        else:
889            # In body.
890            consumed = br.received(data)
891            self.body_data.append(data[:consumed])
892            if br.completed:
893                self.completed = 1
894            return consumed
895
896
897    def parse_header(self, header_plus):
898        """Parses the header_plus block of text.
899
900        (header_plus is the headers plus the first line of the request).
901        """
902        index = header_plus.find('\n')
903        if index >= 0:
904            first_line = header_plus[:index]
905            header = header_plus[index + 1:]
906        else:
907            first_line = header_plus
908            header = ''
909        self.first_line = first_line
910        self.header = header
911
912        lines = self.get_header_lines()
913        headers = self.headers
914        for line in lines:
915            index = line.find(':')
916            if index > 0:
917                key = line[:index]
918                value = line[index + 1:].strip()
919                key1 = key.upper().replace('-', '_')
920                headers[key1] = value
921            # else there's garbage in the headers?
922
923        if not self.is_a_request:
924            # Check for a 304 response.
925            parts = first_line.split()
926            if len(parts) >= 2 and parts[1] == '304':
927                # Expect no body.
928                self.body_rcv = StreamedReceiver(0)
929
930        if self.body_rcv is None:
931            # Ignore the HTTP version and just assume
932            # that the Transfer-Encoding header, when supplied, is valid.
933            te = headers.get('TRANSFER_ENCODING', '')
934            if te == 'chunked':
935                self.chunked = 1
936                self.body_rcv = ChunkedReceiver()
937            if not self.chunked:
938                cl = int(headers.get('CONTENT_LENGTH', -1))
939                self.content_length = cl
940                if cl >= 0 or self.is_a_request:
941                    self.body_rcv = StreamedReceiver(cl)
942                else:
943                    # No content length and this is a response.
944                    # We have to assume unlimited content length.
945                    self.body_rcv = UnlimitedReceiver()
946
947
948    def get_header_lines(self):
949        """Splits the header into lines, putting multi-line headers together.
950        """
951        r = []
952        lines = self.header.split('\n')
953        for line in lines:
954            if line.endswith('\r'):
955                line = line[:-1]
956            if line and line[0] in ' \t':
957                r[-1] = r[-1] + line[1:]
958            else:
959                r.append(line)
960        return r
961
962
963
964class HTTPConnectionSplitter:
965    """Makes a new observer for each HTTP subconnection and forwards events.
966    """
967
968    # __implements__ = IConnectionObserver
969    req_index = 0
970    resp_index = 0
971
972    def __init__(self, sub_factory, fci):
973        self.sub_factory = sub_factory
974        self.transactions = []  # (observer, request_data, response_data)
975        self.fci = fci
976        self._newTransaction()
977
978    def _newTransaction(self):
979        fci = self.fci.dup()
980        fci.transaction = len(self.transactions) + 1
981        obs = self.sub_factory(fci)
982        req = HTTPStreamParser(1)
983        resp = HTTPStreamParser(0)
984        self.transactions.append((obs, req, resp))
985
986    def _mostRecentObs(self):
987        return self.transactions[-1][0]
988
989    def connected(self, from_client):
990        self._mostRecentObs().connected(from_client)
991
992    def closed(self, from_client):
993        self._mostRecentObs().closed(from_client)
994
995    def error(self, from_client, type, value):
996        self._mostRecentObs().error(from_client, type, value)
997
998    def received(self, data, from_client):
999        transactions = self.transactions
1000        while data:
1001            if from_client:
1002                index = self.req_index
1003            else:
1004                index = self.resp_index
1005            if index >= len(transactions):
1006                self._newTransaction()
1007
1008            obs, req, resp = transactions[index]
1009            if from_client:
1010                parser = req
1011            else:
1012                parser = resp
1013
1014            consumed = parser.received(data)
1015            obs.received(data[:consumed], from_client)
1016            data = data[consumed:]
1017            if parser.completed:
1018                new_index = index + 1
1019                if from_client:
1020                    self.req_index = new_index
1021                else:
1022                    self.resp_index = new_index
1023
1024
1025#############################################################################
1026#
1027# HTTP proxy
1028#
1029#############################################################################
1030
1031
1032class HTTPProxyToServerConnection (ForwardingEndpoint):
1033    """Ensures that responses to a persistent HTTP connection occur
1034    in the correct order."""
1035
1036    finished = 0
1037
1038    def __init__(self, proxy_conn, dests=()):
1039        ForwardingEndpoint.__init__(self)
1040        self.response_parser = HTTPStreamParser(0)
1041        self.proxy_conn = proxy_conn
1042        self.set_dests(dests)
1043
1044        # Data for the client held until previous responses are sent
1045        self.held = []
1046
1047    def _isMyTurn(self):
1048        """Returns a true value if it's time for this response
1049        to respond to the client."""
1050        order = self.proxy_conn._response_order
1051        if order:
1052            return (order[0] is self)
1053        return 1
1054
1055    def received(self, data):
1056        """Receives data from the HTTP server to be sent back to the client."""
1057        while 1:
1058            parser = self.response_parser
1059            if parser.completed:
1060                self.finished = 1
1061                self.flush()
1062                # Note that any extra data returned from the server is
1063                # ignored. Should it be? :-(
1064                return
1065            if not data:
1066                break
1067            consumed = parser.received(data)
1068            fragment = data[:consumed]
1069            data = data[consumed:]
1070            ForwardingEndpoint.received(self, fragment)
1071            self.held.append(fragment)
1072            self.flush()
1073
1074    def flush(self):
1075        """Flushes buffers and, if the response has been sent, allows
1076        the next response to take over.
1077        """
1078        if self.held and self._isMyTurn():
1079            data = ''.join(self.held)
1080            del self.held[:]
1081            self.proxy_conn.write(data)
1082        if self.finished:
1083            order = self.proxy_conn._response_order
1084            if order and order[0] is self:
1085                del order[0]
1086            if order:
1087                order[0].flush()  # kick!
1088
1089    def handle_close(self):
1090        """The HTTP server closed the connection.
1091        """
1092        ForwardingEndpoint.handle_close(self)
1093        if not self.finished:
1094            # Cancel the proxy connection, even if there are responses
1095            # pending, since the HTTP spec provides no way to recover
1096            # from an unfinished response.
1097            self.proxy_conn.close()
1098
1099    def close(self):
1100        """Close the connection to the server.
1101
1102        If there is unsent response data, an error is generated.
1103        """
1104        self.flush()
1105        if not self.finished:
1106            t = IOError
1107            v = 'Closed without finishing response to client'
1108            for d in self._dests:
1109                if hasattr(d, 'error'):
1110                    d.error(t, v)
1111        ForwardingEndpoint.close(self)
1112
1113
1114
1115class HTTPProxyToClientConnection (ForwardingEndpoint):
1116    """A connection from a client to the proxy server"""
1117
1118    _req_parser = None
1119    _transaction = 0
1120    _obs = None
1121
1122    def __init__(self, conn, factory, counter, addr):
1123        ForwardingEndpoint.__init__(self, conn)
1124        self._obs_factory = factory
1125        self._counter = counter
1126        self._client_addr = addr
1127        self._response_order = []
1128        self._newRequest()
1129
1130    def _newRequest(self):
1131        """Starts a new request on a persistent connection."""
1132        if self._req_parser is None:
1133            self._req_parser = HTTPStreamParser(1)
1134        factory = self._obs_factory
1135        if factory is not None:
1136            fci = ForwardedConnectionInfo(self._counter, self._client_addr)
1137            self._transaction = self._transaction + 1
1138            fci.transaction = self._transaction
1139            obs = factory(fci)
1140            self._obs = obs
1141            self.set_dests((EndpointObserver(obs, 1),))
1142
1143    def received(self, data):
1144        """Accepts data received from the client."""
1145        while data:
1146            parser = self._req_parser
1147            if parser is None:
1148                # Begin another request.
1149                self._newRequest()
1150                parser = self._req_parser
1151            if not parser.completed:
1152                # Waiting for a complete request.
1153                consumed = parser.received(data)
1154                ForwardingEndpoint.received(self, data[:consumed])
1155                data = data[consumed:]
1156            if parser.completed:
1157                # Connect to a server.
1158                self.openProxyConnection(parser)
1159                # Expect a new request or a closed connection.
1160                self._req_parser = None
1161
1162    def openProxyConnection(self, request):
1163        """Parses the client connection and opens a connection to an
1164        HTTP server.
1165        """
1166        first_line = request.first_line.strip()
1167        if not ' ' in first_line:
1168            raise ValueError, ('Malformed request: %s' % first_line)
1169        command, url = first_line.split(' ', 1)
1170        pos = url.rfind(' HTTP/')
1171        if pos >= 0:
1172            protocol = url[pos + 1:]
1173            url = url[:pos].rstrip()
1174        else:
1175            protocol = 'HTTP/1.0'
1176        if url.startswith('http://'):
1177            # Standard proxy
1178            urlpart = url[7:]
1179            if '/' in urlpart:
1180                host, path = url[7:].split('/', 1)
1181                path = '/' + path
1182            else:
1183                host = urlpart
1184                path = '/'
1185        else:
1186            # Transparent proxy
1187            host = request.headers.get('HOST')
1188            path = url
1189        if not host:
1190            raise ValueError, ('Request type not supported: %s' % url)
1191
1192        if ':' in host:
1193            host, port = host.split(':')
1194            port = int(port)
1195        else:
1196            port = 80
1197
1198        if '@' in host:
1199            username, host = host.split('@')
1200
1201        obs = self._obs
1202        if obs is not None:
1203            eo = EndpointObserver(obs, 0)
1204            ptos = HTTPProxyToServerConnection(self, (eo,))
1205        else:
1206            ptos = HTTPProxyToServerConnection(self)
1207
1208        self._response_order.append(ptos)
1209
1210        ptos.write('%s %s %s\r\n' % (command, path, protocol))
1211        # Duplicate the headers sent by the client.
1212        if request.header:
1213            ptos.write(request.header)
1214        else:
1215            ptos.write('\r\n')
1216        if request.body_data:
1217            ptos.write(''.join(request.body_data))
1218        ptos.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1219        ptos.connect((host, port))
1220
1221    def close(self):
1222        """Closes the connection to the client.
1223
1224        If there are open connections to proxy servers, the server
1225        connections are also closed.
1226        """
1227        ForwardingEndpoint.close(self)
1228        for ptos in self._response_order:
1229            ptos.close()
1230        del self._response_order[:]
1231
1232
1233class HTTPProxyService (asyncore.dispatcher):
1234    """A minimal HTTP proxy server"""
1235
1236    connection_class = HTTPProxyToClientConnection
1237
1238    _counter = 0
1239
1240    def __init__(self, listen_host, listen_port, observer_factory=None):
1241        self._obs_factory = observer_factory
1242        asyncore.dispatcher.__init__(self)
1243        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
1244        self.set_reuse_addr()
1245        self.bind((listen_host, listen_port))
1246        self.listen(5)
1247
1248    def handle_accept(self):
1249        info = self.accept()
1250        if info:
1251            # Got a connection.
1252            conn, addr = info
1253            conn.setblocking(0)
1254            counter = self._counter + 1
1255            self._counter = counter
1256            self.connection_class(conn, self._obs_factory, counter, addr)
1257
1258    def handle_error(self):
1259        # Don't stop the server.
1260        import traceback
1261        traceback.print_exc()
1262
1263
1264#############################################################################
1265#
1266# Command-line interface
1267#
1268#############################################################################
1269
1270def usage():
1271    sys.stderr.write(COPYRIGHT + '\n')
1272    sys.stderr.write(
1273        """TCP monitoring and logging tool with support for HTTP 1.1
1274Simple usage: tcpwatch.py -L listen_port:dest_hostname:dest_port
1275
1276TCP forwarded connection setup:
1277  -L <listen_port>:<dest_port>
1278     Set up a local forwarded connection
1279  -L <listen_port>:<dest_host>:<dest_port>
1280     Set up a forwarded connection to a specified host
1281  -L <listen_host>:<listen_port>:<dest_host>:<dest_port>
1282     Set up a forwarded connection to a specified host, bound to an interface
1283
1284HTTP setup:
1285  -h (or --http) Split forwarded HTTP persistent connections
1286  -p [<listen_host>:]<listen_port> Run an HTTP proxy
1287
1288Output options:
1289  -s   Output to stdout instead of a Tkinter window
1290  -n   No color in GUI (faster and consumes less RAM)
1291  -c   Extra color (colorizes escaped characters)
1292  --cr     Show carriage returns (ASCII 13)
1293  --help   Show usage information
1294
1295Recording options:
1296  -r <path>  (synonyms: -R, --record-directory)
1297    Write recorded data to <path>.  By default, creates request and
1298    response files for each request, and writes a corresponding error file
1299    for any error detected by tcpwatch.
1300  --record-prefix=<prefix>
1301    Use <prefix> as the file prefix for logged request / response / error
1302    files (defaults to 'watch').
1303  --no-record-responses
1304    Suppress writing '.response' files.
1305  --no-record-errors
1306    Suppress writing '.error' files.
1307""")
1308    sys.exit()
1309
1310
1311def usageError(s):
1312    sys.stderr.write(str(s) + '\n\n')
1313    usage()
1314
1315
1316def main(args):
1317    global show_cr
1318
1319    try:
1320        optlist, extra = getopt.getopt(args, 'chL:np:r:R:s',
1321                                       ['help', 'http', 'cr',
1322                                        'record-directory=',
1323                                        'record-prefix=',
1324                                        'no-record-responses',
1325                                        'no-record-errors',
1326                                       ])
1327    except getopt.GetoptError, msg:
1328        usageError(msg)
1329
1330    fwd_params = []
1331    proxy_params = []
1332    obs_factory = None
1333    show_config = 0
1334    split_http = 0
1335    colorized = 1
1336    record_directory = None
1337    record_prefix = 'watch'
1338    record_responses = 1
1339    record_errors = 1
1340    recording = {}
1341
1342    for option, value in optlist:
1343        if option == '--help':
1344            usage()
1345        elif option == '--http' or option == '-h':
1346            split_http = 1
1347        elif option == '-n':
1348            colorized = 0
1349        elif option == '-c':
1350            colorized = 2
1351        elif option == '--cr':
1352            show_cr = 1
1353        elif option == '-s':
1354            show_config = 1
1355            obs_factory = StdoutObserver
1356        elif option == '-p':
1357            # HTTP proxy
1358            info = value.split(':')
1359            listen_host = ''
1360            if len(info) == 1:
1361                listen_port = int(info[0])
1362            elif len(info) == 2:
1363                listen_host = info[0]
1364                listen_port = int(info[1])
1365            else:
1366                usageError('-p requires a port or a host:port parameter')
1367            proxy_params.append((listen_host, listen_port))
1368        elif option == '-L':
1369            # TCP forwarder
1370            info = value.split(':')
1371            listen_host = ''
1372            dest_host = ''
1373            if len(info) == 2:
1374                listen_port = int(info[0])
1375                dest_port = int(info[1])
1376            elif len(info) == 3:
1377                listen_port = int(info[0])
1378                dest_host = info[1]
1379                dest_port = int(info[2])
1380            elif len(info) == 4:
1381                listen_host = info[0]
1382                listen_port = int(info[1])
1383                dest_host = info[2]
1384                dest_port = int(info[3])
1385            else:
1386                usageError('-L requires 2, 3, or 4 colon-separated parameters')
1387            fwd_params.append(
1388                (listen_host, listen_port, dest_host, dest_port))
1389        elif (option == '-r'
1390              or option == '-R'
1391              or option == '--record-directory'):
1392            record_directory = value
1393        elif option == '--record-prefix':
1394            record_prefix = value
1395        elif option == '--no-record-responses':
1396            record_responses = 0
1397        elif option == '--no-record-errors':
1398            record_errors = 0
1399
1400    if not fwd_params and not proxy_params:
1401        usageError("At least one -L or -p option is required.")
1402
1403    # Prepare the configuration display.
1404    config_info_lines = []
1405    title_lst = []
1406    if fwd_params:
1407        config_info_lines.extend(map(
1408            lambda args: 'Forwarding %s:%d -> %s:%d' % args, fwd_params))
1409        title_lst.extend(map(
1410            lambda args: '%s:%d -> %s:%d' % args, fwd_params))
1411    if proxy_params:
1412        config_info_lines.extend(map(
1413            lambda args: 'HTTP proxy listening on %s:%d' % args, proxy_params))
1414        title_lst.extend(map(
1415            lambda args: '%s:%d -> proxy' % args, proxy_params))
1416    if split_http:
1417        config_info_lines.append('HTTP connection splitting enabled.')
1418    if record_directory:
1419        config_info_lines.append(
1420            'Recording to directory %s.' % record_directory)
1421    config_info = '\n'.join(config_info_lines)
1422    titlepart = ', '.join(title_lst)
1423    mainloop = None
1424
1425    if obs_factory is None:
1426        # If no observer factory has been specified, use Tkinter.
1427        obs_factory, mainloop = setupTk(titlepart, config_info, colorized)
1428
1429    if record_directory:
1430        def _decorateRecorder(fci, sub_factory=obs_factory,
1431                              record_directory=record_directory,
1432                              record_prefix=record_prefix,
1433                              record_responses=record_responses,
1434                              record_errors=record_errors):
1435            return RecordingObserver(fci, sub_factory, record_directory,
1436                                     record_prefix, record_responses,
1437                                     record_errors)
1438        obs_factory = _decorateRecorder
1439
1440    chosen_factory = obs_factory
1441    if split_http:
1442        # Put an HTTPConnectionSplitter between the events and the output.
1443        def _factory(fci, sub_factory=obs_factory):
1444            return HTTPConnectionSplitter(sub_factory, fci)
1445        chosen_factory = _factory
1446    # obs_factory is the connection observer factory without HTTP
1447    # connection splitting, while chosen_factory may have connection
1448    # splitting.  Proxy services use obs_factory rather than the full
1449    # chosen_factory because proxy services perform connection
1450    # splitting internally.
1451
1452    services = []
1453    try:
1454        # Start forwarding services.
1455        for params in fwd_params:
1456            args = params + (chosen_factory,)
1457            s = ForwardingService(*args)
1458            services.append(s)
1459
1460        # Start proxy services.
1461        for params in proxy_params:
1462            args = params + (obs_factory,)
1463            s = HTTPProxyService(*args)
1464            services.append(s)
1465
1466        if show_config:
1467            sys.stderr.write(config_info + '\n')
1468
1469        # Run the main loop.
1470        try:
1471            if mainloop is not None:
1472                import thread
1473                thread.start_new_thread(asyncore.loop, (), {'timeout': 1.0})
1474                mainloop()
1475            else:
1476                asyncore.loop(timeout=1.0)
1477        except KeyboardInterrupt:
1478            sys.stderr.write('TCPWatch finished.\n')
1479    finally:
1480        for s in services:
1481            s.close()
1482
1483
1484if __name__ == '__main__':
1485    main(sys.argv[1:])
1486