1import enum 2import platform 3import typing 4import math 5from functools import lru_cache 6from publicsuffix2 import get_sld, get_tld 7 8import urwid 9import urwid.util 10 11from mitmproxy import flow 12from mitmproxy.http import HTTPFlow 13from mitmproxy.utils import human, emoji 14from mitmproxy.tcp import TCPFlow 15 16# Detect Windows Subsystem for Linux and Windows 17IS_WINDOWS = "Microsoft" in platform.platform() or "Windows" in platform.platform() 18 19 20def is_keypress(k): 21 """ 22 Is this input event a keypress? 23 """ 24 if isinstance(k, str): 25 return True 26 27 28def highlight_key(str, key, textattr="text", keyattr="key"): 29 l = [] 30 parts = str.split(key, 1) 31 if parts[0]: 32 l.append((textattr, parts[0])) 33 l.append((keyattr, key)) 34 if parts[1]: 35 l.append((textattr, parts[1])) 36 return l 37 38 39KEY_MAX = 30 40 41 42def format_keyvals( 43 entries: typing.Iterable[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]], 44 key_format: str = "key", 45 value_format: str = "text", 46 indent: int = 0 47) -> typing.List[urwid.Columns]: 48 """ 49 Format a list of (key, value) tuples. 50 51 Args: 52 entries: The list to format. keys must be strings, values can also be None or urwid widgets. 53 The latter makes it possible to use the result of format_keyvals() as a value. 54 key_format: The display attribute for the key. 55 value_format: The display attribute for the value. 56 indent: Additional indent to apply. 57 """ 58 max_key_len = max((len(k) for k, v in entries if k is not None), default=0) 59 max_key_len = min(max_key_len, KEY_MAX) 60 61 if indent > 2: 62 indent -= 2 # We use dividechars=2 below, which already adds two empty spaces 63 64 ret = [] 65 for k, v in entries: 66 if v is None: 67 v = urwid.Text("") 68 elif not isinstance(v, urwid.Widget): 69 v = urwid.Text([(value_format, v)]) 70 ret.append( 71 urwid.Columns( 72 [ 73 ("fixed", indent, urwid.Text("")), 74 ( 75 "fixed", 76 max_key_len, 77 urwid.Text([(key_format, k)]) 78 ), 79 v 80 ], 81 dividechars=2 82 ) 83 ) 84 return ret 85 86 87def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]: 88 s = str(s) 89 return ( 90 "fixed", 91 len(s), 92 urwid.Text( 93 [ 94 (attr, s) 95 ] 96 ) 97 ) 98 99 100if urwid.util.detected_encoding: 101 SYMBOL_REPLAY = "\u21ba" 102 SYMBOL_RETURN = "\u2190" 103 SYMBOL_MARK = "\u25cf" 104 SYMBOL_UP = "\u21E7" 105 SYMBOL_DOWN = "\u21E9" 106 SYMBOL_ELLIPSIS = "\u2026" 107 SYMBOL_FROM_CLIENT = "\u21d2" 108 SYMBOL_TO_CLIENT = "\u21d0" 109else: 110 SYMBOL_REPLAY = "[r]" 111 SYMBOL_RETURN = "<-" 112 SYMBOL_MARK = "#" 113 SYMBOL_UP = "^" 114 SYMBOL_DOWN = " " 115 SYMBOL_ELLIPSIS = "~" 116 SYMBOL_FROM_CLIENT = "->" 117 SYMBOL_TO_CLIENT = "<-" 118 119SCHEME_STYLES = { 120 'http': 'scheme_http', 121 'https': 'scheme_https', 122 'ws': 'scheme_ws', 123 'wss': 'scheme_wss', 124 'tcp': 'scheme_tcp', 125} 126HTTP_REQUEST_METHOD_STYLES = { 127 'GET': 'method_get', 128 'POST': 'method_post', 129 'DELETE': 'method_delete', 130 'HEAD': 'method_head', 131 'PUT': 'method_put' 132} 133HTTP_RESPONSE_CODE_STYLE = { 134 2: "code_200", 135 3: "code_300", 136 4: "code_400", 137 5: "code_500", 138} 139 140 141class RenderMode(enum.Enum): 142 TABLE = 1 143 """The flow list in table format, i.e. one row per flow.""" 144 LIST = 2 145 """The flow list in list format, i.e. potentially multiple rows per flow.""" 146 DETAILVIEW = 3 147 """The top lines in the detail view.""" 148 149 150def fixlen(s: str, maxlen: int) -> str: 151 if len(s) <= maxlen: 152 return s.ljust(maxlen) 153 else: 154 return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS 155 156 157def fixlen_r(s: str, maxlen: int) -> str: 158 if len(s) <= maxlen: 159 return s.rjust(maxlen) 160 else: 161 return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):] 162 163 164def render_marker(marker: str) -> str: 165 rendered = emoji.emoji.get(marker, SYMBOL_MARK) 166 167 # The marker can only be one glyph. Some emoji that use zero-width joiners (ZWJ) 168 # will not be rendered as a single glyph and instead will show 169 # multiple glyphs. Just use the first glyph as a fallback. 170 # https://emojipedia.org/emoji-zwj-sequence/ 171 return rendered[0] 172 173 174class TruncatedText(urwid.Widget): 175 def __init__(self, text, attr, align='left'): 176 self.text = text 177 self.attr = attr 178 self.align = align 179 super().__init__() 180 181 def pack(self, size, focus=False): 182 return (len(self.text), 1) 183 184 def rows(self, size, focus=False): 185 return 1 186 187 def render(self, size, focus=False): 188 text = self.text 189 attr = self.attr 190 if self.align == 'right': 191 text = text[::-1] 192 attr = attr[::-1] 193 194 text_len = len(text) # TODO: unicode? 195 if size is not None and len(size) > 0: 196 width = size[0] 197 else: 198 width = text_len 199 200 if width >= text_len: 201 remaining = width - text_len 202 if remaining > 0: 203 c_text = text + ' ' * remaining 204 c_attr = attr + [('text', remaining)] 205 else: 206 c_text = text 207 c_attr = attr 208 else: 209 visible_len = width - len(SYMBOL_ELLIPSIS) 210 visible_text = text[0:visible_len] 211 c_text = visible_text + SYMBOL_ELLIPSIS 212 c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) + 213 [('focus', len(SYMBOL_ELLIPSIS.encode()))]) 214 215 if self.align == 'right': 216 c_text = c_text[::-1] 217 c_attr = c_attr[::-1] 218 219 return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width) 220 221 222def truncated_plain(text, attr, align='left'): 223 return TruncatedText(text, [(attr, len(text.encode()))], align) 224 225 226# Work around https://github.com/urwid/urwid/pull/330 227def rle_append_beginning_modify(rle, a_r): 228 """ 229 Append (a, r) (unpacked from *a_r*) to BEGINNING of rle. 230 Merge with first run when possible 231 232 MODIFIES rle parameter contents. Returns None. 233 """ 234 a, r = a_r 235 if not rle: 236 rle[:] = [(a, r)] 237 else: 238 al, run = rle[0] 239 if a == al: 240 rle[0] = (a, run + r) 241 else: 242 rle[0:0] = [(a, r)] 243 244 245def colorize_host(host): 246 tld = get_tld(host) 247 sld = get_sld(host) 248 249 attr = [] 250 251 tld_size = len(tld) 252 sld_size = len(sld) - tld_size 253 254 for letter in reversed(range(len(host))): 255 character = host[letter] 256 if tld_size > 0: 257 style = 'url_domain' 258 tld_size -= 1 259 elif tld_size == 0: 260 style = 'text' 261 tld_size -= 1 262 elif sld_size > 0: 263 sld_size -= 1 264 style = 'url_extension' 265 else: 266 style = 'text' 267 rle_append_beginning_modify(attr, (style, len(character.encode()))) 268 return attr 269 270 271def colorize_req(s): 272 path = s.split('?', 2)[0] 273 i_query = len(path) 274 i_last_slash = path.rfind('/') 275 i_ext = path[i_last_slash + 1:].rfind('.') 276 i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s) 277 in_val = False 278 attr = [] 279 for i in range(len(s)): 280 c = s[i] 281 if ((i < i_query and c == '/') or 282 (i < i_query and i > i_last_slash and c == '.') or 283 (i == i_query)): 284 a = 'url_punctuation' 285 elif i > i_query: 286 if in_val: 287 if c == '&': 288 in_val = False 289 a = 'url_punctuation' 290 else: 291 a = 'url_query_value' 292 else: 293 if c == '=': 294 in_val = True 295 a = 'url_punctuation' 296 else: 297 a = 'url_query_key' 298 elif i > i_ext: 299 a = 'url_extension' 300 elif i > i_last_slash: 301 a = 'url_filename' 302 else: 303 a = 'text' 304 urwid.util.rle_append_modify(attr, (a, len(c.encode()))) 305 return attr 306 307 308def colorize_url(url): 309 parts = url.split('/', 3) 310 if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':': 311 return [('error', len(url))] # bad URL 312 return [ 313 (SCHEME_STYLES.get(parts[0], "scheme_other"), len(parts[0]) - 1), 314 ('url_punctuation', 3), # :// 315 ] + colorize_host(parts[2]) + colorize_req('/' + parts[3]) 316 317 318def format_http_content_type(content_type: str) -> typing.Tuple[str, str]: 319 content_type = content_type.split(";")[0] 320 if content_type.endswith('/javascript'): 321 style = 'content_script' 322 elif content_type.startswith('text/'): 323 style = 'content_text' 324 elif (content_type.startswith('image/') or 325 content_type.startswith('video/') or 326 content_type.startswith('font/') or 327 "/x-font-" in content_type): 328 style = 'content_media' 329 elif content_type.endswith('/json') or content_type.endswith('/xml'): 330 style = 'content_data' 331 elif content_type.startswith('application/'): 332 style = 'content_raw' 333 else: 334 style = 'content_other' 335 return content_type, style 336 337 338def format_duration(duration: float) -> typing.Tuple[str, str]: 339 pretty_duration = human.pretty_duration(duration) 340 style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99)) 341 return pretty_duration, style 342 343 344def format_size(num_bytes: int) -> typing.Tuple[str, str]: 345 pretty_size = human.pretty_size(num_bytes) 346 style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99)) 347 return pretty_size, style 348 349 350def format_left_indicators( 351 *, 352 focused: bool, 353 intercepted: bool, 354 timestamp: float 355): 356 indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] 357 if focused: 358 indicators.append(("focus", ">>")) 359 else: 360 indicators.append(" ") 361 pretty_timestamp = human.format_timestamp(timestamp)[-8:] 362 if intercepted: 363 indicators.append(("intercept", pretty_timestamp)) 364 else: 365 indicators.append(("text", pretty_timestamp)) 366 return "fixed", 10, urwid.Text(indicators) 367 368 369def format_right_indicators( 370 *, 371 replay: bool, 372 marked: str, 373): 374 indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] 375 if replay: 376 indicators.append(("replay", SYMBOL_REPLAY)) 377 else: 378 indicators.append(" ") 379 if bool(marked): 380 indicators.append(("mark", render_marker(marked))) 381 else: 382 indicators.append(" ") 383 return "fixed", 3, urwid.Text(indicators) 384 385 386@lru_cache(maxsize=800) 387def format_http_flow_list( 388 *, 389 render_mode: RenderMode, 390 focused: bool, 391 marked: str, 392 is_replay: bool, 393 request_method: str, 394 request_scheme: str, 395 request_host: str, 396 request_path: str, 397 request_url: str, 398 request_http_version: str, 399 request_timestamp: float, 400 request_is_push_promise: bool, 401 intercepted: bool, 402 response_code: typing.Optional[int], 403 response_reason: typing.Optional[str], 404 response_content_length: typing.Optional[int], 405 response_content_type: typing.Optional[str], 406 duration: typing.Optional[float], 407 error_message: typing.Optional[str], 408) -> urwid.Widget: 409 req = [] 410 411 if render_mode is RenderMode.DETAILVIEW: 412 req.append(fcol(human.format_timestamp(request_timestamp), "highlight")) 413 else: 414 if focused: 415 req.append(fcol(">>", "focus")) 416 else: 417 req.append(fcol(" ", "focus")) 418 419 method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") 420 req.append(fcol(request_method, method_style)) 421 422 if request_is_push_promise: 423 req.append(fcol('PUSH_PROMISE', 'method_http2_push')) 424 425 preamble_len = sum(x[1] for x in req) + len(req) - 1 426 427 if request_http_version not in ("HTTP/1.0", "HTTP/1.1"): 428 request_url += " " + request_http_version 429 if intercepted and not response_code: 430 url_style = "intercept" 431 elif response_code or error_message: 432 url_style = "text" 433 else: 434 url_style = "title" 435 436 if render_mode is RenderMode.DETAILVIEW: 437 req.append( 438 urwid.Text([(url_style, request_url)]) 439 ) 440 else: 441 req.append(truncated_plain(request_url, url_style)) 442 443 req.append(format_right_indicators(replay=is_replay, marked=marked)) 444 445 resp = [ 446 ("fixed", preamble_len, urwid.Text("")) 447 ] 448 if response_code: 449 if intercepted: 450 style = "intercept" 451 else: 452 style = "" 453 454 status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") 455 resp.append(fcol(SYMBOL_RETURN, status_style)) 456 resp.append(fcol(str(response_code), status_style)) 457 if response_reason and render_mode is RenderMode.DETAILVIEW: 458 resp.append(fcol(response_reason, status_style)) 459 460 if response_content_type: 461 ct, ct_style = format_http_content_type(response_content_type) 462 resp.append(fcol(ct, style or ct_style)) 463 464 if response_content_length: 465 size, size_style = format_size(response_content_length) 466 elif response_content_length == 0: 467 size = "[no content]" 468 size_style = "text" 469 else: 470 size = "[content missing]" 471 size_style = "text" 472 resp.append(fcol(size, style or size_style)) 473 474 if duration: 475 dur, dur_style = format_duration(duration) 476 resp.append(fcol(dur, style or dur_style)) 477 elif error_message: 478 resp.append(fcol(SYMBOL_RETURN, "error")) 479 resp.append(urwid.Text([("error", error_message)])) 480 481 return urwid.Pile([ 482 urwid.Columns(req, dividechars=1), 483 urwid.Columns(resp, dividechars=1) 484 ]) 485 486 487@lru_cache(maxsize=800) 488def format_http_flow_table( 489 *, 490 render_mode: RenderMode, 491 focused: bool, 492 marked: str, 493 is_replay: typing.Optional[str], 494 request_method: str, 495 request_scheme: str, 496 request_host: str, 497 request_path: str, 498 request_url: str, 499 request_http_version: str, 500 request_timestamp: float, 501 request_is_push_promise: bool, 502 intercepted: bool, 503 response_code: typing.Optional[int], 504 response_reason: typing.Optional[str], 505 response_content_length: typing.Optional[int], 506 response_content_type: typing.Optional[str], 507 duration: typing.Optional[float], 508 error_message: typing.Optional[str], 509) -> urwid.Widget: 510 items = [ 511 format_left_indicators( 512 focused=focused, 513 intercepted=intercepted, 514 timestamp=request_timestamp 515 ) 516 ] 517 518 if intercepted and not response_code: 519 request_style = "intercept" 520 else: 521 request_style = "" 522 523 scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other") 524 items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style)) 525 526 if request_is_push_promise: 527 method_style = 'method_http2_push' 528 else: 529 method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") 530 items.append(fcol(fixlen(request_method, 4), method_style)) 531 532 items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right'))) 533 items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left'))) 534 535 if intercepted and response_code: 536 response_style = "intercept" 537 else: 538 response_style = "" 539 540 if response_code: 541 542 status = str(response_code) 543 status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") 544 545 if response_content_length and response_content_type: 546 content, content_style = format_http_content_type(response_content_type) 547 content_style = response_style or content_style 548 elif response_content_length: 549 content = '' 550 content_style = 'content_none' 551 elif response_content_length == 0: 552 content = "[no content]" 553 content_style = 'content_none' 554 else: 555 content = "[content missing]" 556 content_style = 'content_none' 557 558 elif error_message: 559 status = 'err' 560 status_style = 'error' 561 content = error_message 562 content_style = 'error' 563 564 else: 565 status = '' 566 status_style = 'text' 567 content = '' 568 content_style = '' 569 570 items.append(fcol(fixlen(status, 3), status_style)) 571 items.append(('weight', 0.15, truncated_plain(content, content_style, 'right'))) 572 573 if response_content_length: 574 size, size_style = format_size(response_content_length) 575 items.append(fcol(fixlen_r(size, 5), response_style or size_style)) 576 else: 577 items.append(("fixed", 5, urwid.Text(""))) 578 579 if duration: 580 duration_pretty, duration_style = format_duration(duration) 581 items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style)) 582 else: 583 items.append(("fixed", 5, urwid.Text(""))) 584 585 items.append(format_right_indicators( 586 replay=bool(is_replay), 587 marked=marked, 588 )) 589 return urwid.Columns(items, dividechars=1, min_width=15) 590 591 592@lru_cache(maxsize=800) 593def format_tcp_flow( 594 *, 595 render_mode: RenderMode, 596 focused: bool, 597 timestamp_start: float, 598 marked: str, 599 client_address, 600 server_address, 601 total_size: int, 602 duration: typing.Optional[float], 603 error_message: typing.Optional[str], 604): 605 conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}" 606 607 items = [] 608 609 if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW): 610 items.append( 611 format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start) 612 ) 613 else: 614 if focused: 615 items.append(fcol(">>", "focus")) 616 else: 617 items.append(fcol(" ", "focus")) 618 619 if render_mode is RenderMode.TABLE: 620 items.append(fcol("TCP ", SCHEME_STYLES["tcp"])) 621 else: 622 items.append(fcol("TCP", SCHEME_STYLES["tcp"])) 623 624 items.append(('weight', 1.0, truncated_plain(conn, "text", 'left'))) 625 if error_message: 626 items.append(('weight', 1.0, truncated_plain(error_message, "error", 'left'))) 627 628 if total_size: 629 size, size_style = format_size(total_size) 630 items.append(fcol(fixlen_r(size, 5), size_style)) 631 else: 632 items.append(("fixed", 5, urwid.Text(""))) 633 634 if duration: 635 duration_pretty, duration_style = format_duration(duration) 636 items.append(fcol(fixlen_r(duration_pretty, 5), duration_style)) 637 else: 638 items.append(("fixed", 5, urwid.Text(""))) 639 640 items.append(format_right_indicators(replay=False, marked=marked)) 641 642 return urwid.Pile([ 643 urwid.Columns(items, dividechars=1, min_width=15) 644 ]) 645 646 647def format_flow( 648 f: flow.Flow, 649 *, 650 render_mode: RenderMode, 651 hostheader: bool = False, # pass options directly if we need more stuff from them 652 focused: bool = True, 653) -> urwid.Widget: 654 """ 655 This functions calls the proper renderer depending on the flow type. 656 We also want to cache the renderer output, so we extract all attributes 657 relevant for display and call the render with only that. This assures that rows 658 are updated if the flow is changed. 659 """ 660 duration: typing.Optional[float] 661 error_message: typing.Optional[str] 662 if f.error: 663 error_message = f.error.msg 664 else: 665 error_message = None 666 667 if isinstance(f, TCPFlow): 668 total_size = 0 669 for message in f.messages: 670 total_size += len(message.content) 671 if f.messages: 672 duration = f.messages[-1].timestamp - f.timestamp_start 673 else: 674 duration = None 675 return format_tcp_flow( 676 render_mode=render_mode, 677 focused=focused, 678 timestamp_start=f.timestamp_start, 679 marked=f.marked, 680 client_address=f.client_conn.peername, 681 server_address=f.server_conn.address, 682 total_size=total_size, 683 duration=duration, 684 error_message=error_message, 685 ) 686 elif isinstance(f, HTTPFlow): 687 intercepted = ( 688 f.intercepted and not (f.reply and f.reply.state == "committed") 689 ) 690 response_content_length: typing.Optional[int] 691 if f.response: 692 if f.response.raw_content is not None: 693 response_content_length = len(f.response.raw_content) 694 else: 695 response_content_length = None 696 response_code: typing.Optional[int] = f.response.status_code 697 response_reason: typing.Optional[str] = f.response.reason 698 response_content_type = f.response.headers.get("content-type") 699 if f.response.timestamp_end: 700 duration = max([f.response.timestamp_end - f.request.timestamp_start, 0]) 701 else: 702 duration = None 703 else: 704 response_content_length = None 705 response_code = None 706 response_reason = None 707 response_content_type = None 708 duration = None 709 710 scheme = f.request.scheme 711 if f.websocket is not None: 712 if scheme == "https": 713 scheme = "wss" 714 elif scheme == "http": 715 scheme = "ws" 716 717 if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW): 718 render_func = format_http_flow_list 719 else: 720 render_func = format_http_flow_table 721 return render_func( 722 render_mode=render_mode, 723 focused=focused, 724 marked=f.marked, 725 is_replay=f.is_replay, 726 request_method=f.request.method, 727 request_scheme=scheme, 728 request_host=f.request.pretty_host if hostheader else f.request.host, 729 request_path=f.request.path, 730 request_url=f.request.pretty_url if hostheader else f.request.url, 731 request_http_version=f.request.http_version, 732 request_timestamp=f.request.timestamp_start, 733 request_is_push_promise='h2-pushed-stream' in f.metadata, 734 intercepted=intercepted, 735 response_code=response_code, 736 response_reason=response_reason, 737 response_content_length=response_content_length, 738 response_content_type=response_content_type, 739 duration=duration, 740 error_message=error_message, 741 ) 742 743 else: 744 raise NotImplementedError() 745