1#!/usr/bin/env python 2 3############################################################################# 4# 5# Zope Public License (ZPL) Version 2.0 6# ----------------------------------------------- 7# 8# This software is Copyright (c) Zope Corporation (tm) and 9# Contributors. All rights reserved. 10# 11# This license has been certified as open source. It has also 12# been designated as GPL compatible by the Free Software 13# Foundation (FSF). 14# 15# Redistribution and use in source and binary forms, with or 16# without modification, are permitted provided that the 17# following conditions are met: 18# 19# 1. Redistributions in source code must retain the above 20# copyright notice, this list of conditions, and the following 21# disclaimer. 22# 23# 2. Redistributions in binary form must reproduce the above 24# copyright notice, this list of conditions, and the following 25# disclaimer in the documentation and/or other materials 26# provided with the distribution. 27# 28# 3. The name Zope Corporation (tm) must not be used to 29# endorse or promote products derived from this software 30# without prior written permission from Zope Corporation. 31# 32# 4. The right to distribute this software or to use it for 33# any purpose does not give you the right to use Servicemarks 34# (sm) or Trademarks (tm) of Zope Corporation. Use of them is 35# covered in a separate agreement (see 36# http://www.zope.com/Marks). 37# 38# 5. If any files are modified, you must cause the modified 39# files to carry prominent notices stating that you changed 40# the files and the date of any change. 41# 42# Disclaimer 43# 44# THIS SOFTWARE IS PROVIDED BY ZOPE CORPORATION ``AS IS'' 45# AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT 46# NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 47# AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN 48# NO EVENT SHALL ZOPE CORPORATION OR ITS CONTRIBUTORS BE 49# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 50# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 51# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 52# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 53# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 54# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE 55# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 56# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 57# DAMAGE. 58# 59# 60# This software consists of contributions made by Zope 61# Corporation and many individuals on behalf of Zope 62# Corporation. Specific attributions are listed in the 63# accompanying credits file. 64# 65############################################################################# 66"""TCPWatch, a connection forwarder and HTTP proxy for monitoring connections. 67 68Requires Python 2.1 or above. 69 70Revision information: 71$Id: tcpwatch.py,v 1.9 2004/06/17 00:03:46 shane Exp $ 72""" 73 74from __future__ import nested_scopes 75 76VERSION = '1.3' 77COPYRIGHT = ( 78 'TCPWatch %s Copyright 2001 Shane Hathaway, Zope Corporation' 79 % VERSION) 80 81import sys 82import os 83import socket 84import asyncore 85import getopt 86from time import time, localtime 87 88 89RECV_BUFFER_SIZE = 8192 90show_cr = 0 91 92 93############################################################################# 94# 95# Connection forwarder 96# 97############################################################################# 98 99 100class ForwardingEndpoint (asyncore.dispatcher): 101 """A socket wrapper that accepts and generates stream messages. 102 """ 103 _dests = () 104 105 def __init__(self, conn=None): 106 self._outbuf = [] 107 asyncore.dispatcher.__init__(self, conn) 108 109 def set_dests(self, dests): 110 """Sets the destination streams. 111 """ 112 self._dests = dests 113 114 def write(self, data): 115 if data: 116 self._outbuf.append(data) 117 self.handle_write() 118 119 def readable(self): 120 return 1 121 122 def writable(self): 123 return not self.connected or len(self._outbuf) > 0 124 125 def handle_connect(self): 126 for d in self._dests: 127 d.write('') # A blank string means the socket just connected. 128 129 def received(self, data): 130 if data: 131 for d in self._dests: 132 d.write(data) 133 134 def handle_read(self): 135 data = self.recv(RECV_BUFFER_SIZE) 136 self.received(data) 137 138 def handle_write(self): 139 if not self.connected: 140 # Wait for a connection. 141 return 142 buf = self._outbuf 143 while buf: 144 data = buf.pop(0) 145 if data: 146 sent = self.send(data) 147 if sent < len(data): 148 buf.insert(0, data[sent:]) 149 break 150 151 def handle_close (self): 152 dests = self._dests 153 self._dests = () 154 for d in dests: 155 d.close() 156 self.close() 157 158 def handle_error(self): 159 t, v = sys.exc_info()[:2] 160 for d in self._dests: 161 if hasattr(d, 'error'): 162 d.error(t, v) 163 self.handle_close() 164 165 166 167class EndpointObserver: 168 """Sends stream events to a ConnectionObserver. 169 170 Streams don't distinguish sources, while ConnectionObservers do. 171 This adapter adds source information to stream events. 172 """ 173 174 def __init__(self, obs, from_client): 175 self.obs = obs 176 self.from_client = from_client 177 178 def write(self, data): 179 if data: 180 self.obs.received(data, self.from_client) 181 else: 182 self.obs.connected(self.from_client) 183 184 def close(self): 185 self.obs.closed(self.from_client) 186 187 def error(self, t, v): 188 self.obs.error(self.from_client, t, v) 189 190 191 192class ForwardedConnectionInfo: 193 transaction = 1 194 195 def __init__(self, connection_number, client_addr, server_addr=None): 196 self.opened = time() 197 self.connection_number = connection_number 198 self.client_addr = client_addr 199 self.server_addr = server_addr 200 201 def dup(self): 202 return ForwardedConnectionInfo(self.connection_number, 203 self.client_addr, 204 self.server_addr) 205 206 207 208class ForwardingService (asyncore.dispatcher): 209 210 _counter = 0 211 212 def __init__(self, listen_host, listen_port, dest_host, dest_port, 213 observer_factory=None): 214 self._obs_factory = observer_factory 215 self._dest = (dest_host, dest_port) 216 asyncore.dispatcher.__init__(self) 217 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 218 self.set_reuse_addr() 219 self.bind((listen_host, listen_port)) 220 self.listen(5) 221 222 def handle_accept(self): 223 info = self.accept() 224 if info: 225 # Got a connection. 226 conn, addr = info 227 conn.setblocking(0) 228 229 ep1 = ForwardingEndpoint() # connects client to self 230 ep2 = ForwardingEndpoint() # connects self to server 231 232 counter = self._counter + 1 233 self._counter = counter 234 factory = self._obs_factory 235 if factory is not None: 236 fci = ForwardedConnectionInfo(counter, addr, self._dest) 237 obs = factory(fci) 238 dests1 = (ep2, EndpointObserver(obs, 1)) 239 dests2 = (ep1, EndpointObserver(obs, 0)) 240 else: 241 dests1 = (ep2,) 242 dests2 = (ep1,) 243 244 ep1.set_dests(dests1) 245 ep2.set_dests(dests2) 246 247 # Now everything is hooked up. Let data pass. 248 ep2.create_socket(socket.AF_INET, socket.SOCK_STREAM) 249 ep1.set_socket(conn) 250 ep1.connected = 1 # We already know the client connected. 251 ep2.connect(self._dest) 252 253 def handle_error(self): 254 # Don't stop the server. 255 import traceback 256 traceback.print_exc() 257 258 259 260class IConnectionObserver: 261 262 def connected(from_client): 263 """Called when the client or the server connects. 264 """ 265 266 def received(data, from_client): 267 """Called when the client or the server sends data. 268 """ 269 270 def closed(from_client): 271 """Called when the client or the server closes the channel. 272 """ 273 274 def error(from_client, type, value): 275 """Called when an error occurs in the client or the server channel. 276 """ 277 278 279############################################################################# 280# 281# Basic abstract connection observer and stdout observer 282# 283############################################################################# 284 285 286def escape(s): 287 # XXX This might be a brittle trick. :-( 288 return repr('"\'' + str(s))[4:-1] 289 290 291class BasicObserver: 292 293 continuing_line = -1 # Tracks when a line isn't finished. 294 arrows = ('<==', '==>') 295 296 def __init__(self): 297 self._start = time() 298 299 def _output_message(self, m, from_client): 300 if self.continuing_line >= 0: 301 self.write('\n') 302 self.continuing_line = -1 303 if from_client: 304 who = 'client' 305 else: 306 who = 'server' 307 308 t = time() - self._start 309 min, sec = divmod(t, 60) 310 self.write('[%02d:%06.3f - %s %s]\n' % (min, sec, who, m)) 311 self.flush() 312 313 def connection_from(self, fci): 314 if fci.server_addr is not None: 315 self._output_message( 316 '%s:%s forwarded to %s:%s' % 317 (tuple(fci.client_addr) + tuple(fci.server_addr)), 1) 318 else: 319 self._output_message( 320 'connection from %s:%s' % 321 (tuple(fci.client_addr)), 1) 322 323 if fci.transaction > 1: 324 self._output_message( 325 ('HTTP transaction #%d' % fci.transaction), 1) 326 327 def connected(self, from_client): 328 self._output_message('connected', from_client) 329 330 def received(self, data, from_client): 331 arrow = self.arrows[from_client] 332 cl = self.continuing_line 333 if cl >= 0: 334 if cl != from_client: 335 # Switching directions. 336 self.write('\n%s' % arrow) 337 else: 338 self.write(arrow) 339 340 if data.endswith('\n'): 341 data = data[:-1] 342 newline = 1 343 else: 344 newline = 0 345 346 if not show_cr: 347 data = data.replace('\r', '') 348 lines = data.split('\n') 349 lines = map(escape, lines) 350 s = ('\n%s' % arrow).join(lines) 351 self.write(s) 352 353 if newline: 354 self.write('\n') 355 self.continuing_line = -1 356 else: 357 self.continuing_line = from_client 358 self.flush() 359 360 def closed(self, from_client): 361 self._output_message('closed', from_client) 362 363 def error(self, from_client, type, value): 364 self._output_message( 365 'connection error %s: %s' % (type, value), from_client) 366 367 def write(self, s): 368 raise NotImplementedError 369 370 def flush(self): 371 raise NotImplementedError 372 373 374class StdoutObserver (BasicObserver): 375 376 # __implements__ = IConnectionObserver 377 378 def __init__(self, fci): 379 BasicObserver.__init__(self) 380 self.connection_from(fci) 381 382 def write(self, s): 383 sys.stdout.write(s) 384 385 def flush(self): 386 sys.stdout.flush() 387 388 389# 'log_number' is a log file counter used for naming log files. 390log_number = 0 391 392def nextLogNumber(): 393 global log_number 394 log_number = log_number + 1 395 return log_number 396 397 398class RecordingObserver (BasicObserver): 399 """Log request to a file. 400 401 o Filenames mangle connection and transaction numbers from the 402 ForwardedConnectionInfo passed as 'fci'. 403 404 o Decorates an underlying observer, created via the passed 'sub_factory'. 405 406 o Files are created in the supplied 'record_directory'. 407 408 o Unless suppressed, log response and error to corresponding files. 409 """ 410 _ERROR_SOURCES = ('Server', 'Client') 411 412 # __implements__ = IConnectionObserver 413 414 def __init__(self, fci, sub_factory, record_directory, 415 record_prefix='watch', record_responses=1, record_errors=1): 416 self._log_number = nextLogNumber() 417 self._decorated = sub_factory(fci) 418 self._directory = record_directory 419 self._prefix = record_prefix 420 self._response = record_responses 421 self._errors = record_errors 422 423 def connected(self, from_client): 424 """See IConnectionObserver. 425 """ 426 self._decorated.connected(from_client) 427 428 def received(self, data, from_client): 429 """See IConnectionObserver. 430 """ 431 if from_client or self._response: 432 extension = from_client and 'request' or 'response' 433 file = self._openForAppend(extension=extension) 434 file.write(data) 435 file.close() 436 self._decorated.received(data, from_client) 437 438 def closed(self, from_client): 439 """See IConnectionObserver. 440 """ 441 self._decorated.closed(from_client) 442 443 def error(self, from_client, type, value): 444 """See IConnectionObserver. 445 """ 446 if self._errors: 447 file = self._openForAppend(extension='errors') 448 file.write('(%s) %s: %s\n' % (self._ERROR_SOURCES[from_client], 449 type, value)) 450 self._decorated.error(from_client, type, value) 451 452 def _openForAppend(self, extension): 453 """Open a file with the given extension for appending. 454 455 o File should be in the directory indicated by self._directory. 456 457 o File should have a filename '<prefix>_<conn #>.<extension>'. 458 """ 459 filename = '%s%04d.%s' % (self._prefix, self._log_number, extension) 460 fqpath = os.path.join(self._directory, filename) 461 return open(fqpath, 'a') 462 463 464############################################################################# 465# 466# Tkinter GUI 467# 468############################################################################# 469 470 471def setupTk(titlepart, config_info, colorized=1): 472 """Starts the Tk application and returns an observer factory. 473 """ 474 475 import Tkinter 476 from ScrolledText import ScrolledText 477 from Queue import Queue, Empty 478 try: 479 from cStringIO import StringIO 480 except ImportError: 481 from StringIO import StringIO 482 483 startup_text = COPYRIGHT + (""" 484 485Use your client to connect to the proxied port(s) then click 486the list on the left to see the data transferred. 487 488%s 489""" % config_info) 490 491 492 class TkTCPWatch (Tkinter.Frame): 493 '''The tcpwatch top-level window. 494 ''' 495 def __init__(self, master): 496 Tkinter.Frame.__init__(self, master) 497 self.createWidgets() 498 # connections maps ids to TkConnectionObservers. 499 self.connections = {} 500 self.showingid = '' 501 self.queue = Queue() 502 self.processQueue() 503 504 def createWidgets(self): 505 listframe = Tkinter.Frame(self) 506 listframe.pack(side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1) 507 scrollbar = Tkinter.Scrollbar(listframe, orient=Tkinter.VERTICAL) 508 self.connectlist = Tkinter.Listbox( 509 listframe, yscrollcommand=scrollbar.set, exportselection=0) 510 scrollbar.config(command=self.connectlist.yview) 511 scrollbar.pack(side=Tkinter.RIGHT, fill=Tkinter.Y) 512 self.connectlist.pack( 513 side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1) 514 self.connectlist.bind('<Button-1>', self.mouseListSelect) 515 self.textbox = ScrolledText(self, background="#ffffff") 516 self.textbox.tag_config("message", foreground="#000000") 517 self.textbox.tag_config("client", foreground="#007700") 518 self.textbox.tag_config( 519 "clientesc", foreground="#007700", background="#dddddd") 520 self.textbox.tag_config("server", foreground="#770000") 521 self.textbox.tag_config( 522 "serveresc", foreground="#770000", background="#dddddd") 523 self.textbox.insert(Tkinter.END, startup_text, "message") 524 self.textbox.pack(side='right', fill=Tkinter.BOTH, expand=1) 525 self.pack(fill=Tkinter.BOTH, expand=1) 526 527 def addConnection(self, id, conn): 528 self.connections[id] = conn 529 connectlist = self.connectlist 530 connectlist.insert(Tkinter.END, id) 531 532 def updateConnection(self, id, output): 533 if id == self.showingid: 534 textbox = self.textbox 535 for data, style in output: 536 textbox.insert(Tkinter.END, data, style) 537 538 def mouseListSelect(self, event=None): 539 connectlist = self.connectlist 540 idx = connectlist.nearest(event.y) 541 sel = connectlist.get(idx) 542 connections = self.connections 543 if connections.has_key(sel): 544 self.showingid = '' 545 output = connections[sel].getOutput() 546 self.textbox.delete(1.0, Tkinter.END) 547 for data, style in output: 548 self.textbox.insert(Tkinter.END, data, style) 549 self.showingid = sel 550 551 def processQueue(self): 552 try: 553 if not self.queue.empty(): 554 # Process messages for up to 1/4 second 555 from time import time 556 limit = time() + 0.25 557 while time() < limit: 558 try: 559 f, args = self.queue.get_nowait() 560 except Empty: 561 break 562 f(*args) 563 finally: 564 self.master.after(50, self.processQueue) 565 566 567 class TkConnectionObserver (BasicObserver): 568 '''A connection observer which shows captured data in a TCPWatch 569 frame. The data is mangled for presentation. 570 ''' 571 # __implements__ = IConnectionObserver 572 573 def __init__(self, frame, fci, colorized=1): 574 BasicObserver.__init__(self) 575 self._output = [] # list of tuples containing (text, style) 576 self._frame = frame 577 self._colorized = colorized 578 t = localtime(fci.opened) 579 if fci.transaction > 1: 580 base_id = '%03d-%02d' % ( 581 fci.connection_number, fci.transaction) 582 else: 583 base_id = '%03d' % fci.connection_number 584 id = '%s (%02d:%02d:%02d)' % (base_id, t[3], t[4], t[5]) 585 self._id = id 586 frame.queue.put((frame.addConnection, (id, self))) 587 self.connection_from(fci) 588 589 def write(self, s): 590 output = [(s, "message")] 591 self._output.extend(output) 592 self._frame.queue.put( 593 (self._frame.updateConnection, (self._id, output))) 594 595 def flush(self): 596 pass 597 598 def received(self, data, from_client): 599 if not self._colorized: 600 BasicObserver.received(self, data, from_client) 601 return 602 603 if not show_cr: 604 data = data.replace('\r', '') 605 606 output = [] 607 608 extra_color = (self._colorized == 2) 609 610 if extra_color: 611 # 4 colors: Change the color client/server and escaped chars 612 def append(ss, escaped, output=output, 613 from_client=from_client, escape=escape): 614 if escaped: 615 output.append((escape(ss), from_client 616 and 'clientesc' or 'serveresc')) 617 else: 618 output.append((ss, from_client 619 and 'client' or 'server')) 620 else: 621 # 2 colors: Only change color for client/server 622 segments = [] 623 def append(ss, escaped, segments=segments, 624 escape=escape): 625 if escaped: 626 segments.append(escape(ss)) 627 else: 628 segments.append(ss) 629 630 # Escape the input data. 631 was_escaped = 0 632 start_idx = 0 633 for idx in xrange(len(data)): 634 c = data[idx] 635 escaped = (c < ' ' and c != '\n') or c >= '\x80' 636 if was_escaped != escaped: 637 ss = data[start_idx:idx] 638 if ss: 639 append(ss, was_escaped) 640 was_escaped = escaped 641 start_idx = idx 642 ss = data[start_idx:] 643 if ss: 644 append(ss, was_escaped) 645 646 if not extra_color: 647 output.append((''.join(segments), 648 from_client and 'client' or 'server')) 649 650 # Send output to the frame. 651 self._output.extend(output) 652 self._frame.queue.put( 653 (self._frame.updateConnection, (self._id, output))) 654 if data.endswith('\n'): 655 self.continuing_line = -1 656 else: 657 self.continuing_line = from_client 658 659 def getOutput(self): 660 return self._output 661 662 663 def createApp(titlepart): 664 master = Tkinter.Tk() 665 app = TkTCPWatch(master) 666 try: 667 wm_title = app.master.wm_title 668 except AttributeError: 669 pass # No wm_title method available. 670 else: 671 wm_title('TCPWatch [%s]' % titlepart) 672 return app 673 674 app = createApp(titlepart) 675 676 def tkObserverFactory(fci, app=app, colorized=colorized): 677 return TkConnectionObserver(app, fci, colorized) 678 679 return tkObserverFactory, app.mainloop 680 681 682 683############################################################################# 684# 685# The HTTP splitter 686# 687# Derived from Zope.Server.HTTPServer. 688# 689############################################################################# 690 691 692def find_double_newline(s): 693 """Returns the position just after the double newline.""" 694 pos1 = s.find('\n\r\n') # One kind of double newline 695 if pos1 >= 0: 696 pos1 += 3 697 pos2 = s.find('\n\n') # Another kind of double newline 698 if pos2 >= 0: 699 pos2 += 2 700 701 if pos1 >= 0: 702 if pos2 >= 0: 703 return min(pos1, pos2) 704 else: 705 return pos1 706 else: 707 return pos2 708 709 710 711class StreamedReceiver: 712 """Accepts data up to a specific limit.""" 713 714 completed = 0 715 716 def __init__(self, cl, buf=None): 717 self.remain = cl 718 self.buf = buf 719 if cl < 1: 720 self.completed = 1 721 722 def received(self, data): 723 rm = self.remain 724 if rm < 1: 725 self.completed = 1 # Avoid any chance of spinning 726 return 0 727 buf = self.buf 728 datalen = len(data) 729 if rm <= datalen: 730 if buf is not None: 731 buf.append(data[:rm]) 732 self.remain = 0 733 self.completed = 1 734 return rm 735 else: 736 if buf is not None: 737 buf.append(data) 738 self.remain -= datalen 739 return datalen 740 741 742 743class UnlimitedReceiver: 744 """Accepts data without limits.""" 745 746 completed = 0 747 748 def received(self, data): 749 # always consume everything 750 return len(data) 751 752 753 754class ChunkedReceiver: 755 """Accepts all chunks.""" 756 757 chunk_remainder = 0 758 control_line = '' 759 all_chunks_received = 0 760 trailer = '' 761 completed = 0 762 763 764 def __init__(self, buf=None): 765 self.buf = buf 766 767 def received(self, s): 768 # Returns the number of bytes consumed. 769 if self.completed: 770 return 0 771 orig_size = len(s) 772 while s: 773 rm = self.chunk_remainder 774 if rm > 0: 775 # Receive the remainder of a chunk. 776 to_write = s[:rm] 777 if self.buf is not None: 778 self.buf.append(to_write) 779 written = len(to_write) 780 s = s[written:] 781 self.chunk_remainder -= written 782 elif not self.all_chunks_received: 783 # Receive a control line. 784 s = self.control_line + s 785 pos = s.find('\n') 786 if pos < 0: 787 # Control line not finished. 788 self.control_line = s 789 s = '' 790 else: 791 # Control line finished. 792 line = s[:pos] 793 s = s[pos + 1:] 794 self.control_line = '' 795 line = line.strip() 796 if line: 797 # Begin a new chunk. 798 semi = line.find(';') 799 if semi >= 0: 800 # discard extension info. 801 line = line[:semi] 802 sz = int(line.strip(), 16) # hexadecimal 803 if sz > 0: 804 # Start a new chunk. 805 self.chunk_remainder = sz 806 else: 807 # Finished chunks. 808 self.all_chunks_received = 1 809 # else expect a control line. 810 else: 811 # Receive the trailer. 812 trailer = self.trailer + s 813 if trailer[:2] == '\r\n': 814 # No trailer. 815 self.completed = 1 816 return orig_size - (len(trailer) - 2) 817 elif trailer[:1] == '\n': 818 # No trailer. 819 self.completed = 1 820 return orig_size - (len(trailer) - 1) 821 pos = find_double_newline(trailer) 822 if pos < 0: 823 # Trailer not finished. 824 self.trailer = trailer 825 s = '' 826 else: 827 # Finished the trailer. 828 self.completed = 1 829 self.trailer = trailer[:pos] 830 return orig_size - (len(trailer) - pos) 831 return orig_size 832 833 834 835class HTTPStreamParser: 836 """A structure that parses the HTTP stream. 837 """ 838 839 completed = 0 # Set once request is completed. 840 empty = 0 # Set if no request was made. 841 header_plus = '' 842 chunked = 0 843 content_length = 0 844 body_rcv = None 845 846 # headers is a mapping containing keys translated to uppercase 847 # with dashes turned into underscores. 848 849 def __init__(self, is_a_request): 850 self.headers = {} 851 self.is_a_request = is_a_request 852 self.body_data = [] 853 854 def received(self, data): 855 """Receives the HTTP stream for one request. 856 857 Returns the number of bytes consumed. 858 Sets the completed flag once both the header and the 859 body have been received. 860 """ 861 if self.completed: 862 return 0 # Can't consume any more. 863 datalen = len(data) 864 br = self.body_rcv 865 if br is None: 866 # In header. 867 s = self.header_plus + data 868 index = find_double_newline(s) 869 if index >= 0: 870 # Header finished. 871 header_plus = s[:index] 872 consumed = len(data) - (len(s) - index) 873 self.in_header = 0 874 # Remove preceeding blank lines. 875 header_plus = header_plus.lstrip() 876 if not header_plus: 877 self.empty = 1 878 self.completed = 1 879 else: 880 self.parse_header(header_plus) 881 if self.body_rcv is None or self.body_rcv.completed: 882 self.completed = 1 883 return consumed 884 else: 885 # Header not finished yet. 886 self.header_plus = s 887 return datalen 888 else: 889 # In body. 890 consumed = br.received(data) 891 self.body_data.append(data[:consumed]) 892 if br.completed: 893 self.completed = 1 894 return consumed 895 896 897 def parse_header(self, header_plus): 898 """Parses the header_plus block of text. 899 900 (header_plus is the headers plus the first line of the request). 901 """ 902 index = header_plus.find('\n') 903 if index >= 0: 904 first_line = header_plus[:index] 905 header = header_plus[index + 1:] 906 else: 907 first_line = header_plus 908 header = '' 909 self.first_line = first_line 910 self.header = header 911 912 lines = self.get_header_lines() 913 headers = self.headers 914 for line in lines: 915 index = line.find(':') 916 if index > 0: 917 key = line[:index] 918 value = line[index + 1:].strip() 919 key1 = key.upper().replace('-', '_') 920 headers[key1] = value 921 # else there's garbage in the headers? 922 923 if not self.is_a_request: 924 # Check for a 304 response. 925 parts = first_line.split() 926 if len(parts) >= 2 and parts[1] == '304': 927 # Expect no body. 928 self.body_rcv = StreamedReceiver(0) 929 930 if self.body_rcv is None: 931 # Ignore the HTTP version and just assume 932 # that the Transfer-Encoding header, when supplied, is valid. 933 te = headers.get('TRANSFER_ENCODING', '') 934 if te == 'chunked': 935 self.chunked = 1 936 self.body_rcv = ChunkedReceiver() 937 if not self.chunked: 938 cl = int(headers.get('CONTENT_LENGTH', -1)) 939 self.content_length = cl 940 if cl >= 0 or self.is_a_request: 941 self.body_rcv = StreamedReceiver(cl) 942 else: 943 # No content length and this is a response. 944 # We have to assume unlimited content length. 945 self.body_rcv = UnlimitedReceiver() 946 947 948 def get_header_lines(self): 949 """Splits the header into lines, putting multi-line headers together. 950 """ 951 r = [] 952 lines = self.header.split('\n') 953 for line in lines: 954 if line.endswith('\r'): 955 line = line[:-1] 956 if line and line[0] in ' \t': 957 r[-1] = r[-1] + line[1:] 958 else: 959 r.append(line) 960 return r 961 962 963 964class HTTPConnectionSplitter: 965 """Makes a new observer for each HTTP subconnection and forwards events. 966 """ 967 968 # __implements__ = IConnectionObserver 969 req_index = 0 970 resp_index = 0 971 972 def __init__(self, sub_factory, fci): 973 self.sub_factory = sub_factory 974 self.transactions = [] # (observer, request_data, response_data) 975 self.fci = fci 976 self._newTransaction() 977 978 def _newTransaction(self): 979 fci = self.fci.dup() 980 fci.transaction = len(self.transactions) + 1 981 obs = self.sub_factory(fci) 982 req = HTTPStreamParser(1) 983 resp = HTTPStreamParser(0) 984 self.transactions.append((obs, req, resp)) 985 986 def _mostRecentObs(self): 987 return self.transactions[-1][0] 988 989 def connected(self, from_client): 990 self._mostRecentObs().connected(from_client) 991 992 def closed(self, from_client): 993 self._mostRecentObs().closed(from_client) 994 995 def error(self, from_client, type, value): 996 self._mostRecentObs().error(from_client, type, value) 997 998 def received(self, data, from_client): 999 transactions = self.transactions 1000 while data: 1001 if from_client: 1002 index = self.req_index 1003 else: 1004 index = self.resp_index 1005 if index >= len(transactions): 1006 self._newTransaction() 1007 1008 obs, req, resp = transactions[index] 1009 if from_client: 1010 parser = req 1011 else: 1012 parser = resp 1013 1014 consumed = parser.received(data) 1015 obs.received(data[:consumed], from_client) 1016 data = data[consumed:] 1017 if parser.completed: 1018 new_index = index + 1 1019 if from_client: 1020 self.req_index = new_index 1021 else: 1022 self.resp_index = new_index 1023 1024 1025############################################################################# 1026# 1027# HTTP proxy 1028# 1029############################################################################# 1030 1031 1032class HTTPProxyToServerConnection (ForwardingEndpoint): 1033 """Ensures that responses to a persistent HTTP connection occur 1034 in the correct order.""" 1035 1036 finished = 0 1037 1038 def __init__(self, proxy_conn, dests=()): 1039 ForwardingEndpoint.__init__(self) 1040 self.response_parser = HTTPStreamParser(0) 1041 self.proxy_conn = proxy_conn 1042 self.set_dests(dests) 1043 1044 # Data for the client held until previous responses are sent 1045 self.held = [] 1046 1047 def _isMyTurn(self): 1048 """Returns a true value if it's time for this response 1049 to respond to the client.""" 1050 order = self.proxy_conn._response_order 1051 if order: 1052 return (order[0] is self) 1053 return 1 1054 1055 def received(self, data): 1056 """Receives data from the HTTP server to be sent back to the client.""" 1057 while 1: 1058 parser = self.response_parser 1059 if parser.completed: 1060 self.finished = 1 1061 self.flush() 1062 # Note that any extra data returned from the server is 1063 # ignored. Should it be? :-( 1064 return 1065 if not data: 1066 break 1067 consumed = parser.received(data) 1068 fragment = data[:consumed] 1069 data = data[consumed:] 1070 ForwardingEndpoint.received(self, fragment) 1071 self.held.append(fragment) 1072 self.flush() 1073 1074 def flush(self): 1075 """Flushes buffers and, if the response has been sent, allows 1076 the next response to take over. 1077 """ 1078 if self.held and self._isMyTurn(): 1079 data = ''.join(self.held) 1080 del self.held[:] 1081 self.proxy_conn.write(data) 1082 if self.finished: 1083 order = self.proxy_conn._response_order 1084 if order and order[0] is self: 1085 del order[0] 1086 if order: 1087 order[0].flush() # kick! 1088 1089 def handle_close(self): 1090 """The HTTP server closed the connection. 1091 """ 1092 ForwardingEndpoint.handle_close(self) 1093 if not self.finished: 1094 # Cancel the proxy connection, even if there are responses 1095 # pending, since the HTTP spec provides no way to recover 1096 # from an unfinished response. 1097 self.proxy_conn.close() 1098 1099 def close(self): 1100 """Close the connection to the server. 1101 1102 If there is unsent response data, an error is generated. 1103 """ 1104 self.flush() 1105 if not self.finished: 1106 t = IOError 1107 v = 'Closed without finishing response to client' 1108 for d in self._dests: 1109 if hasattr(d, 'error'): 1110 d.error(t, v) 1111 ForwardingEndpoint.close(self) 1112 1113 1114 1115class HTTPProxyToClientConnection (ForwardingEndpoint): 1116 """A connection from a client to the proxy server""" 1117 1118 _req_parser = None 1119 _transaction = 0 1120 _obs = None 1121 1122 def __init__(self, conn, factory, counter, addr): 1123 ForwardingEndpoint.__init__(self, conn) 1124 self._obs_factory = factory 1125 self._counter = counter 1126 self._client_addr = addr 1127 self._response_order = [] 1128 self._newRequest() 1129 1130 def _newRequest(self): 1131 """Starts a new request on a persistent connection.""" 1132 if self._req_parser is None: 1133 self._req_parser = HTTPStreamParser(1) 1134 factory = self._obs_factory 1135 if factory is not None: 1136 fci = ForwardedConnectionInfo(self._counter, self._client_addr) 1137 self._transaction = self._transaction + 1 1138 fci.transaction = self._transaction 1139 obs = factory(fci) 1140 self._obs = obs 1141 self.set_dests((EndpointObserver(obs, 1),)) 1142 1143 def received(self, data): 1144 """Accepts data received from the client.""" 1145 while data: 1146 parser = self._req_parser 1147 if parser is None: 1148 # Begin another request. 1149 self._newRequest() 1150 parser = self._req_parser 1151 if not parser.completed: 1152 # Waiting for a complete request. 1153 consumed = parser.received(data) 1154 ForwardingEndpoint.received(self, data[:consumed]) 1155 data = data[consumed:] 1156 if parser.completed: 1157 # Connect to a server. 1158 self.openProxyConnection(parser) 1159 # Expect a new request or a closed connection. 1160 self._req_parser = None 1161 1162 def openProxyConnection(self, request): 1163 """Parses the client connection and opens a connection to an 1164 HTTP server. 1165 """ 1166 first_line = request.first_line.strip() 1167 if not ' ' in first_line: 1168 raise ValueError, ('Malformed request: %s' % first_line) 1169 command, url = first_line.split(' ', 1) 1170 pos = url.rfind(' HTTP/') 1171 if pos >= 0: 1172 protocol = url[pos + 1:] 1173 url = url[:pos].rstrip() 1174 else: 1175 protocol = 'HTTP/1.0' 1176 if url.startswith('http://'): 1177 # Standard proxy 1178 urlpart = url[7:] 1179 if '/' in urlpart: 1180 host, path = url[7:].split('/', 1) 1181 path = '/' + path 1182 else: 1183 host = urlpart 1184 path = '/' 1185 else: 1186 # Transparent proxy 1187 host = request.headers.get('HOST') 1188 path = url 1189 if not host: 1190 raise ValueError, ('Request type not supported: %s' % url) 1191 1192 if ':' in host: 1193 host, port = host.split(':') 1194 port = int(port) 1195 else: 1196 port = 80 1197 1198 if '@' in host: 1199 username, host = host.split('@') 1200 1201 obs = self._obs 1202 if obs is not None: 1203 eo = EndpointObserver(obs, 0) 1204 ptos = HTTPProxyToServerConnection(self, (eo,)) 1205 else: 1206 ptos = HTTPProxyToServerConnection(self) 1207 1208 self._response_order.append(ptos) 1209 1210 ptos.write('%s %s %s\r\n' % (command, path, protocol)) 1211 # Duplicate the headers sent by the client. 1212 if request.header: 1213 ptos.write(request.header) 1214 else: 1215 ptos.write('\r\n') 1216 if request.body_data: 1217 ptos.write(''.join(request.body_data)) 1218 ptos.create_socket(socket.AF_INET, socket.SOCK_STREAM) 1219 ptos.connect((host, port)) 1220 1221 def close(self): 1222 """Closes the connection to the client. 1223 1224 If there are open connections to proxy servers, the server 1225 connections are also closed. 1226 """ 1227 ForwardingEndpoint.close(self) 1228 for ptos in self._response_order: 1229 ptos.close() 1230 del self._response_order[:] 1231 1232 1233class HTTPProxyService (asyncore.dispatcher): 1234 """A minimal HTTP proxy server""" 1235 1236 connection_class = HTTPProxyToClientConnection 1237 1238 _counter = 0 1239 1240 def __init__(self, listen_host, listen_port, observer_factory=None): 1241 self._obs_factory = observer_factory 1242 asyncore.dispatcher.__init__(self) 1243 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 1244 self.set_reuse_addr() 1245 self.bind((listen_host, listen_port)) 1246 self.listen(5) 1247 1248 def handle_accept(self): 1249 info = self.accept() 1250 if info: 1251 # Got a connection. 1252 conn, addr = info 1253 conn.setblocking(0) 1254 counter = self._counter + 1 1255 self._counter = counter 1256 self.connection_class(conn, self._obs_factory, counter, addr) 1257 1258 def handle_error(self): 1259 # Don't stop the server. 1260 import traceback 1261 traceback.print_exc() 1262 1263 1264############################################################################# 1265# 1266# Command-line interface 1267# 1268############################################################################# 1269 1270def usage(): 1271 sys.stderr.write(COPYRIGHT + '\n') 1272 sys.stderr.write( 1273 """TCP monitoring and logging tool with support for HTTP 1.1 1274Simple usage: tcpwatch.py -L listen_port:dest_hostname:dest_port 1275 1276TCP forwarded connection setup: 1277 -L <listen_port>:<dest_port> 1278 Set up a local forwarded connection 1279 -L <listen_port>:<dest_host>:<dest_port> 1280 Set up a forwarded connection to a specified host 1281 -L <listen_host>:<listen_port>:<dest_host>:<dest_port> 1282 Set up a forwarded connection to a specified host, bound to an interface 1283 1284HTTP setup: 1285 -h (or --http) Split forwarded HTTP persistent connections 1286 -p [<listen_host>:]<listen_port> Run an HTTP proxy 1287 1288Output options: 1289 -s Output to stdout instead of a Tkinter window 1290 -n No color in GUI (faster and consumes less RAM) 1291 -c Extra color (colorizes escaped characters) 1292 --cr Show carriage returns (ASCII 13) 1293 --help Show usage information 1294 1295Recording options: 1296 -r <path> (synonyms: -R, --record-directory) 1297 Write recorded data to <path>. By default, creates request and 1298 response files for each request, and writes a corresponding error file 1299 for any error detected by tcpwatch. 1300 --record-prefix=<prefix> 1301 Use <prefix> as the file prefix for logged request / response / error 1302 files (defaults to 'watch'). 1303 --no-record-responses 1304 Suppress writing '.response' files. 1305 --no-record-errors 1306 Suppress writing '.error' files. 1307""") 1308 sys.exit() 1309 1310 1311def usageError(s): 1312 sys.stderr.write(str(s) + '\n\n') 1313 usage() 1314 1315 1316def main(args): 1317 global show_cr 1318 1319 try: 1320 optlist, extra = getopt.getopt(args, 'chL:np:r:R:s', 1321 ['help', 'http', 'cr', 1322 'record-directory=', 1323 'record-prefix=', 1324 'no-record-responses', 1325 'no-record-errors', 1326 ]) 1327 except getopt.GetoptError, msg: 1328 usageError(msg) 1329 1330 fwd_params = [] 1331 proxy_params = [] 1332 obs_factory = None 1333 show_config = 0 1334 split_http = 0 1335 colorized = 1 1336 record_directory = None 1337 record_prefix = 'watch' 1338 record_responses = 1 1339 record_errors = 1 1340 recording = {} 1341 1342 for option, value in optlist: 1343 if option == '--help': 1344 usage() 1345 elif option == '--http' or option == '-h': 1346 split_http = 1 1347 elif option == '-n': 1348 colorized = 0 1349 elif option == '-c': 1350 colorized = 2 1351 elif option == '--cr': 1352 show_cr = 1 1353 elif option == '-s': 1354 show_config = 1 1355 obs_factory = StdoutObserver 1356 elif option == '-p': 1357 # HTTP proxy 1358 info = value.split(':') 1359 listen_host = '' 1360 if len(info) == 1: 1361 listen_port = int(info[0]) 1362 elif len(info) == 2: 1363 listen_host = info[0] 1364 listen_port = int(info[1]) 1365 else: 1366 usageError('-p requires a port or a host:port parameter') 1367 proxy_params.append((listen_host, listen_port)) 1368 elif option == '-L': 1369 # TCP forwarder 1370 info = value.split(':') 1371 listen_host = '' 1372 dest_host = '' 1373 if len(info) == 2: 1374 listen_port = int(info[0]) 1375 dest_port = int(info[1]) 1376 elif len(info) == 3: 1377 listen_port = int(info[0]) 1378 dest_host = info[1] 1379 dest_port = int(info[2]) 1380 elif len(info) == 4: 1381 listen_host = info[0] 1382 listen_port = int(info[1]) 1383 dest_host = info[2] 1384 dest_port = int(info[3]) 1385 else: 1386 usageError('-L requires 2, 3, or 4 colon-separated parameters') 1387 fwd_params.append( 1388 (listen_host, listen_port, dest_host, dest_port)) 1389 elif (option == '-r' 1390 or option == '-R' 1391 or option == '--record-directory'): 1392 record_directory = value 1393 elif option == '--record-prefix': 1394 record_prefix = value 1395 elif option == '--no-record-responses': 1396 record_responses = 0 1397 elif option == '--no-record-errors': 1398 record_errors = 0 1399 1400 if not fwd_params and not proxy_params: 1401 usageError("At least one -L or -p option is required.") 1402 1403 # Prepare the configuration display. 1404 config_info_lines = [] 1405 title_lst = [] 1406 if fwd_params: 1407 config_info_lines.extend(map( 1408 lambda args: 'Forwarding %s:%d -> %s:%d' % args, fwd_params)) 1409 title_lst.extend(map( 1410 lambda args: '%s:%d -> %s:%d' % args, fwd_params)) 1411 if proxy_params: 1412 config_info_lines.extend(map( 1413 lambda args: 'HTTP proxy listening on %s:%d' % args, proxy_params)) 1414 title_lst.extend(map( 1415 lambda args: '%s:%d -> proxy' % args, proxy_params)) 1416 if split_http: 1417 config_info_lines.append('HTTP connection splitting enabled.') 1418 if record_directory: 1419 config_info_lines.append( 1420 'Recording to directory %s.' % record_directory) 1421 config_info = '\n'.join(config_info_lines) 1422 titlepart = ', '.join(title_lst) 1423 mainloop = None 1424 1425 if obs_factory is None: 1426 # If no observer factory has been specified, use Tkinter. 1427 obs_factory, mainloop = setupTk(titlepart, config_info, colorized) 1428 1429 if record_directory: 1430 def _decorateRecorder(fci, sub_factory=obs_factory, 1431 record_directory=record_directory, 1432 record_prefix=record_prefix, 1433 record_responses=record_responses, 1434 record_errors=record_errors): 1435 return RecordingObserver(fci, sub_factory, record_directory, 1436 record_prefix, record_responses, 1437 record_errors) 1438 obs_factory = _decorateRecorder 1439 1440 chosen_factory = obs_factory 1441 if split_http: 1442 # Put an HTTPConnectionSplitter between the events and the output. 1443 def _factory(fci, sub_factory=obs_factory): 1444 return HTTPConnectionSplitter(sub_factory, fci) 1445 chosen_factory = _factory 1446 # obs_factory is the connection observer factory without HTTP 1447 # connection splitting, while chosen_factory may have connection 1448 # splitting. Proxy services use obs_factory rather than the full 1449 # chosen_factory because proxy services perform connection 1450 # splitting internally. 1451 1452 services = [] 1453 try: 1454 # Start forwarding services. 1455 for params in fwd_params: 1456 args = params + (chosen_factory,) 1457 s = ForwardingService(*args) 1458 services.append(s) 1459 1460 # Start proxy services. 1461 for params in proxy_params: 1462 args = params + (obs_factory,) 1463 s = HTTPProxyService(*args) 1464 services.append(s) 1465 1466 if show_config: 1467 sys.stderr.write(config_info + '\n') 1468 1469 # Run the main loop. 1470 try: 1471 if mainloop is not None: 1472 import thread 1473 thread.start_new_thread(asyncore.loop, (), {'timeout': 1.0}) 1474 mainloop() 1475 else: 1476 asyncore.loop(timeout=1.0) 1477 except KeyboardInterrupt: 1478 sys.stderr.write('TCPWatch finished.\n') 1479 finally: 1480 for s in services: 1481 s.close() 1482 1483 1484if __name__ == '__main__': 1485 main(sys.argv[1:]) 1486