1from unittest import mock 2 3import pytest 4 5from mitmproxy import addonmanager 6from mitmproxy import addons 7from mitmproxy import command 8from mitmproxy import exceptions 9from mitmproxy import hooks 10from mitmproxy import master 11from mitmproxy import options 12from mitmproxy.proxy.layers.http import HttpRequestHook, HttpResponseHook 13from mitmproxy.test import taddons 14from mitmproxy.test import tflow 15 16 17class TAddon: 18 def __init__(self, name, addons=None): 19 self.name = name 20 self.response = True 21 self.running_called = False 22 if addons: 23 self.addons = addons 24 25 @command.command("test.command") 26 def testcommand(self) -> str: 27 return "here" 28 29 def __repr__(self): 30 return "Addon(%s)" % self.name 31 32 def done(self): 33 pass 34 35 def running(self): 36 self.running_called = True 37 38 39class THalt: 40 def running(self): 41 raise exceptions.AddonHalt 42 43 44class AOption: 45 def load(self, l): 46 l.add_option("custom_option", bool, False, "help") 47 48 49class AOldAPI: 50 def clientconnect(self): 51 pass 52 53 54def test_command(): 55 with taddons.context() as tctx: 56 tctx.master.addons.add(TAddon("test")) 57 assert tctx.master.commands.execute("test.command") == "here" 58 59 60def test_halt(): 61 o = options.Options() 62 m = master.Master(o) 63 a = addonmanager.AddonManager(m) 64 halt = THalt() 65 end = TAddon("end") 66 a.add(halt) 67 a.add(end) 68 69 assert not end.running_called 70 a.trigger(hooks.RunningHook()) 71 assert not end.running_called 72 73 a.remove(halt) 74 a.trigger(hooks.RunningHook()) 75 assert end.running_called 76 77 78@pytest.mark.asyncio 79async def test_lifecycle(): 80 o = options.Options() 81 m = master.Master(o) 82 a = addonmanager.AddonManager(m) 83 a.add(TAddon("one")) 84 85 with pytest.raises(exceptions.AddonManagerError): 86 a.add(TAddon("one")) 87 with pytest.raises(exceptions.AddonManagerError): 88 a.remove(TAddon("nonexistent")) 89 90 f = tflow.tflow() 91 await a.handle_lifecycle(HttpRequestHook(f)) 92 93 a._configure_all(o, o.keys()) 94 95 96def test_defaults(): 97 assert addons.default_addons() 98 99 100@pytest.mark.asyncio 101async def test_loader(): 102 with taddons.context() as tctx: 103 with mock.patch("mitmproxy.ctx.log.warn") as warn: 104 l = addonmanager.Loader(tctx.master) 105 l.add_option("custom_option", bool, False, "help") 106 assert "custom_option" in l.master.options 107 108 # calling this again with the same signature is a no-op. 109 l.add_option("custom_option", bool, False, "help") 110 assert not warn.called 111 112 # a different signature should emit a warning though. 113 l.add_option("custom_option", bool, True, "help") 114 assert warn.called 115 116 def cmd(a: str) -> str: 117 return "foo" 118 119 l.add_command("test.command", cmd) 120 121 122@pytest.mark.asyncio 123async def test_simple(): 124 with taddons.context(loadcore=False) as tctx: 125 a = tctx.master.addons 126 127 assert len(a) == 0 128 a.add(TAddon("one")) 129 assert a.get("one") 130 assert not a.get("two") 131 assert len(a) == 1 132 a.clear() 133 assert len(a) == 0 134 assert not a.chain 135 136 a.add(TAddon("one")) 137 138 a.trigger("nonexistent") 139 await tctx.master.await_log("AssertionError") 140 141 f = tflow.tflow() 142 a.trigger(hooks.RunningHook()) 143 a.trigger(HttpResponseHook(f)) 144 await tctx.master.await_log("not callable") 145 146 tctx.master.clear() 147 a.get("one").response = addons 148 a.trigger(HttpResponseHook(f)) 149 with pytest.raises(AssertionError): 150 await tctx.master.await_log("not callable", timeout=0.01) 151 152 a.remove(a.get("one")) 153 assert not a.get("one") 154 155 ta = TAddon("one") 156 a.add(ta) 157 a.trigger(hooks.RunningHook()) 158 assert ta.running_called 159 160 assert ta in a 161 162 163def test_load_option(): 164 o = options.Options() 165 m = master.Master(o) 166 a = addonmanager.AddonManager(m) 167 a.add(AOption()) 168 assert "custom_option" in m.options._options 169 170 171def test_nesting(): 172 o = options.Options() 173 m = master.Master(o) 174 a = addonmanager.AddonManager(m) 175 176 a.add( 177 TAddon( 178 "one", 179 addons=[ 180 TAddon("two"), 181 TAddon("three", addons=[TAddon("four")]) 182 ] 183 ) 184 ) 185 assert len(a.chain) == 1 186 assert a.get("one") 187 assert a.get("two") 188 assert a.get("three") 189 assert a.get("four") 190 191 a.trigger(hooks.RunningHook()) 192 assert a.get("one").running_called 193 assert a.get("two").running_called 194 assert a.get("three").running_called 195 assert a.get("four").running_called 196 197 a.remove(a.get("three")) 198 assert not a.get("three") 199 assert not a.get("four") 200 201 202@pytest.mark.asyncio 203async def test_old_api(): 204 with taddons.context(loadcore=False) as tctx: 205 tctx.master.addons.add(AOldAPI()) 206 await tctx.master.await_log("clientconnect event has been removed") 207