1 use core::fmt::{Debug, Formatter, Result as FmtResult};
2 use core::pin::Pin;
3 use futures_core::task::{Context, Poll};
4 use futures_sink::Sink;
5 use pin_project_lite::pin_project;
6 
7 pin_project! {
8     /// Sink that clones incoming items and forwards them to two sinks at the same time.
9     ///
10     /// Backpressure from any downstream sink propagates up, which means that this sink
11     /// can only process items as fast as its _slowest_ downstream sink.
12     #[must_use = "sinks do nothing unless polled"]
13     pub struct Fanout<Si1, Si2> {
14         #[pin]
15         sink1: Si1,
16         #[pin]
17         sink2: Si2
18     }
19 }
20 
21 impl<Si1, Si2> Fanout<Si1, Si2> {
new(sink1: Si1, sink2: Si2) -> Self22     pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
23         Self { sink1, sink2 }
24     }
25 
26     /// Get a shared reference to the inner sinks.
get_ref(&self) -> (&Si1, &Si2)27     pub fn get_ref(&self) -> (&Si1, &Si2) {
28         (&self.sink1, &self.sink2)
29     }
30 
31     /// Get a mutable reference to the inner sinks.
get_mut(&mut self) -> (&mut Si1, &mut Si2)32     pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
33         (&mut self.sink1, &mut self.sink2)
34     }
35 
36     /// Get a pinned mutable reference to the inner sinks.
get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>)37     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
38         let this = self.project();
39         (this.sink1, this.sink2)
40     }
41 
42     /// Consumes this combinator, returning the underlying sinks.
43     ///
44     /// Note that this may discard intermediate state of this combinator,
45     /// so care should be taken to avoid losing resources when this is called.
into_inner(self) -> (Si1, Si2)46     pub fn into_inner(self) -> (Si1, Si2) {
47         (self.sink1, self.sink2)
48     }
49 }
50 
51 impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
fmt(&self, f: &mut Formatter<'_>) -> FmtResult52     fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
53         f.debug_struct("Fanout").field("sink1", &self.sink1).field("sink2", &self.sink2).finish()
54     }
55 }
56 
57 impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
58 where
59     Si1: Sink<Item>,
60     Item: Clone,
61     Si2: Sink<Item, Error = Si1::Error>,
62 {
63     type Error = Si1::Error;
64 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>65     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
66         let this = self.project();
67 
68         let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
69         let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
70         let ready = sink1_ready && sink2_ready;
71         if ready {
72             Poll::Ready(Ok(()))
73         } else {
74             Poll::Pending
75         }
76     }
77 
start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>78     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
79         let this = self.project();
80 
81         this.sink1.start_send(item.clone())?;
82         this.sink2.start_send(item)?;
83         Ok(())
84     }
85 
poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>86     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87         let this = self.project();
88 
89         let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
90         let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
91         let ready = sink1_ready && sink2_ready;
92         if ready {
93             Poll::Ready(Ok(()))
94         } else {
95             Poll::Pending
96         }
97     }
98 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>99     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
100         let this = self.project();
101 
102         let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
103         let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
104         let ready = sink1_ready && sink2_ready;
105         if ready {
106             Poll::Ready(Ok(()))
107         } else {
108             Poll::Pending
109         }
110     }
111 }
112