1# Copyright 2014-2021 The Matrix.org Foundation C.I.C. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import abc 15import cgi 16import codecs 17import logging 18import random 19import sys 20import typing 21import urllib.parse 22from http import HTTPStatus 23from io import BytesIO, StringIO 24from typing import ( 25 TYPE_CHECKING, 26 Callable, 27 Dict, 28 Generic, 29 List, 30 Optional, 31 Tuple, 32 TypeVar, 33 Union, 34 overload, 35) 36 37import attr 38import treq 39from canonicaljson import encode_canonical_json 40from prometheus_client import Counter 41from signedjson.sign import sign_json 42from typing_extensions import Literal 43 44from twisted.internet import defer 45from twisted.internet.error import DNSLookupError 46from twisted.internet.interfaces import IReactorTime 47from twisted.internet.task import _EPSILON, Cooperator 48from twisted.web.client import ResponseFailed 49from twisted.web.http_headers import Headers 50from twisted.web.iweb import IBodyProducer, IResponse 51 52import synapse.metrics 53import synapse.util.retryutils 54from synapse.api.errors import ( 55 Codes, 56 FederationDeniedError, 57 HttpResponseException, 58 RequestSendFailed, 59 SynapseError, 60) 61from synapse.http import QuieterFileBodyProducer 62from synapse.http.client import ( 63 BlacklistingAgentWrapper, 64 BodyExceededMaxSize, 65 ByteWriteable, 66 encode_query_args, 67 read_body_with_max_size, 68) 69from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent 70from synapse.logging import opentracing 71from synapse.logging.context import make_deferred_yieldable, run_in_background 72from synapse.logging.opentracing import set_tag, start_active_span, tags 73from synapse.types import JsonDict 74from synapse.util import json_decoder 75from synapse.util.async_helpers import timeout_deferred 76from synapse.util.metrics import Measure 77 78if TYPE_CHECKING: 79 from synapse.server import HomeServer 80 81logger = logging.getLogger(__name__) 82 83outgoing_requests_counter = Counter( 84 "synapse_http_matrixfederationclient_requests", "", ["method"] 85) 86incoming_responses_counter = Counter( 87 "synapse_http_matrixfederationclient_responses", "", ["method", "code"] 88) 89 90# a federation response can be rather large (eg a big state_ids is 50M or so), so we 91# need a generous limit here. 92MAX_RESPONSE_SIZE = 100 * 1024 * 1024 93 94MAX_LONG_RETRIES = 10 95MAX_SHORT_RETRIES = 3 96MAXINT = sys.maxsize 97 98 99_next_id = 1 100 101 102QueryArgs = Dict[str, Union[str, List[str]]] 103 104 105T = TypeVar("T") 106 107 108class ByteParser(ByteWriteable, Generic[T], abc.ABC): 109 """A `ByteWriteable` that has an additional `finish` function that returns 110 the parsed data. 111 """ 112 113 CONTENT_TYPE: str = abc.abstractproperty() # type: ignore 114 """The expected content type of the response, e.g. `application/json`. If 115 the content type doesn't match we fail the request. 116 """ 117 118 @abc.abstractmethod 119 def finish(self) -> T: 120 """Called when response has finished streaming and the parser should 121 return the final result (or error). 122 """ 123 pass 124 125 126@attr.s(slots=True, frozen=True) 127class MatrixFederationRequest: 128 method = attr.ib(type=str) 129 """HTTP method 130 """ 131 132 path = attr.ib(type=str) 133 """HTTP path 134 """ 135 136 destination = attr.ib(type=str) 137 """The remote server to send the HTTP request to. 138 """ 139 140 json = attr.ib(default=None, type=Optional[JsonDict]) 141 """JSON to send in the body. 142 """ 143 144 json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]]) 145 """A callback to generate the JSON. 146 """ 147 148 query = attr.ib(default=None, type=Optional[dict]) 149 """Query arguments. 150 """ 151 152 txn_id = attr.ib(default=None, type=Optional[str]) 153 """Unique ID for this request (for logging) 154 """ 155 156 uri = attr.ib(init=False, type=bytes) 157 """The URI of this request 158 """ 159 160 def __attrs_post_init__(self) -> None: 161 global _next_id 162 txn_id = "%s-O-%s" % (self.method, _next_id) 163 _next_id = (_next_id + 1) % (MAXINT - 1) 164 165 object.__setattr__(self, "txn_id", txn_id) 166 167 destination_bytes = self.destination.encode("ascii") 168 path_bytes = self.path.encode("ascii") 169 if self.query: 170 query_bytes = encode_query_args(self.query) 171 else: 172 query_bytes = b"" 173 174 # The object is frozen so we can pre-compute this. 175 uri = urllib.parse.urlunparse( 176 (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") 177 ) 178 object.__setattr__(self, "uri", uri) 179 180 def get_json(self) -> Optional[JsonDict]: 181 if self.json_callback: 182 return self.json_callback() 183 return self.json 184 185 186class JsonParser(ByteParser[Union[JsonDict, list]]): 187 """A parser that buffers the response and tries to parse it as JSON.""" 188 189 CONTENT_TYPE = "application/json" 190 191 def __init__(self): 192 self._buffer = StringIO() 193 self._binary_wrapper = BinaryIOWrapper(self._buffer) 194 195 def write(self, data: bytes) -> int: 196 return self._binary_wrapper.write(data) 197 198 def finish(self) -> Union[JsonDict, list]: 199 return json_decoder.decode(self._buffer.getvalue()) 200 201 202async def _handle_response( 203 reactor: IReactorTime, 204 timeout_sec: float, 205 request: MatrixFederationRequest, 206 response: IResponse, 207 start_ms: int, 208 parser: ByteParser[T], 209 max_response_size: Optional[int] = None, 210) -> T: 211 """ 212 Reads the body of a response with a timeout and sends it to a parser 213 214 Args: 215 reactor: twisted reactor, for the timeout 216 timeout_sec: number of seconds to wait for response to complete 217 request: the request that triggered the response 218 response: response to the request 219 start_ms: Timestamp when request was made 220 parser: The parser for the response 221 max_response_size: The maximum size to read from the response, if None 222 uses the default. 223 224 Returns: 225 The parsed response 226 """ 227 228 if max_response_size is None: 229 max_response_size = MAX_RESPONSE_SIZE 230 231 try: 232 check_content_type_is(response.headers, parser.CONTENT_TYPE) 233 234 d = read_body_with_max_size(response, parser, max_response_size) 235 d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) 236 237 length = await make_deferred_yieldable(d) 238 239 value = parser.finish() 240 except BodyExceededMaxSize as e: 241 # The response was too big. 242 logger.warning( 243 "{%s} [%s] JSON response exceeded max size %i - %s %s", 244 request.txn_id, 245 request.destination, 246 MAX_RESPONSE_SIZE, 247 request.method, 248 request.uri.decode("ascii"), 249 ) 250 raise RequestSendFailed(e, can_retry=False) from e 251 except ValueError as e: 252 # The content was invalid. 253 logger.warning( 254 "{%s} [%s] Failed to parse response - %s %s", 255 request.txn_id, 256 request.destination, 257 request.method, 258 request.uri.decode("ascii"), 259 ) 260 raise RequestSendFailed(e, can_retry=False) from e 261 except defer.TimeoutError as e: 262 logger.warning( 263 "{%s} [%s] Timed out reading response - %s %s", 264 request.txn_id, 265 request.destination, 266 request.method, 267 request.uri.decode("ascii"), 268 ) 269 raise RequestSendFailed(e, can_retry=True) from e 270 except ResponseFailed as e: 271 logger.warning( 272 "{%s} [%s] Failed to read response - %s %s", 273 request.txn_id, 274 request.destination, 275 request.method, 276 request.uri.decode("ascii"), 277 ) 278 raise RequestSendFailed(e, can_retry=True) from e 279 except Exception as e: 280 logger.warning( 281 "{%s} [%s] Error reading response %s %s: %s", 282 request.txn_id, 283 request.destination, 284 request.method, 285 request.uri.decode("ascii"), 286 e, 287 ) 288 raise 289 290 time_taken_secs = reactor.seconds() - start_ms / 1000 291 292 logger.info( 293 "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s", 294 request.txn_id, 295 request.destination, 296 response.code, 297 response.phrase.decode("ascii", errors="replace"), 298 time_taken_secs, 299 length, 300 request.method, 301 request.uri.decode("ascii"), 302 ) 303 return value 304 305 306class BinaryIOWrapper: 307 """A wrapper for a TextIO which converts from bytes on the fly.""" 308 309 def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"): 310 self.decoder = codecs.getincrementaldecoder(encoding)(errors) 311 self.file = file 312 313 def write(self, b: Union[bytes, bytearray]) -> int: 314 self.file.write(self.decoder.decode(b)) 315 return len(b) 316 317 318class MatrixFederationHttpClient: 319 """HTTP client used to talk to other homeservers over the federation 320 protocol. Send client certificates and signs requests. 321 322 Attributes: 323 agent (twisted.web.client.Agent): The twisted Agent used to send the 324 requests. 325 """ 326 327 def __init__(self, hs: "HomeServer", tls_client_options_factory): 328 self.hs = hs 329 self.signing_key = hs.signing_key 330 self.server_name = hs.hostname 331 332 self.reactor = hs.get_reactor() 333 334 user_agent = hs.version_string 335 if hs.config.server.user_agent_suffix: 336 user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) 337 user_agent = user_agent.encode("ascii") 338 339 federation_agent = MatrixFederationAgent( 340 self.reactor, 341 tls_client_options_factory, 342 user_agent, 343 hs.config.server.federation_ip_range_whitelist, 344 hs.config.server.federation_ip_range_blacklist, 345 ) 346 347 # Use a BlacklistingAgentWrapper to prevent circumventing the IP 348 # blacklist via IP literals in server names 349 self.agent = BlacklistingAgentWrapper( 350 federation_agent, 351 ip_blacklist=hs.config.server.federation_ip_range_blacklist, 352 ) 353 354 self.clock = hs.get_clock() 355 self._store = hs.get_datastore() 356 self.version_string_bytes = hs.version_string.encode("ascii") 357 self.default_timeout = 60 358 359 def schedule(x): 360 self.reactor.callLater(_EPSILON, x) 361 362 self._cooperator = Cooperator(scheduler=schedule) 363 364 async def _send_request_with_optional_trailing_slash( 365 self, 366 request: MatrixFederationRequest, 367 try_trailing_slash_on_400: bool = False, 368 **send_request_args, 369 ) -> IResponse: 370 """Wrapper for _send_request which can optionally retry the request 371 upon receiving a combination of a 400 HTTP response code and a 372 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3 373 due to #3622. 374 375 Args: 376 request: details of request to be sent 377 try_trailing_slash_on_400: Whether on receiving a 400 378 'M_UNRECOGNIZED' from the server to retry the request with a 379 trailing slash appended to the request path. 380 send_request_args: A dictionary of arguments to pass to `_send_request()`. 381 382 Raises: 383 HttpResponseException: If we get an HTTP response code >= 300 384 (except 429). 385 386 Returns: 387 Parsed JSON response body. 388 """ 389 try: 390 response = await self._send_request(request, **send_request_args) 391 except HttpResponseException as e: 392 # Received an HTTP error > 300. Check if it meets the requirements 393 # to retry with a trailing slash 394 if not try_trailing_slash_on_400: 395 raise 396 397 if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED": 398 raise 399 400 # Retry with a trailing slash if we received a 400 with 401 # 'M_UNRECOGNIZED' which some endpoints can return when omitting a 402 # trailing slash on Synapse <= v0.99.3. 403 logger.info("Retrying request with trailing slash") 404 405 # Request is frozen so we create a new instance 406 request = attr.evolve(request, path=request.path + "/") 407 408 response = await self._send_request(request, **send_request_args) 409 410 return response 411 412 async def _send_request( 413 self, 414 request: MatrixFederationRequest, 415 retry_on_dns_fail: bool = True, 416 timeout: Optional[int] = None, 417 long_retries: bool = False, 418 ignore_backoff: bool = False, 419 backoff_on_404: bool = False, 420 ) -> IResponse: 421 """ 422 Sends a request to the given server. 423 424 Args: 425 request: details of request to be sent 426 427 retry_on_dns_fail: true if the request should be retied on DNS failures 428 429 timeout: number of milliseconds to wait for the response headers 430 (including connecting to the server), *for each attempt*. 431 60s by default. 432 433 long_retries: whether to use the long retry algorithm. 434 435 The regular retry algorithm makes 4 attempts, with intervals 436 [0.5s, 1s, 2s]. 437 438 The long retry algorithm makes 11 attempts, with intervals 439 [4s, 16s, 60s, 60s, ...] 440 441 Both algorithms add -20%/+40% jitter to the retry intervals. 442 443 Note that the above intervals are *in addition* to the time spent 444 waiting for the request to complete (up to `timeout` ms). 445 446 NB: the long retry algorithm takes over 20 minutes to complete, with 447 a default timeout of 60s! 448 449 ignore_backoff: true to ignore the historical backoff data 450 and try the request anyway. 451 452 backoff_on_404: Back off if we get a 404 453 454 Returns: 455 Resolves with the HTTP response object on success. 456 457 Raises: 458 HttpResponseException: If we get an HTTP response code >= 300 459 (except 429). 460 NotRetryingDestination: If we are not yet ready to retry this 461 server. 462 FederationDeniedError: If this destination is not on our 463 federation whitelist 464 RequestSendFailed: If there were problems connecting to the 465 remote, due to e.g. DNS failures, connection timeouts etc. 466 """ 467 if timeout: 468 _sec_timeout = timeout / 1000 469 else: 470 _sec_timeout = self.default_timeout 471 472 if ( 473 self.hs.config.federation.federation_domain_whitelist is not None 474 and request.destination 475 not in self.hs.config.federation.federation_domain_whitelist 476 ): 477 raise FederationDeniedError(request.destination) 478 479 limiter = await synapse.util.retryutils.get_retry_limiter( 480 request.destination, 481 self.clock, 482 self._store, 483 backoff_on_404=backoff_on_404, 484 ignore_backoff=ignore_backoff, 485 ) 486 487 method_bytes = request.method.encode("ascii") 488 destination_bytes = request.destination.encode("ascii") 489 path_bytes = request.path.encode("ascii") 490 if request.query: 491 query_bytes = encode_query_args(request.query) 492 else: 493 query_bytes = b"" 494 495 scope = start_active_span( 496 "outgoing-federation-request", 497 tags={ 498 tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, 499 tags.PEER_ADDRESS: request.destination, 500 tags.HTTP_METHOD: request.method, 501 tags.HTTP_URL: request.path, 502 }, 503 finish_on_close=True, 504 ) 505 506 # Inject the span into the headers 507 headers_dict: Dict[bytes, List[bytes]] = {} 508 opentracing.inject_header_dict(headers_dict, request.destination) 509 510 headers_dict[b"User-Agent"] = [self.version_string_bytes] 511 512 with limiter, scope: 513 # XXX: Would be much nicer to retry only at the transaction-layer 514 # (once we have reliable transactions in place) 515 if long_retries: 516 retries_left = MAX_LONG_RETRIES 517 else: 518 retries_left = MAX_SHORT_RETRIES 519 520 url_bytes = request.uri 521 url_str = url_bytes.decode("ascii") 522 523 url_to_sign_bytes = urllib.parse.urlunparse( 524 (b"", b"", path_bytes, None, query_bytes, b"") 525 ) 526 527 while True: 528 try: 529 json = request.get_json() 530 if json: 531 headers_dict[b"Content-Type"] = [b"application/json"] 532 auth_headers = self.build_auth_headers( 533 destination_bytes, method_bytes, url_to_sign_bytes, json 534 ) 535 data = encode_canonical_json(json) 536 producer: Optional[IBodyProducer] = QuieterFileBodyProducer( 537 BytesIO(data), cooperator=self._cooperator 538 ) 539 else: 540 producer = None 541 auth_headers = self.build_auth_headers( 542 destination_bytes, method_bytes, url_to_sign_bytes 543 ) 544 545 headers_dict[b"Authorization"] = auth_headers 546 547 logger.debug( 548 "{%s} [%s] Sending request: %s %s; timeout %fs", 549 request.txn_id, 550 request.destination, 551 request.method, 552 url_str, 553 _sec_timeout, 554 ) 555 556 outgoing_requests_counter.labels(request.method).inc() 557 558 try: 559 with Measure(self.clock, "outbound_request"): 560 # we don't want all the fancy cookie and redirect handling 561 # that treq.request gives: just use the raw Agent. 562 563 # To preserve the logging context, the timeout is treated 564 # in a similar way to `defer.gatherResults`: 565 # * Each logging context-preserving fork is wrapped in 566 # `run_in_background`. In this case there is only one, 567 # since the timeout fork is not logging-context aware. 568 # * The `Deferred` that joins the forks back together is 569 # wrapped in `make_deferred_yieldable` to restore the 570 # logging context regardless of the path taken. 571 request_deferred = run_in_background( 572 self.agent.request, 573 method_bytes, 574 url_bytes, 575 headers=Headers(headers_dict), 576 bodyProducer=producer, 577 ) 578 request_deferred = timeout_deferred( 579 request_deferred, 580 timeout=_sec_timeout, 581 reactor=self.reactor, 582 ) 583 584 response = await make_deferred_yieldable(request_deferred) 585 except DNSLookupError as e: 586 raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e 587 except Exception as e: 588 raise RequestSendFailed(e, can_retry=True) from e 589 590 incoming_responses_counter.labels( 591 request.method, response.code 592 ).inc() 593 594 set_tag(tags.HTTP_STATUS_CODE, response.code) 595 response_phrase = response.phrase.decode("ascii", errors="replace") 596 597 if 200 <= response.code < 300: 598 logger.debug( 599 "{%s} [%s] Got response headers: %d %s", 600 request.txn_id, 601 request.destination, 602 response.code, 603 response_phrase, 604 ) 605 pass 606 else: 607 logger.info( 608 "{%s} [%s] Got response headers: %d %s", 609 request.txn_id, 610 request.destination, 611 response.code, 612 response_phrase, 613 ) 614 # :'( 615 # Update transactions table? 616 d = treq.content(response) 617 d = timeout_deferred( 618 d, timeout=_sec_timeout, reactor=self.reactor 619 ) 620 621 try: 622 body = await make_deferred_yieldable(d) 623 except Exception as e: 624 # Eh, we're already going to raise an exception so lets 625 # ignore if this fails. 626 logger.warning( 627 "{%s} [%s] Failed to get error response: %s %s: %s", 628 request.txn_id, 629 request.destination, 630 request.method, 631 url_str, 632 _flatten_response_never_received(e), 633 ) 634 body = None 635 636 exc = HttpResponseException( 637 response.code, response_phrase, body 638 ) 639 640 # Retry if the error is a 5xx or a 429 (Too Many 641 # Requests), otherwise just raise a standard 642 # `HttpResponseException` 643 if 500 <= response.code < 600 or response.code == 429: 644 raise RequestSendFailed(exc, can_retry=True) from exc 645 else: 646 raise exc 647 648 break 649 except RequestSendFailed as e: 650 logger.info( 651 "{%s} [%s] Request failed: %s %s: %s", 652 request.txn_id, 653 request.destination, 654 request.method, 655 url_str, 656 _flatten_response_never_received(e.inner_exception), 657 ) 658 659 if not e.can_retry: 660 raise 661 662 if retries_left and not timeout: 663 if long_retries: 664 delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) 665 delay = min(delay, 60) 666 delay *= random.uniform(0.8, 1.4) 667 else: 668 delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) 669 delay = min(delay, 2) 670 delay *= random.uniform(0.8, 1.4) 671 672 logger.debug( 673 "{%s} [%s] Waiting %ss before re-sending...", 674 request.txn_id, 675 request.destination, 676 delay, 677 ) 678 679 await self.clock.sleep(delay) 680 retries_left -= 1 681 else: 682 raise 683 684 except Exception as e: 685 logger.warning( 686 "{%s} [%s] Request failed: %s %s: %s", 687 request.txn_id, 688 request.destination, 689 request.method, 690 url_str, 691 _flatten_response_never_received(e), 692 ) 693 raise 694 return response 695 696 def build_auth_headers( 697 self, 698 destination: Optional[bytes], 699 method: bytes, 700 url_bytes: bytes, 701 content: Optional[JsonDict] = None, 702 destination_is: Optional[bytes] = None, 703 ) -> List[bytes]: 704 """ 705 Builds the Authorization headers for a federation request 706 Args: 707 destination: The destination homeserver of the request. 708 May be None if the destination is an identity server, in which case 709 destination_is must be non-None. 710 method: The HTTP method of the request 711 url_bytes: The URI path of the request 712 content: The body of the request 713 destination_is: As 'destination', but if the destination is an 714 identity server 715 716 Returns: 717 A list of headers to be added as "Authorization:" headers 718 """ 719 request: JsonDict = { 720 "method": method.decode("ascii"), 721 "uri": url_bytes.decode("ascii"), 722 "origin": self.server_name, 723 } 724 725 if destination is not None: 726 request["destination"] = destination.decode("ascii") 727 728 if destination_is is not None: 729 request["destination_is"] = destination_is.decode("ascii") 730 731 if content is not None: 732 request["content"] = content 733 734 request = sign_json(request, self.server_name, self.signing_key) 735 736 auth_headers = [] 737 738 for key, sig in request["signatures"][self.server_name].items(): 739 auth_headers.append( 740 ( 741 'X-Matrix origin=%s,key="%s",sig="%s"' 742 % (self.server_name, key, sig) 743 ).encode("ascii") 744 ) 745 return auth_headers 746 747 @overload 748 async def put_json( 749 self, 750 destination: str, 751 path: str, 752 args: Optional[QueryArgs] = None, 753 data: Optional[JsonDict] = None, 754 json_data_callback: Optional[Callable[[], JsonDict]] = None, 755 long_retries: bool = False, 756 timeout: Optional[int] = None, 757 ignore_backoff: bool = False, 758 backoff_on_404: bool = False, 759 try_trailing_slash_on_400: bool = False, 760 parser: Literal[None] = None, 761 max_response_size: Optional[int] = None, 762 ) -> Union[JsonDict, list]: 763 ... 764 765 @overload 766 async def put_json( 767 self, 768 destination: str, 769 path: str, 770 args: Optional[QueryArgs] = None, 771 data: Optional[JsonDict] = None, 772 json_data_callback: Optional[Callable[[], JsonDict]] = None, 773 long_retries: bool = False, 774 timeout: Optional[int] = None, 775 ignore_backoff: bool = False, 776 backoff_on_404: bool = False, 777 try_trailing_slash_on_400: bool = False, 778 parser: Optional[ByteParser[T]] = None, 779 max_response_size: Optional[int] = None, 780 ) -> T: 781 ... 782 783 async def put_json( 784 self, 785 destination: str, 786 path: str, 787 args: Optional[QueryArgs] = None, 788 data: Optional[JsonDict] = None, 789 json_data_callback: Optional[Callable[[], JsonDict]] = None, 790 long_retries: bool = False, 791 timeout: Optional[int] = None, 792 ignore_backoff: bool = False, 793 backoff_on_404: bool = False, 794 try_trailing_slash_on_400: bool = False, 795 parser: Optional[ByteParser] = None, 796 max_response_size: Optional[int] = None, 797 ): 798 """Sends the specified json data using PUT 799 800 Args: 801 destination: The remote server to send the HTTP request to. 802 path: The HTTP path. 803 args: query params 804 data: A dict containing the data that will be used as 805 the request body. This will be encoded as JSON. 806 json_data_callback: A callable returning the dict to 807 use as the request body. 808 809 long_retries: whether to use the long retry algorithm. See 810 docs on _send_request for details. 811 812 timeout: number of milliseconds to wait for the response. 813 self._default_timeout (60s) by default. 814 815 Note that we may make several attempts to send the request; this 816 timeout applies to the time spent waiting for response headers for 817 *each* attempt (including connection time) as well as the time spent 818 reading the response body after a 200 response. 819 820 ignore_backoff: true to ignore the historical backoff data 821 and try the request anyway. 822 backoff_on_404: True if we should count a 404 response as 823 a failure of the server (and should therefore back off future 824 requests). 825 try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED 826 response we should try appending a trailing slash to the end 827 of the request. Workaround for #3622 in Synapse <= v0.99.3. This 828 will be attempted before backing off if backing off has been 829 enabled. 830 parser: The parser to use to decode the response. Defaults to 831 parsing as JSON. 832 max_response_size: The maximum size to read from the response, if None 833 uses the default. 834 835 Returns: 836 Succeeds when we get a 2xx HTTP response. The 837 result will be the decoded JSON body. 838 839 Raises: 840 HttpResponseException: If we get an HTTP response code >= 300 841 (except 429). 842 NotRetryingDestination: If we are not yet ready to retry this 843 server. 844 FederationDeniedError: If this destination is not on our 845 federation whitelist 846 RequestSendFailed: If there were problems connecting to the 847 remote, due to e.g. DNS failures, connection timeouts etc. 848 """ 849 request = MatrixFederationRequest( 850 method="PUT", 851 destination=destination, 852 path=path, 853 query=args, 854 json_callback=json_data_callback, 855 json=data, 856 ) 857 858 start_ms = self.clock.time_msec() 859 860 response = await self._send_request_with_optional_trailing_slash( 861 request, 862 try_trailing_slash_on_400, 863 backoff_on_404=backoff_on_404, 864 ignore_backoff=ignore_backoff, 865 long_retries=long_retries, 866 timeout=timeout, 867 ) 868 869 if timeout is not None: 870 _sec_timeout = timeout / 1000 871 else: 872 _sec_timeout = self.default_timeout 873 874 if parser is None: 875 parser = JsonParser() 876 877 body = await _handle_response( 878 self.reactor, 879 _sec_timeout, 880 request, 881 response, 882 start_ms, 883 parser=parser, 884 max_response_size=max_response_size, 885 ) 886 887 return body 888 889 async def post_json( 890 self, 891 destination: str, 892 path: str, 893 data: Optional[JsonDict] = None, 894 long_retries: bool = False, 895 timeout: Optional[int] = None, 896 ignore_backoff: bool = False, 897 args: Optional[QueryArgs] = None, 898 ) -> Union[JsonDict, list]: 899 """Sends the specified json data using POST 900 901 Args: 902 destination: The remote server to send the HTTP request to. 903 904 path: The HTTP path. 905 906 data: A dict containing the data that will be used as 907 the request body. This will be encoded as JSON. 908 909 long_retries: whether to use the long retry algorithm. See 910 docs on _send_request for details. 911 912 timeout: number of milliseconds to wait for the response. 913 self._default_timeout (60s) by default. 914 915 Note that we may make several attempts to send the request; this 916 timeout applies to the time spent waiting for response headers for 917 *each* attempt (including connection time) as well as the time spent 918 reading the response body after a 200 response. 919 920 ignore_backoff: true to ignore the historical backoff data and 921 try the request anyway. 922 923 args: query params 924 Returns: 925 dict|list: Succeeds when we get a 2xx HTTP response. The 926 result will be the decoded JSON body. 927 928 Raises: 929 HttpResponseException: If we get an HTTP response code >= 300 930 (except 429). 931 NotRetryingDestination: If we are not yet ready to retry this 932 server. 933 FederationDeniedError: If this destination is not on our 934 federation whitelist 935 RequestSendFailed: If there were problems connecting to the 936 remote, due to e.g. DNS failures, connection timeouts etc. 937 """ 938 939 request = MatrixFederationRequest( 940 method="POST", destination=destination, path=path, query=args, json=data 941 ) 942 943 start_ms = self.clock.time_msec() 944 945 response = await self._send_request( 946 request, 947 long_retries=long_retries, 948 timeout=timeout, 949 ignore_backoff=ignore_backoff, 950 ) 951 952 if timeout: 953 _sec_timeout = timeout / 1000 954 else: 955 _sec_timeout = self.default_timeout 956 957 body = await _handle_response( 958 self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() 959 ) 960 return body 961 962 async def get_json( 963 self, 964 destination: str, 965 path: str, 966 args: Optional[QueryArgs] = None, 967 retry_on_dns_fail: bool = True, 968 timeout: Optional[int] = None, 969 ignore_backoff: bool = False, 970 try_trailing_slash_on_400: bool = False, 971 ) -> Union[JsonDict, list]: 972 """GETs some json from the given host homeserver and path 973 974 Args: 975 destination: The remote server to send the HTTP request to. 976 977 path: The HTTP path. 978 979 args: A dictionary used to create query strings, defaults to 980 None. 981 982 timeout: number of milliseconds to wait for the response. 983 self._default_timeout (60s) by default. 984 985 Note that we may make several attempts to send the request; this 986 timeout applies to the time spent waiting for response headers for 987 *each* attempt (including connection time) as well as the time spent 988 reading the response body after a 200 response. 989 990 ignore_backoff: true to ignore the historical backoff data 991 and try the request anyway. 992 993 try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED 994 response we should try appending a trailing slash to the end of 995 the request. Workaround for #3622 in Synapse <= v0.99.3. 996 Returns: 997 Succeeds when we get a 2xx HTTP response. The 998 result will be the decoded JSON body. 999 1000 Raises: 1001 HttpResponseException: If we get an HTTP response code >= 300 1002 (except 429). 1003 NotRetryingDestination: If we are not yet ready to retry this 1004 server. 1005 FederationDeniedError: If this destination is not on our 1006 federation whitelist 1007 RequestSendFailed: If there were problems connecting to the 1008 remote, due to e.g. DNS failures, connection timeouts etc. 1009 """ 1010 request = MatrixFederationRequest( 1011 method="GET", destination=destination, path=path, query=args 1012 ) 1013 1014 start_ms = self.clock.time_msec() 1015 1016 response = await self._send_request_with_optional_trailing_slash( 1017 request, 1018 try_trailing_slash_on_400, 1019 backoff_on_404=False, 1020 ignore_backoff=ignore_backoff, 1021 retry_on_dns_fail=retry_on_dns_fail, 1022 timeout=timeout, 1023 ) 1024 1025 if timeout is not None: 1026 _sec_timeout = timeout / 1000 1027 else: 1028 _sec_timeout = self.default_timeout 1029 1030 body = await _handle_response( 1031 self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() 1032 ) 1033 1034 return body 1035 1036 async def delete_json( 1037 self, 1038 destination: str, 1039 path: str, 1040 long_retries: bool = False, 1041 timeout: Optional[int] = None, 1042 ignore_backoff: bool = False, 1043 args: Optional[QueryArgs] = None, 1044 ) -> Union[JsonDict, list]: 1045 """Send a DELETE request to the remote expecting some json response 1046 1047 Args: 1048 destination: The remote server to send the HTTP request to. 1049 path: The HTTP path. 1050 1051 long_retries: whether to use the long retry algorithm. See 1052 docs on _send_request for details. 1053 1054 timeout: number of milliseconds to wait for the response. 1055 self._default_timeout (60s) by default. 1056 1057 Note that we may make several attempts to send the request; this 1058 timeout applies to the time spent waiting for response headers for 1059 *each* attempt (including connection time) as well as the time spent 1060 reading the response body after a 200 response. 1061 1062 ignore_backoff: true to ignore the historical backoff data and 1063 try the request anyway. 1064 1065 args: query params 1066 Returns: 1067 Succeeds when we get a 2xx HTTP response. The 1068 result will be the decoded JSON body. 1069 1070 Raises: 1071 HttpResponseException: If we get an HTTP response code >= 300 1072 (except 429). 1073 NotRetryingDestination: If we are not yet ready to retry this 1074 server. 1075 FederationDeniedError: If this destination is not on our 1076 federation whitelist 1077 RequestSendFailed: If there were problems connecting to the 1078 remote, due to e.g. DNS failures, connection timeouts etc. 1079 """ 1080 request = MatrixFederationRequest( 1081 method="DELETE", destination=destination, path=path, query=args 1082 ) 1083 1084 start_ms = self.clock.time_msec() 1085 1086 response = await self._send_request( 1087 request, 1088 long_retries=long_retries, 1089 timeout=timeout, 1090 ignore_backoff=ignore_backoff, 1091 ) 1092 1093 if timeout is not None: 1094 _sec_timeout = timeout / 1000 1095 else: 1096 _sec_timeout = self.default_timeout 1097 1098 body = await _handle_response( 1099 self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() 1100 ) 1101 return body 1102 1103 async def get_file( 1104 self, 1105 destination: str, 1106 path: str, 1107 output_stream, 1108 args: Optional[QueryArgs] = None, 1109 retry_on_dns_fail: bool = True, 1110 max_size: Optional[int] = None, 1111 ignore_backoff: bool = False, 1112 ) -> Tuple[int, Dict[bytes, List[bytes]]]: 1113 """GETs a file from a given homeserver 1114 Args: 1115 destination: The remote server to send the HTTP request to. 1116 path: The HTTP path to GET. 1117 output_stream: File to write the response body to. 1118 args: Optional dictionary used to create the query string. 1119 ignore_backoff: true to ignore the historical backoff data 1120 and try the request anyway. 1121 1122 Returns: 1123 Resolves with an (int,dict) tuple of 1124 the file length and a dict of the response headers. 1125 1126 Raises: 1127 HttpResponseException: If we get an HTTP response code >= 300 1128 (except 429). 1129 NotRetryingDestination: If we are not yet ready to retry this 1130 server. 1131 FederationDeniedError: If this destination is not on our 1132 federation whitelist 1133 RequestSendFailed: If there were problems connecting to the 1134 remote, due to e.g. DNS failures, connection timeouts etc. 1135 """ 1136 request = MatrixFederationRequest( 1137 method="GET", destination=destination, path=path, query=args 1138 ) 1139 1140 response = await self._send_request( 1141 request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff 1142 ) 1143 1144 headers = dict(response.headers.getAllRawHeaders()) 1145 1146 try: 1147 d = read_body_with_max_size(response, output_stream, max_size) 1148 d.addTimeout(self.default_timeout, self.reactor) 1149 length = await make_deferred_yieldable(d) 1150 except BodyExceededMaxSize: 1151 msg = "Requested file is too large > %r bytes" % (max_size,) 1152 logger.warning( 1153 "{%s} [%s] %s", 1154 request.txn_id, 1155 request.destination, 1156 msg, 1157 ) 1158 raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE) 1159 except defer.TimeoutError as e: 1160 logger.warning( 1161 "{%s} [%s] Timed out reading response - %s %s", 1162 request.txn_id, 1163 request.destination, 1164 request.method, 1165 request.uri.decode("ascii"), 1166 ) 1167 raise RequestSendFailed(e, can_retry=True) from e 1168 except ResponseFailed as e: 1169 logger.warning( 1170 "{%s} [%s] Failed to read response - %s %s", 1171 request.txn_id, 1172 request.destination, 1173 request.method, 1174 request.uri.decode("ascii"), 1175 ) 1176 raise RequestSendFailed(e, can_retry=True) from e 1177 except Exception as e: 1178 logger.warning( 1179 "{%s} [%s] Error reading response: %s", 1180 request.txn_id, 1181 request.destination, 1182 e, 1183 ) 1184 raise 1185 logger.info( 1186 "{%s} [%s] Completed: %d %s [%d bytes] %s %s", 1187 request.txn_id, 1188 request.destination, 1189 response.code, 1190 response.phrase.decode("ascii", errors="replace"), 1191 length, 1192 request.method, 1193 request.uri.decode("ascii"), 1194 ) 1195 return length, headers 1196 1197 1198def _flatten_response_never_received(e): 1199 if hasattr(e, "reasons"): 1200 reasons = ", ".join( 1201 _flatten_response_never_received(f.value) for f in e.reasons 1202 ) 1203 1204 return "%s:[%s]" % (type(e).__name__, reasons) 1205 else: 1206 return repr(e) 1207 1208 1209def check_content_type_is(headers: Headers, expected_content_type: str) -> None: 1210 """ 1211 Check that a set of HTTP headers have a Content-Type header, and that it 1212 is the expected value.. 1213 1214 Args: 1215 headers: headers to check 1216 1217 Raises: 1218 RequestSendFailed: if the Content-Type header is missing or doesn't match 1219 1220 """ 1221 content_type_headers = headers.getRawHeaders(b"Content-Type") 1222 if content_type_headers is None: 1223 raise RequestSendFailed( 1224 RuntimeError("No Content-Type header received from remote server"), 1225 can_retry=False, 1226 ) 1227 1228 c_type = content_type_headers[0].decode("ascii") # only the first header 1229 val, options = cgi.parse_header(c_type) 1230 if val != expected_content_type: 1231 raise RequestSendFailed( 1232 RuntimeError( 1233 f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'", 1234 ), 1235 can_retry=False, 1236 ) 1237