1 use codec::Codec; 2 use frame::{self, Reason, StreamId}; 3 4 use bytes::Buf; 5 use futures::{Async, Poll}; 6 use std::io; 7 use tokio_io::AsyncWrite; 8 9 /// Manages our sending of GOAWAY frames. 10 #[derive(Debug)] 11 pub(super) struct GoAway { 12 /// Whether the connection should close now, or wait until idle. 13 close_now: bool, 14 /// Records if we've sent any GOAWAY before. 15 going_away: Option<GoingAway>, 16 /// Whether the user started the GOAWAY by calling `abrupt_shutdown`. 17 is_user_initiated: bool, 18 /// A GOAWAY frame that must be buffered in the Codec immediately. 19 pending: Option<frame::GoAway>, 20 } 21 22 /// Keeps a memory of any GOAWAY frames we've sent before. 23 /// 24 /// This looks very similar to a `frame::GoAway`, but is a separate type. Why? 25 /// Mostly for documentation purposes. This type is to record status. If it 26 /// were a `frame::GoAway`, it might appear like we eventually wanted to 27 /// serialize it. We **only** want to be able to look up these fields at a 28 /// later time. 29 /// 30 /// (Technically, `frame::GoAway` should gain an opaque_debug_data field as 31 /// well, and we wouldn't want to save that here to accidentally dump in logs, 32 /// or waste struct space.) 33 #[derive(Debug)] 34 struct GoingAway { 35 /// Stores the highest stream ID of a GOAWAY that has been sent. 36 /// 37 /// It's illegal to send a subsequent GOAWAY with a higher ID. 38 last_processed_id: StreamId, 39 40 /// Records the error code of any GOAWAY frame sent. 41 reason: Reason, 42 } 43 44 impl GoAway { new() -> Self45 pub fn new() -> Self { 46 GoAway { 47 close_now: false, 48 going_away: None, 49 is_user_initiated: false, 50 pending: None, 51 } 52 } 53 54 /// Enqueue a GOAWAY frame to be written. 55 /// 56 /// The connection is expected to continue to run until idle. go_away(&mut self, f: frame::GoAway)57 pub fn go_away(&mut self, f: frame::GoAway) { 58 if let Some(ref going_away) = self.going_away { 59 assert!( 60 f.last_stream_id() <= going_away.last_processed_id, 61 "GOAWAY stream IDs shouldn't be higher; \ 62 last_processed_id = {:?}, f.last_stream_id() = {:?}", 63 going_away.last_processed_id, 64 f.last_stream_id(), 65 ); 66 } 67 68 self.going_away = Some(GoingAway { 69 last_processed_id: f.last_stream_id(), 70 reason: f.reason(), 71 }); 72 self.pending = Some(f); 73 } 74 go_away_now(&mut self, f: frame::GoAway)75 pub fn go_away_now(&mut self, f: frame::GoAway) { 76 self.close_now = true; 77 if let Some(ref going_away) = self.going_away { 78 // Prevent sending the same GOAWAY twice. 79 if going_away.last_processed_id == f.last_stream_id() 80 && going_away.reason == f.reason() { 81 return; 82 } 83 } 84 self.go_away(f); 85 } 86 go_away_from_user(&mut self, f: frame::GoAway)87 pub fn go_away_from_user(&mut self, f: frame::GoAway) { 88 self.is_user_initiated = true; 89 self.go_away_now(f); 90 } 91 92 /// Return if a GOAWAY has ever been scheduled. is_going_away(&self) -> bool93 pub fn is_going_away(&self) -> bool { 94 self.going_away.is_some() 95 } 96 is_user_initiated(&self) -> bool97 pub fn is_user_initiated(&self) -> bool { 98 self.is_user_initiated 99 } 100 101 /// Return the last Reason we've sent. going_away_reason(&self) -> Option<Reason>102 pub fn going_away_reason(&self) -> Option<Reason> { 103 self.going_away 104 .as_ref() 105 .map(|g| g.reason) 106 } 107 108 /// Returns if the connection should close now, or wait until idle. should_close_now(&self) -> bool109 pub fn should_close_now(&self) -> bool { 110 self.pending.is_none() && self.close_now 111 } 112 113 /// Returns if the connection should be closed when idle. should_close_on_idle(&self) -> bool114 pub fn should_close_on_idle(&self) -> bool { 115 !self.close_now && self.going_away 116 .as_ref() 117 .map(|g| g.last_processed_id != StreamId::MAX) 118 .unwrap_or(false) 119 } 120 121 /// Try to write a pending GOAWAY frame to the buffer. 122 /// 123 /// If a frame is written, the `Reason` of the GOAWAY is returned. send_pending_go_away<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<Option<Reason>, io::Error> where T: AsyncWrite, B: Buf,124 pub fn send_pending_go_away<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<Option<Reason>, io::Error> 125 where 126 T: AsyncWrite, 127 B: Buf, 128 { 129 if let Some(frame) = self.pending.take() { 130 if !dst.poll_ready()?.is_ready() { 131 self.pending = Some(frame); 132 return Ok(Async::NotReady); 133 } 134 135 let reason = frame.reason(); 136 dst.buffer(frame.into()) 137 .ok() 138 .expect("invalid GOAWAY frame"); 139 140 return Ok(Async::Ready(Some(reason))); 141 } else if self.should_close_now() { 142 return Ok(Async::Ready(self.going_away_reason())); 143 } 144 145 Ok(Async::Ready(None)) 146 } 147 } 148