1import enum
2import platform
3import typing
4import math
5from functools import lru_cache
6from publicsuffix2 import get_sld, get_tld
7
8import urwid
9import urwid.util
10
11from mitmproxy import flow
12from mitmproxy.http import HTTPFlow
13from mitmproxy.utils import human, emoji
14from mitmproxy.tcp import TCPFlow
15
16# Detect Windows Subsystem for Linux and Windows
17IS_WINDOWS = "Microsoft" in platform.platform() or "Windows" in platform.platform()
18
19
20def is_keypress(k):
21    """
22        Is this input event a keypress?
23    """
24    if isinstance(k, str):
25        return True
26
27
28def highlight_key(str, key, textattr="text", keyattr="key"):
29    l = []
30    parts = str.split(key, 1)
31    if parts[0]:
32        l.append((textattr, parts[0]))
33    l.append((keyattr, key))
34    if parts[1]:
35        l.append((textattr, parts[1]))
36    return l
37
38
39KEY_MAX = 30
40
41
42def format_keyvals(
43        entries: typing.Iterable[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]],
44        key_format: str = "key",
45        value_format: str = "text",
46        indent: int = 0
47) -> typing.List[urwid.Columns]:
48    """
49    Format a list of (key, value) tuples.
50
51    Args:
52        entries: The list to format. keys must be strings, values can also be None or urwid widgets.
53            The latter makes it possible to use the result of format_keyvals() as a value.
54        key_format: The display attribute for the key.
55        value_format: The display attribute for the value.
56        indent: Additional indent to apply.
57    """
58    max_key_len = max((len(k) for k, v in entries if k is not None), default=0)
59    max_key_len = min(max_key_len, KEY_MAX)
60
61    if indent > 2:
62        indent -= 2  # We use dividechars=2 below, which already adds two empty spaces
63
64    ret = []
65    for k, v in entries:
66        if v is None:
67            v = urwid.Text("")
68        elif not isinstance(v, urwid.Widget):
69            v = urwid.Text([(value_format, v)])
70        ret.append(
71            urwid.Columns(
72                [
73                    ("fixed", indent, urwid.Text("")),
74                    (
75                        "fixed",
76                        max_key_len,
77                        urwid.Text([(key_format, k)])
78                    ),
79                    v
80                ],
81                dividechars=2
82            )
83        )
84    return ret
85
86
87def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]:
88    s = str(s)
89    return (
90        "fixed",
91        len(s),
92        urwid.Text(
93            [
94                (attr, s)
95            ]
96        )
97    )
98
99
100if urwid.util.detected_encoding:
101    SYMBOL_REPLAY = "\u21ba"
102    SYMBOL_RETURN = "\u2190"
103    SYMBOL_MARK = "\u25cf"
104    SYMBOL_UP = "\u21E7"
105    SYMBOL_DOWN = "\u21E9"
106    SYMBOL_ELLIPSIS = "\u2026"
107    SYMBOL_FROM_CLIENT = "\u21d2"
108    SYMBOL_TO_CLIENT = "\u21d0"
109else:
110    SYMBOL_REPLAY = "[r]"
111    SYMBOL_RETURN = "<-"
112    SYMBOL_MARK = "#"
113    SYMBOL_UP = "^"
114    SYMBOL_DOWN = " "
115    SYMBOL_ELLIPSIS = "~"
116    SYMBOL_FROM_CLIENT = "->"
117    SYMBOL_TO_CLIENT = "<-"
118
119SCHEME_STYLES = {
120    'http': 'scheme_http',
121    'https': 'scheme_https',
122    'ws': 'scheme_ws',
123    'wss': 'scheme_wss',
124    'tcp': 'scheme_tcp',
125}
126HTTP_REQUEST_METHOD_STYLES = {
127    'GET': 'method_get',
128    'POST': 'method_post',
129    'DELETE': 'method_delete',
130    'HEAD': 'method_head',
131    'PUT': 'method_put'
132}
133HTTP_RESPONSE_CODE_STYLE = {
134    2: "code_200",
135    3: "code_300",
136    4: "code_400",
137    5: "code_500",
138}
139
140
141class RenderMode(enum.Enum):
142    TABLE = 1
143    """The flow list in table format, i.e. one row per flow."""
144    LIST = 2
145    """The flow list in list format, i.e. potentially multiple rows per flow."""
146    DETAILVIEW = 3
147    """The top lines in the detail view."""
148
149
150def fixlen(s: str, maxlen: int) -> str:
151    if len(s) <= maxlen:
152        return s.ljust(maxlen)
153    else:
154        return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS
155
156
157def fixlen_r(s: str, maxlen: int) -> str:
158    if len(s) <= maxlen:
159        return s.rjust(maxlen)
160    else:
161        return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):]
162
163
164def render_marker(marker: str) -> str:
165    rendered = emoji.emoji.get(marker, SYMBOL_MARK)
166
167    # The marker can only be one glyph. Some emoji that use zero-width joiners (ZWJ)
168    # will not be rendered as a single glyph and instead will show
169    # multiple glyphs. Just use the first glyph as a fallback.
170    # https://emojipedia.org/emoji-zwj-sequence/
171    return rendered[0]
172
173
174class TruncatedText(urwid.Widget):
175    def __init__(self, text, attr, align='left'):
176        self.text = text
177        self.attr = attr
178        self.align = align
179        super().__init__()
180
181    def pack(self, size, focus=False):
182        return (len(self.text), 1)
183
184    def rows(self, size, focus=False):
185        return 1
186
187    def render(self, size, focus=False):
188        text = self.text
189        attr = self.attr
190        if self.align == 'right':
191            text = text[::-1]
192            attr = attr[::-1]
193
194        text_len = len(text)  # TODO: unicode?
195        if size is not None and len(size) > 0:
196            width = size[0]
197        else:
198            width = text_len
199
200        if width >= text_len:
201            remaining = width - text_len
202            if remaining > 0:
203                c_text = text + ' ' * remaining
204                c_attr = attr + [('text', remaining)]
205            else:
206                c_text = text
207                c_attr = attr
208        else:
209            visible_len = width - len(SYMBOL_ELLIPSIS)
210            visible_text = text[0:visible_len]
211            c_text = visible_text + SYMBOL_ELLIPSIS
212            c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) +
213                      [('focus', len(SYMBOL_ELLIPSIS.encode()))])
214
215        if self.align == 'right':
216            c_text = c_text[::-1]
217            c_attr = c_attr[::-1]
218
219        return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width)
220
221
222def truncated_plain(text, attr, align='left'):
223    return TruncatedText(text, [(attr, len(text.encode()))], align)
224
225
226# Work around https://github.com/urwid/urwid/pull/330
227def rle_append_beginning_modify(rle, a_r):
228    """
229    Append (a, r) (unpacked from *a_r*) to BEGINNING of rle.
230    Merge with first run when possible
231
232    MODIFIES rle parameter contents. Returns None.
233    """
234    a, r = a_r
235    if not rle:
236        rle[:] = [(a, r)]
237    else:
238        al, run = rle[0]
239        if a == al:
240            rle[0] = (a, run + r)
241        else:
242            rle[0:0] = [(a, r)]
243
244
245def colorize_host(host):
246    tld = get_tld(host)
247    sld = get_sld(host)
248
249    attr = []
250
251    tld_size = len(tld)
252    sld_size = len(sld) - tld_size
253
254    for letter in reversed(range(len(host))):
255        character = host[letter]
256        if tld_size > 0:
257            style = 'url_domain'
258            tld_size -= 1
259        elif tld_size == 0:
260            style = 'text'
261            tld_size -= 1
262        elif sld_size > 0:
263            sld_size -= 1
264            style = 'url_extension'
265        else:
266            style = 'text'
267        rle_append_beginning_modify(attr, (style, len(character.encode())))
268    return attr
269
270
271def colorize_req(s):
272    path = s.split('?', 2)[0]
273    i_query = len(path)
274    i_last_slash = path.rfind('/')
275    i_ext = path[i_last_slash + 1:].rfind('.')
276    i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s)
277    in_val = False
278    attr = []
279    for i in range(len(s)):
280        c = s[i]
281        if ((i < i_query and c == '/') or
282                (i < i_query and i > i_last_slash and c == '.') or
283                (i == i_query)):
284            a = 'url_punctuation'
285        elif i > i_query:
286            if in_val:
287                if c == '&':
288                    in_val = False
289                    a = 'url_punctuation'
290                else:
291                    a = 'url_query_value'
292            else:
293                if c == '=':
294                    in_val = True
295                    a = 'url_punctuation'
296                else:
297                    a = 'url_query_key'
298        elif i > i_ext:
299            a = 'url_extension'
300        elif i > i_last_slash:
301            a = 'url_filename'
302        else:
303            a = 'text'
304        urwid.util.rle_append_modify(attr, (a, len(c.encode())))
305    return attr
306
307
308def colorize_url(url):
309    parts = url.split('/', 3)
310    if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':':
311        return [('error', len(url))]  # bad URL
312    return [
313               (SCHEME_STYLES.get(parts[0], "scheme_other"), len(parts[0]) - 1),
314               ('url_punctuation', 3),  # ://
315           ] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
316
317
318def format_http_content_type(content_type: str) -> typing.Tuple[str, str]:
319    content_type = content_type.split(";")[0]
320    if content_type.endswith('/javascript'):
321        style = 'content_script'
322    elif content_type.startswith('text/'):
323        style = 'content_text'
324    elif (content_type.startswith('image/') or
325          content_type.startswith('video/') or
326          content_type.startswith('font/') or
327          "/x-font-" in content_type):
328        style = 'content_media'
329    elif content_type.endswith('/json') or content_type.endswith('/xml'):
330        style = 'content_data'
331    elif content_type.startswith('application/'):
332        style = 'content_raw'
333    else:
334        style = 'content_other'
335    return content_type, style
336
337
338def format_duration(duration: float) -> typing.Tuple[str, str]:
339    pretty_duration = human.pretty_duration(duration)
340    style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99))
341    return pretty_duration, style
342
343
344def format_size(num_bytes: int) -> typing.Tuple[str, str]:
345    pretty_size = human.pretty_size(num_bytes)
346    style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99))
347    return pretty_size, style
348
349
350def format_left_indicators(
351        *,
352        focused: bool,
353        intercepted: bool,
354        timestamp: float
355):
356    indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = []
357    if focused:
358        indicators.append(("focus", ">>"))
359    else:
360        indicators.append("  ")
361    pretty_timestamp = human.format_timestamp(timestamp)[-8:]
362    if intercepted:
363        indicators.append(("intercept", pretty_timestamp))
364    else:
365        indicators.append(("text", pretty_timestamp))
366    return "fixed", 10, urwid.Text(indicators)
367
368
369def format_right_indicators(
370        *,
371        replay: bool,
372        marked: str,
373):
374    indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = []
375    if replay:
376        indicators.append(("replay", SYMBOL_REPLAY))
377    else:
378        indicators.append(" ")
379    if bool(marked):
380        indicators.append(("mark", render_marker(marked)))
381    else:
382        indicators.append("  ")
383    return "fixed", 3, urwid.Text(indicators)
384
385
386@lru_cache(maxsize=800)
387def format_http_flow_list(
388        *,
389        render_mode: RenderMode,
390        focused: bool,
391        marked: str,
392        is_replay: bool,
393        request_method: str,
394        request_scheme: str,
395        request_host: str,
396        request_path: str,
397        request_url: str,
398        request_http_version: str,
399        request_timestamp: float,
400        request_is_push_promise: bool,
401        intercepted: bool,
402        response_code: typing.Optional[int],
403        response_reason: typing.Optional[str],
404        response_content_length: typing.Optional[int],
405        response_content_type: typing.Optional[str],
406        duration: typing.Optional[float],
407        error_message: typing.Optional[str],
408) -> urwid.Widget:
409    req = []
410
411    if render_mode is RenderMode.DETAILVIEW:
412        req.append(fcol(human.format_timestamp(request_timestamp), "highlight"))
413    else:
414        if focused:
415            req.append(fcol(">>", "focus"))
416        else:
417            req.append(fcol("  ", "focus"))
418
419    method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
420    req.append(fcol(request_method, method_style))
421
422    if request_is_push_promise:
423        req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
424
425    preamble_len = sum(x[1] for x in req) + len(req) - 1
426
427    if request_http_version not in ("HTTP/1.0", "HTTP/1.1"):
428        request_url += " " + request_http_version
429    if intercepted and not response_code:
430        url_style = "intercept"
431    elif response_code or error_message:
432        url_style = "text"
433    else:
434        url_style = "title"
435
436    if render_mode is RenderMode.DETAILVIEW:
437        req.append(
438            urwid.Text([(url_style, request_url)])
439        )
440    else:
441        req.append(truncated_plain(request_url, url_style))
442
443    req.append(format_right_indicators(replay=is_replay, marked=marked))
444
445    resp = [
446        ("fixed", preamble_len, urwid.Text(""))
447    ]
448    if response_code:
449        if intercepted:
450            style = "intercept"
451        else:
452            style = ""
453
454        status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
455        resp.append(fcol(SYMBOL_RETURN, status_style))
456        resp.append(fcol(str(response_code), status_style))
457        if response_reason and render_mode is RenderMode.DETAILVIEW:
458            resp.append(fcol(response_reason, status_style))
459
460        if response_content_type:
461            ct, ct_style = format_http_content_type(response_content_type)
462            resp.append(fcol(ct, style or ct_style))
463
464        if response_content_length:
465            size, size_style = format_size(response_content_length)
466        elif response_content_length == 0:
467            size = "[no content]"
468            size_style = "text"
469        else:
470            size = "[content missing]"
471            size_style = "text"
472        resp.append(fcol(size, style or size_style))
473
474        if duration:
475            dur, dur_style = format_duration(duration)
476            resp.append(fcol(dur, style or dur_style))
477    elif error_message:
478        resp.append(fcol(SYMBOL_RETURN, "error"))
479        resp.append(urwid.Text([("error", error_message)]))
480
481    return urwid.Pile([
482        urwid.Columns(req, dividechars=1),
483        urwid.Columns(resp, dividechars=1)
484    ])
485
486
487@lru_cache(maxsize=800)
488def format_http_flow_table(
489        *,
490        render_mode: RenderMode,
491        focused: bool,
492        marked: str,
493        is_replay: typing.Optional[str],
494        request_method: str,
495        request_scheme: str,
496        request_host: str,
497        request_path: str,
498        request_url: str,
499        request_http_version: str,
500        request_timestamp: float,
501        request_is_push_promise: bool,
502        intercepted: bool,
503        response_code: typing.Optional[int],
504        response_reason: typing.Optional[str],
505        response_content_length: typing.Optional[int],
506        response_content_type: typing.Optional[str],
507        duration: typing.Optional[float],
508        error_message: typing.Optional[str],
509) -> urwid.Widget:
510    items = [
511        format_left_indicators(
512            focused=focused,
513            intercepted=intercepted,
514            timestamp=request_timestamp
515        )
516    ]
517
518    if intercepted and not response_code:
519        request_style = "intercept"
520    else:
521        request_style = ""
522
523    scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other")
524    items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style))
525
526    if request_is_push_promise:
527        method_style = 'method_http2_push'
528    else:
529        method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
530    items.append(fcol(fixlen(request_method, 4), method_style))
531
532    items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right')))
533    items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left')))
534
535    if intercepted and response_code:
536        response_style = "intercept"
537    else:
538        response_style = ""
539
540    if response_code:
541
542        status = str(response_code)
543        status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
544
545        if response_content_length and response_content_type:
546            content, content_style = format_http_content_type(response_content_type)
547            content_style = response_style or content_style
548        elif response_content_length:
549            content = ''
550            content_style = 'content_none'
551        elif response_content_length == 0:
552            content = "[no content]"
553            content_style = 'content_none'
554        else:
555            content = "[content missing]"
556            content_style = 'content_none'
557
558    elif error_message:
559        status = 'err'
560        status_style = 'error'
561        content = error_message
562        content_style = 'error'
563
564    else:
565        status = ''
566        status_style = 'text'
567        content = ''
568        content_style = ''
569
570    items.append(fcol(fixlen(status, 3), status_style))
571    items.append(('weight', 0.15, truncated_plain(content, content_style, 'right')))
572
573    if response_content_length:
574        size, size_style = format_size(response_content_length)
575        items.append(fcol(fixlen_r(size, 5), response_style or size_style))
576    else:
577        items.append(("fixed", 5, urwid.Text("")))
578
579    if duration:
580        duration_pretty, duration_style = format_duration(duration)
581        items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style))
582    else:
583        items.append(("fixed", 5, urwid.Text("")))
584
585    items.append(format_right_indicators(
586        replay=bool(is_replay),
587        marked=marked,
588    ))
589    return urwid.Columns(items, dividechars=1, min_width=15)
590
591
592@lru_cache(maxsize=800)
593def format_tcp_flow(
594        *,
595        render_mode: RenderMode,
596        focused: bool,
597        timestamp_start: float,
598        marked: str,
599        client_address,
600        server_address,
601        total_size: int,
602        duration: typing.Optional[float],
603        error_message: typing.Optional[str],
604):
605    conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}"
606
607    items = []
608
609    if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW):
610        items.append(
611            format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start)
612        )
613    else:
614        if focused:
615            items.append(fcol(">>", "focus"))
616        else:
617            items.append(fcol("  ", "focus"))
618
619    if render_mode is RenderMode.TABLE:
620        items.append(fcol("TCP  ", SCHEME_STYLES["tcp"]))
621    else:
622        items.append(fcol("TCP", SCHEME_STYLES["tcp"]))
623
624    items.append(('weight', 1.0, truncated_plain(conn, "text", 'left')))
625    if error_message:
626        items.append(('weight', 1.0, truncated_plain(error_message, "error", 'left')))
627
628    if total_size:
629        size, size_style = format_size(total_size)
630        items.append(fcol(fixlen_r(size, 5), size_style))
631    else:
632        items.append(("fixed", 5, urwid.Text("")))
633
634    if duration:
635        duration_pretty, duration_style = format_duration(duration)
636        items.append(fcol(fixlen_r(duration_pretty, 5), duration_style))
637    else:
638        items.append(("fixed", 5, urwid.Text("")))
639
640    items.append(format_right_indicators(replay=False, marked=marked))
641
642    return urwid.Pile([
643        urwid.Columns(items, dividechars=1, min_width=15)
644    ])
645
646
647def format_flow(
648        f: flow.Flow,
649        *,
650        render_mode: RenderMode,
651        hostheader: bool = False,  # pass options directly if we need more stuff from them
652        focused: bool = True,
653) -> urwid.Widget:
654    """
655    This functions calls the proper renderer depending on the flow type.
656    We also want to cache the renderer output, so we extract all attributes
657    relevant for display and call the render with only that. This assures that rows
658    are updated if the flow is changed.
659    """
660    duration: typing.Optional[float]
661    error_message: typing.Optional[str]
662    if f.error:
663        error_message = f.error.msg
664    else:
665        error_message = None
666
667    if isinstance(f, TCPFlow):
668        total_size = 0
669        for message in f.messages:
670            total_size += len(message.content)
671        if f.messages:
672            duration = f.messages[-1].timestamp - f.timestamp_start
673        else:
674            duration = None
675        return format_tcp_flow(
676            render_mode=render_mode,
677            focused=focused,
678            timestamp_start=f.timestamp_start,
679            marked=f.marked,
680            client_address=f.client_conn.peername,
681            server_address=f.server_conn.address,
682            total_size=total_size,
683            duration=duration,
684            error_message=error_message,
685        )
686    elif isinstance(f, HTTPFlow):
687        intercepted = (
688                f.intercepted and not (f.reply and f.reply.state == "committed")
689        )
690        response_content_length: typing.Optional[int]
691        if f.response:
692            if f.response.raw_content is not None:
693                response_content_length = len(f.response.raw_content)
694            else:
695                response_content_length = None
696            response_code: typing.Optional[int] = f.response.status_code
697            response_reason: typing.Optional[str] = f.response.reason
698            response_content_type = f.response.headers.get("content-type")
699            if f.response.timestamp_end:
700                duration = max([f.response.timestamp_end - f.request.timestamp_start, 0])
701            else:
702                duration = None
703        else:
704            response_content_length = None
705            response_code = None
706            response_reason = None
707            response_content_type = None
708            duration = None
709
710        scheme = f.request.scheme
711        if f.websocket is not None:
712            if scheme == "https":
713                scheme = "wss"
714            elif scheme == "http":
715                scheme = "ws"
716
717        if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW):
718            render_func = format_http_flow_list
719        else:
720            render_func = format_http_flow_table
721        return render_func(
722            render_mode=render_mode,
723            focused=focused,
724            marked=f.marked,
725            is_replay=f.is_replay,
726            request_method=f.request.method,
727            request_scheme=scheme,
728            request_host=f.request.pretty_host if hostheader else f.request.host,
729            request_path=f.request.path,
730            request_url=f.request.pretty_url if hostheader else f.request.url,
731            request_http_version=f.request.http_version,
732            request_timestamp=f.request.timestamp_start,
733            request_is_push_promise='h2-pushed-stream' in f.metadata,
734            intercepted=intercepted,
735            response_code=response_code,
736            response_reason=response_reason,
737            response_content_length=response_content_length,
738            response_content_type=response_content_type,
739            duration=duration,
740            error_message=error_message,
741        )
742
743    else:
744        raise NotImplementedError()
745