1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_utils::{unsafe_pinned, unsafe_unpinned};
8 
9 /// Stream for the [`inspect`](super::StreamExt::inspect) method.
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Inspect<St, F> {
12     stream: St,
13     f: F,
14 }
15 
16 impl<St: Unpin, F> Unpin for Inspect<St, F> {}
17 
18 impl<St, F> fmt::Debug for Inspect<St, F>
19 where
20     St: fmt::Debug,
21 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result22     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23         f.debug_struct("Inspect")
24             .field("stream", &self.stream)
25             .finish()
26     }
27 }
28 
29 impl<St, F> Inspect<St, F>
30     where St: Stream,
31           F: FnMut(&St::Item),
32 {
33     unsafe_pinned!(stream: St);
34     unsafe_unpinned!(f: F);
35 
new(stream: St, f: F) -> Inspect<St, F>36     pub(super) fn new(stream: St, f: F) -> Inspect<St, F> {
37         Inspect { stream, f }
38     }
39 
40     /// Acquires a reference to the underlying stream that this combinator is
41     /// pulling from.
get_ref(&self) -> &St42     pub fn get_ref(&self) -> &St {
43         &self.stream
44     }
45 
46     /// Acquires a mutable reference to the underlying stream that this
47     /// combinator is pulling from.
48     ///
49     /// Note that care must be taken to avoid tampering with the state of the
50     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St51     pub fn get_mut(&mut self) -> &mut St {
52         &mut self.stream
53     }
54 
55     /// Acquires a pinned mutable reference to the underlying stream that this
56     /// combinator is pulling from.
57     ///
58     /// Note that care must be taken to avoid tampering with the state of the
59     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>60     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
61         self.stream()
62     }
63 
64     /// Consumes this combinator, returning the underlying stream.
65     ///
66     /// Note that this may discard intermediate state of this combinator, so
67     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St68     pub fn into_inner(self) -> St {
69         self.stream
70     }
71 }
72 
73 impl<St, F> FusedStream for Inspect<St, F>
74     where St: FusedStream,
75           F: FnMut(&St::Item),
76 {
is_terminated(&self) -> bool77     fn is_terminated(&self) -> bool {
78         self.stream.is_terminated()
79     }
80 }
81 
82 // used by `TryStreamExt::{inspect_ok, inspect_err}`
83 #[inline]
inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T84 pub(crate) fn inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T {
85     f(&x);
86     x
87 }
88 
89 impl<St, F> Stream for Inspect<St, F>
90     where St: Stream,
91           F: FnMut(&St::Item),
92 {
93     type Item = St::Item;
94 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>95     fn poll_next(
96         mut self: Pin<&mut Self>,
97         cx: &mut Context<'_>,
98     ) -> Poll<Option<St::Item>> {
99         self.as_mut()
100             .stream()
101             .poll_next(cx)
102             .map(|opt| opt.map(|e| inspect(e, self.as_mut().f())))
103     }
104 
size_hint(&self) -> (usize, Option<usize>)105     fn size_hint(&self) -> (usize, Option<usize>) {
106         self.stream.size_hint()
107     }
108 }
109 
110 // Forwarding impl of Sink from the underlying stream
111 #[cfg(feature = "sink")]
112 impl<S, F, Item> Sink<Item> for Inspect<S, F>
113     where S: Stream + Sink<Item>,
114           F: FnMut(&S::Item),
115 {
116     type Error = S::Error;
117 
118     delegate_sink!(stream, Item);
119 }
120