1import math 2import sys 3from functools import lru_cache 4from typing import Optional, Union # noqa 5 6import mitmproxy.flow 7import mitmproxy.tools.console.master # noqa 8import urwid 9from mitmproxy import contentviews 10from mitmproxy import ctx 11from mitmproxy import http 12from mitmproxy import tcp 13from mitmproxy.tools.console import common 14from mitmproxy.tools.console import flowdetailview 15from mitmproxy.tools.console import layoutwidget 16from mitmproxy.tools.console import searchable 17from mitmproxy.tools.console import tabs 18from mitmproxy.utils import strutils 19 20 21class SearchError(Exception): 22 pass 23 24 25class FlowViewHeader(urwid.WidgetWrap): 26 27 def __init__( 28 self, 29 master: "mitmproxy.tools.console.master.ConsoleMaster", 30 ) -> None: 31 self.master = master 32 self.focus_changed() 33 34 def focus_changed(self): 35 cols, _ = self.master.ui.get_cols_rows() 36 if self.master.view.focus.flow: 37 self._w = common.format_flow( 38 self.master.view.focus.flow, 39 render_mode=common.RenderMode.DETAILVIEW, 40 hostheader=self.master.options.showhost, 41 ) 42 else: 43 self._w = urwid.Pile([]) 44 45 46class FlowDetails(tabs.Tabs): 47 def __init__(self, master): 48 self.master = master 49 super().__init__([]) 50 self.show() 51 self.last_displayed_body = None 52 53 @property 54 def view(self): 55 return self.master.view 56 57 @property 58 def flow(self) -> mitmproxy.flow.Flow: 59 return self.master.view.focus.flow 60 61 def focus_changed(self): 62 f = self.flow 63 if f: 64 if isinstance(f, http.HTTPFlow): 65 if f.websocket: 66 self.tabs = [ 67 (self.tab_http_request, self.view_request), 68 (self.tab_http_response, self.view_response), 69 (self.tab_websocket_messages, self.view_websocket_messages), 70 (self.tab_details, self.view_details), 71 ] 72 else: 73 self.tabs = [ 74 (self.tab_http_request, self.view_request), 75 (self.tab_http_response, self.view_response), 76 (self.tab_details, self.view_details), 77 ] 78 elif isinstance(f, tcp.TCPFlow): 79 self.tabs = [ 80 (self.tab_tcp_stream, self.view_tcp_stream), 81 (self.tab_details, self.view_details), 82 ] 83 self.show() 84 else: 85 self.master.window.pop() 86 87 def tab_http_request(self): 88 flow = self.flow 89 assert isinstance(flow, http.HTTPFlow) 90 if self.flow.intercepted and not flow.response: 91 return "Request intercepted" 92 else: 93 return "Request" 94 95 def tab_http_response(self): 96 flow = self.flow 97 assert isinstance(flow, http.HTTPFlow) 98 if self.flow.intercepted and flow.response: 99 return "Response intercepted" 100 else: 101 return "Response" 102 103 def tab_tcp_stream(self): 104 return "TCP Stream" 105 106 def tab_websocket_messages(self): 107 return "WebSocket Messages" 108 109 def tab_details(self): 110 return "Detail" 111 112 def view_request(self): 113 flow = self.flow 114 assert isinstance(flow, http.HTTPFlow) 115 return self.conn_text(flow.request) 116 117 def view_response(self): 118 flow = self.flow 119 assert isinstance(flow, http.HTTPFlow) 120 return self.conn_text(flow.response) 121 122 def _contentview_status_bar(self, description: str, viewmode: str): 123 cols = [ 124 urwid.Text( 125 [ 126 ("heading", description), 127 ] 128 ), 129 urwid.Text( 130 [ 131 " ", 132 ('heading', "["), 133 ('heading_key', "m"), 134 ('heading', (":%s]" % viewmode)), 135 ], 136 align="right" 137 ) 138 ] 139 contentview_status_bar = urwid.AttrWrap(urwid.Columns(cols), "heading") 140 return contentview_status_bar 141 142 FROM_CLIENT_MARKER = ("from_client", f"{common.SYMBOL_FROM_CLIENT} ") 143 TO_CLIENT_MARKER = ("to_client", f"{common.SYMBOL_TO_CLIENT} ") 144 145 def view_websocket_messages(self): 146 flow = self.flow 147 assert isinstance(flow, http.HTTPFlow) 148 assert flow.websocket is not None 149 150 if not flow.websocket.messages: 151 return searchable.Searchable([urwid.Text(("highlight", "No messages."))]) 152 153 viewmode = self.master.commands.call("console.flowview.mode") 154 155 widget_lines = [] 156 for m in flow.websocket.messages: 157 _, lines, _ = contentviews.get_message_content_view(viewmode, m, flow) 158 159 for line in lines: 160 if m.from_client: 161 line.insert(0, self.FROM_CLIENT_MARKER) 162 else: 163 line.insert(0, self.TO_CLIENT_MARKER) 164 165 widget_lines.append(urwid.Text(line)) 166 167 if flow.websocket.closed_by_client is not None: 168 widget_lines.append(urwid.Text([ 169 (self.FROM_CLIENT_MARKER if flow.websocket.closed_by_client else self.TO_CLIENT_MARKER), 170 ("alert" if flow.websocket.close_code in (1000, 1001, 1005) else "error", 171 f"Connection closed: {flow.websocket.close_code} {flow.websocket.close_reason}") 172 ])) 173 174 if flow.intercepted: 175 markup = widget_lines[-1].get_text()[0] 176 widget_lines[-1].set_text(("intercept", markup)) 177 178 widget_lines.insert(0, self._contentview_status_bar(viewmode.capitalize(), viewmode)) 179 180 return searchable.Searchable(widget_lines) 181 182 def view_tcp_stream(self) -> urwid.Widget: 183 flow = self.flow 184 assert isinstance(flow, tcp.TCPFlow) 185 186 if not flow.messages: 187 return searchable.Searchable([urwid.Text(("highlight", "No messages."))]) 188 189 viewmode = self.master.commands.call("console.flowview.mode") 190 191 # Merge adjacent TCP "messages". For detailed explanation of this code block see: 192 # https://github.com/mitmproxy/mitmproxy/pull/3970/files/469bd32582f764f9a29607efa4f5b04bd87961fb#r418670880 193 from_client = None 194 messages = [] 195 for message in flow.messages: 196 if message.from_client is not from_client: 197 messages.append(message.content) 198 from_client = message.from_client 199 else: 200 messages[-1] += message.content 201 202 widget_lines = [] 203 204 from_client = flow.messages[0].from_client 205 for m in messages: 206 _, lines, _ = contentviews.get_tcp_content_view(viewmode, m, flow) 207 208 for line in lines: 209 if from_client: 210 line.insert(0, self.FROM_CLIENT_MARKER) 211 else: 212 line.insert(0, self.TO_CLIENT_MARKER) 213 214 widget_lines.append(urwid.Text(line)) 215 216 from_client = not from_client 217 218 if flow.intercepted: 219 markup = widget_lines[-1].get_text()[0] 220 widget_lines[-1].set_text(("intercept", markup)) 221 222 widget_lines.insert(0, self._contentview_status_bar(viewmode.capitalize(), viewmode)) 223 224 return searchable.Searchable(widget_lines) 225 226 def view_details(self): 227 return flowdetailview.flowdetails(self.view, self.flow) 228 229 def content_view(self, viewmode, message): 230 if message.raw_content is None: 231 msg, body = "", [urwid.Text([("error", "[content missing]")])] 232 return msg, body 233 else: 234 full = self.master.commands.execute("view.settings.getval @focus fullcontents false") 235 if full == "true": 236 limit = sys.maxsize 237 else: 238 limit = ctx.options.content_view_lines_cutoff 239 240 flow_modify_cache_invalidation = hash(( 241 message.raw_content, 242 message.headers.fields, 243 getattr(message, "path", None), 244 )) 245 # we need to pass the message off-band because it's not hashable 246 self._get_content_view_message = message 247 return self._get_content_view(viewmode, limit, flow_modify_cache_invalidation) 248 249 @lru_cache(maxsize=200) 250 def _get_content_view(self, viewmode, max_lines, _): 251 message = self._get_content_view_message 252 self._get_content_view_message = None 253 description, lines, error = contentviews.get_message_content_view( 254 viewmode, message, self.flow 255 ) 256 if error: 257 self.master.log.debug(error) 258 # Give hint that you have to tab for the response. 259 if description == "No content" and isinstance(message, http.Request): 260 description = "No request content" 261 262 # If the users has a wide terminal, he gets fewer lines; this should not be an issue. 263 chars_per_line = 80 264 max_chars = max_lines * chars_per_line 265 total_chars = 0 266 text_objects = [] 267 for line in lines: 268 txt = [] 269 for (style, text) in line: 270 if total_chars + len(text) > max_chars: 271 text = text[:max_chars - total_chars] 272 txt.append((style, text)) 273 total_chars += len(text) 274 if total_chars == max_chars: 275 break 276 277 # round up to the next line. 278 total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line) 279 280 text_objects.append(urwid.Text(txt)) 281 if total_chars == max_chars: 282 text_objects.append(urwid.Text([ 283 ("highlight", "Stopped displaying data after %d lines. Press " % max_lines), 284 ("key", "f"), 285 ("highlight", " to load all data.") 286 ])) 287 break 288 289 return description, text_objects 290 291 def conn_text(self, conn): 292 if conn: 293 hdrs = [] 294 for k, v in conn.headers.fields: 295 # This will always force an ascii representation of headers. For example, if the server sends a 296 # 297 # X-Authors: Made with ❤ in Hamburg 298 # 299 # header, mitmproxy will display the following: 300 # 301 # X-Authors: Made with \xe2\x9d\xa4 in Hamburg. 302 # 303 # The alternative would be to just use the header's UTF-8 representation and maybe 304 # do `str.replace("\t", "\\t")` to exempt tabs from urwid's special characters escaping [1]. 305 # That would in some terminals allow rendering UTF-8 characters, but the mapping 306 # wouldn't be bijective, i.e. a user couldn't distinguish "\\t" and "\t". 307 # Also, from a security perspective, a mitmproxy user couldn't be fooled by homoglyphs. 308 # 309 # 1) https://github.com/mitmproxy/mitmproxy/issues/1833 310 # https://github.com/urwid/urwid/blob/6608ee2c9932d264abd1171468d833b7a4082e13/urwid/display_common.py#L35-L36, 311 312 k = strutils.bytes_to_escaped_str(k) + ":" 313 v = strutils.bytes_to_escaped_str(v) 314 hdrs.append((k, v)) 315 txt = common.format_keyvals( 316 hdrs, 317 key_format="header" 318 ) 319 viewmode = self.master.commands.call("console.flowview.mode") 320 msg, body = self.content_view(viewmode, conn) 321 322 cols = [ 323 urwid.Text( 324 [ 325 ("heading", msg), 326 ] 327 ), 328 urwid.Text( 329 [ 330 " ", 331 ('heading', "["), 332 ('heading_key', "m"), 333 ('heading', (":%s]" % viewmode)), 334 ], 335 align="right" 336 ) 337 ] 338 title = urwid.AttrWrap(urwid.Columns(cols), "heading") 339 340 txt.append(title) 341 txt.extend(body) 342 else: 343 txt = [ 344 urwid.Text(""), 345 urwid.Text( 346 [ 347 ("highlight", "No response. Press "), 348 ("key", "e"), 349 ("highlight", " and edit any aspect to add one."), 350 ] 351 ) 352 ] 353 return searchable.Searchable(txt) 354 355 356class FlowView(urwid.Frame, layoutwidget.LayoutWidget): 357 keyctx = "flowview" 358 title = "Flow Details" 359 360 def __init__(self, master): 361 super().__init__( 362 FlowDetails(master), 363 header=FlowViewHeader(master), 364 ) 365 self.master = master 366 367 def focus_changed(self, *args, **kwargs): 368 self.body.focus_changed() 369 self.header.focus_changed() 370