1 use core::fmt::{Debug, Formatter, Result as FmtResult}; 2 use core::pin::Pin; 3 use futures_core::task::{Context, Poll}; 4 use futures_sink::Sink; 5 use pin_project::{pin_project, project}; 6 7 /// Sink that clones incoming items and forwards them to two sinks at the same time. 8 /// 9 /// Backpressure from any downstream sink propagates up, which means that this sink 10 /// can only process items as fast as its _slowest_ downstream sink. 11 #[pin_project] 12 #[must_use = "sinks do nothing unless polled"] 13 pub struct Fanout<Si1, Si2> { 14 #[pin] 15 sink1: Si1, 16 #[pin] 17 sink2: Si2 18 } 19 20 impl<Si1, Si2> Fanout<Si1, Si2> { new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2>21 pub(super) fn new(sink1: Si1, sink2: Si2) -> Fanout<Si1, Si2> { 22 Fanout { sink1, sink2 } 23 } 24 25 /// Get a shared reference to the inner sinks. get_ref(&self) -> (&Si1, &Si2)26 pub fn get_ref(&self) -> (&Si1, &Si2) { 27 (&self.sink1, &self.sink2) 28 } 29 30 /// Get a mutable reference to the inner sinks. get_mut(&mut self) -> (&mut Si1, &mut Si2)31 pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) { 32 (&mut self.sink1, &mut self.sink2) 33 } 34 35 /// Get a pinned mutable reference to the inner sinks. 36 #[project] get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>)37 pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { 38 #[project] 39 let Fanout { sink1, sink2 } = self.project(); 40 (sink1, sink2) 41 } 42 43 /// Consumes this combinator, returning the underlying sinks. 44 /// 45 /// Note that this may discard intermediate state of this combinator, 46 /// so care should be taken to avoid losing resources when this is called. into_inner(self) -> (Si1, Si2)47 pub fn into_inner(self) -> (Si1, Si2) { 48 (self.sink1, self.sink2) 49 } 50 } 51 52 impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> { fmt(&self, f: &mut Formatter<'_>) -> FmtResult53 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { 54 f.debug_struct("Fanout") 55 .field("sink1", &self.sink1) 56 .field("sink2", &self.sink2) 57 .finish() 58 } 59 } 60 61 impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> 62 where Si1: Sink<Item>, 63 Item: Clone, 64 Si2: Sink<Item, Error=Si1::Error> 65 { 66 type Error = Si1::Error; 67 68 #[project] poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>69 fn poll_ready( 70 self: Pin<&mut Self>, 71 cx: &mut Context<'_>, 72 ) -> Poll<Result<(), Self::Error>> { 73 #[project] 74 let Fanout { sink1, sink2 } = self.project(); 75 76 let sink1_ready = sink1.poll_ready(cx)?.is_ready(); 77 let sink2_ready = sink2.poll_ready(cx)?.is_ready(); 78 let ready = sink1_ready && sink2_ready; 79 if ready { Poll::Ready(Ok(())) } else { Poll::Pending } 80 } 81 82 #[project] start_send( self: Pin<&mut Self>, item: Item, ) -> Result<(), Self::Error>83 fn start_send( 84 self: Pin<&mut Self>, 85 item: Item, 86 ) -> Result<(), Self::Error> { 87 #[project] 88 let Fanout { sink1, sink2 } = self.project(); 89 90 sink1.start_send(item.clone())?; 91 sink2.start_send(item)?; 92 Ok(()) 93 } 94 95 #[project] poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>96 fn poll_flush( 97 self: Pin<&mut Self>, 98 cx: &mut Context<'_>, 99 ) -> Poll<Result<(), Self::Error>> { 100 #[project] 101 let Fanout { sink1, sink2 } = self.project(); 102 103 let sink1_ready = sink1.poll_flush(cx)?.is_ready(); 104 let sink2_ready = sink2.poll_flush(cx)?.is_ready(); 105 let ready = sink1_ready && sink2_ready; 106 if ready { Poll::Ready(Ok(())) } else { Poll::Pending } 107 } 108 109 #[project] poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>110 fn poll_close( 111 self: Pin<&mut Self>, 112 cx: &mut Context<'_>, 113 ) -> Poll<Result<(), Self::Error>> { 114 #[project] 115 let Fanout { sink1, sink2 } = self.project(); 116 117 let sink1_ready = sink1.poll_close(cx)?.is_ready(); 118 let sink2_ready = sink2.poll_close(cx)?.is_ready(); 119 let ready = sink1_ready && sink2_ready; 120 if ready { Poll::Ready(Ok(())) } else { Poll::Pending } 121 } 122 } 123