1import math
2import sys
3from functools import lru_cache
4from typing import Optional, Union  # noqa
5
6import mitmproxy.flow
7import mitmproxy.tools.console.master  # noqa
8import urwid
9from mitmproxy import contentviews
10from mitmproxy import ctx
11from mitmproxy import http
12from mitmproxy import tcp
13from mitmproxy.tools.console import common
14from mitmproxy.tools.console import flowdetailview
15from mitmproxy.tools.console import layoutwidget
16from mitmproxy.tools.console import searchable
17from mitmproxy.tools.console import tabs
18from mitmproxy.utils import strutils
19
20
21class SearchError(Exception):
22    pass
23
24
25class FlowViewHeader(urwid.WidgetWrap):
26
27    def __init__(
28        self,
29        master: "mitmproxy.tools.console.master.ConsoleMaster",
30    ) -> None:
31        self.master = master
32        self.focus_changed()
33
34    def focus_changed(self):
35        cols, _ = self.master.ui.get_cols_rows()
36        if self.master.view.focus.flow:
37            self._w = common.format_flow(
38                self.master.view.focus.flow,
39                render_mode=common.RenderMode.DETAILVIEW,
40                hostheader=self.master.options.showhost,
41            )
42        else:
43            self._w = urwid.Pile([])
44
45
46class FlowDetails(tabs.Tabs):
47    def __init__(self, master):
48        self.master = master
49        super().__init__([])
50        self.show()
51        self.last_displayed_body = None
52
53    @property
54    def view(self):
55        return self.master.view
56
57    @property
58    def flow(self) -> mitmproxy.flow.Flow:
59        return self.master.view.focus.flow
60
61    def focus_changed(self):
62        f = self.flow
63        if f:
64            if isinstance(f, http.HTTPFlow):
65                if f.websocket:
66                    self.tabs = [
67                        (self.tab_http_request, self.view_request),
68                        (self.tab_http_response, self.view_response),
69                        (self.tab_websocket_messages, self.view_websocket_messages),
70                        (self.tab_details, self.view_details),
71                    ]
72                else:
73                    self.tabs = [
74                        (self.tab_http_request, self.view_request),
75                        (self.tab_http_response, self.view_response),
76                        (self.tab_details, self.view_details),
77                    ]
78            elif isinstance(f, tcp.TCPFlow):
79                self.tabs = [
80                    (self.tab_tcp_stream, self.view_tcp_stream),
81                    (self.tab_details, self.view_details),
82                ]
83            self.show()
84        else:
85            self.master.window.pop()
86
87    def tab_http_request(self):
88        flow = self.flow
89        assert isinstance(flow, http.HTTPFlow)
90        if self.flow.intercepted and not flow.response:
91            return "Request intercepted"
92        else:
93            return "Request"
94
95    def tab_http_response(self):
96        flow = self.flow
97        assert isinstance(flow, http.HTTPFlow)
98        if self.flow.intercepted and flow.response:
99            return "Response intercepted"
100        else:
101            return "Response"
102
103    def tab_tcp_stream(self):
104        return "TCP Stream"
105
106    def tab_websocket_messages(self):
107        return "WebSocket Messages"
108
109    def tab_details(self):
110        return "Detail"
111
112    def view_request(self):
113        flow = self.flow
114        assert isinstance(flow, http.HTTPFlow)
115        return self.conn_text(flow.request)
116
117    def view_response(self):
118        flow = self.flow
119        assert isinstance(flow, http.HTTPFlow)
120        return self.conn_text(flow.response)
121
122    def _contentview_status_bar(self, description: str, viewmode: str):
123        cols = [
124            urwid.Text(
125                [
126                    ("heading", description),
127                ]
128            ),
129            urwid.Text(
130                [
131                    " ",
132                    ('heading', "["),
133                    ('heading_key', "m"),
134                    ('heading', (":%s]" % viewmode)),
135                ],
136                align="right"
137            )
138        ]
139        contentview_status_bar = urwid.AttrWrap(urwid.Columns(cols), "heading")
140        return contentview_status_bar
141
142    FROM_CLIENT_MARKER = ("from_client", f"{common.SYMBOL_FROM_CLIENT} ")
143    TO_CLIENT_MARKER = ("to_client", f"{common.SYMBOL_TO_CLIENT} ")
144
145    def view_websocket_messages(self):
146        flow = self.flow
147        assert isinstance(flow, http.HTTPFlow)
148        assert flow.websocket is not None
149
150        if not flow.websocket.messages:
151            return searchable.Searchable([urwid.Text(("highlight", "No messages."))])
152
153        viewmode = self.master.commands.call("console.flowview.mode")
154
155        widget_lines = []
156        for m in flow.websocket.messages:
157            _, lines, _ = contentviews.get_message_content_view(viewmode, m, flow)
158
159            for line in lines:
160                if m.from_client:
161                    line.insert(0, self.FROM_CLIENT_MARKER)
162                else:
163                    line.insert(0, self.TO_CLIENT_MARKER)
164
165                widget_lines.append(urwid.Text(line))
166
167        if flow.websocket.closed_by_client is not None:
168            widget_lines.append(urwid.Text([
169                (self.FROM_CLIENT_MARKER if flow.websocket.closed_by_client else self.TO_CLIENT_MARKER),
170                ("alert" if flow.websocket.close_code in (1000, 1001, 1005) else "error",
171                 f"Connection closed: {flow.websocket.close_code} {flow.websocket.close_reason}")
172            ]))
173
174        if flow.intercepted:
175            markup = widget_lines[-1].get_text()[0]
176            widget_lines[-1].set_text(("intercept", markup))
177
178        widget_lines.insert(0, self._contentview_status_bar(viewmode.capitalize(), viewmode))
179
180        return searchable.Searchable(widget_lines)
181
182    def view_tcp_stream(self) -> urwid.Widget:
183        flow = self.flow
184        assert isinstance(flow, tcp.TCPFlow)
185
186        if not flow.messages:
187            return searchable.Searchable([urwid.Text(("highlight", "No messages."))])
188
189        viewmode = self.master.commands.call("console.flowview.mode")
190
191        # Merge adjacent TCP "messages". For detailed explanation of this code block see:
192        # https://github.com/mitmproxy/mitmproxy/pull/3970/files/469bd32582f764f9a29607efa4f5b04bd87961fb#r418670880
193        from_client = None
194        messages = []
195        for message in flow.messages:
196            if message.from_client is not from_client:
197                messages.append(message.content)
198                from_client = message.from_client
199            else:
200                messages[-1] += message.content
201
202        widget_lines = []
203
204        from_client = flow.messages[0].from_client
205        for m in messages:
206            _, lines, _ = contentviews.get_tcp_content_view(viewmode, m, flow)
207
208            for line in lines:
209                if from_client:
210                    line.insert(0, self.FROM_CLIENT_MARKER)
211                else:
212                    line.insert(0, self.TO_CLIENT_MARKER)
213
214                widget_lines.append(urwid.Text(line))
215
216            from_client = not from_client
217
218        if flow.intercepted:
219            markup = widget_lines[-1].get_text()[0]
220            widget_lines[-1].set_text(("intercept", markup))
221
222        widget_lines.insert(0, self._contentview_status_bar(viewmode.capitalize(), viewmode))
223
224        return searchable.Searchable(widget_lines)
225
226    def view_details(self):
227        return flowdetailview.flowdetails(self.view, self.flow)
228
229    def content_view(self, viewmode, message):
230        if message.raw_content is None:
231            msg, body = "", [urwid.Text([("error", "[content missing]")])]
232            return msg, body
233        else:
234            full = self.master.commands.execute("view.settings.getval @focus fullcontents false")
235            if full == "true":
236                limit = sys.maxsize
237            else:
238                limit = ctx.options.content_view_lines_cutoff
239
240            flow_modify_cache_invalidation = hash((
241                message.raw_content,
242                message.headers.fields,
243                getattr(message, "path", None),
244            ))
245            # we need to pass the message off-band because it's not hashable
246            self._get_content_view_message = message
247            return self._get_content_view(viewmode, limit, flow_modify_cache_invalidation)
248
249    @lru_cache(maxsize=200)
250    def _get_content_view(self, viewmode, max_lines, _):
251        message = self._get_content_view_message
252        self._get_content_view_message = None
253        description, lines, error = contentviews.get_message_content_view(
254            viewmode, message, self.flow
255        )
256        if error:
257            self.master.log.debug(error)
258        # Give hint that you have to tab for the response.
259        if description == "No content" and isinstance(message, http.Request):
260            description = "No request content"
261
262        # If the users has a wide terminal, he gets fewer lines; this should not be an issue.
263        chars_per_line = 80
264        max_chars = max_lines * chars_per_line
265        total_chars = 0
266        text_objects = []
267        for line in lines:
268            txt = []
269            for (style, text) in line:
270                if total_chars + len(text) > max_chars:
271                    text = text[:max_chars - total_chars]
272                txt.append((style, text))
273                total_chars += len(text)
274                if total_chars == max_chars:
275                    break
276
277            # round up to the next line.
278            total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line)
279
280            text_objects.append(urwid.Text(txt))
281            if total_chars == max_chars:
282                text_objects.append(urwid.Text([
283                    ("highlight", "Stopped displaying data after %d lines. Press " % max_lines),
284                    ("key", "f"),
285                    ("highlight", " to load all data.")
286                ]))
287                break
288
289        return description, text_objects
290
291    def conn_text(self, conn):
292        if conn:
293            hdrs = []
294            for k, v in conn.headers.fields:
295                # This will always force an ascii representation of headers. For example, if the server sends a
296                #
297                #     X-Authors: Made with ❤ in Hamburg
298                #
299                # header, mitmproxy will display the following:
300                #
301                #     X-Authors: Made with \xe2\x9d\xa4 in Hamburg.
302                #
303                # The alternative would be to just use the header's UTF-8 representation and maybe
304                # do `str.replace("\t", "\\t")` to exempt tabs from urwid's special characters escaping [1].
305                # That would in some terminals allow rendering UTF-8 characters, but the mapping
306                # wouldn't be bijective, i.e. a user couldn't distinguish "\\t" and "\t".
307                # Also, from a security perspective, a mitmproxy user couldn't be fooled by homoglyphs.
308                #
309                # 1) https://github.com/mitmproxy/mitmproxy/issues/1833
310                #    https://github.com/urwid/urwid/blob/6608ee2c9932d264abd1171468d833b7a4082e13/urwid/display_common.py#L35-L36,
311
312                k = strutils.bytes_to_escaped_str(k) + ":"
313                v = strutils.bytes_to_escaped_str(v)
314                hdrs.append((k, v))
315            txt = common.format_keyvals(
316                hdrs,
317                key_format="header"
318            )
319            viewmode = self.master.commands.call("console.flowview.mode")
320            msg, body = self.content_view(viewmode, conn)
321
322            cols = [
323                urwid.Text(
324                    [
325                        ("heading", msg),
326                    ]
327                ),
328                urwid.Text(
329                    [
330                        " ",
331                        ('heading', "["),
332                        ('heading_key', "m"),
333                        ('heading', (":%s]" % viewmode)),
334                    ],
335                    align="right"
336                )
337            ]
338            title = urwid.AttrWrap(urwid.Columns(cols), "heading")
339
340            txt.append(title)
341            txt.extend(body)
342        else:
343            txt = [
344                urwid.Text(""),
345                urwid.Text(
346                    [
347                        ("highlight", "No response. Press "),
348                        ("key", "e"),
349                        ("highlight", " and edit any aspect to add one."),
350                    ]
351                )
352            ]
353        return searchable.Searchable(txt)
354
355
356class FlowView(urwid.Frame, layoutwidget.LayoutWidget):
357    keyctx = "flowview"
358    title = "Flow Details"
359
360    def __init__(self, master):
361        super().__init__(
362            FlowDetails(master),
363            header=FlowViewHeader(master),
364        )
365        self.master = master
366
367    def focus_changed(self, *args, **kwargs):
368        self.body.focus_changed()
369        self.header.focus_changed()
370