1 use super::{SendError, Sender, TrySendError, UnboundedSender};
2 use futures_core::task::{Context, Poll};
3 use futures_sink::Sink;
4 use std::pin::Pin;
5 
6 impl<T> Sink<T> for Sender<T> {
7     type Error = SendError;
8 
poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>9     fn poll_ready(
10         mut self: Pin<&mut Self>,
11         cx: &mut Context<'_>,
12     ) -> Poll<Result<(), Self::Error>> {
13         (*self).poll_ready(cx)
14     }
15 
start_send( mut self: Pin<&mut Self>, msg: T, ) -> Result<(), Self::Error>16     fn start_send(
17         mut self: Pin<&mut Self>,
18         msg: T,
19     ) -> Result<(), Self::Error> {
20         (*self).start_send(msg)
21     }
22 
poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>23     fn poll_flush(
24         mut self: Pin<&mut Self>,
25         cx: &mut Context<'_>,
26     ) -> Poll<Result<(), Self::Error>> {
27         match (*self).poll_ready(cx) {
28             Poll::Ready(Err(ref e)) if e.is_disconnected() => {
29                 // If the receiver disconnected, we consider the sink to be flushed.
30                 Poll::Ready(Ok(()))
31             }
32             x => x,
33         }
34     }
35 
poll_close( mut self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>36     fn poll_close(
37         mut self: Pin<&mut Self>,
38         _: &mut Context<'_>,
39     ) -> Poll<Result<(), Self::Error>> {
40         self.disconnect();
41         Poll::Ready(Ok(()))
42     }
43 }
44 
45 impl<T> Sink<T> for UnboundedSender<T> {
46     type Error = SendError;
47 
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>48     fn poll_ready(
49         self: Pin<&mut Self>,
50         cx: &mut Context<'_>,
51     ) -> Poll<Result<(), Self::Error>> {
52         UnboundedSender::poll_ready(&*self, cx)
53     }
54 
start_send( mut self: Pin<&mut Self>, msg: T, ) -> Result<(), Self::Error>55     fn start_send(
56         mut self: Pin<&mut Self>,
57         msg: T,
58     ) -> Result<(), Self::Error> {
59         UnboundedSender::start_send(&mut *self, msg)
60     }
61 
poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>62     fn poll_flush(
63         self: Pin<&mut Self>,
64         _: &mut Context<'_>,
65     ) -> Poll<Result<(), Self::Error>> {
66         Poll::Ready(Ok(()))
67     }
68 
poll_close( mut self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>69     fn poll_close(
70         mut self: Pin<&mut Self>,
71         _: &mut Context<'_>,
72     ) -> Poll<Result<(), Self::Error>> {
73         self.disconnect();
74         Poll::Ready(Ok(()))
75     }
76 }
77 
78 impl<T> Sink<T> for &UnboundedSender<T> {
79     type Error = SendError;
80 
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>81     fn poll_ready(
82         self: Pin<&mut Self>,
83         cx: &mut Context<'_>,
84     ) -> Poll<Result<(), Self::Error>> {
85         UnboundedSender::poll_ready(*self, cx)
86     }
87 
start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error>88     fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
89         self.unbounded_send(msg)
90             .map_err(TrySendError::into_send_error)
91     }
92 
poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>93     fn poll_flush(
94         self: Pin<&mut Self>,
95         _: &mut Context<'_>,
96     ) -> Poll<Result<(), Self::Error>> {
97         Poll::Ready(Ok(()))
98     }
99 
poll_close( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>100     fn poll_close(
101         self: Pin<&mut Self>,
102         _: &mut Context<'_>,
103     ) -> Poll<Result<(), Self::Error>> {
104         self.close_channel();
105         Poll::Ready(Ok(()))
106     }
107 }
108