1 use core::fmt::{Debug, Formatter, Result as FmtResult};
2 use core::pin::Pin;
3 use futures_core::task::{Context, Poll};
4 use futures_sink::Sink;
5 use pin_project::{pin_project, project};
6 
7 /// Sink that clones incoming items and forwards them to two sinks at the same time.
8 ///
9 /// Backpressure from any downstream sink propagates up, which means that this sink
10 /// can only process items as fast as its _slowest_ downstream sink.
11 #[pin_project]
12 #[must_use = "sinks do nothing unless polled"]
13 pub struct Fanout<Si1, Si2> {
14     #[pin]
15     sink1: Si1,
16     #[pin]
17     sink2: Si2
18 }
19 
20 impl<Si1, Si2> Fanout<Si1, Si2> {
new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2>21     pub(super) fn new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2> {
22         Fanout { sink1, sink2 }
23     }
24 
25     /// Get a shared reference to the inner sinks.
get_ref(&self) -> (&Si1, &Si2)26     pub fn get_ref(&self) -> (&Si1, &Si2) {
27         (&self.sink1, &self.sink2)
28     }
29 
30     /// Get a mutable reference to the inner sinks.
get_mut(&mut self) -> (&mut Si1, &mut Si2)31     pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
32         (&mut self.sink1, &mut self.sink2)
33     }
34 
35     /// Get a pinned mutable reference to the inner sinks.
36     #[project]
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>)37     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
38         #[project]
39         let Fanout { sink1, sink2 } = self.project();
40         (sink1, sink2)
41     }
42 
43     /// Consumes this combinator, returning the underlying sinks.
44     ///
45     /// Note that this may discard intermediate state of this combinator,
46     /// so care should be taken to avoid losing resources when this is called.
into_inner(self) -> (Si1, Si2)47     pub fn into_inner(self) -> (Si1, Si2) {
48         (self.sink1, self.sink2)
49     }
50 }
51 
52 impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
fmt(&self, f: &mut Formatter<'_>) -> FmtResult53     fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
54         f.debug_struct("Fanout")
55             .field("sink1", &self.sink1)
56             .field("sink2", &self.sink2)
57             .finish()
58     }
59 }
60 
61 impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
62     where Si1: Sink<Item>,
63           Item: Clone,
64           Si2: Sink<Item, Error=Si1::Error>
65 {
66     type Error = Si1::Error;
67 
68     #[project]
poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>69     fn poll_ready(
70         self: Pin<&mut Self>,
71         cx: &mut Context<'_>,
72     ) -> Poll<Result<(), Self::Error>> {
73         #[project]
74         let Fanout { sink1, sink2 } = self.project();
75 
76         let sink1_ready = sink1.poll_ready(cx)?.is_ready();
77         let sink2_ready = sink2.poll_ready(cx)?.is_ready();
78         let ready = sink1_ready && sink2_ready;
79         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
80     }
81 
82     #[project]
start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error>83     fn start_send(
84         self: Pin<&mut Self>,
85         item: Item,
86     ) -> Result<(), Self::Error> {
87         #[project]
88         let Fanout { sink1, sink2 } = self.project();
89 
90         sink1.start_send(item.clone())?;
91         sink2.start_send(item)?;
92         Ok(())
93     }
94 
95     #[project]
poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>96     fn poll_flush(
97         self: Pin<&mut Self>,
98         cx: &mut Context<'_>,
99     ) -> Poll<Result<(), Self::Error>> {
100         #[project]
101         let Fanout { sink1, sink2 } = self.project();
102 
103         let sink1_ready = sink1.poll_flush(cx)?.is_ready();
104         let sink2_ready = sink2.poll_flush(cx)?.is_ready();
105         let ready = sink1_ready && sink2_ready;
106         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
107     }
108 
109     #[project]
poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>110     fn poll_close(
111         self: Pin<&mut Self>,
112         cx: &mut Context<'_>,
113     ) -> Poll<Result<(), Self::Error>> {
114         #[project]
115         let Fanout { sink1, sink2 } = self.project();
116 
117         let sink1_ready = sink1.poll_close(cx)?.is_ready();
118         let sink2_ready = sink2.poll_close(cx)?.is_ready();
119         let ready = sink1_ready && sink2_ready;
120         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
121     }
122 }
123