1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_utils::{unsafe_pinned, unsafe_unpinned};
8
9 /// Stream for the [`inspect`](super::StreamExt::inspect) method.
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Inspect<St, F> {
12 stream: St,
13 f: F,
14 }
15
16 impl<St: Unpin, F> Unpin for Inspect<St, F> {}
17
18 impl<St, F> fmt::Debug for Inspect<St, F>
19 where
20 St: fmt::Debug,
21 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 f.debug_struct("Inspect")
24 .field("stream", &self.stream)
25 .finish()
26 }
27 }
28
29 impl<St, F> Inspect<St, F>
30 where St: Stream,
31 F: FnMut(&St::Item),
32 {
33 unsafe_pinned!(stream: St);
34 unsafe_unpinned!(f: F);
35
new(stream: St, f: F) -> Inspect<St, F>36 pub(super) fn new(stream: St, f: F) -> Inspect<St, F> {
37 Inspect { stream, f }
38 }
39
40 /// Acquires a reference to the underlying stream that this combinator is
41 /// pulling from.
get_ref(&self) -> &St42 pub fn get_ref(&self) -> &St {
43 &self.stream
44 }
45
46 /// Acquires a mutable reference to the underlying stream that this
47 /// combinator is pulling from.
48 ///
49 /// Note that care must be taken to avoid tampering with the state of the
50 /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St51 pub fn get_mut(&mut self) -> &mut St {
52 &mut self.stream
53 }
54
55 /// Acquires a pinned mutable reference to the underlying stream that this
56 /// combinator is pulling from.
57 ///
58 /// Note that care must be taken to avoid tampering with the state of the
59 /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>60 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
61 self.stream()
62 }
63
64 /// Consumes this combinator, returning the underlying stream.
65 ///
66 /// Note that this may discard intermediate state of this combinator, so
67 /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St68 pub fn into_inner(self) -> St {
69 self.stream
70 }
71 }
72
73 impl<St, F> FusedStream for Inspect<St, F>
74 where St: FusedStream,
75 F: FnMut(&St::Item),
76 {
is_terminated(&self) -> bool77 fn is_terminated(&self) -> bool {
78 self.stream.is_terminated()
79 }
80 }
81
82 // used by `TryStreamExt::{inspect_ok, inspect_err}`
83 #[inline]
inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T84 pub(crate) fn inspect<T, F: FnMut(&T)>(x: T, mut f: F) -> T {
85 f(&x);
86 x
87 }
88
89 impl<St, F> Stream for Inspect<St, F>
90 where St: Stream,
91 F: FnMut(&St::Item),
92 {
93 type Item = St::Item;
94
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<St::Item>>95 fn poll_next(
96 mut self: Pin<&mut Self>,
97 cx: &mut Context<'_>,
98 ) -> Poll<Option<St::Item>> {
99 self.as_mut()
100 .stream()
101 .poll_next(cx)
102 .map(|opt| opt.map(|e| inspect(e, self.as_mut().f())))
103 }
104
size_hint(&self) -> (usize, Option<usize>)105 fn size_hint(&self) -> (usize, Option<usize>) {
106 self.stream.size_hint()
107 }
108 }
109
110 // Forwarding impl of Sink from the underlying stream
111 #[cfg(feature = "sink")]
112 impl<S, F, Item> Sink<Item> for Inspect<S, F>
113 where S: Stream + Sink<Item>,
114 F: FnMut(&S::Item),
115 {
116 type Error = S::Error;
117
118 delegate_sink!(stream, Item);
119 }
120