1 //! Additions to the Futures 0.1 [`Stream`] trait. 2 //! 3 //! [`Stream`]: futures_01_crate::Stream 4 5 use crate::{Error, ErrorCompat, IntoError}; 6 use core::marker::PhantomData; 7 use futures_01_crate::{Async, Stream}; 8 9 /// Additions to [`Stream`]. 10 pub trait StreamExt: Stream + Sized { 11 /// Extend a [`Stream`]'s error with additional context-sensitive 12 /// information. 13 /// 14 /// [`Stream`]: futures_01_crate::Stream] 15 /// 16 /// ```rust 17 /// # use futures_01_crate as futures; 18 /// use futures::Stream; 19 /// # use futures::stream; 20 /// use snafu::{futures01::StreamExt, Snafu}; 21 /// 22 /// #[derive(Debug, Snafu)] 23 /// enum Error { 24 /// Authenticating { 25 /// user_name: String, 26 /// user_id: i32, 27 /// source: ApiError, 28 /// }, 29 /// } 30 /// 31 /// fn example() -> impl Stream<Item = i32, Error = Error> { 32 /// stock_prices().context(Authenticating { 33 /// user_name: "admin", 34 /// user_id: 42, 35 /// }) 36 /// } 37 /// 38 /// # type ApiError = Box<dyn std::error::Error>; 39 /// fn stock_prices() -> impl Stream<Item = i32, Error = ApiError> { 40 /// /* ... */ 41 /// # stream::empty() 42 /// } 43 /// ``` 44 /// 45 /// Note that the context selector will call [`Into::into`] on 46 /// each field, so the types are not required to exactly match. context<C, E>(self, context: C) -> Context<Self, C, E> where C: IntoError<E, Source = Self::Error> + Clone, E: Error + ErrorCompat47 fn context<C, E>(self, context: C) -> Context<Self, C, E> 48 where 49 C: IntoError<E, Source = Self::Error> + Clone, 50 E: Error + ErrorCompat; 51 52 /// Extend a [`Stream`]'s error with lazily-generated context-sensitive 53 /// information. 54 /// 55 /// [`Stream`]: futures_01_crate::Stream] 56 /// 57 /// ```rust 58 /// # use futures_01_crate as futures; 59 /// use futures::Stream; 60 /// # use futures::stream; 61 /// use snafu::{futures01::StreamExt, Snafu}; 62 /// 63 /// #[derive(Debug, Snafu)] 64 /// enum Error { 65 /// Authenticating { 66 /// user_name: String, 67 /// user_id: i32, 68 /// source: ApiError, 69 /// }, 70 /// } 71 /// 72 /// fn example() -> impl Stream<Item = i32, Error = Error> { 73 /// stock_prices().with_context(|| Authenticating { 74 /// user_name: "admin".to_string(), 75 /// user_id: 42, 76 /// }) 77 /// } 78 /// 79 /// # type ApiError = std::io::Error; 80 /// fn stock_prices() -> impl Stream<Item = i32, Error = ApiError> { 81 /// /* ... */ 82 /// # stream::empty() 83 /// } 84 /// ``` 85 /// 86 /// Note that this *may not* be needed in many cases because the 87 /// context selector will call [`Into::into`] on each field. with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> where F: FnMut() -> C, C: IntoError<E, Source = Self::Error>, E: Error + ErrorCompat88 fn with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> 89 where 90 F: FnMut() -> C, 91 C: IntoError<E, Source = Self::Error>, 92 E: Error + ErrorCompat; 93 } 94 95 impl<St> StreamExt for St 96 where 97 St: Stream, 98 { context<C, E>(self, context: C) -> Context<Self, C, E> where C: IntoError<E, Source = Self::Error> + Clone, E: Error + ErrorCompat,99 fn context<C, E>(self, context: C) -> Context<Self, C, E> 100 where 101 C: IntoError<E, Source = Self::Error> + Clone, 102 E: Error + ErrorCompat, 103 { 104 Context { 105 stream: self, 106 context, 107 _e: PhantomData, 108 } 109 } 110 with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> where F: FnMut() -> C, C: IntoError<E, Source = Self::Error>, E: Error + ErrorCompat,111 fn with_context<F, C, E>(self, context: F) -> WithContext<Self, F, E> 112 where 113 F: FnMut() -> C, 114 C: IntoError<E, Source = Self::Error>, 115 E: Error + ErrorCompat, 116 { 117 WithContext { 118 stream: self, 119 context, 120 _e: PhantomData, 121 } 122 } 123 } 124 125 /// Stream for the [`context`](StreamExt::context) combinator. 126 /// 127 /// See the [`StreamExt::context`] method for more details. 128 pub struct Context<St, C, E> { 129 stream: St, 130 context: C, 131 _e: PhantomData<E>, 132 } 133 134 impl<St, C, E> Stream for Context<St, C, E> 135 where 136 St: Stream, 137 C: IntoError<E, Source = St::Error> + Clone, 138 E: Error + ErrorCompat, 139 { 140 type Item = St::Item; 141 type Error = E; 142 poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>143 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { 144 self.stream 145 .poll() 146 .map_err(|error| self.context.clone().into_error(error)) 147 } 148 } 149 150 /// Stream for the [`with_context`](StreamExt::with_context) combinator. 151 /// 152 /// See the [`StreamExt::with_context`] method for more details. 153 pub struct WithContext<St, F, E> { 154 stream: St, 155 context: F, 156 _e: PhantomData<E>, 157 } 158 159 impl<St, F, C, E> Stream for WithContext<St, F, E> 160 where 161 St: Stream, 162 F: FnMut() -> C, 163 C: IntoError<E, Source = St::Error>, 164 E: Error + ErrorCompat, 165 { 166 type Item = St::Item; 167 type Error = E; 168 poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error>169 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { 170 self.stream.poll().map_err(|error| { 171 let context = &mut self.context; 172 context().into_error(error) 173 }) 174 } 175 } 176