1 #![no_std]
2 //! Streams that produce elements with an associated ordering.
3 //!
4 //! Say you have a bunch of events that all have a timestamp, sequence number, or other ordering
5 //! attribute.  If you get these events from multiple [`Stream`](core::stream::Stream)s, then you
6 //! should be able to produce a "composite" stream by joining each of the individual streams, so
7 //! long as each originating stream is ordered.
8 //!
9 //! However, if you actually implement this, you discover that you need to buffer at least one
10 //! element from each stream in order to avoid ordering inversions if the sources are independent
11 //! (including just running in different tasks).  This presents a problem if one of the sources
12 //! rarely produces events: that slow source can stall all other streams in order to handle the
13 //! case where the slowness is due to an earlier element instead of just having no elements.
14 //!
15 //! The [`OrderedStream`] trait provides a way to solve this problem: if you can ask a stream if it
16 //! will ever have any events that should be delivered before a given event, then you can often
17 //! avoid blocking the composite stream when data is ready.
18 use core::pin::Pin;
19 use core::task::{Context, Poll};
20 
21 /// A stream that produces items that are ordered according to some token.
22 ///
23 /// The main advantage of this trait over the standard `Stream` trait is the ability to implement a
24 /// [`join`](join()) function that does not either block until both source streams produce an item
25 /// or contain a race condition when rejoining streams that originated from a common well-ordered
26 /// source.
27 pub trait OrderedStream {
28     /// The type ordered by this stream.
29     ///
30     /// Each stream must produce values that are in ascending order according to this function,
31     /// although there is no requirement that the values be strictly ascending.
32     type Ordering: Ord;
33 
34     /// The unordered data carried by this stream
35     ///
36     /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate
37     /// type as the ordering key.  This is especially useful if you generate values to pass in to
38     /// `before`.
39     type Data;
40 
41     /// Attempt to pull out the next value of this stream, registering the current task for wakeup
42     /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any
43     /// more values ordered before the given point.
44     ///
45     /// # Return value
46     ///
47     /// There are several possible return values, each indicating a distinct stream state depending
48     /// on the value passed in `before`:
49     ///
50     /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready
51     /// yet. Implementations will ensure that the current task is notified when the next value may
52     /// be ready.
53     ///
54     /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready
55     /// and that it is not yet known if the stream will produce a value ordered prior to the given
56     /// ordering value.  Implementations will ensure that the current task is notified when either
57     /// the next value is ready or once it is known that no such value will be produced.
58     ///
59     /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced
60     /// an item.  The stream may produce further values on subsequent `poll_next_before` calls.
61     /// The returned ordering value must not be less than any prior ordering value returned by this
62     /// stream.  The returned ordering value **may** be greater than the value passed to `before`.
63     ///
64     /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and
65     /// `poll_next_before` should not be invoked again.
66     ///
67     /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce
68     /// any further ordering tokens less than the given token.  Subsequent `poll_next_before` calls
69     /// may still produce additional items, but their tokens will be greater than or equal to the
70     /// given token.  It does not make sense to return this value if `before` was `None`.
poll_next_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<PollResult<Self::Ordering, Self::Data>>71     fn poll_next_before(
72         self: Pin<&mut Self>,
73         cx: &mut Context<'_>,
74         before: Option<&Self::Ordering>,
75     ) -> Poll<PollResult<Self::Ordering, Self::Data>>;
76 }
77 
78 impl<P> OrderedStream for Pin<P>
79 where
80     P: core::ops::DerefMut + Unpin,
81     P::Target: OrderedStream,
82 {
83     type Data = <P::Target as OrderedStream>::Data;
84     type Ordering = <P::Target as OrderedStream>::Ordering;
85 
poll_next_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<PollResult<Self::Ordering, Self::Data>>86     fn poll_next_before(
87         self: Pin<&mut Self>,
88         cx: &mut Context<'_>,
89         before: Option<&Self::Ordering>,
90     ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
91         self.get_mut().as_mut().poll_next_before(cx, before)
92     }
93 }
94 
95 /// An [`OrderedStream`] that tracks if the underlying stream should be polled.
96 pub trait FusedOrderedStream: OrderedStream {
97     /// Returns `true` if the stream should no longer be polled.
is_terminated(&self) -> bool98     fn is_terminated(&self) -> bool;
99 }
100 
101 /// The result of a [`OrderedStream::poll_next_before`] operation.
102 #[derive(Debug)]
103 pub enum PollResult<Ordering, Data> {
104     /// An item with a corresponding ordering token.
105     Item { data: Data, ordering: Ordering },
106     /// This stream will not return any items prior to the given point.
107     NoneBefore,
108     /// This stream is terminated and should not be polled again.
109     Terminated,
110 }
111 
112 impl<D, T> PollResult<T, D> {
113     /// Extract the data from the result.
into_data(self) -> Option<D>114     pub fn into_data(self) -> Option<D> {
115         match self {
116             Self::Item { data, .. } => Some(data),
117             _ => None,
118         }
119     }
120 
121     /// Extract the item from the result.
into_tuple(self) -> Option<(T, D)>122     pub fn into_tuple(self) -> Option<(T, D)> {
123         match self {
124             Self::Item { data, ordering } => Some((ordering, data)),
125             _ => None,
126         }
127     }
128 
129     /// Apply a closure to the data.
map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R>130     pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> {
131         match self {
132             Self::Item { data, ordering } => PollResult::Item {
133                 data: f(data),
134                 ordering,
135             },
136             Self::NoneBefore => PollResult::NoneBefore,
137             Self::Terminated => PollResult::Terminated,
138         }
139     }
140 }
141 
142 impl<T, D, E> PollResult<T, Result<D, E>> {
143     /// Extract the error of a [`Result`] item.
transpose_result(self) -> Result<PollResult<T, D>, E>144     pub fn transpose_result(self) -> Result<PollResult<T, D>, E> {
145         self.transpose_result_item().map_err(|(_, e)| e)
146     }
147 
148     /// Extract the error and ordering from a [`Result`] item.
transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)>149     pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> {
150         match self {
151             Self::Item {
152                 data: Ok(data),
153                 ordering,
154             } => Ok(PollResult::Item { data, ordering }),
155             Self::Item {
156                 data: Err(data),
157                 ordering,
158             } => Err((ordering, data)),
159             Self::NoneBefore => Ok(PollResult::NoneBefore),
160             Self::Terminated => Ok(PollResult::Terminated),
161         }
162     }
163 }
164 
165 /// A [`Future`](core::future::Future) that produces an item with an associated ordering.
166 ///
167 /// This is equivalent to an [`OrderedStream`] that always produces exactly one item.  This trait
168 /// is not very useful on its own; see [`FromFuture`] to convert it to a stream.
169 ///
170 /// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the
171 /// same type.  In this case, unless otherwise documented by the implementing type, neither poll
172 /// function should be invoked after either returns an output value.
173 pub trait OrderedFuture {
174     /// See [`OrderedStream::Ordering`].
175     type Ordering: Ord;
176 
177     /// See [`OrderedStream::Data`].
178     type Output;
179 
180     /// Attempt to pull out the value of this future, registering the current task for wakeup if
181     /// needed, and returning `None` if it is known that the future will not produce a value
182     /// ordered before the given point.
183     ///
184     /// # Return value
185     ///
186     /// There are several possible return values, each indicating a distinct state depending on the
187     /// value passed in `before`:
188     ///
189     /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet.
190     /// Implementations will ensure that the current task is notified when the next value may be
191     /// ready.
192     ///
193     /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and
194     /// that it is not yet known if the value will be ordered prior to the given ordering value.
195     /// Implementations will ensure that the current task is notified when either the next value is
196     /// ready or once it is known that no such value will be produced.
197     ///
198     /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated.  The
199     /// returned ordering value **may** be greater than the value passed to `before`.  The
200     /// `poll_before` function should not be invoked again.
201     ///
202     /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than
203     /// the given token.  It is an error to return `None` if `before` was `None`.
poll_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<Option<(Self::Ordering, Self::Output)>>204     fn poll_before(
205         self: Pin<&mut Self>,
206         cx: &mut Context<'_>,
207         before: Option<&Self::Ordering>,
208     ) -> Poll<Option<(Self::Ordering, Self::Output)>>;
209 }
210 
211 mod adapters;
212 pub use adapters::*;
213 mod join;
214 pub use join::*;
215