1 use crate::stream::Stream;
2 
3 use std::borrow::Borrow;
4 use std::hash::Hash;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 
8 /// Combine many streams into one, indexing each source stream with a unique
9 /// key.
10 ///
11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12 /// streams into a single merged stream that yields values in the order that
13 /// they arrive from the source streams. However, `StreamMap` has a lot more
14 /// flexibility in usage patterns.
15 ///
16 /// `StreamMap` can:
17 ///
18 /// * Merge an arbitrary number of streams.
19 /// * Track which source stream the value was received from.
20 /// * Handle inserting and removing streams from the set of managed streams at
21 ///   any point during iteration.
22 ///
23 /// All source streams held by `StreamMap` are indexed using a key. This key is
24 /// included with the value when a source stream yields a value. The key is also
25 /// used to remove the stream from the `StreamMap` before the stream has
26 /// completed streaming.
27 ///
28 /// # `Unpin`
29 ///
30 /// Because the `StreamMap` API moves streams during runtime, both streams and
31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33 /// pin the stream in the heap.
34 ///
35 /// # Implementation
36 ///
37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
38 /// internal implementation detail will persist in future versions, but it is
39 /// important to know the runtime implications. In general, `StreamMap` works
40 /// best with a "smallish" number of streams as all entries are scanned on
41 /// insert, remove, and polling. In cases where a large number of streams need
42 /// to be merged, it may be advisable to use tasks sending values on a shared
43 /// [`mpsc`] channel.
44 ///
45 /// [`StreamExt::merge`]: crate::stream::StreamExt::merge
46 /// [`mpsc`]: crate::sync::mpsc
47 /// [`pin!`]: macro@pin
48 /// [`Box::pin`]: std::boxed::Box::pin
49 ///
50 /// # Examples
51 ///
52 /// Merging two streams, then remove them after receiving the first value
53 ///
54 /// ```
55 /// use tokio::stream::{StreamExt, StreamMap};
56 /// use tokio::sync::mpsc;
57 ///
58 /// #[tokio::main]
59 /// async fn main() {
60 ///     let (mut tx1, rx1) = mpsc::channel(10);
61 ///     let (mut tx2, rx2) = mpsc::channel(10);
62 ///
63 ///     tokio::spawn(async move {
64 ///         tx1.send(1).await.unwrap();
65 ///
66 ///         // This value will never be received. The send may or may not return
67 ///         // `Err` depending on if the remote end closed first or not.
68 ///         let _ = tx1.send(2).await;
69 ///     });
70 ///
71 ///     tokio::spawn(async move {
72 ///         tx2.send(3).await.unwrap();
73 ///         let _ = tx2.send(4).await;
74 ///     });
75 ///
76 ///     let mut map = StreamMap::new();
77 ///
78 ///     // Insert both streams
79 ///     map.insert("one", rx1);
80 ///     map.insert("two", rx2);
81 ///
82 ///     // Read twice
83 ///     for _ in 0..2 {
84 ///         let (key, val) = map.next().await.unwrap();
85 ///
86 ///         if key == "one" {
87 ///             assert_eq!(val, 1);
88 ///         } else {
89 ///             assert_eq!(val, 3);
90 ///         }
91 ///
92 ///         // Remove the stream to prevent reading the next value
93 ///         map.remove(key);
94 ///     }
95 /// }
96 /// ```
97 ///
98 /// This example models a read-only client to a chat system with channels. The
99 /// client sends commands to join and leave channels. `StreamMap` is used to
100 /// manage active channel subscriptions.
101 ///
102 /// For simplicity, messages are displayed with `println!`, but they could be
103 /// sent to the client over a socket.
104 ///
105 /// ```no_run
106 /// use tokio::stream::{Stream, StreamExt, StreamMap};
107 ///
108 /// enum Command {
109 ///     Join(String),
110 ///     Leave(String),
111 /// }
112 ///
113 /// fn commands() -> impl Stream<Item = Command> {
114 ///     // Streams in user commands by parsing `stdin`.
115 /// # tokio::stream::pending()
116 /// }
117 ///
118 /// // Join a channel, returns a stream of messages received on the channel.
119 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
120 ///     // left as an exercise to the reader
121 /// # tokio::stream::pending()
122 /// }
123 ///
124 /// #[tokio::main]
125 /// async fn main() {
126 ///     let mut channels = StreamMap::new();
127 ///
128 ///     // Input commands (join / leave channels).
129 ///     let cmds = commands();
130 ///     tokio::pin!(cmds);
131 ///
132 ///     loop {
133 ///         tokio::select! {
134 ///             Some(cmd) = cmds.next() => {
135 ///                 match cmd {
136 ///                     Command::Join(chan) => {
137 ///                         // Join the channel and add it to the `channels`
138 ///                         // stream map
139 ///                         let msgs = join(&chan);
140 ///                         channels.insert(chan, msgs);
141 ///                     }
142 ///                     Command::Leave(chan) => {
143 ///                         channels.remove(&chan);
144 ///                     }
145 ///                 }
146 ///             }
147 ///             Some((chan, msg)) = channels.next() => {
148 ///                 // Received a message, display it on stdout with the channel
149 ///                 // it originated from.
150 ///                 println!("{}: {}", chan, msg);
151 ///             }
152 ///             // Both the `commands` stream and the `channels` stream are
153 ///             // complete. There is no more work to do, so leave the loop.
154 ///             else => break,
155 ///         }
156 ///     }
157 /// }
158 /// ```
159 #[derive(Debug, Default)]
160 pub struct StreamMap<K, V> {
161     /// Streams stored in the map
162     entries: Vec<(K, V)>,
163 }
164 
165 impl<K, V> StreamMap<K, V> {
166     /// Creates an empty `StreamMap`.
167     ///
168     /// The stream map is initially created with a capacity of `0`, so it will
169     /// not allocate until it is first inserted into.
170     ///
171     /// # Examples
172     ///
173     /// ```
174     /// use tokio::stream::{StreamMap, Pending};
175     ///
176     /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
177     /// ```
new() -> StreamMap<K, V>178     pub fn new() -> StreamMap<K, V> {
179         StreamMap { entries: vec![] }
180     }
181 
182     /// Creates an empty `StreamMap` with the specified capacity.
183     ///
184     /// The stream map will be able to hold at least `capacity` elements without
185     /// reallocating. If `capacity` is 0, the stream map will not allocate.
186     ///
187     /// # Examples
188     ///
189     /// ```
190     /// use tokio::stream::{StreamMap, Pending};
191     ///
192     /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
193     /// ```
with_capacity(capacity: usize) -> StreamMap<K, V>194     pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
195         StreamMap {
196             entries: Vec::with_capacity(capacity),
197         }
198     }
199 
200     /// Returns an iterator visiting all keys in arbitrary order.
201     ///
202     /// The iterator element type is &'a K.
203     ///
204     /// # Examples
205     ///
206     /// ```
207     /// use tokio::stream::{StreamMap, pending};
208     ///
209     /// let mut map = StreamMap::new();
210     ///
211     /// map.insert("a", pending::<i32>());
212     /// map.insert("b", pending());
213     /// map.insert("c", pending());
214     ///
215     /// for key in map.keys() {
216     ///     println!("{}", key);
217     /// }
218     /// ```
keys(&self) -> impl Iterator<Item = &K>219     pub fn keys(&self) -> impl Iterator<Item = &K> {
220         self.entries.iter().map(|(k, _)| k)
221     }
222 
223     /// An iterator visiting all values in arbitrary order.
224     ///
225     /// The iterator element type is &'a V.
226     ///
227     /// # Examples
228     ///
229     /// ```
230     /// use tokio::stream::{StreamMap, pending};
231     ///
232     /// let mut map = StreamMap::new();
233     ///
234     /// map.insert("a", pending::<i32>());
235     /// map.insert("b", pending());
236     /// map.insert("c", pending());
237     ///
238     /// for stream in map.values() {
239     ///     println!("{:?}", stream);
240     /// }
241     /// ```
values(&self) -> impl Iterator<Item = &V>242     pub fn values(&self) -> impl Iterator<Item = &V> {
243         self.entries.iter().map(|(_, v)| v)
244     }
245 
246     /// An iterator visiting all values mutably in arbitrary order.
247     ///
248     /// The iterator element type is &'a mut V.
249     ///
250     /// # Examples
251     ///
252     /// ```
253     /// use tokio::stream::{StreamMap, pending};
254     ///
255     /// let mut map = StreamMap::new();
256     ///
257     /// map.insert("a", pending::<i32>());
258     /// map.insert("b", pending());
259     /// map.insert("c", pending());
260     ///
261     /// for stream in map.values_mut() {
262     ///     println!("{:?}", stream);
263     /// }
264     /// ```
values_mut(&mut self) -> impl Iterator<Item = &mut V>265     pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
266         self.entries.iter_mut().map(|(_, v)| v)
267     }
268 
269     /// Returns the number of streams the map can hold without reallocating.
270     ///
271     /// This number is a lower bound; the `StreamMap` might be able to hold
272     /// more, but is guaranteed to be able to hold at least this many.
273     ///
274     /// # Examples
275     ///
276     /// ```
277     /// use tokio::stream::{StreamMap, Pending};
278     ///
279     /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
280     /// assert!(map.capacity() >= 100);
281     /// ```
capacity(&self) -> usize282     pub fn capacity(&self) -> usize {
283         self.entries.capacity()
284     }
285 
286     /// Returns the number of streams in the map.
287     ///
288     /// # Examples
289     ///
290     /// ```
291     /// use tokio::stream::{StreamMap, pending};
292     ///
293     /// let mut a = StreamMap::new();
294     /// assert_eq!(a.len(), 0);
295     /// a.insert(1, pending::<i32>());
296     /// assert_eq!(a.len(), 1);
297     /// ```
len(&self) -> usize298     pub fn len(&self) -> usize {
299         self.entries.len()
300     }
301 
302     /// Returns `true` if the map contains no elements.
303     ///
304     /// # Examples
305     ///
306     /// ```
307     /// use std::collections::HashMap;
308     ///
309     /// let mut a = HashMap::new();
310     /// assert!(a.is_empty());
311     /// a.insert(1, "a");
312     /// assert!(!a.is_empty());
313     /// ```
is_empty(&self) -> bool314     pub fn is_empty(&self) -> bool {
315         self.entries.is_empty()
316     }
317 
318     /// Clears the map, removing all key-stream pairs. Keeps the allocated
319     /// memory for reuse.
320     ///
321     /// # Examples
322     ///
323     /// ```
324     /// use tokio::stream::{StreamMap, pending};
325     ///
326     /// let mut a = StreamMap::new();
327     /// a.insert(1, pending::<i32>());
328     /// a.clear();
329     /// assert!(a.is_empty());
330     /// ```
clear(&mut self)331     pub fn clear(&mut self) {
332         self.entries.clear();
333     }
334 
335     /// Insert a key-stream pair into the map.
336     ///
337     /// If the map did not have this key present, `None` is returned.
338     ///
339     /// If the map did have this key present, the new `stream` replaces the old
340     /// one and the old stream is returned.
341     ///
342     /// # Examples
343     ///
344     /// ```
345     /// use tokio::stream::{StreamMap, pending};
346     ///
347     /// let mut map = StreamMap::new();
348     ///
349     /// assert!(map.insert(37, pending::<i32>()).is_none());
350     /// assert!(!map.is_empty());
351     ///
352     /// map.insert(37, pending());
353     /// assert!(map.insert(37, pending()).is_some());
354     /// ```
insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,355     pub fn insert(&mut self, k: K, stream: V) -> Option<V>
356     where
357         K: Hash + Eq,
358     {
359         let ret = self.remove(&k);
360         self.entries.push((k, stream));
361 
362         ret
363     }
364 
365     /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
366     ///
367     /// The key may be any borrowed form of the map's key type, but `Hash` and
368     /// `Eq` on the borrowed form must match those for the key type.
369     ///
370     /// # Examples
371     ///
372     /// ```
373     /// use tokio::stream::{StreamMap, pending};
374     ///
375     /// let mut map = StreamMap::new();
376     /// map.insert(1, pending::<i32>());
377     /// assert!(map.remove(&1).is_some());
378     /// assert!(map.remove(&1).is_none());
379     /// ```
remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,380     pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
381     where
382         K: Borrow<Q>,
383         Q: Hash + Eq,
384     {
385         for i in 0..self.entries.len() {
386             if self.entries[i].0.borrow() == k {
387                 return Some(self.entries.swap_remove(i).1);
388             }
389         }
390 
391         None
392     }
393 
394     /// Returns `true` if the map contains a stream for the specified key.
395     ///
396     /// The key may be any borrowed form of the map's key type, but `Hash` and
397     /// `Eq` on the borrowed form must match those for the key type.
398     ///
399     /// # Examples
400     ///
401     /// ```
402     /// use tokio::stream::{StreamMap, pending};
403     ///
404     /// let mut map = StreamMap::new();
405     /// map.insert(1, pending::<i32>());
406     /// assert_eq!(map.contains_key(&1), true);
407     /// assert_eq!(map.contains_key(&2), false);
408     /// ```
contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,409     pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
410     where
411         K: Borrow<Q>,
412         Q: Hash + Eq,
413     {
414         for i in 0..self.entries.len() {
415             if self.entries[i].0.borrow() == k {
416                 return true;
417             }
418         }
419 
420         false
421     }
422 }
423 
424 impl<K, V> StreamMap<K, V>
425 where
426     K: Unpin,
427     V: Stream + Unpin,
428 {
429     /// Polls the next value, includes the vec entry index
poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>430     fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
431         use Poll::*;
432 
433         let start = crate::util::thread_rng_n(self.entries.len() as u32) as usize;
434         let mut idx = start;
435 
436         for _ in 0..self.entries.len() {
437             let (_, stream) = &mut self.entries[idx];
438 
439             match Pin::new(stream).poll_next(cx) {
440                 Ready(Some(val)) => return Ready(Some((idx, val))),
441                 Ready(None) => {
442                     // Remove the entry
443                     self.entries.swap_remove(idx);
444 
445                     // Check if this was the last entry, if so the cursor needs
446                     // to wrap
447                     if idx == self.entries.len() {
448                         idx = 0;
449                     } else if idx < start && start <= self.entries.len() {
450                         // The stream being swapped into the current index has
451                         // already been polled, so skip it.
452                         idx = idx.wrapping_add(1) % self.entries.len();
453                     }
454                 }
455                 Pending => {
456                     idx = idx.wrapping_add(1) % self.entries.len();
457                 }
458             }
459         }
460 
461         // If the map is empty, then the stream is complete.
462         if self.entries.is_empty() {
463             Ready(None)
464         } else {
465             Pending
466         }
467     }
468 }
469 
470 impl<K, V> Stream for StreamMap<K, V>
471 where
472     K: Clone + Unpin,
473     V: Stream + Unpin,
474 {
475     type Item = (K, V::Item);
476 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>477     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
478         if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
479             let key = self.entries[idx].0.clone();
480             Poll::Ready(Some((key, val)))
481         } else {
482             Poll::Ready(None)
483         }
484     }
485 
size_hint(&self) -> (usize, Option<usize>)486     fn size_hint(&self) -> (usize, Option<usize>) {
487         let mut ret = (0, Some(0));
488 
489         for (_, stream) in &self.entries {
490             let hint = stream.size_hint();
491 
492             ret.0 += hint.0;
493 
494             match (ret.1, hint.1) {
495                 (Some(a), Some(b)) => ret.1 = Some(a + b),
496                 (Some(_), None) => ret.1 = None,
497                 _ => {}
498             }
499         }
500 
501         ret
502     }
503 }
504