1# Copyright 2014-2016 OpenMarket Ltd 2# Copyright 2019 New Vector Ltd 3# Copyright 2021 The Matrix.org Foundation C.I.C. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import datetime 17import logging 18from types import TracebackType 19from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type 20 21import attr 22from prometheus_client import Counter 23 24from synapse.api.errors import ( 25 FederationDeniedError, 26 HttpResponseException, 27 RequestSendFailed, 28) 29from synapse.api.presence import UserPresenceState 30from synapse.events import EventBase 31from synapse.federation.units import Edu 32from synapse.handlers.presence import format_user_presence_state 33from synapse.logging import issue9533_logger 34from synapse.logging.opentracing import SynapseTags, set_tag 35from synapse.metrics import sent_transactions_counter 36from synapse.metrics.background_process_metrics import run_as_background_process 37from synapse.types import ReadReceipt 38from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter 39 40if TYPE_CHECKING: 41 import synapse.server 42 43# This is defined in the Matrix spec and enforced by the receiver. 44MAX_EDUS_PER_TRANSACTION = 100 45 46logger = logging.getLogger(__name__) 47 48 49sent_edus_counter = Counter( 50 "synapse_federation_client_sent_edus", "Total number of EDUs successfully sent" 51) 52 53sent_edus_by_type = Counter( 54 "synapse_federation_client_sent_edus_by_type", 55 "Number of sent EDUs successfully sent, by event type", 56 ["type"], 57) 58 59 60class PerDestinationQueue: 61 """ 62 Manages the per-destination transmission queues. 63 64 Args: 65 hs 66 transaction_sender 67 destination: the server_name of the destination that we are managing 68 transmission for. 69 """ 70 71 def __init__( 72 self, 73 hs: "synapse.server.HomeServer", 74 transaction_manager: "synapse.federation.sender.TransactionManager", 75 destination: str, 76 ): 77 self._server_name = hs.hostname 78 self._clock = hs.get_clock() 79 self._store = hs.get_datastore() 80 self._transaction_manager = transaction_manager 81 self._instance_name = hs.get_instance_name() 82 self._federation_shard_config = hs.config.worker.federation_shard_config 83 self._state = hs.get_state_handler() 84 85 self._should_send_on_this_instance = True 86 if not self._federation_shard_config.should_handle( 87 self._instance_name, destination 88 ): 89 # We don't raise an exception here to avoid taking out any other 90 # processing. We have a guard in `attempt_new_transaction` that 91 # ensure we don't start sending stuff. 92 logger.error( 93 "Create a per destination queue for %s on wrong worker", 94 destination, 95 ) 96 self._should_send_on_this_instance = False 97 98 self._destination = destination 99 self.transmission_loop_running = False 100 101 # Flag to signal to any running transmission loop that there is new data 102 # queued up to be sent. 103 self._new_data_to_send = False 104 105 # True whilst we are sending events that the remote homeserver missed 106 # because it was unreachable. We start in this state so we can perform 107 # catch-up at startup. 108 # New events will only be sent once this is finished, at which point 109 # _catching_up is flipped to False. 110 self._catching_up: bool = True 111 112 # The stream_ordering of the most recent PDU that was discarded due to 113 # being in catch-up mode. 114 self._catchup_last_skipped: int = 0 115 116 # Cache of the last successfully-transmitted stream ordering for this 117 # destination (we are the only updater so this is safe) 118 self._last_successful_stream_ordering: Optional[int] = None 119 120 # a queue of pending PDUs 121 self._pending_pdus: List[EventBase] = [] 122 123 # XXX this is never actually used: see 124 # https://github.com/matrix-org/synapse/issues/7549 125 self._pending_edus: List[Edu] = [] 126 127 # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered 128 # based on their key (e.g. typing events by room_id) 129 # Map of (edu_type, key) -> Edu 130 self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {} 131 132 # Map of user_id -> UserPresenceState of pending presence to be sent to this 133 # destination 134 self._pending_presence: Dict[str, UserPresenceState] = {} 135 136 # room_id -> receipt_type -> user_id -> receipt_dict 137 self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} 138 self._rrs_pending_flush = False 139 140 # stream_id of last successfully sent to-device message. 141 # NB: may be a long or an int. 142 self._last_device_stream_id = 0 143 144 # stream_id of last successfully sent device list update. 145 self._last_device_list_stream_id = 0 146 147 def __str__(self) -> str: 148 return "PerDestinationQueue[%s]" % self._destination 149 150 def pending_pdu_count(self) -> int: 151 return len(self._pending_pdus) 152 153 def pending_edu_count(self) -> int: 154 return ( 155 len(self._pending_edus) 156 + len(self._pending_presence) 157 + len(self._pending_edus_keyed) 158 ) 159 160 def send_pdu(self, pdu: EventBase) -> None: 161 """Add a PDU to the queue, and start the transmission loop if necessary 162 163 Args: 164 pdu: pdu to send 165 """ 166 if not self._catching_up or self._last_successful_stream_ordering is None: 167 # only enqueue the PDU if we are not catching up (False) or do not 168 # yet know if we have anything to catch up (None) 169 self._pending_pdus.append(pdu) 170 else: 171 assert pdu.internal_metadata.stream_ordering 172 self._catchup_last_skipped = pdu.internal_metadata.stream_ordering 173 174 self.attempt_new_transaction() 175 176 def send_presence( 177 self, states: Iterable[UserPresenceState], start_loop: bool = True 178 ) -> None: 179 """Add presence updates to the queue. 180 181 Args: 182 states: Presence updates to send 183 start_loop: Whether to start the transmission loop if not already 184 running. 185 186 Args: 187 states: presence to send 188 """ 189 self._pending_presence.update({state.user_id: state for state in states}) 190 self._new_data_to_send = True 191 192 if start_loop: 193 self.attempt_new_transaction() 194 195 def queue_read_receipt(self, receipt: ReadReceipt) -> None: 196 """Add a RR to the list to be sent. Doesn't start the transmission loop yet 197 (see flush_read_receipts_for_room) 198 199 Args: 200 receipt: receipt to be queued 201 """ 202 self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( 203 receipt.receipt_type, {} 204 )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} 205 206 def flush_read_receipts_for_room(self, room_id: str) -> None: 207 # if we don't have any read-receipts for this room, it may be that we've already 208 # sent them out, so we don't need to flush. 209 if room_id not in self._pending_rrs: 210 return 211 self._rrs_pending_flush = True 212 self.attempt_new_transaction() 213 214 def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: 215 self._pending_edus_keyed[(edu.edu_type, key)] = edu 216 self.attempt_new_transaction() 217 218 def send_edu(self, edu: Edu) -> None: 219 self._pending_edus.append(edu) 220 self.attempt_new_transaction() 221 222 def attempt_new_transaction(self) -> None: 223 """Try to start a new transaction to this destination 224 225 If there is already a transaction in progress to this destination, 226 returns immediately. Otherwise kicks off the process of sending a 227 transaction in the background. 228 """ 229 230 # Mark that we (may) have new things to send, so that any running 231 # transmission loop will recheck whether there is stuff to send. 232 self._new_data_to_send = True 233 234 if self.transmission_loop_running: 235 # XXX: this can get stuck on by a never-ending 236 # request at which point pending_pdus just keeps growing. 237 # we need application-layer timeouts of some flavour of these 238 # requests 239 logger.debug("TX [%s] Transaction already in progress", self._destination) 240 return 241 242 if not self._should_send_on_this_instance: 243 # We don't raise an exception here to avoid taking out any other 244 # processing. 245 logger.error( 246 "Trying to start a transaction to %s on wrong worker", self._destination 247 ) 248 return 249 250 logger.debug("TX [%s] Starting transaction loop", self._destination) 251 252 run_as_background_process( 253 "federation_transaction_transmission_loop", 254 self._transaction_transmission_loop, 255 ) 256 257 async def _transaction_transmission_loop(self) -> None: 258 pending_pdus: List[EventBase] = [] 259 try: 260 self.transmission_loop_running = True 261 262 # This will throw if we wouldn't retry. We do this here so we fail 263 # quickly, but we will later check this again in the http client, 264 # hence why we throw the result away. 265 await get_retry_limiter(self._destination, self._clock, self._store) 266 267 if self._catching_up: 268 # we potentially need to catch-up first 269 await self._catch_up_transmission_loop() 270 if self._catching_up: 271 # not caught up yet 272 return 273 274 pending_pdus = [] 275 while True: 276 self._new_data_to_send = False 277 278 async with _TransactionQueueManager(self) as ( 279 pending_pdus, 280 pending_edus, 281 ): 282 if not pending_pdus and not pending_edus: 283 logger.debug("TX [%s] Nothing to send", self._destination) 284 285 # If we've gotten told about new things to send during 286 # checking for things to send, we try looking again. 287 # Otherwise new PDUs or EDUs might arrive in the meantime, 288 # but not get sent because we hold the 289 # `transmission_loop_running` flag. 290 if self._new_data_to_send: 291 continue 292 else: 293 return 294 295 if pending_pdus: 296 logger.debug( 297 "TX [%s] len(pending_pdus_by_dest[dest]) = %d", 298 self._destination, 299 len(pending_pdus), 300 ) 301 302 await self._transaction_manager.send_new_transaction( 303 self._destination, pending_pdus, pending_edus 304 ) 305 306 sent_transactions_counter.inc() 307 sent_edus_counter.inc(len(pending_edus)) 308 for edu in pending_edus: 309 sent_edus_by_type.labels(edu.edu_type).inc() 310 311 except NotRetryingDestination as e: 312 logger.debug( 313 "TX [%s] not ready for retry yet (next retry at %s) - " 314 "dropping transaction for now", 315 self._destination, 316 datetime.datetime.fromtimestamp( 317 (e.retry_last_ts + e.retry_interval) / 1000.0 318 ), 319 ) 320 321 if e.retry_interval > 60 * 60 * 1000: 322 # we won't retry for another hour! 323 # (this suggests a significant outage) 324 # We drop pending EDUs because otherwise they will 325 # rack up indefinitely. 326 # (Dropping PDUs is already performed by `_start_catching_up`.) 327 # Note that: 328 # - the EDUs that are being dropped here are those that we can 329 # afford to drop (specifically, only typing notifications, 330 # read receipts and presence updates are being dropped here) 331 # - Other EDUs such as to_device messages are queued with a 332 # different mechanism 333 # - this is all volatile state that would be lost if the 334 # federation sender restarted anyway 335 336 # dropping read receipts is a bit sad but should be solved 337 # through another mechanism, because this is all volatile! 338 self._pending_edus = [] 339 self._pending_edus_keyed = {} 340 self._pending_presence = {} 341 self._pending_rrs = {} 342 343 self._start_catching_up() 344 except FederationDeniedError as e: 345 logger.info(e) 346 except HttpResponseException as e: 347 logger.warning( 348 "TX [%s] Received %d response to transaction: %s", 349 self._destination, 350 e.code, 351 e, 352 ) 353 354 except RequestSendFailed as e: 355 logger.warning( 356 "TX [%s] Failed to send transaction: %s", self._destination, e 357 ) 358 359 for p in pending_pdus: 360 logger.info( 361 "Failed to send event %s to %s", p.event_id, self._destination 362 ) 363 except Exception: 364 logger.exception("TX [%s] Failed to send transaction", self._destination) 365 for p in pending_pdus: 366 logger.info( 367 "Failed to send event %s to %s", p.event_id, self._destination 368 ) 369 finally: 370 # We want to be *very* sure we clear this after we stop processing 371 self.transmission_loop_running = False 372 373 async def _catch_up_transmission_loop(self) -> None: 374 first_catch_up_check = self._last_successful_stream_ordering is None 375 376 if first_catch_up_check: 377 # first catchup so get last_successful_stream_ordering from database 378 self._last_successful_stream_ordering = ( 379 await self._store.get_destination_last_successful_stream_ordering( 380 self._destination 381 ) 382 ) 383 384 if self._last_successful_stream_ordering is None: 385 # if it's still None, then this means we don't have the information 386 # in our database we haven't successfully sent a PDU to this server 387 # (at least since the introduction of the feature tracking 388 # last_successful_stream_ordering). 389 # Sadly, this means we can't do anything here as we don't know what 390 # needs catching up — so catching up is futile; let's stop. 391 self._catching_up = False 392 return 393 394 # get at most 50 catchup room/PDUs 395 while True: 396 event_ids = await self._store.get_catch_up_room_event_ids( 397 self._destination, 398 self._last_successful_stream_ordering, 399 ) 400 401 if not event_ids: 402 # No more events to catch up on, but we can't ignore the chance 403 # of a race condition, so we check that no new events have been 404 # skipped due to us being in catch-up mode 405 406 if self._catchup_last_skipped > self._last_successful_stream_ordering: 407 # another event has been skipped because we were in catch-up mode 408 continue 409 410 # we are done catching up! 411 self._catching_up = False 412 break 413 414 if first_catch_up_check: 415 # as this is our check for needing catch-up, we may have PDUs in 416 # the queue from before we *knew* we had to do catch-up, so 417 # clear those out now. 418 self._start_catching_up() 419 420 # fetch the relevant events from the event store 421 # - redacted behaviour of REDACT is fine, since we only send metadata 422 # of redacted events to the destination. 423 # - don't need to worry about rejected events as we do not actively 424 # forward received events over federation. 425 catchup_pdus = await self._store.get_events_as_list(event_ids) 426 if not catchup_pdus: 427 raise AssertionError( 428 "No events retrieved when we asked for %r. " 429 "This should not happen." % event_ids 430 ) 431 432 # We send transactions with events from one room only, as its likely 433 # that the remote will have to do additional processing, which may 434 # take some time. It's better to give it small amounts of work 435 # rather than risk the request timing out and repeatedly being 436 # retried, and not making any progress. 437 # 438 # Note: `catchup_pdus` will have exactly one PDU per room. 439 for pdu in catchup_pdus: 440 # The PDU from the DB will be the last PDU in the room from 441 # *this server* that wasn't sent to the remote. However, other 442 # servers may have sent lots of events since then, and we want 443 # to try and tell the remote only about the *latest* events in 444 # the room. This is so that it doesn't get inundated by events 445 # from various parts of the DAG, which all need to be processed. 446 # 447 # Note: this does mean that in large rooms a server coming back 448 # online will get sent the same events from all the different 449 # servers, but the remote will correctly deduplicate them and 450 # handle it only once. 451 452 # Step 1, fetch the current extremities 453 extrems = await self._store.get_prev_events_for_room(pdu.room_id) 454 455 if pdu.event_id in extrems: 456 # If the event is in the extremities, then great! We can just 457 # use that without having to do further checks. 458 room_catchup_pdus = [pdu] 459 else: 460 # If not, fetch the extremities and figure out which we can 461 # send. 462 extrem_events = await self._store.get_events_as_list(extrems) 463 464 new_pdus = [] 465 for p in extrem_events: 466 # We pulled this from the DB, so it'll be non-null 467 assert p.internal_metadata.stream_ordering 468 469 # Filter out events that happened before the remote went 470 # offline 471 if ( 472 p.internal_metadata.stream_ordering 473 < self._last_successful_stream_ordering 474 ): 475 continue 476 477 # Filter out events where the server is not in the room, 478 # e.g. it may have left/been kicked. *Ideally* we'd pull 479 # out the kick and send that, but it's a rare edge case 480 # so we don't bother for now (the server that sent the 481 # kick should send it out if its online). 482 hosts = await self._state.get_hosts_in_room_at_events( 483 p.room_id, [p.event_id] 484 ) 485 if self._destination not in hosts: 486 continue 487 488 new_pdus.append(p) 489 490 # If we've filtered out all the extremities, fall back to 491 # sending the original event. This should ensure that the 492 # server gets at least some of missed events (especially if 493 # the other sending servers are up). 494 if new_pdus: 495 room_catchup_pdus = new_pdus 496 else: 497 room_catchup_pdus = [pdu] 498 499 logger.info( 500 "Catching up rooms to %s: %r", self._destination, pdu.room_id 501 ) 502 503 await self._transaction_manager.send_new_transaction( 504 self._destination, room_catchup_pdus, [] 505 ) 506 507 sent_transactions_counter.inc() 508 509 # We pulled this from the DB, so it'll be non-null 510 assert pdu.internal_metadata.stream_ordering 511 512 # Note that we mark the last successful stream ordering as that 513 # from the *original* PDU, rather than the PDU(s) we actually 514 # send. This is because we use it to mark our position in the 515 # queue of missed PDUs to process. 516 self._last_successful_stream_ordering = ( 517 pdu.internal_metadata.stream_ordering 518 ) 519 520 await self._store.set_destination_last_successful_stream_ordering( 521 self._destination, self._last_successful_stream_ordering 522 ) 523 524 def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: 525 if not self._pending_rrs: 526 return 527 if not force_flush and not self._rrs_pending_flush: 528 # not yet time for this lot 529 return 530 531 edu = Edu( 532 origin=self._server_name, 533 destination=self._destination, 534 edu_type="m.receipt", 535 content=self._pending_rrs, 536 ) 537 self._pending_rrs = {} 538 self._rrs_pending_flush = False 539 yield edu 540 541 def _pop_pending_edus(self, limit: int) -> List[Edu]: 542 pending_edus = self._pending_edus 543 pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:] 544 return pending_edus 545 546 async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: 547 last_device_list = self._last_device_list_stream_id 548 549 # Retrieve list of new device updates to send to the destination 550 now_stream_id, results = await self._store.get_device_updates_by_remote( 551 self._destination, last_device_list, limit=limit 552 ) 553 edus = [ 554 Edu( 555 origin=self._server_name, 556 destination=self._destination, 557 edu_type=edu_type, 558 content=content, 559 ) 560 for (edu_type, content) in results 561 ] 562 563 assert len(edus) <= limit, "get_device_updates_by_remote returned too many EDUs" 564 565 return edus, now_stream_id 566 567 async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: 568 last_device_stream_id = self._last_device_stream_id 569 to_device_stream_id = self._store.get_to_device_stream_token() 570 contents, stream_id = await self._store.get_new_device_msgs_for_remote( 571 self._destination, last_device_stream_id, to_device_stream_id, limit 572 ) 573 for content in contents: 574 message_id = content.get("message_id") 575 if not message_id: 576 continue 577 578 set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) 579 580 edus = [ 581 Edu( 582 origin=self._server_name, 583 destination=self._destination, 584 edu_type="m.direct_to_device", 585 content=content, 586 ) 587 for content in contents 588 ] 589 590 if edus: 591 issue9533_logger.debug( 592 "Sending %i to-device messages to %s, up to stream id %i", 593 len(edus), 594 self._destination, 595 stream_id, 596 ) 597 598 return edus, stream_id 599 600 def _start_catching_up(self) -> None: 601 """ 602 Marks this destination as being in catch-up mode. 603 604 This throws away the PDU queue. 605 """ 606 self._catching_up = True 607 self._pending_pdus = [] 608 609 610@attr.s(slots=True) 611class _TransactionQueueManager: 612 """A helper async context manager for pulling stuff off the queues and 613 tracking what was last successfully sent, etc. 614 """ 615 616 queue = attr.ib(type=PerDestinationQueue) 617 618 _device_stream_id = attr.ib(type=Optional[int], default=None) 619 _device_list_id = attr.ib(type=Optional[int], default=None) 620 _last_stream_ordering = attr.ib(type=Optional[int], default=None) 621 _pdus = attr.ib(type=List[EventBase], factory=list) 622 623 async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: 624 # First we calculate the EDUs we want to send, if any. 625 626 # We start by fetching device related EDUs, i.e device updates and to 627 # device messages. We have to keep 2 free slots for presence and rr_edus. 628 limit = MAX_EDUS_PER_TRANSACTION - 2 629 630 device_update_edus, dev_list_id = await self.queue._get_device_update_edus( 631 limit 632 ) 633 634 if device_update_edus: 635 self._device_list_id = dev_list_id 636 else: 637 self.queue._last_device_list_stream_id = dev_list_id 638 639 limit -= len(device_update_edus) 640 641 ( 642 to_device_edus, 643 device_stream_id, 644 ) = await self.queue._get_to_device_message_edus(limit) 645 646 if to_device_edus: 647 self._device_stream_id = device_stream_id 648 else: 649 self.queue._last_device_stream_id = device_stream_id 650 651 pending_edus = device_update_edus + to_device_edus 652 653 # Now add the read receipt EDU. 654 pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) 655 656 # And presence EDU. 657 if self.queue._pending_presence: 658 pending_edus.append( 659 Edu( 660 origin=self.queue._server_name, 661 destination=self.queue._destination, 662 edu_type="m.presence", 663 content={ 664 "push": [ 665 format_user_presence_state( 666 presence, self.queue._clock.time_msec() 667 ) 668 for presence in self.queue._pending_presence.values() 669 ] 670 }, 671 ) 672 ) 673 self.queue._pending_presence = {} 674 675 # Finally add any other types of EDUs if there is room. 676 pending_edus.extend( 677 self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) 678 ) 679 while ( 680 len(pending_edus) < MAX_EDUS_PER_TRANSACTION 681 and self.queue._pending_edus_keyed 682 ): 683 _, val = self.queue._pending_edus_keyed.popitem() 684 pending_edus.append(val) 685 686 # Now we look for any PDUs to send, by getting up to 50 PDUs from the 687 # queue 688 self._pdus = self.queue._pending_pdus[:50] 689 690 if not self._pdus and not pending_edus: 691 return [], [] 692 693 # if we've decided to send a transaction anyway, and we have room, we 694 # may as well send any pending RRs 695 if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: 696 pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) 697 698 if self._pdus: 699 self._last_stream_ordering = self._pdus[ 700 -1 701 ].internal_metadata.stream_ordering 702 assert self._last_stream_ordering 703 704 return self._pdus, pending_edus 705 706 async def __aexit__( 707 self, 708 exc_type: Optional[Type[BaseException]], 709 exc: Optional[BaseException], 710 tb: Optional[TracebackType], 711 ) -> None: 712 if exc_type is not None: 713 # Failed to send transaction, so we bail out. 714 return 715 716 # Successfully sent transactions, so we remove pending PDUs from the queue 717 if self._pdus: 718 self.queue._pending_pdus = self.queue._pending_pdus[len(self._pdus) :] 719 720 # Succeeded to send the transaction so we record where we have sent up 721 # to in the various streams 722 723 if self._device_stream_id: 724 await self.queue._store.delete_device_msgs_for_remote( 725 self.queue._destination, self._device_stream_id 726 ) 727 self.queue._last_device_stream_id = self._device_stream_id 728 729 # also mark the device updates as sent 730 if self._device_list_id: 731 logger.info( 732 "Marking as sent %r %r", self.queue._destination, self._device_list_id 733 ) 734 await self.queue._store.mark_as_sent_devices_by_remote( 735 self.queue._destination, self._device_list_id 736 ) 737 self.queue._last_device_list_stream_id = self._device_list_id 738 739 if self._last_stream_ordering: 740 # we sent some PDUs and it was successful, so update our 741 # last_successful_stream_ordering in the destinations table. 742 await self.queue._store.set_destination_last_successful_stream_ordering( 743 self.queue._destination, self._last_stream_ordering 744 ) 745