1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::ready;
4 use futures_core::stream::Stream;
5 use futures_core::task::{Context, Poll};
6 use futures_sink::Sink;
7
8 use crate::lock::BiLock;
9
10 /// A `Stream` part of the split pair
11 #[derive(Debug)]
12 #[must_use = "streams do nothing unless polled"]
13 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
14 pub struct SplitStream<S>(BiLock<S>);
15
16 impl<S> Unpin for SplitStream<S> {}
17
18 impl<S: Unpin> SplitStream<S> {
19 /// Attempts to put the two "halves" of a split `Stream + Sink` back
20 /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
21 /// a matching pair originating from the same call to `StreamExt::split`.
reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> where S: Sink<Item>,22 pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>>
23 where
24 S: Sink<Item>,
25 {
26 other.reunite(self)
27 }
28 }
29
30 impl<S: Stream> Stream for SplitStream<S> {
31 type Item = S::Item;
32
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>33 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
34 ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
35 }
36 }
37
38 #[allow(bad_style)]
SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item>39 fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
40 SplitSink { lock, slot: None }
41 }
42
43 /// A `Sink` part of the split pair
44 #[derive(Debug)]
45 #[must_use = "sinks do nothing unless polled"]
46 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
47 pub struct SplitSink<S, Item> {
48 lock: BiLock<S>,
49 slot: Option<Item>,
50 }
51
52 impl<S, Item> Unpin for SplitSink<S, Item> {}
53
54 impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
55 /// Attempts to put the two "halves" of a split `Stream + Sink` back
56 /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
57 /// a matching pair originating from the same call to `StreamExt::split`.
reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>>58 pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> {
59 self.lock.reunite(other.0).map_err(|err| ReuniteError(SplitSink(err.0), SplitStream(err.1)))
60 }
61 }
62
63 impl<S: Sink<Item>, Item> SplitSink<S, Item> {
poll_flush_slot( mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>64 fn poll_flush_slot(
65 mut inner: Pin<&mut S>,
66 slot: &mut Option<Item>,
67 cx: &mut Context<'_>,
68 ) -> Poll<Result<(), S::Error>> {
69 if slot.is_some() {
70 ready!(inner.as_mut().poll_ready(cx))?;
71 Poll::Ready(inner.start_send(slot.take().unwrap()))
72 } else {
73 Poll::Ready(Ok(()))
74 }
75 }
76
poll_lock_and_flush_slot( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), S::Error>>77 fn poll_lock_and_flush_slot(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 ) -> Poll<Result<(), S::Error>> {
81 let this = &mut *self;
82 let mut inner = ready!(this.lock.poll_lock(cx));
83 Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
84 }
85 }
86
87 impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
88 type Error = S::Error;
89
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>90 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
91 loop {
92 if self.slot.is_none() {
93 return Poll::Ready(Ok(()));
94 }
95 ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
96 }
97 }
98
start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error>99 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> {
100 self.slot = Some(item);
101 Ok(())
102 }
103
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>104 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
105 let this = &mut *self;
106 let mut inner = ready!(this.lock.poll_lock(cx));
107 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
108 inner.as_pin_mut().poll_flush(cx)
109 }
110
poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>>111 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
112 let this = &mut *self;
113 let mut inner = ready!(this.lock.poll_lock(cx));
114 ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
115 inner.as_pin_mut().poll_close(cx)
116 }
117 }
118
split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>)119 pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) {
120 let (a, b) = BiLock::new(s);
121 let read = SplitStream(a);
122 let write = SplitSink(b);
123 (write, read)
124 }
125
126 /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
127 /// of a `Stream + Split`, and thus could not be `reunite`d.
128 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
129 pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
130
131 impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 f.debug_tuple("ReuniteError").field(&"...").finish()
134 }
135 }
136
137 impl<T, Item> fmt::Display for ReuniteError<T, Item> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair")
140 }
141 }
142
143 #[cfg(feature = "std")]
144 impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}
145