1 use std::io; 2 use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; 3 4 use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; 5 6 use super::UdpSocket; 7 8 use tokio_codec::{Decoder, Encoder}; 9 use bytes::{BytesMut, BufMut}; 10 11 /// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using 12 /// the `Encoder` and `Decoder` traits to encode and decode frames. 13 /// 14 /// Raw UDP sockets work with datagrams, but higher-level code usually wants to 15 /// batch these into meaningful chunks, called "frames". This method layers 16 /// framing on top of this socket by using the `Encoder` and `Decoder` traits to 17 /// handle encoding and decoding of messages frames. Note that the incoming and 18 /// outgoing frame types may be distinct. 19 /// 20 /// This function returns a *single* object that is both `Stream` and `Sink`; 21 /// grouping this into a single object is often useful for layering things which 22 /// require both read and write access to the underlying object. 23 /// 24 /// If you want to work more directly with the streams and sink, consider 25 /// calling `split` on the `UdpFramed` returned by this method, which will break 26 /// them into separate objects, allowing them to interact more easily. 27 #[must_use = "sinks do nothing unless polled"] 28 #[derive(Debug)] 29 pub struct UdpFramed<C> { 30 socket: UdpSocket, 31 codec: C, 32 rd: BytesMut, 33 wr: BytesMut, 34 out_addr: SocketAddr, 35 flushed: bool, 36 } 37 38 impl<C: Decoder> Stream for UdpFramed<C> { 39 type Item = (C::Item, SocketAddr); 40 type Error = C::Error; 41 poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error>42 fn poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error> { 43 self.rd.reserve(INITIAL_RD_CAPACITY); 44 45 let (n, addr) = unsafe { 46 // Read into the buffer without having to initialize the memory. 47 let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); 48 self.rd.advance_mut(n); 49 (n, addr) 50 }; 51 trace!("received {} bytes, decoding", n); 52 let frame_res = self.codec.decode(&mut self.rd); 53 self.rd.clear(); 54 let frame = frame_res?; 55 let result = frame.map(|frame| (frame, addr)); // frame -> (frame, addr) 56 trace!("frame decoded from buffer"); 57 Ok(Async::Ready(result)) 58 } 59 } 60 61 impl<C: Encoder> Sink for UdpFramed<C> { 62 type SinkItem = (C::Item, SocketAddr); 63 type SinkError = C::Error; 64 start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>65 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { 66 trace!("sending frame"); 67 68 if !self.flushed { 69 match try!(self.poll_complete()) { 70 Async::Ready(()) => {}, 71 Async::NotReady => return Ok(AsyncSink::NotReady(item)), 72 } 73 } 74 75 let (frame, out_addr) = item; 76 self.codec.encode(frame, &mut self.wr)?; 77 self.out_addr = out_addr; 78 self.flushed = false; 79 trace!("frame encoded; length={}", self.wr.len()); 80 81 Ok(AsyncSink::Ready) 82 } 83 poll_complete(&mut self) -> Poll<(), C::Error>84 fn poll_complete(&mut self) -> Poll<(), C::Error> { 85 if self.flushed { 86 return Ok(Async::Ready(())) 87 } 88 89 trace!("flushing frame; length={}", self.wr.len()); 90 let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr)); 91 trace!("written {}", n); 92 93 let wrote_all = n == self.wr.len(); 94 self.wr.clear(); 95 self.flushed = true; 96 97 if wrote_all { 98 Ok(Async::Ready(())) 99 } else { 100 Err(io::Error::new(io::ErrorKind::Other, 101 "failed to write entire datagram to socket").into()) 102 } 103 } 104 close(&mut self) -> Poll<(), C::Error>105 fn close(&mut self) -> Poll<(), C::Error> { 106 try_ready!(self.poll_complete()); 107 Ok(().into()) 108 } 109 } 110 111 const INITIAL_RD_CAPACITY: usize = 64 * 1024; 112 const INITIAL_WR_CAPACITY: usize = 8 * 1024; 113 114 impl<C> UdpFramed<C> { 115 /// Create a new `UdpFramed` backed by the given socket and codec. 116 /// 117 /// See struct level documentation for more details. new(socket: UdpSocket, codec: C) -> UdpFramed<C>118 pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> { 119 UdpFramed { 120 socket: socket, 121 codec: codec, 122 out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)), 123 rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), 124 wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), 125 flushed: true, 126 } 127 } 128 129 /// Returns a reference to the underlying I/O stream wrapped by `Framed`. 130 /// 131 /// # Note 132 /// 133 /// Care should be taken to not tamper with the underlying stream of data 134 /// coming in as it may corrupt the stream of frames otherwise being worked 135 /// with. get_ref(&self) -> &UdpSocket136 pub fn get_ref(&self) -> &UdpSocket { 137 &self.socket 138 } 139 140 /// Returns a mutable reference to the underlying I/O stream wrapped by 141 /// `Framed`. 142 /// 143 /// # Note 144 /// 145 /// Care should be taken to not tamper with the underlying stream of data 146 /// coming in as it may corrupt the stream of frames otherwise being worked 147 /// with. get_mut(&mut self) -> &mut UdpSocket148 pub fn get_mut(&mut self) -> &mut UdpSocket { 149 &mut self.socket 150 } 151 152 /// Consumes the `Framed`, returning its underlying I/O stream. into_inner(self) -> UdpSocket153 pub fn into_inner(self) -> UdpSocket { 154 self.socket 155 } 156 } 157