1# FIXME: This addon is currently not compatible with mitmproxy 7 and above. 2 3""" 4This inline script allows conditional TLS Interception based 5on a user-defined strategy. 6 7Example: 8 9 > mitmdump -s tls_passthrough.py 10 11 1. curl --proxy http://localhost:8080 https://example.com --insecure 12 // works - we'll also see the contents in mitmproxy 13 14 2. curl --proxy http://localhost:8080 https://example.com --insecure 15 // still works - we'll also see the contents in mitmproxy 16 17 3. curl --proxy http://localhost:8080 https://example.com 18 // fails with a certificate error, which we will also see in mitmproxy 19 20 4. curl --proxy http://localhost:8080 https://example.com 21 // works again, but mitmproxy does not intercept and we do *not* see the contents 22 23Authors: Maximilian Hils, Matthew Tuusberg 24""" 25import collections 26import random 27 28from enum import Enum 29 30import mitmproxy 31from mitmproxy import ctx 32from mitmproxy.exceptions import TlsProtocolException 33from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer 34 35 36class InterceptionResult(Enum): 37 success = True 38 failure = False 39 skipped = None 40 41 42class _TlsStrategy: 43 """ 44 Abstract base class for interception strategies. 45 """ 46 47 def __init__(self): 48 # A server_address -> interception results mapping 49 self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) 50 51 def should_intercept(self, server_address): 52 """ 53 Returns: 54 True, if we should attempt to intercept the connection. 55 False, if we want to employ pass-through instead. 56 """ 57 raise NotImplementedError() 58 59 def record_success(self, server_address): 60 self.history[server_address].append(InterceptionResult.success) 61 62 def record_failure(self, server_address): 63 self.history[server_address].append(InterceptionResult.failure) 64 65 def record_skipped(self, server_address): 66 self.history[server_address].append(InterceptionResult.skipped) 67 68 69class ConservativeStrategy(_TlsStrategy): 70 """ 71 Conservative Interception Strategy - only intercept if there haven't been any failed attempts 72 in the history. 73 """ 74 75 def should_intercept(self, server_address): 76 if InterceptionResult.failure in self.history[server_address]: 77 return False 78 return True 79 80 81class ProbabilisticStrategy(_TlsStrategy): 82 """ 83 Fixed probability that we intercept a given connection. 84 """ 85 86 def __init__(self, p): 87 self.p = p 88 super().__init__() 89 90 def should_intercept(self, server_address): 91 return random.uniform(0, 1) < self.p 92 93 94class TlsFeedback(TlsLayer): 95 """ 96 Monkey-patch _establish_tls_with_client to get feedback if TLS could be established 97 successfully on the client connection (which may fail due to cert pinning). 98 """ 99 100 def _establish_tls_with_client(self): 101 server_address = self.server_conn.address 102 103 try: 104 super()._establish_tls_with_client() 105 except TlsProtocolException as e: 106 tls_strategy.record_failure(server_address) 107 raise e 108 else: 109 tls_strategy.record_success(server_address) 110 111 112# inline script hooks below. 113 114tls_strategy = None 115 116 117def load(l): 118 l.add_option( 119 "tlsstrat", int, 0, "TLS passthrough strategy (0-100)", 120 ) 121 122 123def configure(updated): 124 global tls_strategy 125 if ctx.options.tlsstrat > 0: 126 tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0) 127 else: 128 tls_strategy = ConservativeStrategy() 129 130 131def next_layer(next_layer): 132 """ 133 This hook does the actual magic - if the next layer is planned to be a TLS layer, 134 we check if we want to enter pass-through mode instead. 135 """ 136 if isinstance(next_layer, TlsLayer) and next_layer._client_tls: 137 server_address = next_layer.server_conn.address 138 139 if tls_strategy.should_intercept(server_address): 140 # We try to intercept. 141 # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. 142 next_layer.__class__ = TlsFeedback 143 else: 144 # We don't intercept - reply with a pass-through layer and add a "skipped" entry. 145 mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") 146 next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True) 147 next_layer.reply.send(next_layer_replacement) 148 tls_strategy.record_skipped(server_address) 149