1import urllib
2
3import pytest
4
5import mitmproxy.test.tutils
6from mitmproxy import exceptions
7from mitmproxy import io
8from mitmproxy.addons import serverplayback
9from mitmproxy.test import taddons
10from mitmproxy.test import tflow
11
12
13def tdump(path, flows):
14    with open(path, "wb") as f:
15        w = io.FlowWriter(f)
16        for i in flows:
17            w.add(i)
18
19
20def test_load_file(tmpdir):
21    s = serverplayback.ServerPlayback()
22    with taddons.context(s):
23        fpath = str(tmpdir.join("flows"))
24        tdump(fpath, [tflow.tflow(resp=True)])
25        s.load_file(fpath)
26        assert s.flowmap
27        with pytest.raises(exceptions.CommandError):
28            s.load_file("/nonexistent")
29
30
31def test_config(tmpdir):
32    s = serverplayback.ServerPlayback()
33    with taddons.context(s) as tctx:
34        fpath = str(tmpdir.join("flows"))
35        tdump(fpath, [tflow.tflow(resp=True)])
36        tctx.configure(s, server_replay=[fpath])
37        s.configured = False
38        with pytest.raises(exceptions.OptionsError):
39            tctx.configure(s, server_replay=[str(tmpdir)])
40
41
42def test_server_playback():
43    sp = serverplayback.ServerPlayback()
44    with taddons.context(sp) as tctx:
45        tctx.configure(sp)
46        f = tflow.tflow(resp=True)
47
48        assert not sp.flowmap
49
50        sp.load_flows([f])
51        assert sp.flowmap
52        assert sp.next_flow(f)
53        assert not sp.flowmap
54
55        sp.load_flows([f])
56        assert sp.flowmap
57        sp.clear()
58        assert not sp.flowmap
59
60
61def test_ignore_host():
62    sp = serverplayback.ServerPlayback()
63    with taddons.context(sp) as tctx:
64        tctx.configure(sp, server_replay_ignore_host=True)
65
66        r = tflow.tflow(resp=True)
67        r2 = tflow.tflow(resp=True)
68
69        r.request.host = "address"
70        r2.request.host = "address"
71        assert sp._hash(r) == sp._hash(r2)
72        r2.request.host = "wrong_address"
73        assert sp._hash(r) == sp._hash(r2)
74
75
76def test_ignore_content():
77    s = serverplayback.ServerPlayback()
78    with taddons.context(s) as tctx:
79        tctx.configure(s, server_replay_ignore_content=False)
80
81        r = tflow.tflow(resp=True)
82        r2 = tflow.tflow(resp=True)
83
84        r.request.content = b"foo"
85        r2.request.content = b"foo"
86        assert s._hash(r) == s._hash(r2)
87        r2.request.content = b"bar"
88        assert not s._hash(r) == s._hash(r2)
89
90        tctx.configure(s, server_replay_ignore_content=True)
91        r = tflow.tflow(resp=True)
92        r2 = tflow.tflow(resp=True)
93        r.request.content = b"foo"
94        r2.request.content = b"foo"
95        assert s._hash(r) == s._hash(r2)
96        r2.request.content = b"bar"
97        assert s._hash(r) == s._hash(r2)
98        r2.request.content = b""
99        assert s._hash(r) == s._hash(r2)
100        r2.request.content = None
101        assert s._hash(r) == s._hash(r2)
102
103
104def test_ignore_content_wins_over_params():
105    s = serverplayback.ServerPlayback()
106    with taddons.context(s) as tctx:
107        tctx.configure(
108            s,
109            server_replay_ignore_content=True,
110            server_replay_ignore_payload_params=[
111                "param1", "param2"
112            ]
113        )
114
115        # NOTE: parameters are mutually exclusive in options
116        r = tflow.tflow(resp=True)
117        r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
118        r.request.content = b"paramx=y"
119
120        r2 = tflow.tflow(resp=True)
121        r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
122        r2.request.content = b"paramx=x"
123
124        # same parameters
125        assert s._hash(r) == s._hash(r2)
126
127
128def test_ignore_payload_params_other_content_type():
129    s = serverplayback.ServerPlayback()
130    with taddons.context(s) as tctx:
131        tctx.configure(
132            s,
133            server_replay_ignore_content=False,
134            server_replay_ignore_payload_params=[
135                "param1", "param2"
136            ]
137        )
138
139        r = tflow.tflow(resp=True)
140        r.request.headers["Content-Type"] = "application/json"
141        r.request.content = b'{"param1":"1"}'
142        r2 = tflow.tflow(resp=True)
143        r2.request.headers["Content-Type"] = "application/json"
144        r2.request.content = b'{"param1":"1"}'
145        # same content
146        assert s._hash(r) == s._hash(r2)
147        # distint content (note only x-www-form-urlencoded payload is analysed)
148        r2.request.content = b'{"param1":"2"}'
149        assert not s._hash(r) == s._hash(r2)
150
151
152def test_hash():
153    s = serverplayback.ServerPlayback()
154    with taddons.context(s) as tctx:
155        tctx.configure(s)
156
157        r = tflow.tflow()
158        r2 = tflow.tflow()
159
160        assert s._hash(r)
161        assert s._hash(r) == s._hash(r2)
162        r.request.headers["foo"] = "bar"
163        assert s._hash(r) == s._hash(r2)
164        r.request.path = "voing"
165        assert s._hash(r) != s._hash(r2)
166
167        r.request.path = "path?blank_value"
168        r2.request.path = "path?"
169        assert s._hash(r) != s._hash(r2)
170
171
172def test_headers():
173    s = serverplayback.ServerPlayback()
174    with taddons.context(s) as tctx:
175        tctx.configure(s, server_replay_use_headers=["foo"])
176
177        r = tflow.tflow(resp=True)
178        r.request.headers["foo"] = "bar"
179        r2 = tflow.tflow(resp=True)
180        assert not s._hash(r) == s._hash(r2)
181        r2.request.headers["foo"] = "bar"
182        assert s._hash(r) == s._hash(r2)
183        r2.request.headers["oink"] = "bar"
184        assert s._hash(r) == s._hash(r2)
185
186        r = tflow.tflow(resp=True)
187        r2 = tflow.tflow(resp=True)
188        assert s._hash(r) == s._hash(r2)
189
190
191def test_load():
192    s = serverplayback.ServerPlayback()
193    with taddons.context(s) as tctx:
194        tctx.configure(s)
195
196        r = tflow.tflow(resp=True)
197        r.request.headers["key"] = "one"
198
199        r2 = tflow.tflow(resp=True)
200        r2.request.headers["key"] = "two"
201
202        s.load_flows([r, r2])
203
204        assert s.count() == 2
205
206        n = s.next_flow(r)
207        assert n.request.headers["key"] == "one"
208        assert s.count() == 1
209
210        n = s.next_flow(r)
211        assert n.request.headers["key"] == "two"
212        assert not s.flowmap
213        assert s.count() == 0
214
215        assert not s.next_flow(r)
216
217
218def test_load_with_server_replay_nopop():
219    s = serverplayback.ServerPlayback()
220    with taddons.context(s) as tctx:
221        tctx.configure(s, server_replay_nopop=True)
222
223        r = tflow.tflow(resp=True)
224        r.request.headers["key"] = "one"
225
226        r2 = tflow.tflow(resp=True)
227        r2.request.headers["key"] = "two"
228
229        s.load_flows([r, r2])
230
231        assert s.count() == 2
232        s.next_flow(r)
233        assert s.count() == 2
234
235
236def test_ignore_params():
237    s = serverplayback.ServerPlayback()
238    with taddons.context(s) as tctx:
239        tctx.configure(
240            s,
241            server_replay_ignore_params=["param1", "param2"]
242        )
243
244        r = tflow.tflow(resp=True)
245        r.request.path = "/test?param1=1"
246        r2 = tflow.tflow(resp=True)
247        r2.request.path = "/test"
248        assert s._hash(r) == s._hash(r2)
249        r2.request.path = "/test?param1=2"
250        assert s._hash(r) == s._hash(r2)
251        r2.request.path = "/test?param2=1"
252        assert s._hash(r) == s._hash(r2)
253        r2.request.path = "/test?param3=2"
254        assert not s._hash(r) == s._hash(r2)
255
256
257def thash(r, r2, setter):
258    s = serverplayback.ServerPlayback()
259    with taddons.context(s) as tctx:
260        s = serverplayback.ServerPlayback()
261        tctx.configure(
262            s,
263            server_replay_ignore_payload_params=["param1", "param2"]
264        )
265
266        setter(r, paramx="x", param1="1")
267
268        setter(r2, paramx="x", param1="1")
269        # same parameters
270        assert s._hash(r) == s._hash(r2)
271        # ignored parameters !=
272        setter(r2, paramx="x", param1="2")
273        assert s._hash(r) == s._hash(r2)
274        # missing parameter
275        setter(r2, paramx="x")
276        assert s._hash(r) == s._hash(r2)
277        # ignorable parameter added
278        setter(r2, paramx="x", param1="2")
279        assert s._hash(r) == s._hash(r2)
280        # not ignorable parameter changed
281        setter(r2, paramx="y", param1="1")
282        assert not s._hash(r) == s._hash(r2)
283        # not ignorable parameter missing
284        setter(r2, param1="1")
285        r2.request.content = b"param1=1"
286        assert not s._hash(r) == s._hash(r2)
287
288
289def test_ignore_payload_params():
290    def urlencode_setter(r, **kwargs):
291        r.request.content = urllib.parse.urlencode(kwargs).encode()
292
293    r = tflow.tflow(resp=True)
294    r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
295    r2 = tflow.tflow(resp=True)
296    r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
297    thash(r, r2, urlencode_setter)
298
299    boundary = 'somefancyboundary'
300
301    def multipart_setter(r, **kwargs):
302        b = f"--{boundary}\n"
303        parts = []
304        for k, v in kwargs.items():
305            parts.append(
306                "Content-Disposition: form-data; name=\"%s\"\n\n"
307                "%s\n" % (k, v)
308            )
309        c = b + b.join(parts) + b
310        r.request.content = c.encode()
311        r.request.headers["content-type"] = 'multipart/form-data; boundary=' +\
312            boundary
313
314    r = tflow.tflow(resp=True)
315    r2 = tflow.tflow(resp=True)
316    thash(r, r2, multipart_setter)
317
318
319def test_server_playback_full():
320    s = serverplayback.ServerPlayback()
321    with taddons.context(s) as tctx:
322        tctx.configure(
323            s,
324            server_replay_refresh=True,
325        )
326
327        f = tflow.tflow()
328        f.response = mitmproxy.test.tutils.tresp(content=f.request.content)
329        s.load_flows([f, f])
330
331        tf = tflow.tflow()
332        assert not tf.response
333        s.request(tf)
334        assert tf.response.data == f.response.data
335
336        tf = tflow.tflow()
337        tf.request.content = b"gibble"
338        assert not tf.response
339        s.request(tf)
340        assert not tf.response
341
342
343def test_server_playback_kill():
344    s = serverplayback.ServerPlayback()
345    with taddons.context(s) as tctx:
346        tctx.configure(
347            s,
348            server_replay_refresh=True,
349            server_replay_kill_extra=True
350        )
351
352        f = tflow.tflow()
353        f.response = mitmproxy.test.tutils.tresp(content=f.request.content)
354        s.load_flows([f])
355
356        f = tflow.tflow()
357        f.request.host = "nonexistent"
358        tctx.cycle(s, f)
359        assert f.error
360
361
362def test_server_playback_response_deleted():
363    """
364    The server playback addon holds references to flows that can be modified by the user in the meantime.
365    One thing that can happen is that users remove the response object. This happens for example when doing a client
366    replay at the same time.
367    """
368    sp = serverplayback.ServerPlayback()
369    with taddons.context(sp) as tctx:
370        tctx.configure(sp)
371        f1 = tflow.tflow(resp=True)
372        f2 = tflow.tflow(resp=True)
373
374        assert not sp.flowmap
375
376        sp.load_flows([f1, f2])
377        assert sp.flowmap
378
379        f1.response = f2.response = None
380        assert not sp.next_flow(f1)
381        assert not sp.flowmap
382