1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::Stream;
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7 
8 use crate::lock::BiLock;
9 
10 /// A `Stream` part of the split pair
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14 pub struct SplitStream<S>(BiLock<S>);
15 
16 impl<S> Unpin for SplitStream<S> {}
17 
18 impl<S: Unpin> SplitStream<S> {
19     /// Attempts to put the two "halves" of a split `Stream + Sink` back
20     /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
21     /// a matching pair originating from the same call to `StreamExt::split`.
reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> where S: Sink<Item>,22     pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
23     where
24         S: Sink<Item>,
25     {
26         other.reunite(self)
27     }
28 }
29 
30 impl<S: Stream> Stream for SplitStream<S> {
31     type Item = S::Item;
32 
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>33     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
34         ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
35     }
36 }
37 
38 #[allow(bad_style)]
SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item>39 fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
40     SplitSink { lock, slot: None }
41 }
42 
43 /// A `Sink` part of the split pair
44 #[derive(Debug)]
45 #[must_use = "sinks do nothing unless polled"]
46 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
47 pub struct SplitSink<S, Item> {
48     lock: BiLock<S>,
49     slot: Option<Item>,
50 }
51 
52 impl<S, Item> Unpin for SplitSink<S, Item> {}
53 
54 impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
55     /// Attempts to put the two "halves" of a split `Stream + Sink` back
56     /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
57     /// a matching pair originating from the same call to `StreamExt::split`.
reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>>58     pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
59         self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
60     }
61 }
62 
63 impl<S: Sink<Item>, Item> SplitSink<S, Item> {
poll_flush_slot( mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>64     fn poll_flush_slot(
65         mut inner: Pin<&mut S>,
66         slot: &mut Option<Item>,
67         cx: &mut Context<'_>,
68     ) -> Poll<Result<(), S::Error>> {
69         if slot.is_some() {
70             ready!(inner.as_mut().poll_ready(cx))?;
71             Poll::Ready(inner.start_send(slot.take().unwrap()))
72         } else {
73             Poll::Ready(Ok(()))
74         }
75     }
76 
poll_lock_and_flush_slot( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>77     fn poll_lock_and_flush_slot(
78         mut self: Pin<&mut Self>,
79         cx: &mut Context<'_>,
80     ) -> Poll<Result<(), S::Error>> {
81         let this = &mut *self;
82         let mut inner = ready!(this.lock.poll_lock(cx));
83         Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
84     }
85 }
86 
87 impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
88     type Error = S::Error;
89 
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>90     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
91         loop {
92             if self.slot.is_none() {
93                 return Poll::Ready(Ok(()));
94             }
95             ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
96         }
97     }
98 
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error>99     fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
100         self.slot = Some(item);
101         Ok(())
102     }
103 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>104     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105         let this = &mut *self;
106         let mut inner = ready!(this.lock.poll_lock(cx));
107         ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
108         inner.as_pin_mut().poll_flush(cx)
109     }
110 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>111     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
112         let this = &mut *self;
113         let mut inner = ready!(this.lock.poll_lock(cx));
114         ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
115         inner.as_pin_mut().poll_close(cx)
116     }
117 }
118 
split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>)119 pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
120     let (a, b) = BiLock::new(s);
121     let read = SplitStream(a);
122     let write = SplitSink(b);
123     (write, read)
124 }
125 
126 /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
127 /// of a `Stream + Split`, and thus could not be `reunite`d.
128 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
129 pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
130 
131 impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result132     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133         f.debug_tuple("ReuniteError").field(&"...").finish()
134     }
135 }
136 
137 impl<T, Item> fmt::Display for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result138     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139         write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
140     }
141 }
142 
143 #[cfg(feature = "std")]
144 impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
145