1import urllib 2 3import pytest 4 5import mitmproxy.test.tutils 6from mitmproxy import exceptions 7from mitmproxy import io 8from mitmproxy.addons import serverplayback 9from mitmproxy.test import taddons 10from mitmproxy.test import tflow 11 12 13def tdump(path, flows): 14 with open(path, "wb") as f: 15 w = io.FlowWriter(f) 16 for i in flows: 17 w.add(i) 18 19 20def test_load_file(tmpdir): 21 s = serverplayback.ServerPlayback() 22 with taddons.context(s): 23 fpath = str(tmpdir.join("flows")) 24 tdump(fpath, [tflow.tflow(resp=True)]) 25 s.load_file(fpath) 26 assert s.flowmap 27 with pytest.raises(exceptions.CommandError): 28 s.load_file("/nonexistent") 29 30 31def test_config(tmpdir): 32 s = serverplayback.ServerPlayback() 33 with taddons.context(s) as tctx: 34 fpath = str(tmpdir.join("flows")) 35 tdump(fpath, [tflow.tflow(resp=True)]) 36 tctx.configure(s, server_replay=[fpath]) 37 s.configured = False 38 with pytest.raises(exceptions.OptionsError): 39 tctx.configure(s, server_replay=[str(tmpdir)]) 40 41 42def test_server_playback(): 43 sp = serverplayback.ServerPlayback() 44 with taddons.context(sp) as tctx: 45 tctx.configure(sp) 46 f = tflow.tflow(resp=True) 47 48 assert not sp.flowmap 49 50 sp.load_flows([f]) 51 assert sp.flowmap 52 assert sp.next_flow(f) 53 assert not sp.flowmap 54 55 sp.load_flows([f]) 56 assert sp.flowmap 57 sp.clear() 58 assert not sp.flowmap 59 60 61def test_ignore_host(): 62 sp = serverplayback.ServerPlayback() 63 with taddons.context(sp) as tctx: 64 tctx.configure(sp, server_replay_ignore_host=True) 65 66 r = tflow.tflow(resp=True) 67 r2 = tflow.tflow(resp=True) 68 69 r.request.host = "address" 70 r2.request.host = "address" 71 assert sp._hash(r) == sp._hash(r2) 72 r2.request.host = "wrong_address" 73 assert sp._hash(r) == sp._hash(r2) 74 75 76def test_ignore_content(): 77 s = serverplayback.ServerPlayback() 78 with taddons.context(s) as tctx: 79 tctx.configure(s, server_replay_ignore_content=False) 80 81 r = tflow.tflow(resp=True) 82 r2 = tflow.tflow(resp=True) 83 84 r.request.content = b"foo" 85 r2.request.content = b"foo" 86 assert s._hash(r) == s._hash(r2) 87 r2.request.content = b"bar" 88 assert not s._hash(r) == s._hash(r2) 89 90 tctx.configure(s, server_replay_ignore_content=True) 91 r = tflow.tflow(resp=True) 92 r2 = tflow.tflow(resp=True) 93 r.request.content = b"foo" 94 r2.request.content = b"foo" 95 assert s._hash(r) == s._hash(r2) 96 r2.request.content = b"bar" 97 assert s._hash(r) == s._hash(r2) 98 r2.request.content = b"" 99 assert s._hash(r) == s._hash(r2) 100 r2.request.content = None 101 assert s._hash(r) == s._hash(r2) 102 103 104def test_ignore_content_wins_over_params(): 105 s = serverplayback.ServerPlayback() 106 with taddons.context(s) as tctx: 107 tctx.configure( 108 s, 109 server_replay_ignore_content=True, 110 server_replay_ignore_payload_params=[ 111 "param1", "param2" 112 ] 113 ) 114 115 # NOTE: parameters are mutually exclusive in options 116 r = tflow.tflow(resp=True) 117 r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" 118 r.request.content = b"paramx=y" 119 120 r2 = tflow.tflow(resp=True) 121 r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" 122 r2.request.content = b"paramx=x" 123 124 # same parameters 125 assert s._hash(r) == s._hash(r2) 126 127 128def test_ignore_payload_params_other_content_type(): 129 s = serverplayback.ServerPlayback() 130 with taddons.context(s) as tctx: 131 tctx.configure( 132 s, 133 server_replay_ignore_content=False, 134 server_replay_ignore_payload_params=[ 135 "param1", "param2" 136 ] 137 ) 138 139 r = tflow.tflow(resp=True) 140 r.request.headers["Content-Type"] = "application/json" 141 r.request.content = b'{"param1":"1"}' 142 r2 = tflow.tflow(resp=True) 143 r2.request.headers["Content-Type"] = "application/json" 144 r2.request.content = b'{"param1":"1"}' 145 # same content 146 assert s._hash(r) == s._hash(r2) 147 # distint content (note only x-www-form-urlencoded payload is analysed) 148 r2.request.content = b'{"param1":"2"}' 149 assert not s._hash(r) == s._hash(r2) 150 151 152def test_hash(): 153 s = serverplayback.ServerPlayback() 154 with taddons.context(s) as tctx: 155 tctx.configure(s) 156 157 r = tflow.tflow() 158 r2 = tflow.tflow() 159 160 assert s._hash(r) 161 assert s._hash(r) == s._hash(r2) 162 r.request.headers["foo"] = "bar" 163 assert s._hash(r) == s._hash(r2) 164 r.request.path = "voing" 165 assert s._hash(r) != s._hash(r2) 166 167 r.request.path = "path?blank_value" 168 r2.request.path = "path?" 169 assert s._hash(r) != s._hash(r2) 170 171 172def test_headers(): 173 s = serverplayback.ServerPlayback() 174 with taddons.context(s) as tctx: 175 tctx.configure(s, server_replay_use_headers=["foo"]) 176 177 r = tflow.tflow(resp=True) 178 r.request.headers["foo"] = "bar" 179 r2 = tflow.tflow(resp=True) 180 assert not s._hash(r) == s._hash(r2) 181 r2.request.headers["foo"] = "bar" 182 assert s._hash(r) == s._hash(r2) 183 r2.request.headers["oink"] = "bar" 184 assert s._hash(r) == s._hash(r2) 185 186 r = tflow.tflow(resp=True) 187 r2 = tflow.tflow(resp=True) 188 assert s._hash(r) == s._hash(r2) 189 190 191def test_load(): 192 s = serverplayback.ServerPlayback() 193 with taddons.context(s) as tctx: 194 tctx.configure(s) 195 196 r = tflow.tflow(resp=True) 197 r.request.headers["key"] = "one" 198 199 r2 = tflow.tflow(resp=True) 200 r2.request.headers["key"] = "two" 201 202 s.load_flows([r, r2]) 203 204 assert s.count() == 2 205 206 n = s.next_flow(r) 207 assert n.request.headers["key"] == "one" 208 assert s.count() == 1 209 210 n = s.next_flow(r) 211 assert n.request.headers["key"] == "two" 212 assert not s.flowmap 213 assert s.count() == 0 214 215 assert not s.next_flow(r) 216 217 218def test_load_with_server_replay_nopop(): 219 s = serverplayback.ServerPlayback() 220 with taddons.context(s) as tctx: 221 tctx.configure(s, server_replay_nopop=True) 222 223 r = tflow.tflow(resp=True) 224 r.request.headers["key"] = "one" 225 226 r2 = tflow.tflow(resp=True) 227 r2.request.headers["key"] = "two" 228 229 s.load_flows([r, r2]) 230 231 assert s.count() == 2 232 s.next_flow(r) 233 assert s.count() == 2 234 235 236def test_ignore_params(): 237 s = serverplayback.ServerPlayback() 238 with taddons.context(s) as tctx: 239 tctx.configure( 240 s, 241 server_replay_ignore_params=["param1", "param2"] 242 ) 243 244 r = tflow.tflow(resp=True) 245 r.request.path = "/test?param1=1" 246 r2 = tflow.tflow(resp=True) 247 r2.request.path = "/test" 248 assert s._hash(r) == s._hash(r2) 249 r2.request.path = "/test?param1=2" 250 assert s._hash(r) == s._hash(r2) 251 r2.request.path = "/test?param2=1" 252 assert s._hash(r) == s._hash(r2) 253 r2.request.path = "/test?param3=2" 254 assert not s._hash(r) == s._hash(r2) 255 256 257def thash(r, r2, setter): 258 s = serverplayback.ServerPlayback() 259 with taddons.context(s) as tctx: 260 s = serverplayback.ServerPlayback() 261 tctx.configure( 262 s, 263 server_replay_ignore_payload_params=["param1", "param2"] 264 ) 265 266 setter(r, paramx="x", param1="1") 267 268 setter(r2, paramx="x", param1="1") 269 # same parameters 270 assert s._hash(r) == s._hash(r2) 271 # ignored parameters != 272 setter(r2, paramx="x", param1="2") 273 assert s._hash(r) == s._hash(r2) 274 # missing parameter 275 setter(r2, paramx="x") 276 assert s._hash(r) == s._hash(r2) 277 # ignorable parameter added 278 setter(r2, paramx="x", param1="2") 279 assert s._hash(r) == s._hash(r2) 280 # not ignorable parameter changed 281 setter(r2, paramx="y", param1="1") 282 assert not s._hash(r) == s._hash(r2) 283 # not ignorable parameter missing 284 setter(r2, param1="1") 285 r2.request.content = b"param1=1" 286 assert not s._hash(r) == s._hash(r2) 287 288 289def test_ignore_payload_params(): 290 def urlencode_setter(r, **kwargs): 291 r.request.content = urllib.parse.urlencode(kwargs).encode() 292 293 r = tflow.tflow(resp=True) 294 r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" 295 r2 = tflow.tflow(resp=True) 296 r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" 297 thash(r, r2, urlencode_setter) 298 299 boundary = 'somefancyboundary' 300 301 def multipart_setter(r, **kwargs): 302 b = f"--{boundary}\n" 303 parts = [] 304 for k, v in kwargs.items(): 305 parts.append( 306 "Content-Disposition: form-data; name=\"%s\"\n\n" 307 "%s\n" % (k, v) 308 ) 309 c = b + b.join(parts) + b 310 r.request.content = c.encode() 311 r.request.headers["content-type"] = 'multipart/form-data; boundary=' +\ 312 boundary 313 314 r = tflow.tflow(resp=True) 315 r2 = tflow.tflow(resp=True) 316 thash(r, r2, multipart_setter) 317 318 319def test_server_playback_full(): 320 s = serverplayback.ServerPlayback() 321 with taddons.context(s) as tctx: 322 tctx.configure( 323 s, 324 server_replay_refresh=True, 325 ) 326 327 f = tflow.tflow() 328 f.response = mitmproxy.test.tutils.tresp(content=f.request.content) 329 s.load_flows([f, f]) 330 331 tf = tflow.tflow() 332 assert not tf.response 333 s.request(tf) 334 assert tf.response.data == f.response.data 335 336 tf = tflow.tflow() 337 tf.request.content = b"gibble" 338 assert not tf.response 339 s.request(tf) 340 assert not tf.response 341 342 343def test_server_playback_kill(): 344 s = serverplayback.ServerPlayback() 345 with taddons.context(s) as tctx: 346 tctx.configure( 347 s, 348 server_replay_refresh=True, 349 server_replay_kill_extra=True 350 ) 351 352 f = tflow.tflow() 353 f.response = mitmproxy.test.tutils.tresp(content=f.request.content) 354 s.load_flows([f]) 355 356 f = tflow.tflow() 357 f.request.host = "nonexistent" 358 tctx.cycle(s, f) 359 assert f.error 360 361 362def test_server_playback_response_deleted(): 363 """ 364 The server playback addon holds references to flows that can be modified by the user in the meantime. 365 One thing that can happen is that users remove the response object. This happens for example when doing a client 366 replay at the same time. 367 """ 368 sp = serverplayback.ServerPlayback() 369 with taddons.context(sp) as tctx: 370 tctx.configure(sp) 371 f1 = tflow.tflow(resp=True) 372 f2 = tflow.tflow(resp=True) 373 374 assert not sp.flowmap 375 376 sp.load_flows([f1, f2]) 377 assert sp.flowmap 378 379 f1.response = f2.response = None 380 assert not sp.next_flow(f1) 381 assert not sp.flowmap 382