1 #![no_std] 2 //! Streams that produce elements with an associated ordering. 3 //! 4 //! Say you have a bunch of events that all have a timestamp, sequence number, or other ordering 5 //! attribute. If you get these events from multiple [`Stream`](core::stream::Stream)s, then you 6 //! should be able to produce a "composite" stream by joining each of the individual streams, so 7 //! long as each originating stream is ordered. 8 //! 9 //! However, if you actually implement this, you discover that you need to buffer at least one 10 //! element from each stream in order to avoid ordering inversions if the sources are independent 11 //! (including just running in different tasks). This presents a problem if one of the sources 12 //! rarely produces events: that slow source can stall all other streams in order to handle the 13 //! case where the slowness is due to an earlier element instead of just having no elements. 14 //! 15 //! The [`OrderedStream`] trait provides a way to solve this problem: if you can ask a stream if it 16 //! will ever have any events that should be delivered before a given event, then you can often 17 //! avoid blocking the composite stream when data is ready. 18 use core::pin::Pin; 19 use core::task::{Context, Poll}; 20 21 /// A stream that produces items that are ordered according to some token. 22 /// 23 /// The main advantage of this trait over the standard `Stream` trait is the ability to implement a 24 /// [`join`](join()) function that does not either block until both source streams produce an item 25 /// or contain a race condition when rejoining streams that originated from a common well-ordered 26 /// source. 27 pub trait OrderedStream { 28 /// The type ordered by this stream. 29 /// 30 /// Each stream must produce values that are in ascending order according to this function, 31 /// although there is no requirement that the values be strictly ascending. 32 type Ordering: Ord; 33 34 /// The unordered data carried by this stream 35 /// 36 /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate 37 /// type as the ordering key. This is especially useful if you generate values to pass in to 38 /// `before`. 39 type Data; 40 41 /// Attempt to pull out the next value of this stream, registering the current task for wakeup 42 /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any 43 /// more values ordered before the given point. 44 /// 45 /// # Return value 46 /// 47 /// There are several possible return values, each indicating a distinct stream state depending 48 /// on the value passed in `before`: 49 /// 50 /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready 51 /// yet. Implementations will ensure that the current task is notified when the next value may 52 /// be ready. 53 /// 54 /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready 55 /// and that it is not yet known if the stream will produce a value ordered prior to the given 56 /// ordering value. Implementations will ensure that the current task is notified when either 57 /// the next value is ready or once it is known that no such value will be produced. 58 /// 59 /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced 60 /// an item. The stream may produce further values on subsequent `poll_next_before` calls. 61 /// The returned ordering value must not be less than any prior ordering value returned by this 62 /// stream. The returned ordering value **may** be greater than the value passed to `before`. 63 /// 64 /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and 65 /// `poll_next_before` should not be invoked again. 66 /// 67 /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce 68 /// any further ordering tokens less than the given token. Subsequent `poll_next_before` calls 69 /// may still produce additional items, but their tokens will be greater than or equal to the 70 /// given token. It does not make sense to return this value if `before` was `None`. poll_next_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<PollResult<Self::Ordering, Self::Data>>71 fn poll_next_before( 72 self: Pin<&mut Self>, 73 cx: &mut Context<'_>, 74 before: Option<&Self::Ordering>, 75 ) -> Poll<PollResult<Self::Ordering, Self::Data>>; 76 } 77 78 impl<P> OrderedStream for Pin<P> 79 where 80 P: core::ops::DerefMut + Unpin, 81 P::Target: OrderedStream, 82 { 83 type Data = <P::Target as OrderedStream>::Data; 84 type Ordering = <P::Target as OrderedStream>::Ordering; 85 poll_next_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<PollResult<Self::Ordering, Self::Data>>86 fn poll_next_before( 87 self: Pin<&mut Self>, 88 cx: &mut Context<'_>, 89 before: Option<&Self::Ordering>, 90 ) -> Poll<PollResult<Self::Ordering, Self::Data>> { 91 self.get_mut().as_mut().poll_next_before(cx, before) 92 } 93 } 94 95 /// An [`OrderedStream`] that tracks if the underlying stream should be polled. 96 pub trait FusedOrderedStream: OrderedStream { 97 /// Returns `true` if the stream should no longer be polled. is_terminated(&self) -> bool98 fn is_terminated(&self) -> bool; 99 } 100 101 /// The result of a [`OrderedStream::poll_next_before`] operation. 102 #[derive(Debug)] 103 pub enum PollResult<Ordering, Data> { 104 /// An item with a corresponding ordering token. 105 Item { data: Data, ordering: Ordering }, 106 /// This stream will not return any items prior to the given point. 107 NoneBefore, 108 /// This stream is terminated and should not be polled again. 109 Terminated, 110 } 111 112 impl<D, T> PollResult<T, D> { 113 /// Extract the data from the result. into_data(self) -> Option<D>114 pub fn into_data(self) -> Option<D> { 115 match self { 116 Self::Item { data, .. } => Some(data), 117 _ => None, 118 } 119 } 120 121 /// Extract the item from the result. into_tuple(self) -> Option<(T, D)>122 pub fn into_tuple(self) -> Option<(T, D)> { 123 match self { 124 Self::Item { data, ordering } => Some((ordering, data)), 125 _ => None, 126 } 127 } 128 129 /// Apply a closure to the data. map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R>130 pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> { 131 match self { 132 Self::Item { data, ordering } => PollResult::Item { 133 data: f(data), 134 ordering, 135 }, 136 Self::NoneBefore => PollResult::NoneBefore, 137 Self::Terminated => PollResult::Terminated, 138 } 139 } 140 } 141 142 impl<T, D, E> PollResult<T, Result<D, E>> { 143 /// Extract the error of a [`Result`] item. transpose_result(self) -> Result<PollResult<T, D>, E>144 pub fn transpose_result(self) -> Result<PollResult<T, D>, E> { 145 self.transpose_result_item().map_err(|(_, e)| e) 146 } 147 148 /// Extract the error and ordering from a [`Result`] item. transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)>149 pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> { 150 match self { 151 Self::Item { 152 data: Ok(data), 153 ordering, 154 } => Ok(PollResult::Item { data, ordering }), 155 Self::Item { 156 data: Err(data), 157 ordering, 158 } => Err((ordering, data)), 159 Self::NoneBefore => Ok(PollResult::NoneBefore), 160 Self::Terminated => Ok(PollResult::Terminated), 161 } 162 } 163 } 164 165 /// A [`Future`](core::future::Future) that produces an item with an associated ordering. 166 /// 167 /// This is equivalent to an [`OrderedStream`] that always produces exactly one item. This trait 168 /// is not very useful on its own; see [`FromFuture`] to convert it to a stream. 169 /// 170 /// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the 171 /// same type. In this case, unless otherwise documented by the implementing type, neither poll 172 /// function should be invoked after either returns an output value. 173 pub trait OrderedFuture { 174 /// See [`OrderedStream::Ordering`]. 175 type Ordering: Ord; 176 177 /// See [`OrderedStream::Data`]. 178 type Output; 179 180 /// Attempt to pull out the value of this future, registering the current task for wakeup if 181 /// needed, and returning `None` if it is known that the future will not produce a value 182 /// ordered before the given point. 183 /// 184 /// # Return value 185 /// 186 /// There are several possible return values, each indicating a distinct state depending on the 187 /// value passed in `before`: 188 /// 189 /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet. 190 /// Implementations will ensure that the current task is notified when the next value may be 191 /// ready. 192 /// 193 /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and 194 /// that it is not yet known if the value will be ordered prior to the given ordering value. 195 /// Implementations will ensure that the current task is notified when either the next value is 196 /// ready or once it is known that no such value will be produced. 197 /// 198 /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated. The 199 /// returned ordering value **may** be greater than the value passed to `before`. The 200 /// `poll_before` function should not be invoked again. 201 /// 202 /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than 203 /// the given token. It is an error to return `None` if `before` was `None`. poll_before( self: Pin<&mut Self>, cx: &mut Context<'_>, before: Option<&Self::Ordering>, ) -> Poll<Option<(Self::Ordering, Self::Output)>>204 fn poll_before( 205 self: Pin<&mut Self>, 206 cx: &mut Context<'_>, 207 before: Option<&Self::Ordering>, 208 ) -> Poll<Option<(Self::Ordering, Self::Output)>>; 209 } 210 211 mod adapters; 212 pub use adapters::*; 213 mod join; 214 pub use join::*; 215