14import abc
15import cgi
16import codecs
17import logging
18import random
19import sys
20import typing
21import urllib.parse
22from http import HTTPStatus
23from io import BytesIO, StringIO
24from typing import (
26    Callable,
27    Dict,
28    Generic,
29    List,
30    Optional,
31    Tuple,
32    TypeVar,
33    Union,
34    overload,
37import attr
38import treq
39from canonicaljson import encode_canonical_json
40from prometheus_client import Counter
41from signedjson.sign import sign_json
42from typing_extensions import Literal
44from twisted.internet import defer
45from twisted.internet.error import DNSLookupError
46from twisted.internet.interfaces import IReactorTime
47from twisted.internet.task import _EPSILON, Cooperator
48from twisted.web.client import ResponseFailed
49from twisted.web.http_headers import Headers
50from twisted.web.iweb import IBodyProducer, IResponse
52import synapse.metrics
53import synapse.util.retryutils
54from synapse.api.errors import (
55    Codes,
56    FederationDeniedError,
57    HttpResponseException,
58    RequestSendFailed,
59    SynapseError,
61from synapse.http import QuieterFileBodyProducer
62from synapse.http.client import (
63    BlacklistingAgentWrapper,
64    BodyExceededMaxSize,
65    ByteWriteable,
66    encode_query_args,
67    read_body_with_max_size,
69from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
70from synapse.logging import opentracing
71from synapse.logging.context import make_deferred_yieldable, run_in_background
72from synapse.logging.opentracing import set_tag, start_active_span, tags
73from synapse.types import JsonDict
74from synapse.util import json_decoder
75from synapse.util.async_helpers import timeout_deferred
76from synapse.util.metrics import Measure
79    from synapse.server import HomeServer
81logger = logging.getLogger(__name__)
83outgoing_requests_counter = Counter(
84    "synapse_http_matrixfederationclient_requests", "", ["method"]
86incoming_responses_counter = Counter(
87    "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
90# a federation response can be rather large (eg a big state_ids is 50M or so), so we
91# need a generous limit here.
92MAX_RESPONSE_SIZE = 100 * 1024 * 1024
96MAXINT = sys.maxsize
99_next_id = 1
102QueryArgs = Dict[str, Union[str, List[str]]]
105T = TypeVar("T")
108class ByteParser(ByteWriteable, Generic[T], abc.ABC):
109    """A `ByteWriteable` that has an additional `finish` function that returns
110    the parsed data.
111    """
113    CONTENT_TYPE: str = abc.abstractproperty()  # type: ignore
114    """The expected content type of the response, e.g. `application/json`. If
115    the content type doesn't match we fail the request.
116    """
118    @abc.abstractmethod
119    def finish(self) -> T:
120        """Called when response has finished streaming and the parser should
121        return the final result (or error).
122        """
123        pass
126@attr.s(slots=True, frozen=True)
127class MatrixFederationRequest:
128    method = attr.ib(type=str)
129    """HTTP method
130    """
132    path = attr.ib(type=str)
133    """HTTP path
134    """
136    destination = attr.ib(type=str)
137    """The remote server to send the HTTP request to.
138    """
140    json = attr.ib(default=None, type=Optional[JsonDict])
141    """JSON to send in the body.
142    """
144    json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]])
145    """A callback to generate the JSON.
146    """
148    query = attr.ib(default=None, type=Optional[dict])
149    """Query arguments.
150    """
152    txn_id = attr.ib(default=None, type=Optional[str])
153    """Unique ID for this request (for logging)
154    """
156    uri = attr.ib(init=False, type=bytes)
157    """The URI of this request
158    """
160    def __attrs_post_init__(self) -> None:
161        global _next_id
162        txn_id = "%s-O-%s" % (self.method, _next_id)
163        _next_id = (_next_id + 1) % (MAXINT - 1)
165        object.__setattr__(self, "txn_id", txn_id)
167        destination_bytes = self.destination.encode("ascii")
168        path_bytes = self.path.encode("ascii")
169        if self.query:
170            query_bytes = encode_query_args(self.query)
171        else:
172            query_bytes = b""
174        # The object is frozen so we can pre-compute this.
175        uri = urllib.parse.urlunparse(
176            (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
177        )
178        object.__setattr__(self, "uri", uri)
180    def get_json(self) -> Optional[JsonDict]:
181        if self.json_callback:
182            return self.json_callback()
183        return self.json
186class JsonParser(ByteParser[Union[JsonDict, list]]):
187    """A parser that buffers the response and tries to parse it as JSON."""
189    CONTENT_TYPE = "application/json"
191    def __init__(self):
192        self._buffer = StringIO()
193        self._binary_wrapper = BinaryIOWrapper(self._buffer)
195    def write(self, data: bytes) -> int:
196        return self._binary_wrapper.write(data)
198    def finish(self) -> Union[JsonDict, list]:
199        return json_decoder.decode(self._buffer.getvalue())
202async def _handle_response(
203    reactor: IReactorTime,
204    timeout_sec: float,
205    request: MatrixFederationRequest,
206    response: IResponse,
207    start_ms: int,
208    parser: ByteParser[T],
209    max_response_size: Optional[int] = None,
210) -> T:
211    """
212    Reads the body of a response with a timeout and sends it to a parser
214    Args:
215        reactor: twisted reactor, for the timeout
216        timeout_sec: number of seconds to wait for response to complete
217        request: the request that triggered the response
218        response: response to the request
219        start_ms: Timestamp when request was made
220        parser: The parser for the response
221        max_response_size: The maximum size to read from the response, if None
222            uses the default.
224    Returns:
225        The parsed response
226    """
228    if max_response_size is None:
229        max_response_size = MAX_RESPONSE_SIZE
231    try:
232        check_content_type_is(response.headers, parser.CONTENT_TYPE)
234        d = read_body_with_max_size(response, parser, max_response_size)
235        d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
237        length = await make_deferred_yieldable(d)
239        value = parser.finish()
240    except BodyExceededMaxSize as e:
241        # The response was too big.
242        logger.warning(
243            "{%s} [%s] JSON response exceeded max size %i - %s %s",
244            request.txn_id,
245            request.destination,
246            MAX_RESPONSE_SIZE,
247            request.method,
248            request.uri.decode("ascii"),
249        )
250        raise RequestSendFailed(e, can_retry=False) from e
251    except ValueError as e:
252        # The content was invalid.
253        logger.warning(
254            "{%s} [%s] Failed to parse response - %s %s",
255            request.txn_id,
256            request.destination,
257            request.method,
258            request.uri.decode("ascii"),
259        )
260        raise RequestSendFailed(e, can_retry=False) from e
261    except defer.TimeoutError as e:
262        logger.warning(
263            "{%s} [%s] Timed out reading response - %s %s",
264            request.txn_id,
265            request.destination,
266            request.method,
267            request.uri.decode("ascii"),
268        )
269        raise RequestSendFailed(e, can_retry=True) from e
270    except ResponseFailed as e:
271        logger.warning(
272            "{%s} [%s] Failed to read response - %s %s",
273            request.txn_id,
274            request.destination,
275            request.method,
276            request.uri.decode("ascii"),
277        )
278        raise RequestSendFailed(e, can_retry=True) from e
279    except Exception as e:
280        logger.warning(
281            "{%s} [%s] Error reading response %s %s: %s",
282            request.txn_id,
283            request.destination,
284            request.method,
285            request.uri.decode("ascii"),
286            e,
287        )
288        raise
290    time_taken_secs = reactor.seconds() - start_ms / 1000
292    logger.info(
293        "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
294        request.txn_id,
295        request.destination,
296        response.code,
297        response.phrase.decode("ascii", errors="replace"),
298        time_taken_secs,
299        length,
300        request.method,
301        request.uri.decode("ascii"),
302    )
303    return value
306class BinaryIOWrapper:
307    """A wrapper for a TextIO which converts from bytes on the fly."""
309    def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
310        self.decoder = codecs.getincrementaldecoder(encoding)(errors)
311        self.file = file
313    def write(self, b: Union[bytes, bytearray]) -> int:
314        self.file.write(self.decoder.decode(b))
315        return len(b)
318class MatrixFederationHttpClient:
319    """HTTP client used to talk to other homeservers over the federation
320    protocol. Send client certificates and signs requests.
322    Attributes:
323        agent (twisted.web.client.Agent): The twisted Agent used to send the
324            requests.
325    """
327    def __init__(self, hs: "HomeServer", tls_client_options_factory):
328        self.hs = hs
329        self.signing_key = hs.signing_key
330        self.server_name = hs.hostname
332        self.reactor = hs.get_reactor()
334        user_agent = hs.version_string
335        if hs.config.server.user_agent_suffix:
336            user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
337        user_agent = user_agent.encode("ascii")
339        federation_agent = MatrixFederationAgent(
340            self.reactor,
341            tls_client_options_factory,
342            user_agent,
343            hs.config.server.federation_ip_range_whitelist,
344            hs.config.server.federation_ip_range_blacklist,
345        )
347        # Use a BlacklistingAgentWrapper to prevent circumventing the IP
348        # blacklist via IP literals in server names
349        self.agent = BlacklistingAgentWrapper(
350            federation_agent,
351            ip_blacklist=hs.config.server.federation_ip_range_blacklist,
352        )
354        self.clock = hs.get_clock()
355        self._store = hs.get_datastore()
356        self.version_string_bytes = hs.version_string.encode("ascii")
357        self.default_timeout = 60
359        def schedule(x):
360            self.reactor.callLater(_EPSILON, x)
362        self._cooperator = Cooperator(scheduler=schedule)
364    async def _send_request_with_optional_trailing_slash(
365        self,
366        request: MatrixFederationRequest,
367        try_trailing_slash_on_400: bool = False,
368        **send_request_args,
369    ) -> IResponse:
370        """Wrapper for _send_request which can optionally retry the request
371        upon receiving a combination of a 400 HTTP response code and a
372        'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
373        due to #3622.
375        Args:
376            request: details of request to be sent
377            try_trailing_slash_on_400: Whether on receiving a 400
378                'M_UNRECOGNIZED' from the server to retry the request with a
379                trailing slash appended to the request path.
380            send_request_args: A dictionary of arguments to pass to `_send_request()`.
382        Raises:
383            HttpResponseException: If we get an HTTP response code >= 300
384                (except 429).
386        Returns:
387            Parsed JSON response body.
388        """
389        try:
390            response = await self._send_request(request, **send_request_args)
391        except HttpResponseException as e:
392            # Received an HTTP error > 300. Check if it meets the requirements
393            # to retry with a trailing slash
394            if not try_trailing_slash_on_400:
395                raise
397            if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
398                raise
400            # Retry with a trailing slash if we received a 400 with
401            # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
402            # trailing slash on Synapse <= v0.99.3.
403            logger.info("Retrying request with trailing slash")
405            # Request is frozen so we create a new instance
406            request = attr.evolve(request, path=request.path + "/")
408            response = await self._send_request(request, **send_request_args)
410        return response
412    async def _send_request(
413        self,
414        request: MatrixFederationRequest,
415        retry_on_dns_fail: bool = True,
416        timeout: Optional[int] = None,
417        long_retries: bool = False,
418        ignore_backoff: bool = False,
419        backoff_on_404: bool = False,
420    ) -> IResponse:
421        """
422        Sends a request to the given server.
424        Args:
425            request: details of request to be sent
427            retry_on_dns_fail: true if the request should be retied on DNS failures
429            timeout: number of milliseconds to wait for the response headers
430                (including connecting to the server), *for each attempt*.
431                60s by default.
433            long_retries: whether to use the long retry algorithm.
435                The regular retry algorithm makes 4 attempts, with intervals
436                [0.5s, 1s, 2s].
438                The long retry algorithm makes 11 attempts, with intervals
439                [4s, 16s, 60s, 60s, ...]
441                Both algorithms add -20%/+40% jitter to the retry intervals.
443                Note that the above intervals are *in addition* to the time spent
444                waiting for the request to complete (up to `timeout` ms).
446                NB: the long retry algorithm takes over 20 minutes to complete, with
447                a default timeout of 60s!
449            ignore_backoff: true to ignore the historical backoff data
450                and try the request anyway.
452            backoff_on_404: Back off if we get a 404
454        Returns:
455            Resolves with the HTTP response object on success.
457        Raises:
458            HttpResponseException: If we get an HTTP response code >= 300
459                (except 429).
460            NotRetryingDestination: If we are not yet ready to retry this
461                server.
462            FederationDeniedError: If this destination  is not on our
463                federation whitelist
464            RequestSendFailed: If there were problems connecting to the
465                remote, due to e.g. DNS failures, connection timeouts etc.
466        """
467        if timeout:
468            _sec_timeout = timeout / 1000
469        else:
470            _sec_timeout = self.default_timeout
472        if (
473            self.hs.config.federation.federation_domain_whitelist is not None
474            and request.destination
475            not in self.hs.config.federation.federation_domain_whitelist
476        ):
477            raise FederationDeniedError(request.destination)
479        limiter = await synapse.util.retryutils.get_retry_limiter(
480            request.destination,
481            self.clock,
482            self._store,
483            backoff_on_404=backoff_on_404,
484            ignore_backoff=ignore_backoff,
485        )
487        method_bytes = request.method.encode("ascii")
488        destination_bytes = request.destination.encode("ascii")
489        path_bytes = request.path.encode("ascii")
490        if request.query:
491            query_bytes = encode_query_args(request.query)
492        else:
493            query_bytes = b""
495        scope = start_active_span(
496            "outgoing-federation-request",
497            tags={
498                tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
499                tags.PEER_ADDRESS: request.destination,
500                tags.HTTP_METHOD: request.method,
501                tags.HTTP_URL: request.path,
502            },
503            finish_on_close=True,
504        )
506        # Inject the span into the headers
507        headers_dict: Dict[bytes, List[bytes]] = {}
508        opentracing.inject_header_dict(headers_dict, request.destination)
510        headers_dict[b"User-Agent"] = [self.version_string_bytes]
512        with limiter, scope:
513            # XXX: Would be much nicer to retry only at the transaction-layer
514            # (once we have reliable transactions in place)
515            if long_retries:
516                retries_left = MAX_LONG_RETRIES
517            else:
518                retries_left = MAX_SHORT_RETRIES
520            url_bytes = request.uri
521            url_str = url_bytes.decode("ascii")
523            url_to_sign_bytes = urllib.parse.urlunparse(
524                (b"", b"", path_bytes, None, query_bytes, b"")
525            )
527            while True:
528                try:
529                    json = request.get_json()
530                    if json:
531                        headers_dict[b"Content-Type"] = [b"application/json"]
532                        auth_headers = self.build_auth_headers(
533                            destination_bytes, method_bytes, url_to_sign_bytes, json
534                        )
535                        data = encode_canonical_json(json)
536                        producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
537                            BytesIO(data), cooperator=self._cooperator
538                        )
539                    else:
540                        producer = None
541                        auth_headers = self.build_auth_headers(
542                            destination_bytes, method_bytes, url_to_sign_bytes
543                        )
545                    headers_dict[b"Authorization"] = auth_headers
547                    logger.debug(
548                        "{%s} [%s] Sending request: %s %s; timeout %fs",
549                        request.txn_id,
550                        request.destination,
551                        request.method,
552                        url_str,
553                        _sec_timeout,
554                    )
556                    outgoing_requests_counter.labels(request.method).inc()
558                    try:
559                        with Measure(self.clock, "outbound_request"):
560                            # we don't want all the fancy cookie and redirect handling
561                            # that treq.request gives: just use the raw Agent.
563                            # To preserve the logging context, the timeout is treated
564                            # in a similar way to `defer.gatherResults`:
565                            # * Each logging context-preserving fork is wrapped in
566                            #   `run_in_background`. In this case there is only one,
567                            #   since the timeout fork is not logging-context aware.
568                            # * The `Deferred` that joins the forks back together is
569                            #   wrapped in `make_deferred_yieldable` to restore the
570                            #   logging context regardless of the path taken.
571                            request_deferred = run_in_background(
572                                self.agent.request,
573                                method_bytes,
574                                url_bytes,
575                                headers=Headers(headers_dict),
576                                bodyProducer=producer,
577                            )
578                            request_deferred = timeout_deferred(
579                                request_deferred,
580                                timeout=_sec_timeout,
581                                reactor=self.reactor,
582                            )
584                            response = await make_deferred_yieldable(request_deferred)
585                    except DNSLookupError as e:
586                        raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
587                    except Exception as e:
588                        raise RequestSendFailed(e, can_retry=True) from e
590                    incoming_responses_counter.labels(
591                        request.method, response.code
592                    ).inc()
594                    set_tag(tags.HTTP_STATUS_CODE, response.code)
595                    response_phrase = response.phrase.decode("ascii", errors="replace")
597                    if 200 <= response.code < 300:
598                        logger.debug(
599                            "{%s} [%s] Got response headers: %d %s",
600                            request.txn_id,
601                            request.destination,
602                            response.code,
603                            response_phrase,
604                        )
605                        pass
606                    else:
607                        logger.info(
608                            "{%s} [%s] Got response headers: %d %s",
609                            request.txn_id,
610                            request.destination,
611                            response.code,
612                            response_phrase,
613                        )
614                        # :'(
615                        # Update transactions table?
616                        d = treq.content(response)
617                        d = timeout_deferred(
618                            d, timeout=_sec_timeout, reactor=self.reactor
619                        )
621                        try:
622                            body = await make_deferred_yieldable(d)
623                        except Exception as e:
624                            # Eh, we're already going to raise an exception so lets
625                            # ignore if this fails.
626                            logger.warning(
627                                "{%s} [%s] Failed to get error response: %s %s: %s",
628                                request.txn_id,
629                                request.destination,
630                                request.method,
631                                url_str,
632                                _flatten_response_never_received(e),
633                            )
634                            body = None
636                        exc = HttpResponseException(
637                            response.code, response_phrase, body
638                        )
640                        # Retry if the error is a 5xx or a 429 (Too Many
641                        # Requests), otherwise just raise a standard
642                        # `HttpResponseException`
643                        if 500 <= response.code < 600 or response.code == 429:
644                            raise RequestSendFailed(exc, can_retry=True) from exc
645                        else:
646                            raise exc
648                    break
649                except RequestSendFailed as e:
650                    logger.info(
651                        "{%s} [%s] Request failed: %s %s: %s",
652                        request.txn_id,
653                        request.destination,
654                        request.method,
655                        url_str,
656                        _flatten_response_never_received(e.inner_exception),
657                    )
659                    if not e.can_retry:
660                        raise
662                    if retries_left and not timeout:
663                        if long_retries:
664                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
665                            delay = min(delay, 60)
666                            delay *= random.uniform(0.8, 1.4)
667                        else:
668                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
669                            delay = min(delay, 2)
670                            delay *= random.uniform(0.8, 1.4)
672                        logger.debug(
673                            "{%s} [%s] Waiting %ss before re-sending...",
674                            request.txn_id,
675                            request.destination,
676                            delay,
677                        )
679                        await self.clock.sleep(delay)
680                        retries_left -= 1
681                    else:
682                        raise
684                except Exception as e:
685                    logger.warning(
686                        "{%s} [%s] Request failed: %s %s: %s",
687                        request.txn_id,
688                        request.destination,
689                        request.method,
690                        url_str,
691                        _flatten_response_never_received(e),
692                    )
693                    raise
694        return response
696    def build_auth_headers(
697        self,
698        destination: Optional[bytes],
699        method: bytes,
700        url_bytes: bytes,
701        content: Optional[JsonDict] = None,
702        destination_is: Optional[bytes] = None,
703    ) -> List[bytes]:
704        """
705        Builds the Authorization headers for a federation request
706        Args:
707            destination: The destination homeserver of the request.
708                May be None if the destination is an identity server, in which case
709                destination_is must be non-None.
710            method: The HTTP method of the request
711            url_bytes: The URI path of the request
712            content: The body of the request
713            destination_is: As 'destination', but if the destination is an
714                identity server
716        Returns:
717            A list of headers to be added as "Authorization:" headers
718        """
719        request: JsonDict = {
720            "method": method.decode("ascii"),
721            "uri": url_bytes.decode("ascii"),
722            "origin": self.server_name,
723        }
725        if destination is not None:
726            request["destination"] = destination.decode("ascii")
728        if destination_is is not None:
729            request["destination_is"] = destination_is.decode("ascii")
731        if content is not None:
732            request["content"] = content
734        request = sign_json(request, self.server_name, self.signing_key)
736        auth_headers = []
738        for key, sig in request["signatures"][self.server_name].items():
739            auth_headers.append(
740                (
741                    'X-Matrix origin=%s,key="%s",sig="%s"'
742                    % (self.server_name, key, sig)
743                ).encode("ascii")
744            )
745        return auth_headers
747    @overload
748    async def put_json(
749        self,
750        destination: str,
751        path: str,
752        args: Optional[QueryArgs] = None,
753        data: Optional[JsonDict] = None,
754        json_data_callback: Optional[Callable[[], JsonDict]] = None,
755        long_retries: bool = False,
756        timeout: Optional[int] = None,
757        ignore_backoff: bool = False,
758        backoff_on_404: bool = False,
759        try_trailing_slash_on_400: bool = False,
760        parser: Literal[None] = None,
761        max_response_size: Optional[int] = None,
762    ) -> Union[JsonDict, list]:
763        ...
765    @overload
766    async def put_json(
767        self,
768        destination: str,
769        path: str,
770        args: Optional[QueryArgs] = None,
771        data: Optional[JsonDict] = None,
772        json_data_callback: Optional[Callable[[], JsonDict]] = None,
773        long_retries: bool = False,
774        timeout: Optional[int] = None,
775        ignore_backoff: bool = False,
776        backoff_on_404: bool = False,
777        try_trailing_slash_on_400: bool = False,
778        parser: Optional[ByteParser[T]] = None,
779        max_response_size: Optional[int] = None,
780    ) -> T:
781        ...
783    async def put_json(
784        self,
785        destination: str,
786        path: str,
787        args: Optional[QueryArgs] = None,
788        data: Optional[JsonDict] = None,
789        json_data_callback: Optional[Callable[[], JsonDict]] = None,
790        long_retries: bool = False,
791        timeout: Optional[int] = None,
792        ignore_backoff: bool = False,
793        backoff_on_404: bool = False,
794        try_trailing_slash_on_400: bool = False,
795        parser: Optional[ByteParser] = None,
796        max_response_size: Optional[int] = None,
797    ):
798        """Sends the specified json data using PUT
800        Args:
801            destination: The remote server to send the HTTP request to.
802            path: The HTTP path.
803            args: query params
804            data: A dict containing the data that will be used as
805                the request body. This will be encoded as JSON.
806            json_data_callback: A callable returning the dict to
807                use as the request body.
809            long_retries: whether to use the long retry algorithm. See
810                docs on _send_request for details.
812            timeout: number of milliseconds to wait for the response.
813                self._default_timeout (60s) by default.
815                Note that we may make several attempts to send the request; this
816                timeout applies to the time spent waiting for response headers for
817                *each* attempt (including connection time) as well as the time spent
818                reading the response body after a 200 response.
820            ignore_backoff: true to ignore the historical backoff data
821                and try the request anyway.
822            backoff_on_404: True if we should count a 404 response as
823                a failure of the server (and should therefore back off future
824                requests).
825            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
826                response we should try appending a trailing slash to the end
827                of the request. Workaround for #3622 in Synapse <= v0.99.3. This
828                will be attempted before backing off if backing off has been
829                enabled.
830            parser: The parser to use to decode the response. Defaults to
831                parsing as JSON.
832            max_response_size: The maximum size to read from the response, if None
833                uses the default.
835        Returns:
836            Succeeds when we get a 2xx HTTP response. The
837            result will be the decoded JSON body.
839        Raises:
840            HttpResponseException: If we get an HTTP response code >= 300
841                (except 429).
842            NotRetryingDestination: If we are not yet ready to retry this
843                server.
844            FederationDeniedError: If this destination  is not on our
845                federation whitelist
846            RequestSendFailed: If there were problems connecting to the
847                remote, due to e.g. DNS failures, connection timeouts etc.
848        """
849        request = MatrixFederationRequest(
850            method="PUT",
851            destination=destination,
852            path=path,
853            query=args,
854            json_callback=json_data_callback,
855            json=data,
856        )
858        start_ms = self.clock.time_msec()
860        response = await self._send_request_with_optional_trailing_slash(
861            request,
862            try_trailing_slash_on_400,
863            backoff_on_404=backoff_on_404,
864            ignore_backoff=ignore_backoff,
865            long_retries=long_retries,
866            timeout=timeout,
867        )
869        if timeout is not None:
870            _sec_timeout = timeout / 1000
871        else:
872            _sec_timeout = self.default_timeout
874        if parser is None:
875            parser = JsonParser()
877        body = await _handle_response(
878            self.reactor,
879            _sec_timeout,
880            request,
881            response,
882            start_ms,
883            parser=parser,
884            max_response_size=max_response_size,
885        )
887        return body
889    async def post_json(
890        self,
891        destination: str,
892        path: str,
893        data: Optional[JsonDict] = None,
894        long_retries: bool = False,
895        timeout: Optional[int] = None,
896        ignore_backoff: bool = False,
897        args: Optional[QueryArgs] = None,
898    ) -> Union[JsonDict, list]:
899        """Sends the specified json data using POST
901        Args:
902            destination: The remote server to send the HTTP request to.
904            path: The HTTP path.
906            data: A dict containing the data that will be used as
907                the request body. This will be encoded as JSON.
909            long_retries: whether to use the long retry algorithm. See
910                docs on _send_request for details.
912            timeout: number of milliseconds to wait for the response.
913                self._default_timeout (60s) by default.
915                Note that we may make several attempts to send the request; this
916                timeout applies to the time spent waiting for response headers for
917                *each* attempt (including connection time) as well as the time spent
918                reading the response body after a 200 response.
920            ignore_backoff: true to ignore the historical backoff data and
921                try the request anyway.
923            args: query params
924        Returns:
925            dict|list: Succeeds when we get a 2xx HTTP response. The
926            result will be the decoded JSON body.
928        Raises:
929            HttpResponseException: If we get an HTTP response code >= 300
930                (except 429).
931            NotRetryingDestination: If we are not yet ready to retry this
932                server.
933            FederationDeniedError: If this destination  is not on our
934                federation whitelist
935            RequestSendFailed: If there were problems connecting to the
936                remote, due to e.g. DNS failures, connection timeouts etc.
937        """
939        request = MatrixFederationRequest(
940            method="POST", destination=destination, path=path, query=args, json=data
941        )
943        start_ms = self.clock.time_msec()
945        response = await self._send_request(
946            request,
947            long_retries=long_retries,
948            timeout=timeout,
949            ignore_backoff=ignore_backoff,
950        )
952        if timeout:
953            _sec_timeout = timeout / 1000
954        else:
955            _sec_timeout = self.default_timeout
957        body = await _handle_response(
958            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
959        )
960        return body
962    async def get_json(
963        self,
964        destination: str,
965        path: str,
966        args: Optional[QueryArgs] = None,
967        retry_on_dns_fail: bool = True,
968        timeout: Optional[int] = None,
969        ignore_backoff: bool = False,
970        try_trailing_slash_on_400: bool = False,
971    ) -> Union[JsonDict, list]:
972        """GETs some json from the given host homeserver and path
974        Args:
975            destination: The remote server to send the HTTP request to.
977            path: The HTTP path.
979            args: A dictionary used to create query strings, defaults to
980                None.
982            timeout: number of milliseconds to wait for the response.
983                self._default_timeout (60s) by default.
985                Note that we may make several attempts to send the request; this
986                timeout applies to the time spent waiting for response headers for
987                *each* attempt (including connection time) as well as the time spent
988                reading the response body after a 200 response.
990            ignore_backoff: true to ignore the historical backoff data
991                and try the request anyway.
993            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
994                response we should try appending a trailing slash to the end of
995                the request. Workaround for #3622 in Synapse <= v0.99.3.
996        Returns:
997            Succeeds when we get a 2xx HTTP response. The
998            result will be the decoded JSON body.
1000        Raises:
1001            HttpResponseException: If we get an HTTP response code >= 300
1002                (except 429).
1003            NotRetryingDestination: If we are not yet ready to retry this
1004                server.
1005            FederationDeniedError: If this destination  is not on our
1006                federation whitelist
1007            RequestSendFailed: If there were problems connecting to the
1008                remote, due to e.g. DNS failures, connection timeouts etc.
1009        """
1010        request = MatrixFederationRequest(
1011            method="GET", destination=destination, path=path, query=args
1012        )
1014        start_ms = self.clock.time_msec()
1016        response = await self._send_request_with_optional_trailing_slash(
1017            request,
1018            try_trailing_slash_on_400,
1019            backoff_on_404=False,
1020            ignore_backoff=ignore_backoff,
1021            retry_on_dns_fail=retry_on_dns_fail,
1022            timeout=timeout,
1023        )
1025        if timeout is not None:
1026            _sec_timeout = timeout / 1000
1027        else:
1028            _sec_timeout = self.default_timeout
1030        body = await _handle_response(
1031            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
1032        )
1034        return body
1036    async def delete_json(
1037        self,
1038        destination: str,
1039        path: str,
1040        long_retries: bool = False,
1041        timeout: Optional[int] = None,
1042        ignore_backoff: bool = False,
1043        args: Optional[QueryArgs] = None,
1044    ) -> Union[JsonDict, list]:
1045        """Send a DELETE request to the remote expecting some json response
1047        Args:
1048            destination: The remote server to send the HTTP request to.
1049            path: The HTTP path.
1051            long_retries: whether to use the long retry algorithm. See
1052                docs on _send_request for details.
1054            timeout: number of milliseconds to wait for the response.
1055                self._default_timeout (60s) by default.
1057                Note that we may make several attempts to send the request; this
1058                timeout applies to the time spent waiting for response headers for
1059                *each* attempt (including connection time) as well as the time spent
1060                reading the response body after a 200 response.
1062            ignore_backoff: true to ignore the historical backoff data and
1063                try the request anyway.
1065            args: query params
1066        Returns:
1067            Succeeds when we get a 2xx HTTP response. The
1068            result will be the decoded JSON body.
1070        Raises:
1071            HttpResponseException: If we get an HTTP response code >= 300
1072                (except 429).
1073            NotRetryingDestination: If we are not yet ready to retry this
1074                server.
1075            FederationDeniedError: If this destination  is not on our
1076                federation whitelist
1077            RequestSendFailed: If there were problems connecting to the
1078                remote, due to e.g. DNS failures, connection timeouts etc.
1079        """
1080        request = MatrixFederationRequest(
1081            method="DELETE", destination=destination, path=path, query=args
1082        )
1084        start_ms = self.clock.time_msec()
1086        response = await self._send_request(
1087            request,
1088            long_retries=long_retries,
1089            timeout=timeout,
1090            ignore_backoff=ignore_backoff,
1091        )
1093        if timeout is not None:
1094            _sec_timeout = timeout / 1000
1095        else:
1096            _sec_timeout = self.default_timeout
1098        body = await _handle_response(
1099            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
1100        )
1101        return body
1103    async def get_file(
1104        self,
1105        destination: str,
1106        path: str,
1107        output_stream,
1108        args: Optional[QueryArgs] = None,
1109        retry_on_dns_fail: bool = True,
1110        max_size: Optional[int] = None,
1111        ignore_backoff: bool = False,
1112    ) -> Tuple[int, Dict[bytes, List[bytes]]]:
1113        """GETs a file from a given homeserver
1114        Args:
1115            destination: The remote server to send the HTTP request to.
1116            path: The HTTP path to GET.
1117            output_stream: File to write the response body to.
1118            args: Optional dictionary used to create the query string.
1119            ignore_backoff: true to ignore the historical backoff data
1120                and try the request anyway.
1122        Returns:
1123            Resolves with an (int,dict) tuple of
1124            the file length and a dict of the response headers.
1126        Raises:
1127            HttpResponseException: If we get an HTTP response code >= 300
1128                (except 429).
1129            NotRetryingDestination: If we are not yet ready to retry this
1130                server.
1131            FederationDeniedError: If this destination  is not on our
1132                federation whitelist
1133            RequestSendFailed: If there were problems connecting to the
1134                remote, due to e.g. DNS failures, connection timeouts etc.
1135        """
1136        request = MatrixFederationRequest(
1137            method="GET", destination=destination, path=path, query=args
1138        )
1140        response = await self._send_request(
1141            request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
1142        )
1144        headers = dict(response.headers.getAllRawHeaders())
1146        try:
1147            d = read_body_with_max_size(response, output_stream, max_size)
1148            d.addTimeout(self.default_timeout, self.reactor)
1149            length = await make_deferred_yieldable(d)
1150        except BodyExceededMaxSize:
1151            msg = "Requested file is too large > %r bytes" % (max_size,)
1152            logger.warning(
1153                "{%s} [%s] %s",
1154                request.txn_id,
1155                request.destination,
1156                msg,
1157            )
1158            raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
1159        except defer.TimeoutError as e:
1160            logger.warning(
1161                "{%s} [%s] Timed out reading response - %s %s",
1162                request.txn_id,
1163                request.destination,
1164                request.method,
1165                request.uri.decode("ascii"),
1166            )
1167            raise RequestSendFailed(e, can_retry=True) from e
1168        except ResponseFailed as e:
1169            logger.warning(
1170                "{%s} [%s] Failed to read response - %s %s",
1171                request.txn_id,
1172                request.destination,
1173                request.method,
1174                request.uri.decode("ascii"),
1175            )
1176            raise RequestSendFailed(e, can_retry=True) from e
1177        except Exception as e:
1178            logger.warning(
1179                "{%s} [%s] Error reading response: %s",
1180                request.txn_id,
1181                request.destination,
1182                e,
1183            )
1184            raise
1185        logger.info(
1186            "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
1187            request.txn_id,
1188            request.destination,
1189            response.code,
1190            response.phrase.decode("ascii", errors="replace"),
1191            length,
1192            request.method,
1193            request.uri.decode("ascii"),
1194        )
1195        return length, headers
1198def _flatten_response_never_received(e):
1199    if hasattr(e, "reasons"):
1200        reasons = ", ".join(
1201            _flatten_response_never_received(f.value) for f in e.reasons
1202        )
1204        return "%s:[%s]" % (type(e).__name__, reasons)
1205    else:
1206        return repr(e)
1209def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
1210    """
1211    Check that a set of HTTP headers have a Content-Type header, and that it
1212    is the expected value..
1214    Args:
1215        headers: headers to check
1217    Raises:
1218        RequestSendFailed: if the Content-Type header is missing or doesn't match
1220    """
1221    content_type_headers = headers.getRawHeaders(b"Content-Type")
1222    if content_type_headers is None:
1223        raise RequestSendFailed(
1224            RuntimeError("No Content-Type header received from remote server"),
1225            can_retry=False,
1226        )
1228    c_type = content_type_headers[0].decode("ascii")  # only the first header
1229    val, options = cgi.parse_header(c_type)
1230    if val != expected_content_type:
1231        raise RequestSendFailed(
1232            RuntimeError(
1233                f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
1234            ),
1235            can_retry=False,
1236        )