1import binascii 2import os 3import re 4import time 5import urllib.parse 6import json 7from dataclasses import dataclass 8from dataclasses import fields 9from email.utils import formatdate 10from email.utils import mktime_tz 11from email.utils import parsedate_tz 12from typing import Callable 13from typing import Dict 14from typing import Iterable 15from typing import Iterator 16from typing import List 17from typing import Mapping 18from typing import Optional 19from typing import Tuple 20from typing import Union 21from typing import cast 22from typing import Any 23 24from mitmproxy import flow 25from mitmproxy.websocket import WebSocketData 26from mitmproxy.coretypes import multidict 27from mitmproxy.coretypes import serializable 28from mitmproxy.net import encoding 29from mitmproxy.net.http import cookies 30from mitmproxy.net.http import multipart 31from mitmproxy.net.http import status_codes 32from mitmproxy.net.http import url 33from mitmproxy.net.http.headers import assemble_content_type 34from mitmproxy.net.http.headers import parse_content_type 35from mitmproxy.utils import human 36from mitmproxy.utils import strutils 37from mitmproxy.utils import typecheck 38from mitmproxy.utils.strutils import always_bytes 39from mitmproxy.utils.strutils import always_str 40 41 42# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. 43def _native(x: bytes) -> str: 44 return x.decode("utf-8", "surrogateescape") 45 46 47def _always_bytes(x: Union[str, bytes]) -> bytes: 48 return strutils.always_bytes(x, "utf-8", "surrogateescape") 49 50 51# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types. 52class Headers(multidict.MultiDict): # type: ignore 53 """ 54 Header class which allows both convenient access to individual headers as well as 55 direct access to the underlying raw data. Provides a full dictionary interface. 56 57 Create headers with keyword arguments: 58 >>> h = Headers(host="example.com", content_type="application/xml") 59 60 Headers mostly behave like a normal dict: 61 >>> h["Host"] 62 "example.com" 63 64 Headers are case insensitive: 65 >>> h["host"] 66 "example.com" 67 68 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 69 >>> h = Headers([ 70 (b"Host",b"example.com"), 71 (b"Accept",b"text/html"), 72 (b"accept",b"application/xml") 73 ]) 74 75 Multiple headers are folded into a single header as per RFC 7230: 76 >>> h["Accept"] 77 "text/html, application/xml" 78 79 Setting a header removes all existing headers with the same name: 80 >>> h["Accept"] = "application/text" 81 >>> h["Accept"] 82 "application/text" 83 84 `bytes(h)` returns an HTTP/1 header block: 85 >>> print(bytes(h)) 86 Host: example.com 87 Accept: application/text 88 89 For full control, the raw header fields can be accessed: 90 >>> h.fields 91 92 Caveats: 93 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 94 """ 95 96 def __init__(self, fields: Iterable[Tuple[bytes, bytes]] = (), **headers): 97 """ 98 *Args:* 99 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 100 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 101 - *\*\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 102 For convenience, underscores in header names will be transformed to dashes - 103 this behaviour does not extend to other methods. 104 105 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 106 the behavior is undefined. 107 """ 108 super().__init__(fields) 109 110 for key, value in self.fields: 111 if not isinstance(key, bytes) or not isinstance(value, bytes): 112 raise TypeError("Header fields must be bytes.") 113 114 # content_type -> content-type 115 self.update({ 116 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 117 for name, value in headers.items() 118 }) 119 120 fields: Tuple[Tuple[bytes, bytes], ...] 121 122 @staticmethod 123 def _reduce_values(values) -> str: 124 # Headers can be folded 125 return ", ".join(values) 126 127 @staticmethod 128 def _kconv(key) -> str: 129 # Headers are case-insensitive 130 return key.lower() 131 132 def __bytes__(self) -> bytes: 133 if self.fields: 134 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 135 else: 136 return b"" 137 138 def __delitem__(self, key: Union[str, bytes]) -> None: 139 key = _always_bytes(key) 140 super().__delitem__(key) 141 142 def __iter__(self) -> Iterator[str]: 143 for x in super().__iter__(): 144 yield _native(x) 145 146 def get_all(self, name: Union[str, bytes]) -> List[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [ 158 _native(x) for x in 159 super().get_all(name) 160 ] 161 162 def set_all(self, name: Union[str, bytes], values: List[Union[str, bytes]]): 163 """ 164 Explicitly set multiple headers for the given key. 165 See `Headers.get_all`. 166 """ 167 name = _always_bytes(name) 168 values = [_always_bytes(x) for x in values] 169 return super().set_all(name, values) 170 171 def insert(self, index: int, key: Union[str, bytes], value: Union[str, bytes]): 172 key = _always_bytes(key) 173 value = _always_bytes(value) 174 super().insert(index, key, value) 175 176 def items(self, multi=False): 177 if multi: 178 return ( 179 (_native(k), _native(v)) 180 for k, v in self.fields 181 ) 182 else: 183 return super().items() 184 185 186@dataclass 187class MessageData(serializable.Serializable): 188 http_version: bytes 189 headers: Headers 190 content: Optional[bytes] 191 trailers: Optional[Headers] 192 timestamp_start: float 193 timestamp_end: Optional[float] 194 195 # noinspection PyUnreachableCode 196 if __debug__: 197 def __post_init__(self): 198 for field in fields(self): 199 val = getattr(self, field.name) 200 typecheck.check_option_type(field.name, val, field.type) 201 202 def set_state(self, state): 203 for k, v in state.items(): 204 if k in ("headers", "trailers") and v is not None: 205 v = Headers.from_state(v) 206 setattr(self, k, v) 207 208 def get_state(self): 209 state = vars(self).copy() 210 state["headers"] = state["headers"].get_state() 211 if state["trailers"] is not None: 212 state["trailers"] = state["trailers"].get_state() 213 return state 214 215 @classmethod 216 def from_state(cls, state): 217 state["headers"] = Headers.from_state(state["headers"]) 218 if state["trailers"] is not None: 219 state["trailers"] = Headers.from_state(state["trailers"]) 220 return cls(**state) 221 222 223@dataclass 224class RequestData(MessageData): 225 host: str 226 port: int 227 method: bytes 228 scheme: bytes 229 authority: bytes 230 path: bytes 231 232 233@dataclass 234class ResponseData(MessageData): 235 status_code: int 236 reason: bytes 237 238 239class Message(serializable.Serializable): 240 """Base class for `Request` and `Response`.""" 241 242 @classmethod 243 def from_state(cls, state): 244 return cls(**state) 245 246 def get_state(self): 247 return self.data.get_state() 248 249 def set_state(self, state): 250 self.data.set_state(state) 251 252 data: MessageData 253 stream: Union[Callable[[bytes], Union[Iterable[bytes], bytes]], bool] = False 254 """ 255 This attribute controls if the message body should be streamed. 256 257 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 258 This makes it possible to perform string replacements on the entire body. 259 If `True`, the message body will not be buffered on the proxy 260 but immediately forwarded instead. 261 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 262 Please note that packet boundaries generally should not be relied upon. 263 264 This attribute must be set in the `requestheaders` or `responseheaders` hook. 265 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 266 """ 267 268 @property 269 def http_version(self) -> str: 270 """ 271 HTTP version string, for example `HTTP/1.1`. 272 """ 273 return self.data.http_version.decode("utf-8", "surrogateescape") 274 275 @http_version.setter 276 def http_version(self, http_version: Union[str, bytes]) -> None: 277 self.data.http_version = strutils.always_bytes(http_version, "utf-8", "surrogateescape") 278 279 @property 280 def is_http10(self) -> bool: 281 return self.data.http_version == b"HTTP/1.0" 282 283 @property 284 def is_http11(self) -> bool: 285 return self.data.http_version == b"HTTP/1.1" 286 287 @property 288 def is_http2(self) -> bool: 289 return self.data.http_version == b"HTTP/2.0" 290 291 @property 292 def headers(self) -> Headers: 293 """ 294 The HTTP headers. 295 """ 296 return self.data.headers 297 298 @headers.setter 299 def headers(self, h: Headers) -> None: 300 self.data.headers = h 301 302 @property 303 def trailers(self) -> Optional[Headers]: 304 """ 305 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 306 """ 307 return self.data.trailers 308 309 @trailers.setter 310 def trailers(self, h: Optional[Headers]) -> None: 311 self.data.trailers = h 312 313 @property 314 def raw_content(self) -> Optional[bytes]: 315 """ 316 The raw (potentially compressed) HTTP message body. 317 318 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 319 320 *See also:* `Message.content`, `Message.text` 321 """ 322 return self.data.content 323 324 @raw_content.setter 325 def raw_content(self, content: Optional[bytes]) -> None: 326 self.data.content = content 327 328 @property 329 def content(self) -> Optional[bytes]: 330 """ 331 The uncompressed HTTP message body as bytes. 332 333 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 334 335 *See also:* `Message.raw_content`, `Message.text` 336 """ 337 return self.get_content() 338 339 @content.setter 340 def content(self, value: Optional[bytes]) -> None: 341 self.set_content(value) 342 343 @property 344 def text(self) -> Optional[str]: 345 """ 346 The uncompressed and decoded HTTP message body as text. 347 348 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 349 350 *See also:* `Message.raw_content`, `Message.content` 351 """ 352 return self.get_text() 353 354 @text.setter 355 def text(self, value: Optional[str]) -> None: 356 self.set_text(value) 357 358 def set_content(self, value: Optional[bytes]) -> None: 359 if value is None: 360 self.raw_content = None 361 return 362 if not isinstance(value, bytes): 363 raise TypeError( 364 f"Message content must be bytes, not {type(value).__name__}. " 365 "Please use .text if you want to assign a str." 366 ) 367 ce = self.headers.get("content-encoding") 368 try: 369 self.raw_content = encoding.encode(value, ce or "identity") 370 except ValueError: 371 # So we have an invalid content-encoding? 372 # Let's remove it! 373 del self.headers["content-encoding"] 374 self.raw_content = value 375 376 if "transfer-encoding" in self.headers: 377 # https://httpwg.org/specs/rfc7230.html#header.content-length 378 # don't set content-length if a transfer-encoding is provided 379 pass 380 else: 381 self.headers["content-length"] = str(len(self.raw_content)) 382 383 def get_content(self, strict: bool = True) -> Optional[bytes]: 384 """ 385 Similar to `Message.content`, but does not raise if `strict` is `False`. 386 Instead, the compressed message body is returned as-is. 387 """ 388 if self.raw_content is None: 389 return None 390 ce = self.headers.get("content-encoding") 391 if ce: 392 try: 393 content = encoding.decode(self.raw_content, ce) 394 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 395 if isinstance(content, str): 396 raise ValueError(f"Invalid Content-Encoding: {ce}") 397 return content 398 except ValueError: 399 if strict: 400 raise 401 return self.raw_content 402 else: 403 return self.raw_content 404 405 def _get_content_type_charset(self) -> Optional[str]: 406 ct = parse_content_type(self.headers.get("content-type", "")) 407 if ct: 408 return ct[2].get("charset") 409 return None 410 411 def _guess_encoding(self, content: bytes = b"") -> str: 412 enc = self._get_content_type_charset() 413 if not enc: 414 if "json" in self.headers.get("content-type", ""): 415 enc = "utf8" 416 if not enc: 417 meta_charset = re.search(rb"""<meta[^>]+charset=['"]?([^'">]+)""", content) 418 if meta_charset: 419 enc = meta_charset.group(1).decode("ascii", "ignore") 420 if not enc: 421 if "text/css" in self.headers.get("content-type", ""): 422 # @charset rule must be the very first thing. 423 css_charset = re.match(rb"""@charset "([^"]+)";""", content) 424 if css_charset: 425 enc = css_charset.group(1).decode("ascii", "ignore") 426 if not enc: 427 enc = "latin-1" 428 # Use GB 18030 as the superset of GB2312 and GBK to fix common encoding problems on Chinese websites. 429 if enc.lower() in ("gb2312", "gbk"): 430 enc = "gb18030" 431 432 return enc 433 434 def set_text(self, text: Optional[str]) -> None: 435 if text is None: 436 self.content = None 437 return 438 enc = self._guess_encoding() 439 440 try: 441 self.content = cast(bytes, encoding.encode(text, enc)) 442 except ValueError: 443 # Fall back to UTF-8 and update the content-type header. 444 ct = parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {}) 445 ct[2]["charset"] = "utf-8" 446 self.headers["content-type"] = assemble_content_type(*ct) 447 enc = "utf8" 448 self.content = text.encode(enc, "surrogateescape") 449 450 def get_text(self, strict: bool = True) -> Optional[str]: 451 """ 452 Similar to `Message.text`, but does not raise if `strict` is `False`. 453 Instead, the message body is returned as surrogate-escaped UTF-8. 454 """ 455 content = self.get_content(strict) 456 if content is None: 457 return None 458 enc = self._guess_encoding(content) 459 try: 460 return cast(str, encoding.decode(content, enc)) 461 except ValueError: 462 if strict: 463 raise 464 return content.decode("utf8", "surrogateescape") 465 466 @property 467 def timestamp_start(self) -> float: 468 """ 469 *Timestamp:* Headers received. 470 """ 471 return self.data.timestamp_start 472 473 @timestamp_start.setter 474 def timestamp_start(self, timestamp_start: float) -> None: 475 self.data.timestamp_start = timestamp_start 476 477 @property 478 def timestamp_end(self) -> Optional[float]: 479 """ 480 *Timestamp:* Last byte received. 481 """ 482 return self.data.timestamp_end 483 484 @timestamp_end.setter 485 def timestamp_end(self, timestamp_end: Optional[float]): 486 self.data.timestamp_end = timestamp_end 487 488 def decode(self, strict: bool = True) -> None: 489 """ 490 Decodes body based on the current Content-Encoding header, then 491 removes the header. If there is no Content-Encoding header, no 492 action is taken. 493 494 *Raises:* 495 - `ValueError`, when the content-encoding is invalid and strict is True. 496 """ 497 decoded = self.get_content(strict) 498 self.headers.pop("content-encoding", None) 499 self.content = decoded 500 501 def encode(self, encoding: str) -> None: 502 """ 503 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 504 Any existing content-encodings are overwritten, the content is not decoded beforehand. 505 506 *Raises:* 507 - `ValueError`, when the specified content-encoding is invalid. 508 """ 509 self.headers["content-encoding"] = encoding 510 self.content = self.raw_content 511 if "content-encoding" not in self.headers: 512 raise ValueError("Invalid content encoding {}".format(repr(encoding))) 513 514 def json(self, **kwargs: Any) -> Any: 515 """ 516 Returns the JSON encoded content of the response, if any. 517 `**kwargs` are optional arguments that will be 518 passed to `json.loads()`. 519 520 Will raise if the content can not be decoded and then parsed as JSON. 521 522 *Raises:* 523 - `json.decoder.JSONDecodeError` if content is not valid JSON. 524 - `TypeError` if the content is not available, for example because the response 525 has been streamed. 526 """ 527 content = self.get_content(strict=False) 528 if content is None: 529 raise TypeError('Message content is not available.') 530 else: 531 return json.loads(content, **kwargs) 532 533 534class Request(Message): 535 """ 536 An HTTP request. 537 """ 538 data: RequestData 539 540 def __init__( 541 self, 542 host: str, 543 port: int, 544 method: bytes, 545 scheme: bytes, 546 authority: bytes, 547 path: bytes, 548 http_version: bytes, 549 headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]], 550 content: Optional[bytes], 551 trailers: Union[Headers, Tuple[Tuple[bytes, bytes], ...], None], 552 timestamp_start: float, 553 timestamp_end: Optional[float], 554 ): 555 # auto-convert invalid types to retain compatibility with older code. 556 if isinstance(host, bytes): 557 host = host.decode("idna", "strict") 558 if isinstance(method, str): 559 method = method.encode("ascii", "strict") 560 if isinstance(scheme, str): 561 scheme = scheme.encode("ascii", "strict") 562 if isinstance(authority, str): 563 authority = authority.encode("ascii", "strict") 564 if isinstance(path, str): 565 path = path.encode("ascii", "strict") 566 if isinstance(http_version, str): 567 http_version = http_version.encode("ascii", "strict") 568 569 if isinstance(content, str): 570 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 571 if not isinstance(headers, Headers): 572 headers = Headers(headers) 573 if trailers is not None and not isinstance(trailers, Headers): 574 trailers = Headers(trailers) 575 576 self.data = RequestData( 577 host=host, 578 port=port, 579 method=method, 580 scheme=scheme, 581 authority=authority, 582 path=path, 583 http_version=http_version, 584 headers=headers, 585 content=content, 586 trailers=trailers, 587 timestamp_start=timestamp_start, 588 timestamp_end=timestamp_end, 589 ) 590 591 def __repr__(self) -> str: 592 if self.host and self.port: 593 hostport = f"{self.host}:{self.port}" 594 else: 595 hostport = "" 596 path = self.path or "" 597 return f"Request({self.method} {hostport}{path})" 598 599 @classmethod 600 def make( 601 cls, 602 method: str, 603 url: str, 604 content: Union[bytes, str] = "", 605 headers: Union[Headers, Dict[Union[str, bytes], Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = () 606 ) -> "Request": 607 """ 608 Simplified API for creating request objects. 609 """ 610 # Headers can be list or dict, we differentiate here. 611 if isinstance(headers, Headers): 612 pass 613 elif isinstance(headers, dict): 614 headers = Headers( 615 (always_bytes(k, "utf-8", "surrogateescape"), 616 always_bytes(v, "utf-8", "surrogateescape")) 617 for k, v in headers.items() 618 ) 619 elif isinstance(headers, Iterable): 620 headers = Headers(headers) # type: ignore 621 else: 622 raise TypeError("Expected headers to be an iterable or dict, but is {}.".format( 623 type(headers).__name__ 624 )) 625 626 req = cls( 627 "", 628 0, 629 method.encode("utf-8", "surrogateescape"), 630 b"", 631 b"", 632 b"", 633 b"HTTP/1.1", 634 headers, 635 b"", 636 None, 637 time.time(), 638 time.time(), 639 ) 640 641 req.url = url 642 # Assign this manually to update the content-length header. 643 if isinstance(content, bytes): 644 req.content = content 645 elif isinstance(content, str): 646 req.text = content 647 else: 648 raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.") 649 650 return req 651 652 @property 653 def first_line_format(self) -> str: 654 """ 655 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 656 657 origin-form and asterisk-form are subsumed as "relative". 658 """ 659 if self.method == "CONNECT": 660 return "authority" 661 elif self.authority: 662 return "absolute" 663 else: 664 return "relative" 665 666 @property 667 def method(self) -> str: 668 """ 669 HTTP request method, e.g. "GET". 670 """ 671 return self.data.method.decode("utf-8", "surrogateescape").upper() 672 673 @method.setter 674 def method(self, val: Union[str, bytes]) -> None: 675 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 676 677 @property 678 def scheme(self) -> str: 679 """ 680 HTTP request scheme, which should be "http" or "https". 681 """ 682 return self.data.scheme.decode("utf-8", "surrogateescape") 683 684 @scheme.setter 685 def scheme(self, val: Union[str, bytes]) -> None: 686 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 687 688 @property 689 def authority(self) -> str: 690 """ 691 HTTP request authority. 692 693 For HTTP/1, this is the authority portion of the request target 694 (in either absolute-form or authority-form). 695 For origin-form and asterisk-form requests, this property is set to an empty string. 696 697 For HTTP/2, this is the :authority pseudo header. 698 699 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 700 """ 701 try: 702 return self.data.authority.decode("idna") 703 except UnicodeError: 704 return self.data.authority.decode("utf8", "surrogateescape") 705 706 @authority.setter 707 def authority(self, val: Union[str, bytes]) -> None: 708 if isinstance(val, str): 709 try: 710 val = val.encode("idna", "strict") 711 except UnicodeError: 712 val = val.encode("utf8", "surrogateescape") # type: ignore 713 self.data.authority = val 714 715 @property 716 def host(self) -> str: 717 """ 718 Target server for this request. This may be parsed from the raw request 719 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 720 or inferred from the proxy mode (e.g. an IP in transparent mode). 721 722 Setting the host attribute also updates the host header and authority information, if present. 723 724 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 725 """ 726 return self.data.host 727 728 @host.setter 729 def host(self, val: Union[str, bytes]) -> None: 730 self.data.host = always_str(val, "idna", "strict") 731 732 # Update host header 733 if "Host" in self.data.headers: 734 self.data.headers["Host"] = val 735 # Update authority 736 if self.data.authority: 737 self.authority = url.hostport(self.scheme, self.host, self.port) 738 739 @property 740 def host_header(self) -> Optional[str]: 741 """ 742 The request's host/authority header. 743 744 This property maps to either ``request.headers["Host"]`` or 745 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 746 747 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 748 """ 749 if self.is_http2: 750 return self.authority or self.data.headers.get("Host", None) 751 else: 752 return self.data.headers.get("Host", None) 753 754 @host_header.setter 755 def host_header(self, val: Union[None, str, bytes]) -> None: 756 if val is None: 757 if self.is_http2: 758 self.data.authority = b"" 759 self.headers.pop("Host", None) 760 else: 761 if self.is_http2: 762 self.authority = val # type: ignore 763 if not self.is_http2 or "Host" in self.headers: 764 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 765 self.headers["Host"] = val 766 767 @property 768 def port(self) -> int: 769 """ 770 Target port. 771 """ 772 return self.data.port 773 774 @port.setter 775 def port(self, port: int) -> None: 776 self.data.port = port 777 778 @property 779 def path(self) -> str: 780 """ 781 HTTP request path, e.g. "/index.html". 782 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 783 """ 784 return self.data.path.decode("utf-8", "surrogateescape") 785 786 @path.setter 787 def path(self, val: Union[str, bytes]) -> None: 788 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 789 790 @property 791 def url(self) -> str: 792 """ 793 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 794 795 Settings this property updates these attributes as well. 796 """ 797 if self.first_line_format == "authority": 798 return f"{self.host}:{self.port}" 799 return url.unparse(self.scheme, self.host, self.port, self.path) 800 801 @url.setter 802 def url(self, val: Union[str, bytes]) -> None: 803 val = always_str(val, "utf-8", "surrogateescape") 804 self.scheme, self.host, self.port, self.path = url.parse(val) 805 806 @property 807 def pretty_host(self) -> str: 808 """ 809 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 810 This is useful in transparent mode where `Request.host` is only an IP address. 811 812 *Warning:* When working in adversarial environments, this may not reflect the actual destination 813 as the Host header could be spoofed. 814 """ 815 authority = self.host_header 816 if authority: 817 return url.parse_authority(authority, check=False)[0] 818 else: 819 return self.host 820 821 @property 822 def pretty_url(self) -> str: 823 """ 824 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 825 """ 826 if self.first_line_format == "authority": 827 return self.authority 828 829 host_header = self.host_header 830 if not host_header: 831 return self.url 832 833 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 834 pretty_port = pretty_port or url.default_port(self.scheme) or 443 835 836 return url.unparse(self.scheme, pretty_host, pretty_port, self.path) 837 838 def _get_query(self): 839 query = urllib.parse.urlparse(self.url).query 840 return tuple(url.decode(query)) 841 842 def _set_query(self, query_data): 843 query = url.encode(query_data) 844 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 845 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 846 847 @property 848 def query(self) -> multidict.MultiDictView[str, str]: 849 """ 850 The request query as a mutable mapping view on the request's path. 851 For the most part, this behaves like a dictionary. 852 Modifications to the MultiDictView update `Request.path`, and vice versa. 853 """ 854 return multidict.MultiDictView( 855 self._get_query, 856 self._set_query 857 ) 858 859 @query.setter 860 def query(self, value): 861 self._set_query(value) 862 863 def _get_cookies(self): 864 h = self.headers.get_all("Cookie") 865 return tuple(cookies.parse_cookie_headers(h)) 866 867 def _set_cookies(self, value): 868 self.headers["cookie"] = cookies.format_cookie_header(value) 869 870 @property 871 def cookies(self) -> multidict.MultiDictView[str, str]: 872 """ 873 The request cookies. 874 For the most part, this behaves like a dictionary. 875 Modifications to the MultiDictView update `Request.headers`, and vice versa. 876 """ 877 return multidict.MultiDictView( 878 self._get_cookies, 879 self._set_cookies 880 ) 881 882 @cookies.setter 883 def cookies(self, value): 884 self._set_cookies(value) 885 886 @property 887 def path_components(self) -> Tuple[str, ...]: 888 """ 889 The URL's path components as a tuple of strings. 890 Components are unquoted. 891 """ 892 path = urllib.parse.urlparse(self.url).path 893 # This needs to be a tuple so that it's immutable. 894 # Otherwise, this would fail silently: 895 # request.path_components.append("foo") 896 return tuple(url.unquote(i) for i in path.split("/") if i) 897 898 @path_components.setter 899 def path_components(self, components: Iterable[str]): 900 components = map(lambda x: url.quote(x, safe=""), components) 901 path = "/" + "/".join(components) 902 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 903 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 904 905 def anticache(self) -> None: 906 """ 907 Modifies this request to remove headers that might produce a cached response. 908 """ 909 delheaders = ( 910 "if-modified-since", 911 "if-none-match", 912 ) 913 for i in delheaders: 914 self.headers.pop(i, None) 915 916 def anticomp(self) -> None: 917 """ 918 Modify the Accept-Encoding header to only accept uncompressed responses. 919 """ 920 self.headers["accept-encoding"] = "identity" 921 922 def constrain_encoding(self) -> None: 923 """ 924 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 925 """ 926 accept_encoding = self.headers.get("accept-encoding") 927 if accept_encoding: 928 self.headers["accept-encoding"] = ( 929 ', '.join( 930 e 931 for e in {"gzip", "identity", "deflate", "br", "zstd"} 932 if e in accept_encoding 933 ) 934 ) 935 936 def _get_urlencoded_form(self): 937 is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower() 938 if is_valid_content_type: 939 return tuple(url.decode(self.get_text(strict=False))) 940 return () 941 942 def _set_urlencoded_form(self, form_data): 943 """ 944 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 945 This will overwrite the existing content if there is one. 946 """ 947 self.headers["content-type"] = "application/x-www-form-urlencoded" 948 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 949 950 @property 951 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 952 """ 953 The URL-encoded form data. 954 955 If the content-type indicates non-form data or the form could not be parsed, this is set to 956 an empty `MultiDictView`. 957 958 Modifications to the MultiDictView update `Request.content`, and vice versa. 959 """ 960 return multidict.MultiDictView( 961 self._get_urlencoded_form, 962 self._set_urlencoded_form 963 ) 964 965 @urlencoded_form.setter 966 def urlencoded_form(self, value): 967 self._set_urlencoded_form(value) 968 969 def _get_multipart_form(self): 970 is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower() 971 if is_valid_content_type: 972 try: 973 return multipart.decode(self.headers.get("content-type"), self.content) 974 except ValueError: 975 pass 976 return () 977 978 def _set_multipart_form(self, value): 979 is_valid_content_type = self.headers.get("content-type", "").lower().startswith("multipart/form-data") 980 if not is_valid_content_type: 981 """ 982 Generate a random boundary here. 983 984 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 985 on generating the boundary. 986 """ 987 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 988 self.headers["content-type"] = f"multipart/form-data; boundary={boundary}" 989 self.content = multipart.encode(self.headers, value) 990 991 @property 992 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 993 """ 994 The multipart form data. 995 996 If the content-type indicates non-form data or the form could not be parsed, this is set to 997 an empty `MultiDictView`. 998 999 Modifications to the MultiDictView update `Request.content`, and vice versa. 1000 """ 1001 return multidict.MultiDictView( 1002 self._get_multipart_form, 1003 self._set_multipart_form 1004 ) 1005 1006 @multipart_form.setter 1007 def multipart_form(self, value): 1008 self._set_multipart_form(value) 1009 1010 1011class Response(Message): 1012 """ 1013 An HTTP response. 1014 """ 1015 data: ResponseData 1016 1017 def __init__( 1018 self, 1019 http_version: bytes, 1020 status_code: int, 1021 reason: bytes, 1022 headers: Union[Headers, Tuple[Tuple[bytes, bytes], ...]], 1023 content: Optional[bytes], 1024 trailers: Union[None, Headers, Tuple[Tuple[bytes, bytes], ...]], 1025 timestamp_start: float, 1026 timestamp_end: Optional[float], 1027 ): 1028 # auto-convert invalid types to retain compatibility with older code. 1029 if isinstance(http_version, str): 1030 http_version = http_version.encode("ascii", "strict") 1031 if isinstance(reason, str): 1032 reason = reason.encode("ascii", "strict") 1033 1034 if isinstance(content, str): 1035 raise ValueError("Content must be bytes, not {}".format(type(content).__name__)) 1036 if not isinstance(headers, Headers): 1037 headers = Headers(headers) 1038 if trailers is not None and not isinstance(trailers, Headers): 1039 trailers = Headers(trailers) 1040 1041 self.data = ResponseData( 1042 http_version=http_version, 1043 status_code=status_code, 1044 reason=reason, 1045 headers=headers, 1046 content=content, 1047 trailers=trailers, 1048 timestamp_start=timestamp_start, 1049 timestamp_end=timestamp_end, 1050 ) 1051 1052 def __repr__(self) -> str: 1053 if self.raw_content: 1054 ct = self.headers.get("content-type", "unknown content type") 1055 size = human.pretty_size(len(self.raw_content)) 1056 details = f"{ct}, {size}" 1057 else: 1058 details = "no content" 1059 return f"Response({self.status_code}, {details})" 1060 1061 @classmethod 1062 def make( 1063 cls, 1064 status_code: int = 200, 1065 content: Union[bytes, str] = b"", 1066 headers: Union[Headers, Mapping[str, Union[str, bytes]], Iterable[Tuple[bytes, bytes]]] = () 1067 ) -> "Response": 1068 """ 1069 Simplified API for creating response objects. 1070 """ 1071 if isinstance(headers, Headers): 1072 headers = headers 1073 elif isinstance(headers, dict): 1074 headers = Headers( 1075 (always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1076 always_bytes(v, "utf-8", "surrogateescape")) 1077 for k, v in headers.items() 1078 ) 1079 elif isinstance(headers, Iterable): 1080 headers = Headers(headers) # type: ignore 1081 else: 1082 raise TypeError("Expected headers to be an iterable or dict, but is {}.".format( 1083 type(headers).__name__ 1084 )) 1085 1086 resp = cls( 1087 b"HTTP/1.1", 1088 status_code, 1089 status_codes.RESPONSES.get(status_code, "").encode(), 1090 headers, 1091 None, 1092 None, 1093 time.time(), 1094 time.time(), 1095 ) 1096 1097 # Assign this manually to update the content-length header. 1098 if isinstance(content, bytes): 1099 resp.content = content 1100 elif isinstance(content, str): 1101 resp.text = content 1102 else: 1103 raise TypeError(f"Expected content to be str or bytes, but is {type(content).__name__}.") 1104 1105 return resp 1106 1107 @property 1108 def status_code(self) -> int: 1109 """ 1110 HTTP Status Code, e.g. ``200``. 1111 """ 1112 return self.data.status_code 1113 1114 @status_code.setter 1115 def status_code(self, status_code: int) -> None: 1116 self.data.status_code = status_code 1117 1118 @property 1119 def reason(self) -> str: 1120 """ 1121 HTTP reason phrase, for example "Not Found". 1122 1123 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1124 """ 1125 # Encoding: http://stackoverflow.com/a/16674906/934719 1126 return self.data.reason.decode("ISO-8859-1") 1127 1128 @reason.setter 1129 def reason(self, reason: Union[str, bytes]) -> None: 1130 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1131 1132 def _get_cookies(self): 1133 h = self.headers.get_all("set-cookie") 1134 all_cookies = cookies.parse_set_cookie_headers(h) 1135 return tuple( 1136 (name, (value, attrs)) 1137 for name, value, attrs in all_cookies 1138 ) 1139 1140 def _set_cookies(self, value): 1141 cookie_headers = [] 1142 for k, v in value: 1143 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1144 cookie_headers.append(header) 1145 self.headers.set_all("set-cookie", cookie_headers) 1146 1147 @property 1148 def cookies(self) -> multidict.MultiDictView[str, Tuple[str, multidict.MultiDict[str, Optional[str]]]]: 1149 """ 1150 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1151 name strings, and values are `(cookie value, attributes)` tuples. Within 1152 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1153 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1154 1155 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1156 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1157 """ 1158 return multidict.MultiDictView( 1159 self._get_cookies, 1160 self._set_cookies 1161 ) 1162 1163 @cookies.setter 1164 def cookies(self, value): 1165 self._set_cookies(value) 1166 1167 def refresh(self, now=None): 1168 """ 1169 This fairly complex and heuristic function refreshes a server 1170 response for replay. 1171 1172 - It adjusts date, expires, and last-modified headers. 1173 - It adjusts cookie expiration. 1174 """ 1175 if not now: 1176 now = time.time() 1177 delta = now - self.timestamp_start 1178 refresh_headers = [ 1179 "date", 1180 "expires", 1181 "last-modified", 1182 ] 1183 for i in refresh_headers: 1184 if i in self.headers: 1185 d = parsedate_tz(self.headers[i]) 1186 if d: 1187 new = mktime_tz(d) + delta 1188 self.headers[i] = formatdate(new, usegmt=True) 1189 c = [] 1190 for set_cookie_header in self.headers.get_all("set-cookie"): 1191 try: 1192 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1193 except ValueError: 1194 refreshed = set_cookie_header 1195 c.append(refreshed) 1196 if c: 1197 self.headers.set_all("set-cookie", c) 1198 1199 1200class HTTPFlow(flow.Flow): 1201 """ 1202 An HTTPFlow is a collection of objects representing a single HTTP 1203 transaction. 1204 """ 1205 request: Request 1206 """The client's HTTP request.""" 1207 response: Optional[Response] = None 1208 """The server's HTTP response.""" 1209 error: Optional[flow.Error] = None 1210 """ 1211 A connection or protocol error affecting this flow. 1212 1213 Note that it's possible for a Flow to have both a response and an error 1214 object. This might happen, for instance, when a response was received 1215 from the server, but there was an error sending it back to the client. 1216 """ 1217 1218 websocket: Optional[WebSocketData] = None 1219 """ 1220 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1221 """ 1222 1223 def __init__(self, client_conn, server_conn, live=None, mode="regular"): 1224 super().__init__("http", client_conn, server_conn, live) 1225 self.mode = mode 1226 1227 _stateobject_attributes = flow.Flow._stateobject_attributes.copy() 1228 # mypy doesn't support update with kwargs 1229 _stateobject_attributes.update(dict( 1230 request=Request, 1231 response=Response, 1232 websocket=WebSocketData, 1233 mode=str 1234 )) 1235 1236 def __repr__(self): 1237 s = "<HTTPFlow" 1238 for a in ("request", "response", "websocket", "error", "client_conn", "server_conn"): 1239 if getattr(self, a, False): 1240 s += f"\r\n {a} = {{flow.{a}}}" 1241 s += ">" 1242 return s.format(flow=self) 1243 1244 @property 1245 def timestamp_start(self) -> float: 1246 """*Read-only:* An alias for `Request.timestamp_start`.""" 1247 return self.request.timestamp_start 1248 1249 def copy(self): 1250 f = super().copy() 1251 if self.request: 1252 f.request = self.request.copy() 1253 if self.response: 1254 f.response = self.response.copy() 1255 return f 1256 1257 1258__all__ = [ 1259 "HTTPFlow", 1260 "Message", 1261 "Request", 1262 "Response", 1263 "Headers", 1264] 1265