1 use std::io;
2 use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
3 
4 use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
5 
6 use super::UdpSocket;
7 
8 use tokio_codec::{Decoder, Encoder};
9 use bytes::{BytesMut, BufMut};
10 
11 /// A unified `Stream` and `Sink` interface to an underlying `UdpSocket`, using
12 /// the `Encoder` and `Decoder` traits to encode and decode frames.
13 ///
14 /// Raw UDP sockets work with datagrams, but higher-level code usually wants to
15 /// batch these into meaningful chunks, called "frames". This method layers
16 /// framing on top of this socket by using the `Encoder` and `Decoder` traits to
17 /// handle encoding and decoding of messages frames. Note that the incoming and
18 /// outgoing frame types may be distinct.
19 ///
20 /// This function returns a *single* object that is both `Stream` and `Sink`;
21 /// grouping this into a single object is often useful for layering things which
22 /// require both read and write access to the underlying object.
23 ///
24 /// If you want to work more directly with the streams and sink, consider
25 /// calling `split` on the `UdpFramed` returned by this method, which will break
26 /// them into separate objects, allowing them to interact more easily.
27 #[must_use = "sinks do nothing unless polled"]
28 #[derive(Debug)]
29 pub struct UdpFramed<C> {
30     socket: UdpSocket,
31     codec: C,
32     rd: BytesMut,
33     wr: BytesMut,
34     out_addr: SocketAddr,
35     flushed: bool,
36 }
37 
38 impl<C: Decoder> Stream for UdpFramed<C> {
39     type Item = (C::Item, SocketAddr);
40     type Error = C::Error;
41 
poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error>42     fn poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error> {
43         self.rd.reserve(INITIAL_RD_CAPACITY);
44 
45         let (n, addr) = unsafe {
46             // Read into the buffer without having to initialize the memory.
47             let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
48             self.rd.advance_mut(n);
49             (n, addr)
50         };
51         trace!("received {} bytes, decoding", n);
52         let frame_res = self.codec.decode(&mut self.rd);
53         self.rd.clear();
54         let frame = frame_res?;
55         let result = frame.map(|frame| (frame, addr)); // frame -> (frame, addr)
56         trace!("frame decoded from buffer");
57         Ok(Async::Ready(result))
58     }
59 }
60 
61 impl<C: Encoder> Sink for UdpFramed<C> {
62     type SinkItem = (C::Item, SocketAddr);
63     type SinkError = C::Error;
64 
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError>65     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
66         trace!("sending frame");
67 
68         if !self.flushed {
69             match try!(self.poll_complete()) {
70                 Async::Ready(()) => {},
71                 Async::NotReady => return Ok(AsyncSink::NotReady(item)),
72             }
73         }
74 
75         let (frame, out_addr) = item;
76         self.codec.encode(frame, &mut self.wr)?;
77         self.out_addr = out_addr;
78         self.flushed = false;
79         trace!("frame encoded; length={}", self.wr.len());
80 
81         Ok(AsyncSink::Ready)
82     }
83 
poll_complete(&mut self) -> Poll<(), C::Error>84     fn poll_complete(&mut self) -> Poll<(), C::Error> {
85         if self.flushed {
86             return Ok(Async::Ready(()))
87         }
88 
89         trace!("flushing frame; length={}", self.wr.len());
90         let n = try_ready!(self.socket.poll_send_to(&self.wr, &self.out_addr));
91         trace!("written {}", n);
92 
93         let wrote_all = n == self.wr.len();
94         self.wr.clear();
95         self.flushed = true;
96 
97         if wrote_all {
98             Ok(Async::Ready(()))
99         } else {
100             Err(io::Error::new(io::ErrorKind::Other,
101                                "failed to write entire datagram to socket").into())
102         }
103     }
104 
close(&mut self) -> Poll<(), C::Error>105     fn close(&mut self) -> Poll<(), C::Error> {
106         try_ready!(self.poll_complete());
107         Ok(().into())
108     }
109 }
110 
111 const INITIAL_RD_CAPACITY: usize = 64 * 1024;
112 const INITIAL_WR_CAPACITY: usize = 8 * 1024;
113 
114 impl<C> UdpFramed<C> {
115     /// Create a new `UdpFramed` backed by the given socket and codec.
116     ///
117     /// See struct level documentation for more details.
new(socket: UdpSocket, codec: C) -> UdpFramed<C>118     pub fn new(socket: UdpSocket, codec: C) -> UdpFramed<C> {
119         UdpFramed {
120             socket: socket,
121             codec: codec,
122             out_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0)),
123             rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
124             wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
125             flushed: true,
126         }
127     }
128 
129     /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
130     ///
131     /// # Note
132     ///
133     /// Care should be taken to not tamper with the underlying stream of data
134     /// coming in as it may corrupt the stream of frames otherwise being worked
135     /// with.
get_ref(&self) -> &UdpSocket136     pub fn get_ref(&self) -> &UdpSocket {
137         &self.socket
138     }
139 
140     /// Returns a mutable reference to the underlying I/O stream wrapped by
141     /// `Framed`.
142     ///
143     /// # Note
144     ///
145     /// Care should be taken to not tamper with the underlying stream of data
146     /// coming in as it may corrupt the stream of frames otherwise being worked
147     /// with.
get_mut(&mut self) -> &mut UdpSocket148     pub fn get_mut(&mut self) -> &mut UdpSocket {
149         &mut self.socket
150     }
151 
152     /// Consumes the `Framed`, returning its underlying I/O stream.
into_inner(self) -> UdpSocket153     pub fn into_inner(self) -> UdpSocket {
154         self.socket
155     }
156 }
157