1 //! Additions to the Futures 0.1 [`Stream`] trait.
2 //!
3 //! [`Stream`]: futures_01_crate::Stream
4 
5 use crate::{Error, ErrorCompat, IntoError};
6 use core::marker::PhantomData;
7 use futures_01_crate::{Async, Stream};
8 
9 /// Additions to [`Stream`].
10 pub trait StreamExt: Stream + Sized {
11     /// Extend a [`Stream`]'s error with additional context-sensitive
12     /// information.
13     ///
14     /// [`Stream`]: futures_01_crate::Stream]
15     ///
16     /// ```rust
17     /// # use futures_01_crate as futures;
18     /// use futures::Stream;
19     /// # use futures::stream;
20     /// use snafu::{futures01::StreamExt, Snafu};
21     ///
22     /// #[derive(Debug, Snafu)]
23     /// enum Error {
24     ///     Authenticating {
25     ///         user_name: String,
26     ///         user_id: i32,
27     ///         source: ApiError,
28     ///     },
29     /// }
30     ///
31     /// fn example() -> impl Stream<Item = i32, Error = Error> {
32     ///     stock_prices().context(Authenticating {
33     ///         user_name: "admin",
34     ///         user_id: 42,
35     ///     })
36     /// }
37     ///
38     /// # type ApiError = Box<dyn std::error::Error>;
39     /// fn stock_prices() -> impl Stream<Item = i32, Error = ApiError> {
40     ///     /* ... */
41     /// # stream::empty()
42     /// }
43     /// ```
44     ///
45     /// Note that the context selector will call [`Into::into`] on
46     /// each field, so the types are not required to exactly match.
context<C, E>(self, context: C) -> Context<Self, C, E> where C: IntoError<E, Source = Self::Error> + Clone, E: Error + ErrorCompat47     fn context<C, E>(self, context: C) -> Context<Self, C, E>
48     where
49         C: IntoError<E, Source = Self::Error> + Clone,
50         E: Error + ErrorCompat;
51 
52     /// Extend a [`Stream`]'s error with lazily-generated context-sensitive
53     /// information.
54     ///
55     /// [`Stream`]: futures_01_crate::Stream]
56     ///
57     /// ```rust
58     /// # use futures_01_crate as futures;
59     /// use futures::Stream;
60     /// # use futures::stream;
61     /// use snafu::{futures01::StreamExt, Snafu};
62     ///
63     /// #[derive(Debug, Snafu)]
64     /// enum Error {
65     ///     Authenticating {
66     ///         user_name: String,
67     ///         user_id: i32,
68     ///         source: ApiError,
69     ///     },
70     /// }
71     ///
72     /// fn example() -> impl Stream<Item = i32, Error = Error> {
73     ///     stock_prices().with_context(|| Authenticating {
74     ///         user_name: "admin".to_string(),
75     ///         user_id: 42,
76     ///     })
77     /// }
78     ///
79     /// # type ApiError = std::io::Error;
80     /// fn stock_prices() -> impl Stream<Item = i32, Error = ApiError> {
81     ///     /* ... */
82     /// # stream::empty()
83     /// }
84     /// ```
85     ///
86     /// Note that this *may not* be needed in many cases because the
87     /// context selector will call [`Into::into`] on each field.
with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> where F: FnMut() -> C, C: IntoError<E, Source = Self::Error>, E: Error + ErrorCompat88     fn with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E>
89     where
90         F: FnMut() -> C,
91         C: IntoError<E, Source = Self::Error>,
92         E: Error + ErrorCompat;
93 }
94 
95 impl<St> StreamExt for St
96 where
97     St: Stream,
98 {
context<C, E>(self, context: C) -> Context<Self, C, E> where C: IntoError<E, Source = Self::Error> + Clone, E: Error + ErrorCompat,99     fn context<C, E>(self, context: C) -> Context<Self, C, E>
100     where
101         C: IntoError<E, Source = Self::Error> + Clone,
102         E: Error + ErrorCompat,
103     {
104         Context {
105             stream: self,
106             context,
107             _e: PhantomData,
108         }
109     }
110 
with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> where F: FnMut() -> C, C: IntoError<E, Source = Self::Error>, E: Error + ErrorCompat,111     fn with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E>
112     where
113         F: FnMut() -> C,
114         C: IntoError<E, Source = Self::Error>,
115         E: Error + ErrorCompat,
116     {
117         WithContext {
118             stream: self,
119             context,
120             _e: PhantomData,
121         }
122     }
123 }
124 
125 /// Stream for the [`context`](StreamExt::context) combinator.
126 ///
127 /// See the [`StreamExt::context`] method for more details.
128 pub struct Context<St, C, E> {
129     stream: St,
130     context: C,
131     _e: PhantomData<E>,
132 }
133 
134 impl<St, C, E> Stream for Context<St, C, E>
135 where
136     St: Stream,
137     C: IntoError<E, Source = St::Error> + Clone,
138     E: Error + ErrorCompat,
139 {
140     type Item = St::Item;
141     type Error = E;
142 
poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>143     fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
144         self.stream
145             .poll()
146             .map_err(|error| self.context.clone().into_error(error))
147     }
148 }
149 
150 /// Stream for the [`with_context`](StreamExt::with_context) combinator.
151 ///
152 /// See the [`StreamExt::with_context`] method for more details.
153 pub struct WithContext<St, F, E> {
154     stream: St,
155     context: F,
156     _e: PhantomData<E>,
157 }
158 
159 impl<St, F, C, E> Stream for WithContext<St, F, E>
160 where
161     St: Stream,
162     F: FnMut() -> C,
163     C: IntoError<E, Source = St::Error>,
164     E: Error + ErrorCompat,
165 {
166     type Item = St::Item;
167     type Error = E;
168 
poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>169     fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
170         self.stream.poll().map_err(|error| {
171             let context = &mut self.context;
172             context().into_error(error)
173         })
174     }
175 }
176