1# FIXME: This addon is currently not compatible with mitmproxy 7 and above.
2
3"""
4This inline script allows conditional TLS Interception based
5on a user-defined strategy.
6
7Example:
8
9    > mitmdump -s tls_passthrough.py
10
11    1. curl --proxy http://localhost:8080 https://example.com --insecure
12    // works - we'll also see the contents in mitmproxy
13
14    2. curl --proxy http://localhost:8080 https://example.com --insecure
15    // still works - we'll also see the contents in mitmproxy
16
17    3. curl --proxy http://localhost:8080 https://example.com
18    // fails with a certificate error, which we will also see in mitmproxy
19
20    4. curl --proxy http://localhost:8080 https://example.com
21    // works again, but mitmproxy does not intercept and we do *not* see the contents
22
23Authors: Maximilian Hils, Matthew Tuusberg
24"""
25import collections
26import random
27
28from enum import Enum
29
30import mitmproxy
31from mitmproxy import ctx
32from mitmproxy.exceptions import TlsProtocolException
33from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
34
35
36class InterceptionResult(Enum):
37    success = True
38    failure = False
39    skipped = None
40
41
42class _TlsStrategy:
43    """
44    Abstract base class for interception strategies.
45    """
46
47    def __init__(self):
48        # A server_address -> interception results mapping
49        self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
50
51    def should_intercept(self, server_address):
52        """
53        Returns:
54            True, if we should attempt to intercept the connection.
55            False, if we want to employ pass-through instead.
56        """
57        raise NotImplementedError()
58
59    def record_success(self, server_address):
60        self.history[server_address].append(InterceptionResult.success)
61
62    def record_failure(self, server_address):
63        self.history[server_address].append(InterceptionResult.failure)
64
65    def record_skipped(self, server_address):
66        self.history[server_address].append(InterceptionResult.skipped)
67
68
69class ConservativeStrategy(_TlsStrategy):
70    """
71    Conservative Interception Strategy - only intercept if there haven't been any failed attempts
72    in the history.
73    """
74
75    def should_intercept(self, server_address):
76        if InterceptionResult.failure in self.history[server_address]:
77            return False
78        return True
79
80
81class ProbabilisticStrategy(_TlsStrategy):
82    """
83    Fixed probability that we intercept a given connection.
84    """
85
86    def __init__(self, p):
87        self.p = p
88        super().__init__()
89
90    def should_intercept(self, server_address):
91        return random.uniform(0, 1) < self.p
92
93
94class TlsFeedback(TlsLayer):
95    """
96    Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
97    successfully on the client connection (which may fail due to cert pinning).
98    """
99
100    def _establish_tls_with_client(self):
101        server_address = self.server_conn.address
102
103        try:
104            super()._establish_tls_with_client()
105        except TlsProtocolException as e:
106            tls_strategy.record_failure(server_address)
107            raise e
108        else:
109            tls_strategy.record_success(server_address)
110
111
112# inline script hooks below.
113
114tls_strategy = None
115
116
117def load(l):
118    l.add_option(
119        "tlsstrat", int, 0, "TLS passthrough strategy (0-100)",
120    )
121
122
123def configure(updated):
124    global tls_strategy
125    if ctx.options.tlsstrat > 0:
126        tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
127    else:
128        tls_strategy = ConservativeStrategy()
129
130
131def next_layer(next_layer):
132    """
133    This hook does the actual magic - if the next layer is planned to be a TLS layer,
134    we check if we want to enter pass-through mode instead.
135    """
136    if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
137        server_address = next_layer.server_conn.address
138
139        if tls_strategy.should_intercept(server_address):
140            # We try to intercept.
141            # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
142            next_layer.__class__ = TlsFeedback
143        else:
144            # We don't intercept - reply with a pass-through layer and add a "skipped" entry.
145            mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
146            next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True)
147            next_layer.reply.send(next_layer_replacement)
148            tls_strategy.record_skipped(server_address)
149