1# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import abc
15import cgi
16import codecs
17import logging
18import random
19import sys
20import typing
21import urllib.parse
22from http import HTTPStatus
23from io import BytesIO, StringIO
24from typing import (
25    TYPE_CHECKING,
26    Callable,
27    Dict,
28    Generic,
29    List,
30    Optional,
31    Tuple,
32    TypeVar,
33    Union,
34    overload,
35)
36
37import attr
38import treq
39from canonicaljson import encode_canonical_json
40from prometheus_client import Counter
41from signedjson.sign import sign_json
42from typing_extensions import Literal
43
44from twisted.internet import defer
45from twisted.internet.error import DNSLookupError
46from twisted.internet.interfaces import IReactorTime
47from twisted.internet.task import _EPSILON, Cooperator
48from twisted.web.client import ResponseFailed
49from twisted.web.http_headers import Headers
50from twisted.web.iweb import IBodyProducer, IResponse
51
52import synapse.metrics
53import synapse.util.retryutils
54from synapse.api.errors import (
55    Codes,
56    FederationDeniedError,
57    HttpResponseException,
58    RequestSendFailed,
59    SynapseError,
60)
61from synapse.http import QuieterFileBodyProducer
62from synapse.http.client import (
63    BlacklistingAgentWrapper,
64    BodyExceededMaxSize,
65    ByteWriteable,
66    encode_query_args,
67    read_body_with_max_size,
68)
69from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
70from synapse.logging import opentracing
71from synapse.logging.context import make_deferred_yieldable, run_in_background
72from synapse.logging.opentracing import set_tag, start_active_span, tags
73from synapse.types import JsonDict
74from synapse.util import json_decoder
75from synapse.util.async_helpers import timeout_deferred
76from synapse.util.metrics import Measure
77
78if TYPE_CHECKING:
79    from synapse.server import HomeServer
80
81logger = logging.getLogger(__name__)
82
83outgoing_requests_counter = Counter(
84    "synapse_http_matrixfederationclient_requests", "", ["method"]
85)
86incoming_responses_counter = Counter(
87    "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
88)
89
90# a federation response can be rather large (eg a big state_ids is 50M or so), so we
91# need a generous limit here.
92MAX_RESPONSE_SIZE = 100 * 1024 * 1024
93
94MAX_LONG_RETRIES = 10
95MAX_SHORT_RETRIES = 3
96MAXINT = sys.maxsize
97
98
99_next_id = 1
100
101
102QueryArgs = Dict[str, Union[str, List[str]]]
103
104
105T = TypeVar("T")
106
107
108class ByteParser(ByteWriteable, Generic[T], abc.ABC):
109    """A `ByteWriteable` that has an additional `finish` function that returns
110    the parsed data.
111    """
112
113    CONTENT_TYPE: str = abc.abstractproperty()  # type: ignore
114    """The expected content type of the response, e.g. `application/json`. If
115    the content type doesn't match we fail the request.
116    """
117
118    @abc.abstractmethod
119    def finish(self) -> T:
120        """Called when response has finished streaming and the parser should
121        return the final result (or error).
122        """
123        pass
124
125
126@attr.s(slots=True, frozen=True)
127class MatrixFederationRequest:
128    method = attr.ib(type=str)
129    """HTTP method
130    """
131
132    path = attr.ib(type=str)
133    """HTTP path
134    """
135
136    destination = attr.ib(type=str)
137    """The remote server to send the HTTP request to.
138    """
139
140    json = attr.ib(default=None, type=Optional[JsonDict])
141    """JSON to send in the body.
142    """
143
144    json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]])
145    """A callback to generate the JSON.
146    """
147
148    query = attr.ib(default=None, type=Optional[dict])
149    """Query arguments.
150    """
151
152    txn_id = attr.ib(default=None, type=Optional[str])
153    """Unique ID for this request (for logging)
154    """
155
156    uri = attr.ib(init=False, type=bytes)
157    """The URI of this request
158    """
159
160    def __attrs_post_init__(self) -> None:
161        global _next_id
162        txn_id = "%s-O-%s" % (self.method, _next_id)
163        _next_id = (_next_id + 1) % (MAXINT - 1)
164
165        object.__setattr__(self, "txn_id", txn_id)
166
167        destination_bytes = self.destination.encode("ascii")
168        path_bytes = self.path.encode("ascii")
169        if self.query:
170            query_bytes = encode_query_args(self.query)
171        else:
172            query_bytes = b""
173
174        # The object is frozen so we can pre-compute this.
175        uri = urllib.parse.urlunparse(
176            (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
177        )
178        object.__setattr__(self, "uri", uri)
179
180    def get_json(self) -> Optional[JsonDict]:
181        if self.json_callback:
182            return self.json_callback()
183        return self.json
184
185
186class JsonParser(ByteParser[Union[JsonDict, list]]):
187    """A parser that buffers the response and tries to parse it as JSON."""
188
189    CONTENT_TYPE = "application/json"
190
191    def __init__(self):
192        self._buffer = StringIO()
193        self._binary_wrapper = BinaryIOWrapper(self._buffer)
194
195    def write(self, data: bytes) -> int:
196        return self._binary_wrapper.write(data)
197
198    def finish(self) -> Union[JsonDict, list]:
199        return json_decoder.decode(self._buffer.getvalue())
200
201
202async def _handle_response(
203    reactor: IReactorTime,
204    timeout_sec: float,
205    request: MatrixFederationRequest,
206    response: IResponse,
207    start_ms: int,
208    parser: ByteParser[T],
209    max_response_size: Optional[int] = None,
210) -> T:
211    """
212    Reads the body of a response with a timeout and sends it to a parser
213
214    Args:
215        reactor: twisted reactor, for the timeout
216        timeout_sec: number of seconds to wait for response to complete
217        request: the request that triggered the response
218        response: response to the request
219        start_ms: Timestamp when request was made
220        parser: The parser for the response
221        max_response_size: The maximum size to read from the response, if None
222            uses the default.
223
224    Returns:
225        The parsed response
226    """
227
228    if max_response_size is None:
229        max_response_size = MAX_RESPONSE_SIZE
230
231    try:
232        check_content_type_is(response.headers, parser.CONTENT_TYPE)
233
234        d = read_body_with_max_size(response, parser, max_response_size)
235        d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
236
237        length = await make_deferred_yieldable(d)
238
239        value = parser.finish()
240    except BodyExceededMaxSize as e:
241        # The response was too big.
242        logger.warning(
243            "{%s} [%s] JSON response exceeded max size %i - %s %s",
244            request.txn_id,
245            request.destination,
246            MAX_RESPONSE_SIZE,
247            request.method,
248            request.uri.decode("ascii"),
249        )
250        raise RequestSendFailed(e, can_retry=False) from e
251    except ValueError as e:
252        # The content was invalid.
253        logger.warning(
254            "{%s} [%s] Failed to parse response - %s %s",
255            request.txn_id,
256            request.destination,
257            request.method,
258            request.uri.decode("ascii"),
259        )
260        raise RequestSendFailed(e, can_retry=False) from e
261    except defer.TimeoutError as e:
262        logger.warning(
263            "{%s} [%s] Timed out reading response - %s %s",
264            request.txn_id,
265            request.destination,
266            request.method,
267            request.uri.decode("ascii"),
268        )
269        raise RequestSendFailed(e, can_retry=True) from e
270    except ResponseFailed as e:
271        logger.warning(
272            "{%s} [%s] Failed to read response - %s %s",
273            request.txn_id,
274            request.destination,
275            request.method,
276            request.uri.decode("ascii"),
277        )
278        raise RequestSendFailed(e, can_retry=True) from e
279    except Exception as e:
280        logger.warning(
281            "{%s} [%s] Error reading response %s %s: %s",
282            request.txn_id,
283            request.destination,
284            request.method,
285            request.uri.decode("ascii"),
286            e,
287        )
288        raise
289
290    time_taken_secs = reactor.seconds() - start_ms / 1000
291
292    logger.info(
293        "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
294        request.txn_id,
295        request.destination,
296        response.code,
297        response.phrase.decode("ascii", errors="replace"),
298        time_taken_secs,
299        length,
300        request.method,
301        request.uri.decode("ascii"),
302    )
303    return value
304
305
306class BinaryIOWrapper:
307    """A wrapper for a TextIO which converts from bytes on the fly."""
308
309    def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
310        self.decoder = codecs.getincrementaldecoder(encoding)(errors)
311        self.file = file
312
313    def write(self, b: Union[bytes, bytearray]) -> int:
314        self.file.write(self.decoder.decode(b))
315        return len(b)
316
317
318class MatrixFederationHttpClient:
319    """HTTP client used to talk to other homeservers over the federation
320    protocol. Send client certificates and signs requests.
321
322    Attributes:
323        agent (twisted.web.client.Agent): The twisted Agent used to send the
324            requests.
325    """
326
327    def __init__(self, hs: "HomeServer", tls_client_options_factory):
328        self.hs = hs
329        self.signing_key = hs.signing_key
330        self.server_name = hs.hostname
331
332        self.reactor = hs.get_reactor()
333
334        user_agent = hs.version_string
335        if hs.config.server.user_agent_suffix:
336            user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
337        user_agent = user_agent.encode("ascii")
338
339        federation_agent = MatrixFederationAgent(
340            self.reactor,
341            tls_client_options_factory,
342            user_agent,
343            hs.config.server.federation_ip_range_whitelist,
344            hs.config.server.federation_ip_range_blacklist,
345        )
346
347        # Use a BlacklistingAgentWrapper to prevent circumventing the IP
348        # blacklist via IP literals in server names
349        self.agent = BlacklistingAgentWrapper(
350            federation_agent,
351            ip_blacklist=hs.config.server.federation_ip_range_blacklist,
352        )
353
354        self.clock = hs.get_clock()
355        self._store = hs.get_datastore()
356        self.version_string_bytes = hs.version_string.encode("ascii")
357        self.default_timeout = 60
358
359        def schedule(x):
360            self.reactor.callLater(_EPSILON, x)
361
362        self._cooperator = Cooperator(scheduler=schedule)
363
364    async def _send_request_with_optional_trailing_slash(
365        self,
366        request: MatrixFederationRequest,
367        try_trailing_slash_on_400: bool = False,
368        **send_request_args,
369    ) -> IResponse:
370        """Wrapper for _send_request which can optionally retry the request
371        upon receiving a combination of a 400 HTTP response code and a
372        'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
373        due to #3622.
374
375        Args:
376            request: details of request to be sent
377            try_trailing_slash_on_400: Whether on receiving a 400
378                'M_UNRECOGNIZED' from the server to retry the request with a
379                trailing slash appended to the request path.
380            send_request_args: A dictionary of arguments to pass to `_send_request()`.
381
382        Raises:
383            HttpResponseException: If we get an HTTP response code >= 300
384                (except 429).
385
386        Returns:
387            Parsed JSON response body.
388        """
389        try:
390            response = await self._send_request(request, **send_request_args)
391        except HttpResponseException as e:
392            # Received an HTTP error > 300. Check if it meets the requirements
393            # to retry with a trailing slash
394            if not try_trailing_slash_on_400:
395                raise
396
397            if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
398                raise
399
400            # Retry with a trailing slash if we received a 400 with
401            # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
402            # trailing slash on Synapse <= v0.99.3.
403            logger.info("Retrying request with trailing slash")
404
405            # Request is frozen so we create a new instance
406            request = attr.evolve(request, path=request.path + "/")
407
408            response = await self._send_request(request, **send_request_args)
409
410        return response
411
412    async def _send_request(
413        self,
414        request: MatrixFederationRequest,
415        retry_on_dns_fail: bool = True,
416        timeout: Optional[int] = None,
417        long_retries: bool = False,
418        ignore_backoff: bool = False,
419        backoff_on_404: bool = False,
420    ) -> IResponse:
421        """
422        Sends a request to the given server.
423
424        Args:
425            request: details of request to be sent
426
427            retry_on_dns_fail: true if the request should be retied on DNS failures
428
429            timeout: number of milliseconds to wait for the response headers
430                (including connecting to the server), *for each attempt*.
431                60s by default.
432
433            long_retries: whether to use the long retry algorithm.
434
435                The regular retry algorithm makes 4 attempts, with intervals
436                [0.5s, 1s, 2s].
437
438                The long retry algorithm makes 11 attempts, with intervals
439                [4s, 16s, 60s, 60s, ...]
440
441                Both algorithms add -20%/+40% jitter to the retry intervals.
442
443                Note that the above intervals are *in addition* to the time spent
444                waiting for the request to complete (up to `timeout` ms).
445
446                NB: the long retry algorithm takes over 20 minutes to complete, with
447                a default timeout of 60s!
448
449            ignore_backoff: true to ignore the historical backoff data
450                and try the request anyway.
451
452            backoff_on_404: Back off if we get a 404
453
454        Returns:
455            Resolves with the HTTP response object on success.
456
457        Raises:
458            HttpResponseException: If we get an HTTP response code >= 300
459                (except 429).
460            NotRetryingDestination: If we are not yet ready to retry this
461                server.
462            FederationDeniedError: If this destination  is not on our
463                federation whitelist
464            RequestSendFailed: If there were problems connecting to the
465                remote, due to e.g. DNS failures, connection timeouts etc.
466        """
467        if timeout:
468            _sec_timeout = timeout / 1000
469        else:
470            _sec_timeout = self.default_timeout
471
472        if (
473            self.hs.config.federation.federation_domain_whitelist is not None
474            and request.destination
475            not in self.hs.config.federation.federation_domain_whitelist
476        ):
477            raise FederationDeniedError(request.destination)
478
479        limiter = await synapse.util.retryutils.get_retry_limiter(
480            request.destination,
481            self.clock,
482            self._store,
483            backoff_on_404=backoff_on_404,
484            ignore_backoff=ignore_backoff,
485        )
486
487        method_bytes = request.method.encode("ascii")
488        destination_bytes = request.destination.encode("ascii")
489        path_bytes = request.path.encode("ascii")
490        if request.query:
491            query_bytes = encode_query_args(request.query)
492        else:
493            query_bytes = b""
494
495        scope = start_active_span(
496            "outgoing-federation-request",
497            tags={
498                tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
499                tags.PEER_ADDRESS: request.destination,
500                tags.HTTP_METHOD: request.method,
501                tags.HTTP_URL: request.path,
502            },
503            finish_on_close=True,
504        )
505
506        # Inject the span into the headers
507        headers_dict: Dict[bytes, List[bytes]] = {}
508        opentracing.inject_header_dict(headers_dict, request.destination)
509
510        headers_dict[b"User-Agent"] = [self.version_string_bytes]
511
512        with limiter, scope:
513            # XXX: Would be much nicer to retry only at the transaction-layer
514            # (once we have reliable transactions in place)
515            if long_retries:
516                retries_left = MAX_LONG_RETRIES
517            else:
518                retries_left = MAX_SHORT_RETRIES
519
520            url_bytes = request.uri
521            url_str = url_bytes.decode("ascii")
522
523            url_to_sign_bytes = urllib.parse.urlunparse(
524                (b"", b"", path_bytes, None, query_bytes, b"")
525            )
526
527            while True:
528                try:
529                    json = request.get_json()
530                    if json:
531                        headers_dict[b"Content-Type"] = [b"application/json"]
532                        auth_headers = self.build_auth_headers(
533                            destination_bytes, method_bytes, url_to_sign_bytes, json
534                        )
535                        data = encode_canonical_json(json)
536                        producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
537                            BytesIO(data), cooperator=self._cooperator
538                        )
539                    else:
540                        producer = None
541                        auth_headers = self.build_auth_headers(
542                            destination_bytes, method_bytes, url_to_sign_bytes
543                        )
544
545                    headers_dict[b"Authorization"] = auth_headers
546
547                    logger.debug(
548                        "{%s} [%s] Sending request: %s %s; timeout %fs",
549                        request.txn_id,
550                        request.destination,
551                        request.method,
552                        url_str,
553                        _sec_timeout,
554                    )
555
556                    outgoing_requests_counter.labels(request.method).inc()
557
558                    try:
559                        with Measure(self.clock, "outbound_request"):
560                            # we don't want all the fancy cookie and redirect handling
561                            # that treq.request gives: just use the raw Agent.
562
563                            # To preserve the logging context, the timeout is treated
564                            # in a similar way to `defer.gatherResults`:
565                            # * Each logging context-preserving fork is wrapped in
566                            #   `run_in_background`. In this case there is only one,
567                            #   since the timeout fork is not logging-context aware.
568                            # * The `Deferred` that joins the forks back together is
569                            #   wrapped in `make_deferred_yieldable` to restore the
570                            #   logging context regardless of the path taken.
571                            request_deferred = run_in_background(
572                                self.agent.request,
573                                method_bytes,
574                                url_bytes,
575                                headers=Headers(headers_dict),
576                                bodyProducer=producer,
577                            )
578                            request_deferred = timeout_deferred(
579                                request_deferred,
580                                timeout=_sec_timeout,
581                                reactor=self.reactor,
582                            )
583
584                            response = await make_deferred_yieldable(request_deferred)
585                    except DNSLookupError as e:
586                        raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
587                    except Exception as e:
588                        raise RequestSendFailed(e, can_retry=True) from e
589
590                    incoming_responses_counter.labels(
591                        request.method, response.code
592                    ).inc()
593
594                    set_tag(tags.HTTP_STATUS_CODE, response.code)
595                    response_phrase = response.phrase.decode("ascii", errors="replace")
596
597                    if 200 <= response.code < 300:
598                        logger.debug(
599                            "{%s} [%s] Got response headers: %d %s",
600                            request.txn_id,
601                            request.destination,
602                            response.code,
603                            response_phrase,
604                        )
605                        pass
606                    else:
607                        logger.info(
608                            "{%s} [%s] Got response headers: %d %s",
609                            request.txn_id,
610                            request.destination,
611                            response.code,
612                            response_phrase,
613                        )
614                        # :'(
615                        # Update transactions table?
616                        d = treq.content(response)
617                        d = timeout_deferred(
618                            d, timeout=_sec_timeout, reactor=self.reactor
619                        )
620
621                        try:
622                            body = await make_deferred_yieldable(d)
623                        except Exception as e:
624                            # Eh, we're already going to raise an exception so lets
625                            # ignore if this fails.
626                            logger.warning(
627                                "{%s} [%s] Failed to get error response: %s %s: %s",
628                                request.txn_id,
629                                request.destination,
630                                request.method,
631                                url_str,
632                                _flatten_response_never_received(e),
633                            )
634                            body = None
635
636                        exc = HttpResponseException(
637                            response.code, response_phrase, body
638                        )
639
640                        # Retry if the error is a 5xx or a 429 (Too Many
641                        # Requests), otherwise just raise a standard
642                        # `HttpResponseException`
643                        if 500 <= response.code < 600 or response.code == 429:
644                            raise RequestSendFailed(exc, can_retry=True) from exc
645                        else:
646                            raise exc
647
648                    break
649                except RequestSendFailed as e:
650                    logger.info(
651                        "{%s} [%s] Request failed: %s %s: %s",
652                        request.txn_id,
653                        request.destination,
654                        request.method,
655                        url_str,
656                        _flatten_response_never_received(e.inner_exception),
657                    )
658
659                    if not e.can_retry:
660                        raise
661
662                    if retries_left and not timeout:
663                        if long_retries:
664                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
665                            delay = min(delay, 60)
666                            delay *= random.uniform(0.8, 1.4)
667                        else:
668                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
669                            delay = min(delay, 2)
670                            delay *= random.uniform(0.8, 1.4)
671
672                        logger.debug(
673                            "{%s} [%s] Waiting %ss before re-sending...",
674                            request.txn_id,
675                            request.destination,
676                            delay,
677                        )
678
679                        await self.clock.sleep(delay)
680                        retries_left -= 1
681                    else:
682                        raise
683
684                except Exception as e:
685                    logger.warning(
686                        "{%s} [%s] Request failed: %s %s: %s",
687                        request.txn_id,
688                        request.destination,
689                        request.method,
690                        url_str,
691                        _flatten_response_never_received(e),
692                    )
693                    raise
694        return response
695
696    def build_auth_headers(
697        self,
698        destination: Optional[bytes],
699        method: bytes,
700        url_bytes: bytes,
701        content: Optional[JsonDict] = None,
702        destination_is: Optional[bytes] = None,
703    ) -> List[bytes]:
704        """
705        Builds the Authorization headers for a federation request
706        Args:
707            destination: The destination homeserver of the request.
708                May be None if the destination is an identity server, in which case
709                destination_is must be non-None.
710            method: The HTTP method of the request
711            url_bytes: The URI path of the request
712            content: The body of the request
713            destination_is: As 'destination', but if the destination is an
714                identity server
715
716        Returns:
717            A list of headers to be added as "Authorization:" headers
718        """
719        request: JsonDict = {
720            "method": method.decode("ascii"),
721            "uri": url_bytes.decode("ascii"),
722            "origin": self.server_name,
723        }
724
725        if destination is not None:
726            request["destination"] = destination.decode("ascii")
727
728        if destination_is is not None:
729            request["destination_is"] = destination_is.decode("ascii")
730
731        if content is not None:
732            request["content"] = content
733
734        request = sign_json(request, self.server_name, self.signing_key)
735
736        auth_headers = []
737
738        for key, sig in request["signatures"][self.server_name].items():
739            auth_headers.append(
740                (
741                    'X-Matrix origin=%s,key="%s",sig="%s"'
742                    % (self.server_name, key, sig)
743                ).encode("ascii")
744            )
745        return auth_headers
746
747    @overload
748    async def put_json(
749        self,
750        destination: str,
751        path: str,
752        args: Optional[QueryArgs] = None,
753        data: Optional[JsonDict] = None,
754        json_data_callback: Optional[Callable[[], JsonDict]] = None,
755        long_retries: bool = False,
756        timeout: Optional[int] = None,
757        ignore_backoff: bool = False,
758        backoff_on_404: bool = False,
759        try_trailing_slash_on_400: bool = False,
760        parser: Literal[None] = None,
761        max_response_size: Optional[int] = None,
762    ) -> Union[JsonDict, list]:
763        ...
764
765    @overload
766    async def put_json(
767        self,
768        destination: str,
769        path: str,
770        args: Optional[QueryArgs] = None,
771        data: Optional[JsonDict] = None,
772        json_data_callback: Optional[Callable[[], JsonDict]] = None,
773        long_retries: bool = False,
774        timeout: Optional[int] = None,
775        ignore_backoff: bool = False,
776        backoff_on_404: bool = False,
777        try_trailing_slash_on_400: bool = False,
778        parser: Optional[ByteParser[T]] = None,
779        max_response_size: Optional[int] = None,
780    ) -> T:
781        ...
782
783    async def put_json(
784        self,
785        destination: str,
786        path: str,
787        args: Optional[QueryArgs] = None,
788        data: Optional[JsonDict] = None,
789        json_data_callback: Optional[Callable[[], JsonDict]] = None,
790        long_retries: bool = False,
791        timeout: Optional[int] = None,
792        ignore_backoff: bool = False,
793        backoff_on_404: bool = False,
794        try_trailing_slash_on_400: bool = False,
795        parser: Optional[ByteParser] = None,
796        max_response_size: Optional[int] = None,
797    ):
798        """Sends the specified json data using PUT
799
800        Args:
801            destination: The remote server to send the HTTP request to.
802            path: The HTTP path.
803            args: query params
804            data: A dict containing the data that will be used as
805                the request body. This will be encoded as JSON.
806            json_data_callback: A callable returning the dict to
807                use as the request body.
808
809            long_retries: whether to use the long retry algorithm. See
810                docs on _send_request for details.
811
812            timeout: number of milliseconds to wait for the response.
813                self._default_timeout (60s) by default.
814
815                Note that we may make several attempts to send the request; this
816                timeout applies to the time spent waiting for response headers for
817                *each* attempt (including connection time) as well as the time spent
818                reading the response body after a 200 response.
819
820            ignore_backoff: true to ignore the historical backoff data
821                and try the request anyway.
822            backoff_on_404: True if we should count a 404 response as
823                a failure of the server (and should therefore back off future
824                requests).
825            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
826                response we should try appending a trailing slash to the end
827                of the request. Workaround for #3622 in Synapse <= v0.99.3. This
828                will be attempted before backing off if backing off has been
829                enabled.
830            parser: The parser to use to decode the response. Defaults to
831                parsing as JSON.
832            max_response_size: The maximum size to read from the response, if None
833                uses the default.
834
835        Returns:
836            Succeeds when we get a 2xx HTTP response. The
837            result will be the decoded JSON body.
838
839        Raises:
840            HttpResponseException: If we get an HTTP response code >= 300
841                (except 429).
842            NotRetryingDestination: If we are not yet ready to retry this
843                server.
844            FederationDeniedError: If this destination  is not on our
845                federation whitelist
846            RequestSendFailed: If there were problems connecting to the
847                remote, due to e.g. DNS failures, connection timeouts etc.
848        """
849        request = MatrixFederationRequest(
850            method="PUT",
851            destination=destination,
852            path=path,
853            query=args,
854            json_callback=json_data_callback,
855            json=data,
856        )
857
858        start_ms = self.clock.time_msec()
859
860        response = await self._send_request_with_optional_trailing_slash(
861            request,
862            try_trailing_slash_on_400,
863            backoff_on_404=backoff_on_404,
864            ignore_backoff=ignore_backoff,
865            long_retries=long_retries,
866            timeout=timeout,
867        )
868
869        if timeout is not None:
870            _sec_timeout = timeout / 1000
871        else:
872            _sec_timeout = self.default_timeout
873
874        if parser is None:
875            parser = JsonParser()
876
877        body = await _handle_response(
878            self.reactor,
879            _sec_timeout,
880            request,
881            response,
882            start_ms,
883            parser=parser,
884            max_response_size=max_response_size,
885        )
886
887        return body
888
889    async def post_json(
890        self,
891        destination: str,
892        path: str,
893        data: Optional[JsonDict] = None,
894        long_retries: bool = False,
895        timeout: Optional[int] = None,
896        ignore_backoff: bool = False,
897        args: Optional[QueryArgs] = None,
898    ) -> Union[JsonDict, list]:
899        """Sends the specified json data using POST
900
901        Args:
902            destination: The remote server to send the HTTP request to.
903
904            path: The HTTP path.
905
906            data: A dict containing the data that will be used as
907                the request body. This will be encoded as JSON.
908
909            long_retries: whether to use the long retry algorithm. See
910                docs on _send_request for details.
911
912            timeout: number of milliseconds to wait for the response.
913                self._default_timeout (60s) by default.
914
915                Note that we may make several attempts to send the request; this
916                timeout applies to the time spent waiting for response headers for
917                *each* attempt (including connection time) as well as the time spent
918                reading the response body after a 200 response.
919
920            ignore_backoff: true to ignore the historical backoff data and
921                try the request anyway.
922
923            args: query params
924        Returns:
925            dict|list: Succeeds when we get a 2xx HTTP response. The
926            result will be the decoded JSON body.
927
928        Raises:
929            HttpResponseException: If we get an HTTP response code >= 300
930                (except 429).
931            NotRetryingDestination: If we are not yet ready to retry this
932                server.
933            FederationDeniedError: If this destination  is not on our
934                federation whitelist
935            RequestSendFailed: If there were problems connecting to the
936                remote, due to e.g. DNS failures, connection timeouts etc.
937        """
938
939        request = MatrixFederationRequest(
940            method="POST", destination=destination, path=path, query=args, json=data
941        )
942
943        start_ms = self.clock.time_msec()
944
945        response = await self._send_request(
946            request,
947            long_retries=long_retries,
948            timeout=timeout,
949            ignore_backoff=ignore_backoff,
950        )
951
952        if timeout:
953            _sec_timeout = timeout / 1000
954        else:
955            _sec_timeout = self.default_timeout
956
957        body = await _handle_response(
958            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
959        )
960        return body
961
962    async def get_json(
963        self,
964        destination: str,
965        path: str,
966        args: Optional[QueryArgs] = None,
967        retry_on_dns_fail: bool = True,
968        timeout: Optional[int] = None,
969        ignore_backoff: bool = False,
970        try_trailing_slash_on_400: bool = False,
971    ) -> Union[JsonDict, list]:
972        """GETs some json from the given host homeserver and path
973
974        Args:
975            destination: The remote server to send the HTTP request to.
976
977            path: The HTTP path.
978
979            args: A dictionary used to create query strings, defaults to
980                None.
981
982            timeout: number of milliseconds to wait for the response.
983                self._default_timeout (60s) by default.
984
985                Note that we may make several attempts to send the request; this
986                timeout applies to the time spent waiting for response headers for
987                *each* attempt (including connection time) as well as the time spent
988                reading the response body after a 200 response.
989
990            ignore_backoff: true to ignore the historical backoff data
991                and try the request anyway.
992
993            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
994                response we should try appending a trailing slash to the end of
995                the request. Workaround for #3622 in Synapse <= v0.99.3.
996        Returns:
997            Succeeds when we get a 2xx HTTP response. The
998            result will be the decoded JSON body.
999
1000        Raises:
1001            HttpResponseException: If we get an HTTP response code >= 300
1002                (except 429).
1003            NotRetryingDestination: If we are not yet ready to retry this
1004                server.
1005            FederationDeniedError: If this destination  is not on our
1006                federation whitelist
1007            RequestSendFailed: If there were problems connecting to the
1008                remote, due to e.g. DNS failures, connection timeouts etc.
1009        """
1010        request = MatrixFederationRequest(
1011            method="GET", destination=destination, path=path, query=args
1012        )
1013
1014        start_ms = self.clock.time_msec()
1015
1016        response = await self._send_request_with_optional_trailing_slash(
1017            request,
1018            try_trailing_slash_on_400,
1019            backoff_on_404=False,
1020            ignore_backoff=ignore_backoff,
1021            retry_on_dns_fail=retry_on_dns_fail,
1022            timeout=timeout,
1023        )
1024
1025        if timeout is not None:
1026            _sec_timeout = timeout / 1000
1027        else:
1028            _sec_timeout = self.default_timeout
1029
1030        body = await _handle_response(
1031            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
1032        )
1033
1034        return body
1035
1036    async def delete_json(
1037        self,
1038        destination: str,
1039        path: str,
1040        long_retries: bool = False,
1041        timeout: Optional[int] = None,
1042        ignore_backoff: bool = False,
1043        args: Optional[QueryArgs] = None,
1044    ) -> Union[JsonDict, list]:
1045        """Send a DELETE request to the remote expecting some json response
1046
1047        Args:
1048            destination: The remote server to send the HTTP request to.
1049            path: The HTTP path.
1050
1051            long_retries: whether to use the long retry algorithm. See
1052                docs on _send_request for details.
1053
1054            timeout: number of milliseconds to wait for the response.
1055                self._default_timeout (60s) by default.
1056
1057                Note that we may make several attempts to send the request; this
1058                timeout applies to the time spent waiting for response headers for
1059                *each* attempt (including connection time) as well as the time spent
1060                reading the response body after a 200 response.
1061
1062            ignore_backoff: true to ignore the historical backoff data and
1063                try the request anyway.
1064
1065            args: query params
1066        Returns:
1067            Succeeds when we get a 2xx HTTP response. The
1068            result will be the decoded JSON body.
1069
1070        Raises:
1071            HttpResponseException: If we get an HTTP response code >= 300
1072                (except 429).
1073            NotRetryingDestination: If we are not yet ready to retry this
1074                server.
1075            FederationDeniedError: If this destination  is not on our
1076                federation whitelist
1077            RequestSendFailed: If there were problems connecting to the
1078                remote, due to e.g. DNS failures, connection timeouts etc.
1079        """
1080        request = MatrixFederationRequest(
1081            method="DELETE", destination=destination, path=path, query=args
1082        )
1083
1084        start_ms = self.clock.time_msec()
1085
1086        response = await self._send_request(
1087            request,
1088            long_retries=long_retries,
1089            timeout=timeout,
1090            ignore_backoff=ignore_backoff,
1091        )
1092
1093        if timeout is not None:
1094            _sec_timeout = timeout / 1000
1095        else:
1096            _sec_timeout = self.default_timeout
1097
1098        body = await _handle_response(
1099            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
1100        )
1101        return body
1102
1103    async def get_file(
1104        self,
1105        destination: str,
1106        path: str,
1107        output_stream,
1108        args: Optional[QueryArgs] = None,
1109        retry_on_dns_fail: bool = True,
1110        max_size: Optional[int] = None,
1111        ignore_backoff: bool = False,
1112    ) -> Tuple[int, Dict[bytes, List[bytes]]]:
1113        """GETs a file from a given homeserver
1114        Args:
1115            destination: The remote server to send the HTTP request to.
1116            path: The HTTP path to GET.
1117            output_stream: File to write the response body to.
1118            args: Optional dictionary used to create the query string.
1119            ignore_backoff: true to ignore the historical backoff data
1120                and try the request anyway.
1121
1122        Returns:
1123            Resolves with an (int,dict) tuple of
1124            the file length and a dict of the response headers.
1125
1126        Raises:
1127            HttpResponseException: If we get an HTTP response code >= 300
1128                (except 429).
1129            NotRetryingDestination: If we are not yet ready to retry this
1130                server.
1131            FederationDeniedError: If this destination  is not on our
1132                federation whitelist
1133            RequestSendFailed: If there were problems connecting to the
1134                remote, due to e.g. DNS failures, connection timeouts etc.
1135        """
1136        request = MatrixFederationRequest(
1137            method="GET", destination=destination, path=path, query=args
1138        )
1139
1140        response = await self._send_request(
1141            request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
1142        )
1143
1144        headers = dict(response.headers.getAllRawHeaders())
1145
1146        try:
1147            d = read_body_with_max_size(response, output_stream, max_size)
1148            d.addTimeout(self.default_timeout, self.reactor)
1149            length = await make_deferred_yieldable(d)
1150        except BodyExceededMaxSize:
1151            msg = "Requested file is too large > %r bytes" % (max_size,)
1152            logger.warning(
1153                "{%s} [%s] %s",
1154                request.txn_id,
1155                request.destination,
1156                msg,
1157            )
1158            raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
1159        except defer.TimeoutError as e:
1160            logger.warning(
1161                "{%s} [%s] Timed out reading response - %s %s",
1162                request.txn_id,
1163                request.destination,
1164                request.method,
1165                request.uri.decode("ascii"),
1166            )
1167            raise RequestSendFailed(e, can_retry=True) from e
1168        except ResponseFailed as e:
1169            logger.warning(
1170                "{%s} [%s] Failed to read response - %s %s",
1171                request.txn_id,
1172                request.destination,
1173                request.method,
1174                request.uri.decode("ascii"),
1175            )
1176            raise RequestSendFailed(e, can_retry=True) from e
1177        except Exception as e:
1178            logger.warning(
1179                "{%s} [%s] Error reading response: %s",
1180                request.txn_id,
1181                request.destination,
1182                e,
1183            )
1184            raise
1185        logger.info(
1186            "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
1187            request.txn_id,
1188            request.destination,
1189            response.code,
1190            response.phrase.decode("ascii", errors="replace"),
1191            length,
1192            request.method,
1193            request.uri.decode("ascii"),
1194        )
1195        return length, headers
1196
1197
1198def _flatten_response_never_received(e):
1199    if hasattr(e, "reasons"):
1200        reasons = ", ".join(
1201            _flatten_response_never_received(f.value) for f in e.reasons
1202        )
1203
1204        return "%s:[%s]" % (type(e).__name__, reasons)
1205    else:
1206        return repr(e)
1207
1208
1209def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
1210    """
1211    Check that a set of HTTP headers have a Content-Type header, and that it
1212    is the expected value..
1213
1214    Args:
1215        headers: headers to check
1216
1217    Raises:
1218        RequestSendFailed: if the Content-Type header is missing or doesn't match
1219
1220    """
1221    content_type_headers = headers.getRawHeaders(b"Content-Type")
1222    if content_type_headers is None:
1223        raise RequestSendFailed(
1224            RuntimeError("No Content-Type header received from remote server"),
1225            can_retry=False,
1226        )
1227
1228    c_type = content_type_headers[0].decode("ascii")  # only the first header
1229    val, options = cgi.parse_header(c_type)
1230    if val != expected_content_type:
1231        raise RequestSendFailed(
1232            RuntimeError(
1233                f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
1234            ),
1235            can_retry=False,
1236        )
1237