1from unittest import mock
2
3import pytest
4
5from mitmproxy import addonmanager
6from mitmproxy import addons
7from mitmproxy import command
8from mitmproxy import exceptions
9from mitmproxy import hooks
10from mitmproxy import master
11from mitmproxy import options
12from mitmproxy.proxy.layers.http import HttpRequestHook, HttpResponseHook
13from mitmproxy.test import taddons
14from mitmproxy.test import tflow
15
16
17class TAddon:
18    def __init__(self, name, addons=None):
19        self.name = name
20        self.response = True
21        self.running_called = False
22        if addons:
23            self.addons = addons
24
25    @command.command("test.command")
26    def testcommand(self) -> str:
27        return "here"
28
29    def __repr__(self):
30        return "Addon(%s)" % self.name
31
32    def done(self):
33        pass
34
35    def running(self):
36        self.running_called = True
37
38
39class THalt:
40    def running(self):
41        raise exceptions.AddonHalt
42
43
44class AOption:
45    def load(self, l):
46        l.add_option("custom_option", bool, False, "help")
47
48
49class AOldAPI:
50    def clientconnect(self):
51        pass
52
53
54def test_command():
55    with taddons.context() as tctx:
56        tctx.master.addons.add(TAddon("test"))
57        assert tctx.master.commands.execute("test.command") == "here"
58
59
60def test_halt():
61    o = options.Options()
62    m = master.Master(o)
63    a = addonmanager.AddonManager(m)
64    halt = THalt()
65    end = TAddon("end")
66    a.add(halt)
67    a.add(end)
68
69    assert not end.running_called
70    a.trigger(hooks.RunningHook())
71    assert not end.running_called
72
73    a.remove(halt)
74    a.trigger(hooks.RunningHook())
75    assert end.running_called
76
77
78@pytest.mark.asyncio
79async def test_lifecycle():
80    o = options.Options()
81    m = master.Master(o)
82    a = addonmanager.AddonManager(m)
83    a.add(TAddon("one"))
84
85    with pytest.raises(exceptions.AddonManagerError):
86        a.add(TAddon("one"))
87    with pytest.raises(exceptions.AddonManagerError):
88        a.remove(TAddon("nonexistent"))
89
90    f = tflow.tflow()
91    await a.handle_lifecycle(HttpRequestHook(f))
92
93    a._configure_all(o, o.keys())
94
95
96def test_defaults():
97    assert addons.default_addons()
98
99
100@pytest.mark.asyncio
101async def test_loader():
102    with taddons.context() as tctx:
103        with mock.patch("mitmproxy.ctx.log.warn") as warn:
104            l = addonmanager.Loader(tctx.master)
105            l.add_option("custom_option", bool, False, "help")
106            assert "custom_option" in l.master.options
107
108            # calling this again with the same signature is a no-op.
109            l.add_option("custom_option", bool, False, "help")
110            assert not warn.called
111
112            # a different signature should emit a warning though.
113            l.add_option("custom_option", bool, True, "help")
114            assert warn.called
115
116            def cmd(a: str) -> str:
117                return "foo"
118
119            l.add_command("test.command", cmd)
120
121
122@pytest.mark.asyncio
123async def test_simple():
124    with taddons.context(loadcore=False) as tctx:
125        a = tctx.master.addons
126
127        assert len(a) == 0
128        a.add(TAddon("one"))
129        assert a.get("one")
130        assert not a.get("two")
131        assert len(a) == 1
132        a.clear()
133        assert len(a) == 0
134        assert not a.chain
135
136        a.add(TAddon("one"))
137
138        a.trigger("nonexistent")
139        await tctx.master.await_log("AssertionError")
140
141        f = tflow.tflow()
142        a.trigger(hooks.RunningHook())
143        a.trigger(HttpResponseHook(f))
144        await tctx.master.await_log("not callable")
145
146        tctx.master.clear()
147        a.get("one").response = addons
148        a.trigger(HttpResponseHook(f))
149        with pytest.raises(AssertionError):
150            await tctx.master.await_log("not callable", timeout=0.01)
151
152        a.remove(a.get("one"))
153        assert not a.get("one")
154
155        ta = TAddon("one")
156        a.add(ta)
157        a.trigger(hooks.RunningHook())
158        assert ta.running_called
159
160        assert ta in a
161
162
163def test_load_option():
164    o = options.Options()
165    m = master.Master(o)
166    a = addonmanager.AddonManager(m)
167    a.add(AOption())
168    assert "custom_option" in m.options._options
169
170
171def test_nesting():
172    o = options.Options()
173    m = master.Master(o)
174    a = addonmanager.AddonManager(m)
175
176    a.add(
177        TAddon(
178            "one",
179            addons=[
180                TAddon("two"),
181                TAddon("three", addons=[TAddon("four")])
182            ]
183        )
184    )
185    assert len(a.chain) == 1
186    assert a.get("one")
187    assert a.get("two")
188    assert a.get("three")
189    assert a.get("four")
190
191    a.trigger(hooks.RunningHook())
192    assert a.get("one").running_called
193    assert a.get("two").running_called
194    assert a.get("three").running_called
195    assert a.get("four").running_called
196
197    a.remove(a.get("three"))
198    assert not a.get("three")
199    assert not a.get("four")
200
201
202@pytest.mark.asyncio
203async def test_old_api():
204    with taddons.context(loadcore=False) as tctx:
205        tctx.master.addons.add(AOldAPI())
206        await tctx.master.await_log("clientconnect event has been removed")
207