1import binascii
2import os
3import re
4import time
5import urllib.parse
6import json
7from dataclasses import dataclass
8from dataclasses import fields
9from email.utils import formatdate
10from email.utils import mktime_tz
11from email.utils import parsedate_tz
12from typing import Callable
13from typing import Dict
14from typing import Iterable
15from typing import Iterator
16from typing import List
17from typing import Mapping
18from typing import Optional
19from typing import Tuple
20from typing import Union
21from typing import cast
22from typing import Any
23
24from mitmproxy import flow
25from mitmproxy.websocket import WebSocketData
26from mitmproxy.coretypes import multidict
27from mitmproxy.coretypes import serializable
28from mitmproxy.net import encoding
29from mitmproxy.net.http import cookies
30from mitmproxy.net.http import multipart
31from mitmproxy.net.http import status_codes
32from mitmproxy.net.http import url
33from mitmproxy.net.http.headers import assemble_content_type
34from mitmproxy.net.http.headers import parse_content_type
35from mitmproxy.utils import human
36from mitmproxy.utils import strutils
37from mitmproxy.utils import typecheck
38from mitmproxy.utils.strutils import always_bytes
39from mitmproxy.utils.strutils import always_str
40
41
42# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
43def _native(x: bytes) -> str:
44    return x.decode("utf-8", "surrogateescape")
45
46
47def _always_bytes(x: Union[str, bytes]) -> bytes:
48    return strutils.always_bytes(x, "utf-8", "surrogateescape")
49
50
51# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types.
52class Headers(multidict.MultiDict):  # type: ignore
53    """
54    Header class which allows both convenient access to individual headers as well as
55    direct access to the underlying raw data. Provides a full dictionary interface.
56
57    Create headers with keyword arguments:
58    >>> h = Headers(host="example.com", content_type="application/xml")
59
60    Headers mostly behave like a normal dict:
61    >>> h["Host"]
62    "example.com"
63
64    Headers are case insensitive:
65    >>> h["host"]
66    "example.com"
67
68    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
69    >>> h = Headers([
70        (b"Host",b"example.com"),
71        (b"Accept",b"text/html"),
72        (b"accept",b"application/xml")
73    ])
74
75    Multiple headers are folded into a single header as per RFC 7230:
76    >>> h["Accept"]
77    "text/html, application/xml"
78
79    Setting a header removes all existing headers with the same name:
80    >>> h["Accept"] = "application/text"
81    >>> h["Accept"]
82    "application/text"
83
84    `bytes(h)` returns an HTTP/1 header block:
85    >>> print(bytes(h))
86    Host: example.com
87    Accept: application/text
88
89    For full control, the raw header fields can be accessed:
90    >>> h.fields
91
92    Caveats:
93     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
94    """
95
96    def __init__(self, fields: Iterable[Tuple[bytes, bytes]] = (), **headers):
97        """
98        *Args:*
99         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
100           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
101         - *\*\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
102           For convenience, underscores in header names will be transformed to dashes -
103           this behaviour does not extend to other methods.
104
105        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
106        the behavior is undefined.
107        """
108        super().__init__(fields)
109
110        for key, value in self.fields:
111            if not isinstance(key, bytes) or not isinstance(value, bytes):
112                raise TypeError("Header fields must be bytes.")
113
114        # content_type -> content-type
115        self.update({
116            _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
117            for name, value in headers.items()
118        })
119
120    fields: Tuple[Tuple[bytes, bytes], ...]
121
122    @staticmethod
123    def _reduce_values(values) -> str:
124        # Headers can be folded
125        return ", ".join(values)
126
127    @staticmethod
128    def _kconv(key) -> str:
129        # Headers are case-insensitive
130        return key.lower()
131
132    def __bytes__(self) -> bytes:
133        if self.fields:
134            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
135        else:
136            return b""
137
138    def __delitem__(self, key: Union[str, bytes]) -> None:
139        key = _always_bytes(key)
140        super().__delitem__(key)
141
142    def __iter__(self) -> Iterator[str]:
143        for x in super().__iter__():
144            yield _native(x)
145
146    def get_all(self, name: Union[str, bytes]) -> List[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [
158            _native(x) for x in
159            super().get_all(name)
160        ]
161
162    def set_all(self, name: Union[str, bytes], values: List[Union[str, bytes]]):
163        """
164        Explicitly set multiple headers for the given key.
165        See `Headers.get_all`.
166        """
167        name = _always_bytes(name)
168        values = [_always_bytes(x) for x in values]
169        return super().set_all(name, values)
170
171    def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]):
172        key = _always_bytes(key)
173        value = _always_bytes(value)
174        super().insert(index, key, value)
175
176    def items(self, multi=False):
177        if multi:
178            return (
179                (_native(k), _native(v))
180                for k, v in self.fields
181            )
182        else:
183            return super().items()
184
185
186@dataclass
187class MessageData(serializable.Serializable):
188    http_version: bytes
189    headers: Headers
190    content: Optional[bytes]
191    trailers: Optional[Headers]
192    timestamp_start: float
193    timestamp_end: Optional[float]
194
195    # noinspection PyUnreachableCode
196    if __debug__:
197        def __post_init__(self):
198            for field in fields(self):
199                val = getattr(self, field.name)
200                typecheck.check_option_type(field.name, val, field.type)
201
202    def set_state(self, state):
203        for k, v in state.items():
204            if k in ("headers", "trailers") and v is not None:
205                v = Headers.from_state(v)
206            setattr(self, k, v)
207
208    def get_state(self):
209        state = vars(self).copy()
210        state["headers"] = state["headers"].get_state()
211        if state["trailers"] is not None:
212            state["trailers"] = state["trailers"].get_state()
213        return state
214
215    @classmethod
216    def from_state(cls, state):
217        state["headers"] = Headers.from_state(state["headers"])
218        if state["trailers"] is not None:
219            state["trailers"] = Headers.from_state(state["trailers"])
220        return cls(**state)
221
222
223@dataclass
224class RequestData(MessageData):
225    host: str
226    port: int
227    method: bytes
228    scheme: bytes
229    authority: bytes
230    path: bytes
231
232
233@dataclass
234class ResponseData(MessageData):
235    status_code: int
236    reason: bytes
237
238
239class Message(serializable.Serializable):
240    """Base class for `Request` and `Response`."""
241
242    @classmethod
243    def from_state(cls, state):
244        return cls(**state)
245
246    def get_state(self):
247        return self.data.get_state()
248
249    def set_state(self, state):
250        self.data.set_state(state)
251
252    data: MessageData
253    stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False
254    """
255    This attribute controls if the message body should be streamed.
256
257    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
258    This makes it possible to perform string replacements on the entire body.
259    If `True`, the message body will not be buffered on the proxy
260    but immediately forwarded instead.
261    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
262    Please note that packet boundaries generally should not be relied upon.
263
264    This attribute must be set in the `requestheaders` or `responseheaders` hook.
265    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
266    """
267
268    @property
269    def http_version(self) -> str:
270        """
271        HTTP version string, for example `HTTP/1.1`.
272        """
273        return self.data.http_version.decode("utf-8", "surrogateescape")
274
275    @http_version.setter
276    def http_version(self, http_version: Union[str, bytes]) -> None:
277        self.data.http_version = strutils.always_bytes(http_version, "utf-8", "surrogateescape")
278
279    @property
280    def is_http10(self) -> bool:
281        return self.data.http_version == b"HTTP/1.0"
282
283    @property
284    def is_http11(self) -> bool:
285        return self.data.http_version == b"HTTP/1.1"
286
287    @property
288    def is_http2(self) -> bool:
289        return self.data.http_version == b"HTTP/2.0"
290
291    @property
292    def headers(self) -> Headers:
293        """
294        The HTTP headers.
295        """
296        return self.data.headers
297
298    @headers.setter
299    def headers(self, h: Headers) -> None:
300        self.data.headers = h
301
302    @property
303    def trailers(self) -> Optional[Headers]:
304        """
305        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
306        """
307        return self.data.trailers
308
309    @trailers.setter
310    def trailers(self, h: Optional[Headers]) -> None:
311        self.data.trailers = h
312
313    @property
314    def raw_content(self) -> Optional[bytes]:
315        """
316        The raw (potentially compressed) HTTP message body.
317
318        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
319
320        *See also:* `Message.content`, `Message.text`
321        """
322        return self.data.content
323
324    @raw_content.setter
325    def raw_content(self, content: Optional[bytes]) -> None:
326        self.data.content = content
327
328    @property
329    def content(self) -> Optional[bytes]:
330        """
331        The uncompressed HTTP message body as bytes.
332
333        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
334
335        *See also:* `Message.raw_content`, `Message.text`
336        """
337        return self.get_content()
338
339    @content.setter
340    def content(self, value: Optional[bytes]) -> None:
341        self.set_content(value)
342
343    @property
344    def text(self) -> Optional[str]:
345        """
346        The uncompressed and decoded HTTP message body as text.
347
348        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
349
350        *See also:* `Message.raw_content`, `Message.content`
351        """
352        return self.get_text()
353
354    @text.setter
355    def text(self, value: Optional[str]) -> None:
356        self.set_text(value)
357
358    def set_content(self, value: Optional[bytes]) -> None:
359        if value is None:
360            self.raw_content = None
361            return
362        if not isinstance(value, bytes):
363            raise TypeError(
364                f"Message content must be bytes, not {type(value).__name__}. "
365                "Please use .text if you want to assign a str."
366            )
367        ce = self.headers.get("content-encoding")
368        try:
369            self.raw_content = encoding.encode(value, ce or "identity")
370        except ValueError:
371            # So we have an invalid content-encoding?
372            # Let's remove it!
373            del self.headers["content-encoding"]
374            self.raw_content = value
375
376        if "transfer-encoding" in self.headers:
377            # https://httpwg.org/specs/rfc7230.html#header.content-length
378            # don't set content-length if a transfer-encoding is provided
379            pass
380        else:
381            self.headers["content-length"] = str(len(self.raw_content))
382
383    def get_content(self, strict: bool = True) -> Optional[bytes]:
384        """
385        Similar to `Message.content`, but does not raise if `strict` is `False`.
386        Instead, the compressed message body is returned as-is.
387        """
388        if self.raw_content is None:
389            return None
390        ce = self.headers.get("content-encoding")
391        if ce:
392            try:
393                content = encoding.decode(self.raw_content, ce)
394                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
395                if isinstance(content, str):
396                    raise ValueError(f"Invalid Content-Encoding: {ce}")
397                return content
398            except ValueError:
399                if strict:
400                    raise
401                return self.raw_content
402        else:
403            return self.raw_content
404
405    def _get_content_type_charset(self) -> Optional[str]:
406        ct = parse_content_type(self.headers.get("content-type", ""))
407        if ct:
408            return ct[2].get("charset")
409        return None
410
411    def _guess_encoding(self, content: bytes = b"") -> str:
412        enc = self._get_content_type_charset()
413        if not enc:
414            if "json" in self.headers.get("content-type", ""):
415                enc = "utf8"
416        if not enc:
417            meta_charset = re.search(rb"""<meta[^>]+charset=['"]?([^'">]+)""", content)
418            if meta_charset:
419                enc = meta_charset.group(1).decode("ascii", "ignore")
420        if not enc:
421            if "text/css" in self.headers.get("content-type", ""):
422                # @charset rule must be the very first thing.
423                css_charset = re.match(rb"""@charset "([^"]+)";""", content)
424                if css_charset:
425                    enc = css_charset.group(1).decode("ascii", "ignore")
426        if not enc:
427            enc = "latin-1"
428        # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites.
429        if enc.lower() in ("gb2312", "gbk"):
430            enc = "gb18030"
431
432        return enc
433
434    def set_text(self, text: Optional[str]) -> None:
435        if text is None:
436            self.content = None
437            return
438        enc = self._guess_encoding()
439
440        try:
441            self.content = cast(bytes, encoding.encode(text, enc))
442        except ValueError:
443            # Fall back to UTF-8 and update the content-type header.
444            ct = parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {})
445            ct[2]["charset"] = "utf-8"
446            self.headers["content-type"] = assemble_content_type(*ct)
447            enc = "utf8"
448            self.content = text.encode(enc, "surrogateescape")
449
450    def get_text(self, strict: bool = True) -> Optional[str]:
451        """
452        Similar to `Message.text`, but does not raise if `strict` is `False`.
453        Instead, the message body is returned as surrogate-escaped UTF-8.
454        """
455        content = self.get_content(strict)
456        if content is None:
457            return None
458        enc = self._guess_encoding(content)
459        try:
460            return cast(str, encoding.decode(content, enc))
461        except ValueError:
462            if strict:
463                raise
464            return content.decode("utf8", "surrogateescape")
465
466    @property
467    def timestamp_start(self) -> float:
468        """
469        *Timestamp:* Headers received.
470        """
471        return self.data.timestamp_start
472
473    @timestamp_start.setter
474    def timestamp_start(self, timestamp_start: float) -> None:
475        self.data.timestamp_start = timestamp_start
476
477    @property
478    def timestamp_end(self) -> Optional[float]:
479        """
480        *Timestamp:* Last byte received.
481        """
482        return self.data.timestamp_end
483
484    @timestamp_end.setter
485    def timestamp_end(self, timestamp_end: Optional[float]):
486        self.data.timestamp_end = timestamp_end
487
488    def decode(self, strict: bool = True) -> None:
489        """
490        Decodes body based on the current Content-Encoding header, then
491        removes the header. If there is no Content-Encoding header, no
492        action is taken.
493
494        *Raises:*
495         - `ValueError`, when the content-encoding is invalid and strict is True.
496        """
497        decoded = self.get_content(strict)
498        self.headers.pop("content-encoding", None)
499        self.content = decoded
500
501    def encode(self, encoding: str) -> None:
502        """
503        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
504        Any existing content-encodings are overwritten, the content is not decoded beforehand.
505
506        *Raises:*
507         - `ValueError`, when the specified content-encoding is invalid.
508        """
509        self.headers["content-encoding"] = encoding
510        self.content = self.raw_content
511        if "content-encoding" not in self.headers:
512            raise ValueError("Invalid content encoding {}".format(repr(encoding)))
513
514    def json(self, **kwargs: Any) -> Any:
515        """
516        Returns the JSON encoded content of the response, if any.
517        `**kwargs` are optional arguments that will be
518        passed to `json.loads()`.
519
520        Will raise if the content can not be decoded and then parsed as JSON.
521
522        *Raises:*
523         - `json.decoder.JSONDecodeError` if content is not valid JSON.
524         - `TypeError` if the content is not available, for example because the response
525            has been streamed.
526        """
527        content = self.get_content(strict=False)
528        if content is None:
529            raise TypeError('Message content is not available.')
530        else:
531            return json.loads(content, **kwargs)
532
533
534class Request(Message):
535    """
536    An HTTP request.
537    """
538    data: RequestData
539
540    def __init__(
541        self,
542        host: str,
543        port: int,
544        method: bytes,
545        scheme: bytes,
546        authority: bytes,
547        path: bytes,
548        http_version: bytes,
549        headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]],
550        content: Optional[bytes],
551        trailers: Union[Headers, Tuple[Tuple[bytes, bytes], ...], None],
552        timestamp_start: float,
553        timestamp_end: Optional[float],
554    ):
555        # auto-convert invalid types to retain compatibility with older code.
556        if isinstance(host, bytes):
557            host = host.decode("idna", "strict")
558        if isinstance(method, str):
559            method = method.encode("ascii", "strict")
560        if isinstance(scheme, str):
561            scheme = scheme.encode("ascii", "strict")
562        if isinstance(authority, str):
563            authority = authority.encode("ascii", "strict")
564        if isinstance(path, str):
565            path = path.encode("ascii", "strict")
566        if isinstance(http_version, str):
567            http_version = http_version.encode("ascii", "strict")
568
569        if isinstance(content, str):
570            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
571        if not isinstance(headers, Headers):
572            headers = Headers(headers)
573        if trailers is not None and not isinstance(trailers, Headers):
574            trailers = Headers(trailers)
575
576        self.data = RequestData(
577            host=host,
578            port=port,
579            method=method,
580            scheme=scheme,
581            authority=authority,
582            path=path,
583            http_version=http_version,
584            headers=headers,
585            content=content,
586            trailers=trailers,
587            timestamp_start=timestamp_start,
588            timestamp_end=timestamp_end,
589        )
590
591    def __repr__(self) -> str:
592        if self.host and self.port:
593            hostport = f"{self.host}:{self.port}"
594        else:
595            hostport = ""
596        path = self.path or ""
597        return f"Request({self.method} {hostport}{path})"
598
599    @classmethod
600    def make(
601        cls,
602        method: str,
603        url: str,
604        content: Union[bytes, str] = "",
605        headers: Union[Headers, Dict[Union[str, bytes], Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = ()
606    ) -> "Request":
607        """
608        Simplified API for creating request objects.
609        """
610        # Headers can be list or dict, we differentiate here.
611        if isinstance(headers, Headers):
612            pass
613        elif isinstance(headers, dict):
614            headers = Headers(
615                (always_bytes(k, "utf-8", "surrogateescape"),
616                 always_bytes(v, "utf-8", "surrogateescape"))
617                for k, v in headers.items()
618            )
619        elif isinstance(headers, Iterable):
620            headers = Headers(headers)  # type: ignore
621        else:
622            raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
623                type(headers).__name__
624            ))
625
626        req = cls(
627            "",
628            0,
629            method.encode("utf-8", "surrogateescape"),
630            b"",
631            b"",
632            b"",
633            b"HTTP/1.1",
634            headers,
635            b"",
636            None,
637            time.time(),
638            time.time(),
639        )
640
641        req.url = url
642        # Assign this manually to update the content-length header.
643        if isinstance(content, bytes):
644            req.content = content
645        elif isinstance(content, str):
646            req.text = content
647        else:
648            raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.")
649
650        return req
651
652    @property
653    def first_line_format(self) -> str:
654        """
655        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
656
657        origin-form and asterisk-form are subsumed as "relative".
658        """
659        if self.method == "CONNECT":
660            return "authority"
661        elif self.authority:
662            return "absolute"
663        else:
664            return "relative"
665
666    @property
667    def method(self) -> str:
668        """
669        HTTP request method, e.g. "GET".
670        """
671        return self.data.method.decode("utf-8", "surrogateescape").upper()
672
673    @method.setter
674    def method(self, val: Union[str, bytes]) -> None:
675        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
676
677    @property
678    def scheme(self) -> str:
679        """
680        HTTP request scheme, which should be "http" or "https".
681        """
682        return self.data.scheme.decode("utf-8", "surrogateescape")
683
684    @scheme.setter
685    def scheme(self, val: Union[str, bytes]) -> None:
686        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
687
688    @property
689    def authority(self) -> str:
690        """
691        HTTP request authority.
692
693        For HTTP/1, this is the authority portion of the request target
694        (in either absolute-form or authority-form).
695        For origin-form and asterisk-form requests, this property is set to an empty string.
696
697        For HTTP/2, this is the :authority pseudo header.
698
699        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
700        """
701        try:
702            return self.data.authority.decode("idna")
703        except UnicodeError:
704            return self.data.authority.decode("utf8", "surrogateescape")
705
706    @authority.setter
707    def authority(self, val: Union[str, bytes]) -> None:
708        if isinstance(val, str):
709            try:
710                val = val.encode("idna", "strict")
711            except UnicodeError:
712                val = val.encode("utf8", "surrogateescape")  # type: ignore
713        self.data.authority = val
714
715    @property
716    def host(self) -> str:
717        """
718        Target server for this request. This may be parsed from the raw request
719        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
720        or inferred from the proxy mode (e.g. an IP in transparent mode).
721
722        Setting the host attribute also updates the host header and authority information, if present.
723
724        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
725        """
726        return self.data.host
727
728    @host.setter
729    def host(self, val: Union[str, bytes]) -> None:
730        self.data.host = always_str(val, "idna", "strict")
731
732        # Update host header
733        if "Host" in self.data.headers:
734            self.data.headers["Host"] = val
735        # Update authority
736        if self.data.authority:
737            self.authority = url.hostport(self.scheme, self.host, self.port)
738
739    @property
740    def host_header(self) -> Optional[str]:
741        """
742        The request's host/authority header.
743
744        This property maps to either ``request.headers["Host"]`` or
745        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
746
747        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
748        """
749        if self.is_http2:
750            return self.authority or self.data.headers.get("Host", None)
751        else:
752            return self.data.headers.get("Host", None)
753
754    @host_header.setter
755    def host_header(self, val: Union[None, str, bytes]) -> None:
756        if val is None:
757            if self.is_http2:
758                self.data.authority = b""
759            self.headers.pop("Host", None)
760        else:
761            if self.is_http2:
762                self.authority = val  # type: ignore
763            if not self.is_http2 or "Host" in self.headers:
764                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
765                self.headers["Host"] = val
766
767    @property
768    def port(self) -> int:
769        """
770        Target port.
771        """
772        return self.data.port
773
774    @port.setter
775    def port(self, port: int) -> None:
776        self.data.port = port
777
778    @property
779    def path(self) -> str:
780        """
781        HTTP request path, e.g. "/index.html".
782        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
783        """
784        return self.data.path.decode("utf-8", "surrogateescape")
785
786    @path.setter
787    def path(self, val: Union[str, bytes]) -> None:
788        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
789
790    @property
791    def url(self) -> str:
792        """
793        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
794
795        Settings this property updates these attributes as well.
796        """
797        if self.first_line_format == "authority":
798            return f"{self.host}:{self.port}"
799        return url.unparse(self.scheme, self.host, self.port, self.path)
800
801    @url.setter
802    def url(self, val: Union[str, bytes]) -> None:
803        val = always_str(val, "utf-8", "surrogateescape")
804        self.scheme, self.host, self.port, self.path = url.parse(val)
805
806    @property
807    def pretty_host(self) -> str:
808        """
809        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
810        This is useful in transparent mode where `Request.host` is only an IP address.
811
812        *Warning:* When working in adversarial environments, this may not reflect the actual destination
813        as the Host header could be spoofed.
814        """
815        authority = self.host_header
816        if authority:
817            return url.parse_authority(authority, check=False)[0]
818        else:
819            return self.host
820
821    @property
822    def pretty_url(self) -> str:
823        """
824        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
825        """
826        if self.first_line_format == "authority":
827            return self.authority
828
829        host_header = self.host_header
830        if not host_header:
831            return self.url
832
833        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
834        pretty_port = pretty_port or url.default_port(self.scheme) or 443
835
836        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
837
838    def _get_query(self):
839        query = urllib.parse.urlparse(self.url).query
840        return tuple(url.decode(query))
841
842    def _set_query(self, query_data):
843        query = url.encode(query_data)
844        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
845        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
846
847    @property
848    def query(self) -> multidict.MultiDictView[str, str]:
849        """
850        The request query as a mutable mapping view on the request's path.
851        For the most part, this behaves like a dictionary.
852        Modifications to the MultiDictView update `Request.path`, and vice versa.
853        """
854        return multidict.MultiDictView(
855            self._get_query,
856            self._set_query
857        )
858
859    @query.setter
860    def query(self, value):
861        self._set_query(value)
862
863    def _get_cookies(self):
864        h = self.headers.get_all("Cookie")
865        return tuple(cookies.parse_cookie_headers(h))
866
867    def _set_cookies(self, value):
868        self.headers["cookie"] = cookies.format_cookie_header(value)
869
870    @property
871    def cookies(self) -> multidict.MultiDictView[str, str]:
872        """
873        The request cookies.
874        For the most part, this behaves like a dictionary.
875        Modifications to the MultiDictView update `Request.headers`, and vice versa.
876        """
877        return multidict.MultiDictView(
878            self._get_cookies,
879            self._set_cookies
880        )
881
882    @cookies.setter
883    def cookies(self, value):
884        self._set_cookies(value)
885
886    @property
887    def path_components(self) -> Tuple[str, ...]:
888        """
889        The URL's path components as a tuple of strings.
890        Components are unquoted.
891        """
892        path = urllib.parse.urlparse(self.url).path
893        # This needs to be a tuple so that it's immutable.
894        # Otherwise, this would fail silently:
895        #   request.path_components.append("foo")
896        return tuple(url.unquote(i) for i in path.split("/") if i)
897
898    @path_components.setter
899    def path_components(self, components: Iterable[str]):
900        components = map(lambda x: url.quote(x, safe=""), components)
901        path = "/" + "/".join(components)
902        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
903        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
904
905    def anticache(self) -> None:
906        """
907        Modifies this request to remove headers that might produce a cached response.
908        """
909        delheaders = (
910            "if-modified-since",
911            "if-none-match",
912        )
913        for i in delheaders:
914            self.headers.pop(i, None)
915
916    def anticomp(self) -> None:
917        """
918        Modify the Accept-Encoding header to only accept uncompressed responses.
919        """
920        self.headers["accept-encoding"] = "identity"
921
922    def constrain_encoding(self) -> None:
923        """
924        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
925        """
926        accept_encoding = self.headers.get("accept-encoding")
927        if accept_encoding:
928            self.headers["accept-encoding"] = (
929                ', '.join(
930                    e
931                    for e in {"gzip", "identity", "deflate", "br", "zstd"}
932                    if e in accept_encoding
933                )
934            )
935
936    def _get_urlencoded_form(self):
937        is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
938        if is_valid_content_type:
939            return tuple(url.decode(self.get_text(strict=False)))
940        return ()
941
942    def _set_urlencoded_form(self, form_data):
943        """
944        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
945        This will overwrite the existing content if there is one.
946        """
947        self.headers["content-type"] = "application/x-www-form-urlencoded"
948        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
949
950    @property
951    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
952        """
953        The URL-encoded form data.
954
955        If the content-type indicates non-form data or the form could not be parsed, this is set to
956        an empty `MultiDictView`.
957
958        Modifications to the MultiDictView update `Request.content`, and vice versa.
959        """
960        return multidict.MultiDictView(
961            self._get_urlencoded_form,
962            self._set_urlencoded_form
963        )
964
965    @urlencoded_form.setter
966    def urlencoded_form(self, value):
967        self._set_urlencoded_form(value)
968
969    def _get_multipart_form(self):
970        is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
971        if is_valid_content_type:
972            try:
973                return multipart.decode(self.headers.get("content-type"), self.content)
974            except ValueError:
975                pass
976        return ()
977
978    def _set_multipart_form(self, value):
979        is_valid_content_type = self.headers.get("content-type", "").lower().startswith("multipart/form-data")
980        if not is_valid_content_type:
981            """
982            Generate a random boundary here.
983
984            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
985            on generating the boundary.
986            """
987            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
988            self.headers["content-type"] = f"multipart/form-data; boundary={boundary}"
989        self.content = multipart.encode(self.headers, value)
990
991    @property
992    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
993        """
994        The multipart form data.
995
996        If the content-type indicates non-form data or the form could not be parsed, this is set to
997        an empty `MultiDictView`.
998
999        Modifications to the MultiDictView update `Request.content`, and vice versa.
1000        """
1001        return multidict.MultiDictView(
1002            self._get_multipart_form,
1003            self._set_multipart_form
1004        )
1005
1006    @multipart_form.setter
1007    def multipart_form(self, value):
1008        self._set_multipart_form(value)
1009
1010
1011class Response(Message):
1012    """
1013    An HTTP response.
1014    """
1015    data: ResponseData
1016
1017    def __init__(
1018        self,
1019        http_version: bytes,
1020        status_code: int,
1021        reason: bytes,
1022        headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]],
1023        content: Optional[bytes],
1024        trailers: Union[None, Headers, Tuple[Tuple[bytes, bytes], ...]],
1025        timestamp_start: float,
1026        timestamp_end: Optional[float],
1027    ):
1028        # auto-convert invalid types to retain compatibility with older code.
1029        if isinstance(http_version, str):
1030            http_version = http_version.encode("ascii", "strict")
1031        if isinstance(reason, str):
1032            reason = reason.encode("ascii", "strict")
1033
1034        if isinstance(content, str):
1035            raise ValueError("Content must be bytes, not {}".format(type(content).__name__))
1036        if not isinstance(headers, Headers):
1037            headers = Headers(headers)
1038        if trailers is not None and not isinstance(trailers, Headers):
1039            trailers = Headers(trailers)
1040
1041        self.data = ResponseData(
1042            http_version=http_version,
1043            status_code=status_code,
1044            reason=reason,
1045            headers=headers,
1046            content=content,
1047            trailers=trailers,
1048            timestamp_start=timestamp_start,
1049            timestamp_end=timestamp_end,
1050        )
1051
1052    def __repr__(self) -> str:
1053        if self.raw_content:
1054            ct = self.headers.get("content-type", "unknown content type")
1055            size = human.pretty_size(len(self.raw_content))
1056            details = f"{ct}, {size}"
1057        else:
1058            details = "no content"
1059        return f"Response({self.status_code}, {details})"
1060
1061    @classmethod
1062    def make(
1063        cls,
1064        status_code: int = 200,
1065        content: Union[bytes, str] = b"",
1066        headers: Union[Headers, Mapping[str, Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = ()
1067    ) -> "Response":
1068        """
1069        Simplified API for creating response objects.
1070        """
1071        if isinstance(headers, Headers):
1072            headers = headers
1073        elif isinstance(headers, dict):
1074            headers = Headers(
1075                (always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1076                 always_bytes(v, "utf-8", "surrogateescape"))
1077                for k, v in headers.items()
1078            )
1079        elif isinstance(headers, Iterable):
1080            headers = Headers(headers)  # type: ignore
1081        else:
1082            raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
1083                type(headers).__name__
1084            ))
1085
1086        resp = cls(
1087            b"HTTP/1.1",
1088            status_code,
1089            status_codes.RESPONSES.get(status_code, "").encode(),
1090            headers,
1091            None,
1092            None,
1093            time.time(),
1094            time.time(),
1095        )
1096
1097        # Assign this manually to update the content-length header.
1098        if isinstance(content, bytes):
1099            resp.content = content
1100        elif isinstance(content, str):
1101            resp.text = content
1102        else:
1103            raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.")
1104
1105        return resp
1106
1107    @property
1108    def status_code(self) -> int:
1109        """
1110        HTTP Status Code, e.g. ``200``.
1111        """
1112        return self.data.status_code
1113
1114    @status_code.setter
1115    def status_code(self, status_code: int) -> None:
1116        self.data.status_code = status_code
1117
1118    @property
1119    def reason(self) -> str:
1120        """
1121        HTTP reason phrase, for example "Not Found".
1122
1123        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1124        """
1125        # Encoding: http://stackoverflow.com/a/16674906/934719
1126        return self.data.reason.decode("ISO-8859-1")
1127
1128    @reason.setter
1129    def reason(self, reason: Union[str, bytes]) -> None:
1130        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1131
1132    def _get_cookies(self):
1133        h = self.headers.get_all("set-cookie")
1134        all_cookies = cookies.parse_set_cookie_headers(h)
1135        return tuple(
1136            (name, (value, attrs))
1137            for name, value, attrs in all_cookies
1138        )
1139
1140    def _set_cookies(self, value):
1141        cookie_headers = []
1142        for k, v in value:
1143            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1144            cookie_headers.append(header)
1145        self.headers.set_all("set-cookie", cookie_headers)
1146
1147    @property
1148    def cookies(self) -> multidict.MultiDictView[str, Tuple[str, multidict.MultiDict[str, Optional[str]]]]:
1149        """
1150        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1151        name strings, and values are `(cookie value, attributes)` tuples. Within
1152        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1153        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1154
1155        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1156        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1157        """
1158        return multidict.MultiDictView(
1159            self._get_cookies,
1160            self._set_cookies
1161        )
1162
1163    @cookies.setter
1164    def cookies(self, value):
1165        self._set_cookies(value)
1166
1167    def refresh(self, now=None):
1168        """
1169        This fairly complex and heuristic function refreshes a server
1170        response for replay.
1171
1172         - It adjusts date, expires, and last-modified headers.
1173         - It adjusts cookie expiration.
1174        """
1175        if not now:
1176            now = time.time()
1177        delta = now - self.timestamp_start
1178        refresh_headers = [
1179            "date",
1180            "expires",
1181            "last-modified",
1182        ]
1183        for i in refresh_headers:
1184            if i in self.headers:
1185                d = parsedate_tz(self.headers[i])
1186                if d:
1187                    new = mktime_tz(d) + delta
1188                    self.headers[i] = formatdate(new, usegmt=True)
1189        c = []
1190        for set_cookie_header in self.headers.get_all("set-cookie"):
1191            try:
1192                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1193            except ValueError:
1194                refreshed = set_cookie_header
1195            c.append(refreshed)
1196        if c:
1197            self.headers.set_all("set-cookie", c)
1198
1199
1200class HTTPFlow(flow.Flow):
1201    """
1202    An HTTPFlow is a collection of objects representing a single HTTP
1203    transaction.
1204    """
1205    request: Request
1206    """The client's HTTP request."""
1207    response: Optional[Response] = None
1208    """The server's HTTP response."""
1209    error: Optional[flow.Error] = None
1210    """
1211    A connection or protocol error affecting this flow.
1212
1213    Note that it's possible for a Flow to have both a response and an error
1214    object. This might happen, for instance, when a response was received
1215    from the server, but there was an error sending it back to the client.
1216    """
1217
1218    websocket: Optional[WebSocketData] = None
1219    """
1220    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1221    """
1222
1223    def __init__(self, client_conn, server_conn, live=None, mode="regular"):
1224        super().__init__("http", client_conn, server_conn, live)
1225        self.mode = mode
1226
1227    _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
1228    # mypy doesn't support update with kwargs
1229    _stateobject_attributes.update(dict(
1230        request=Request,
1231        response=Response,
1232        websocket=WebSocketData,
1233        mode=str
1234    ))
1235
1236    def __repr__(self):
1237        s = "<HTTPFlow"
1238        for a in ("request", "response", "websocket", "error", "client_conn", "server_conn"):
1239            if getattr(self, a, False):
1240                s += f"\r\n  {a} = {{flow.{a}}}"
1241        s += ">"
1242        return s.format(flow=self)
1243
1244    @property
1245    def timestamp_start(self) -> float:
1246        """*Read-only:* An alias for `Request.timestamp_start`."""
1247        return self.request.timestamp_start
1248
1249    def copy(self):
1250        f = super().copy()
1251        if self.request:
1252            f.request = self.request.copy()
1253        if self.response:
1254            f.response = self.response.copy()
1255        return f
1256
1257
1258__all__ = [
1259    "HTTPFlow",
1260    "Message",
1261    "Request",
1262    "Response",
1263    "Headers",
1264]
1265