1 use crate::stream::Stream; 2 3 use std::borrow::Borrow; 4 use std::hash::Hash; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 /// Combine many streams into one, indexing each source stream with a unique 9 /// key. 10 /// 11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source 12 /// streams into a single merged stream that yields values in the order that 13 /// they arrive from the source streams. However, `StreamMap` has a lot more 14 /// flexibility in usage patterns. 15 /// 16 /// `StreamMap` can: 17 /// 18 /// * Merge an arbitrary number of streams. 19 /// * Track which source stream the value was received from. 20 /// * Handle inserting and removing streams from the set of managed streams at 21 /// any point during iteration. 22 /// 23 /// All source streams held by `StreamMap` are indexed using a key. This key is 24 /// included with the value when a source stream yields a value. The key is also 25 /// used to remove the stream from the `StreamMap` before the stream has 26 /// completed streaming. 27 /// 28 /// # `Unpin` 29 /// 30 /// Because the `StreamMap` API moves streams during runtime, both streams and 31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a 32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to 33 /// pin the stream in the heap. 34 /// 35 /// # Implementation 36 /// 37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this 38 /// internal implementation detail will persist in future versions, but it is 39 /// important to know the runtime implications. In general, `StreamMap` works 40 /// best with a "smallish" number of streams as all entries are scanned on 41 /// insert, remove, and polling. In cases where a large number of streams need 42 /// to be merged, it may be advisable to use tasks sending values on a shared 43 /// [`mpsc`] channel. 44 /// 45 /// [`StreamExt::merge`]: crate::stream::StreamExt::merge 46 /// [`mpsc`]: crate::sync::mpsc 47 /// [`pin!`]: macro@pin 48 /// [`Box::pin`]: std::boxed::Box::pin 49 /// 50 /// # Examples 51 /// 52 /// Merging two streams, then remove them after receiving the first value 53 /// 54 /// ``` 55 /// use tokio::stream::{StreamExt, StreamMap}; 56 /// use tokio::sync::mpsc; 57 /// 58 /// #[tokio::main] 59 /// async fn main() { 60 /// let (mut tx1, rx1) = mpsc::channel(10); 61 /// let (mut tx2, rx2) = mpsc::channel(10); 62 /// 63 /// tokio::spawn(async move { 64 /// tx1.send(1).await.unwrap(); 65 /// 66 /// // This value will never be received. The send may or may not return 67 /// // `Err` depending on if the remote end closed first or not. 68 /// let _ = tx1.send(2).await; 69 /// }); 70 /// 71 /// tokio::spawn(async move { 72 /// tx2.send(3).await.unwrap(); 73 /// let _ = tx2.send(4).await; 74 /// }); 75 /// 76 /// let mut map = StreamMap::new(); 77 /// 78 /// // Insert both streams 79 /// map.insert("one", rx1); 80 /// map.insert("two", rx2); 81 /// 82 /// // Read twice 83 /// for _ in 0..2 { 84 /// let (key, val) = map.next().await.unwrap(); 85 /// 86 /// if key == "one" { 87 /// assert_eq!(val, 1); 88 /// } else { 89 /// assert_eq!(val, 3); 90 /// } 91 /// 92 /// // Remove the stream to prevent reading the next value 93 /// map.remove(key); 94 /// } 95 /// } 96 /// ``` 97 /// 98 /// This example models a read-only client to a chat system with channels. The 99 /// client sends commands to join and leave channels. `StreamMap` is used to 100 /// manage active channel subscriptions. 101 /// 102 /// For simplicity, messages are displayed with `println!`, but they could be 103 /// sent to the client over a socket. 104 /// 105 /// ```no_run 106 /// use tokio::stream::{Stream, StreamExt, StreamMap}; 107 /// 108 /// enum Command { 109 /// Join(String), 110 /// Leave(String), 111 /// } 112 /// 113 /// fn commands() -> impl Stream<Item = Command> { 114 /// // Streams in user commands by parsing `stdin`. 115 /// # tokio::stream::pending() 116 /// } 117 /// 118 /// // Join a channel, returns a stream of messages received on the channel. 119 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { 120 /// // left as an exercise to the reader 121 /// # tokio::stream::pending() 122 /// } 123 /// 124 /// #[tokio::main] 125 /// async fn main() { 126 /// let mut channels = StreamMap::new(); 127 /// 128 /// // Input commands (join / leave channels). 129 /// let cmds = commands(); 130 /// tokio::pin!(cmds); 131 /// 132 /// loop { 133 /// tokio::select! { 134 /// Some(cmd) = cmds.next() => { 135 /// match cmd { 136 /// Command::Join(chan) => { 137 /// // Join the channel and add it to the `channels` 138 /// // stream map 139 /// let msgs = join(&chan); 140 /// channels.insert(chan, msgs); 141 /// } 142 /// Command::Leave(chan) => { 143 /// channels.remove(&chan); 144 /// } 145 /// } 146 /// } 147 /// Some((chan, msg)) = channels.next() => { 148 /// // Received a message, display it on stdout with the channel 149 /// // it originated from. 150 /// println!("{}: {}", chan, msg); 151 /// } 152 /// // Both the `commands` stream and the `channels` stream are 153 /// // complete. There is no more work to do, so leave the loop. 154 /// else => break, 155 /// } 156 /// } 157 /// } 158 /// ``` 159 #[derive(Debug, Default)] 160 pub struct StreamMap<K, V> { 161 /// Streams stored in the map 162 entries: Vec<(K, V)>, 163 } 164 165 impl<K, V> StreamMap<K, V> { 166 /// Creates an empty `StreamMap`. 167 /// 168 /// The stream map is initially created with a capacity of `0`, so it will 169 /// not allocate until it is first inserted into. 170 /// 171 /// # Examples 172 /// 173 /// ``` 174 /// use tokio::stream::{StreamMap, Pending}; 175 /// 176 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); 177 /// ``` new() -> StreamMap<K, V>178 pub fn new() -> StreamMap<K, V> { 179 StreamMap { entries: vec![] } 180 } 181 182 /// Creates an empty `StreamMap` with the specified capacity. 183 /// 184 /// The stream map will be able to hold at least `capacity` elements without 185 /// reallocating. If `capacity` is 0, the stream map will not allocate. 186 /// 187 /// # Examples 188 /// 189 /// ``` 190 /// use tokio::stream::{StreamMap, Pending}; 191 /// 192 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); 193 /// ``` with_capacity(capacity: usize) -> StreamMap<K, V>194 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { 195 StreamMap { 196 entries: Vec::with_capacity(capacity), 197 } 198 } 199 200 /// Returns an iterator visiting all keys in arbitrary order. 201 /// 202 /// The iterator element type is &'a K. 203 /// 204 /// # Examples 205 /// 206 /// ``` 207 /// use tokio::stream::{StreamMap, pending}; 208 /// 209 /// let mut map = StreamMap::new(); 210 /// 211 /// map.insert("a", pending::<i32>()); 212 /// map.insert("b", pending()); 213 /// map.insert("c", pending()); 214 /// 215 /// for key in map.keys() { 216 /// println!("{}", key); 217 /// } 218 /// ``` keys(&self) -> impl Iterator<Item = &K>219 pub fn keys(&self) -> impl Iterator<Item = &K> { 220 self.entries.iter().map(|(k, _)| k) 221 } 222 223 /// An iterator visiting all values in arbitrary order. 224 /// 225 /// The iterator element type is &'a V. 226 /// 227 /// # Examples 228 /// 229 /// ``` 230 /// use tokio::stream::{StreamMap, pending}; 231 /// 232 /// let mut map = StreamMap::new(); 233 /// 234 /// map.insert("a", pending::<i32>()); 235 /// map.insert("b", pending()); 236 /// map.insert("c", pending()); 237 /// 238 /// for stream in map.values() { 239 /// println!("{:?}", stream); 240 /// } 241 /// ``` values(&self) -> impl Iterator<Item = &V>242 pub fn values(&self) -> impl Iterator<Item = &V> { 243 self.entries.iter().map(|(_, v)| v) 244 } 245 246 /// An iterator visiting all values mutably in arbitrary order. 247 /// 248 /// The iterator element type is &'a mut V. 249 /// 250 /// # Examples 251 /// 252 /// ``` 253 /// use tokio::stream::{StreamMap, pending}; 254 /// 255 /// let mut map = StreamMap::new(); 256 /// 257 /// map.insert("a", pending::<i32>()); 258 /// map.insert("b", pending()); 259 /// map.insert("c", pending()); 260 /// 261 /// for stream in map.values_mut() { 262 /// println!("{:?}", stream); 263 /// } 264 /// ``` values_mut(&mut self) -> impl Iterator<Item = &mut V>265 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { 266 self.entries.iter_mut().map(|(_, v)| v) 267 } 268 269 /// Returns the number of streams the map can hold without reallocating. 270 /// 271 /// This number is a lower bound; the `StreamMap` might be able to hold 272 /// more, but is guaranteed to be able to hold at least this many. 273 /// 274 /// # Examples 275 /// 276 /// ``` 277 /// use tokio::stream::{StreamMap, Pending}; 278 /// 279 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); 280 /// assert!(map.capacity() >= 100); 281 /// ``` capacity(&self) -> usize282 pub fn capacity(&self) -> usize { 283 self.entries.capacity() 284 } 285 286 /// Returns the number of streams in the map. 287 /// 288 /// # Examples 289 /// 290 /// ``` 291 /// use tokio::stream::{StreamMap, pending}; 292 /// 293 /// let mut a = StreamMap::new(); 294 /// assert_eq!(a.len(), 0); 295 /// a.insert(1, pending::<i32>()); 296 /// assert_eq!(a.len(), 1); 297 /// ``` len(&self) -> usize298 pub fn len(&self) -> usize { 299 self.entries.len() 300 } 301 302 /// Returns `true` if the map contains no elements. 303 /// 304 /// # Examples 305 /// 306 /// ``` 307 /// use std::collections::HashMap; 308 /// 309 /// let mut a = HashMap::new(); 310 /// assert!(a.is_empty()); 311 /// a.insert(1, "a"); 312 /// assert!(!a.is_empty()); 313 /// ``` is_empty(&self) -> bool314 pub fn is_empty(&self) -> bool { 315 self.entries.is_empty() 316 } 317 318 /// Clears the map, removing all key-stream pairs. Keeps the allocated 319 /// memory for reuse. 320 /// 321 /// # Examples 322 /// 323 /// ``` 324 /// use tokio::stream::{StreamMap, pending}; 325 /// 326 /// let mut a = StreamMap::new(); 327 /// a.insert(1, pending::<i32>()); 328 /// a.clear(); 329 /// assert!(a.is_empty()); 330 /// ``` clear(&mut self)331 pub fn clear(&mut self) { 332 self.entries.clear(); 333 } 334 335 /// Insert a key-stream pair into the map. 336 /// 337 /// If the map did not have this key present, `None` is returned. 338 /// 339 /// If the map did have this key present, the new `stream` replaces the old 340 /// one and the old stream is returned. 341 /// 342 /// # Examples 343 /// 344 /// ``` 345 /// use tokio::stream::{StreamMap, pending}; 346 /// 347 /// let mut map = StreamMap::new(); 348 /// 349 /// assert!(map.insert(37, pending::<i32>()).is_none()); 350 /// assert!(!map.is_empty()); 351 /// 352 /// map.insert(37, pending()); 353 /// assert!(map.insert(37, pending()).is_some()); 354 /// ``` insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,355 pub fn insert(&mut self, k: K, stream: V) -> Option<V> 356 where 357 K: Hash + Eq, 358 { 359 let ret = self.remove(&k); 360 self.entries.push((k, stream)); 361 362 ret 363 } 364 365 /// Removes a key from the map, returning the stream at the key if the key was previously in the map. 366 /// 367 /// The key may be any borrowed form of the map's key type, but `Hash` and 368 /// `Eq` on the borrowed form must match those for the key type. 369 /// 370 /// # Examples 371 /// 372 /// ``` 373 /// use tokio::stream::{StreamMap, pending}; 374 /// 375 /// let mut map = StreamMap::new(); 376 /// map.insert(1, pending::<i32>()); 377 /// assert!(map.remove(&1).is_some()); 378 /// assert!(map.remove(&1).is_none()); 379 /// ``` remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,380 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> 381 where 382 K: Borrow<Q>, 383 Q: Hash + Eq, 384 { 385 for i in 0..self.entries.len() { 386 if self.entries[i].0.borrow() == k { 387 return Some(self.entries.swap_remove(i).1); 388 } 389 } 390 391 None 392 } 393 394 /// Returns `true` if the map contains a stream for the specified key. 395 /// 396 /// The key may be any borrowed form of the map's key type, but `Hash` and 397 /// `Eq` on the borrowed form must match those for the key type. 398 /// 399 /// # Examples 400 /// 401 /// ``` 402 /// use tokio::stream::{StreamMap, pending}; 403 /// 404 /// let mut map = StreamMap::new(); 405 /// map.insert(1, pending::<i32>()); 406 /// assert_eq!(map.contains_key(&1), true); 407 /// assert_eq!(map.contains_key(&2), false); 408 /// ``` contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,409 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool 410 where 411 K: Borrow<Q>, 412 Q: Hash + Eq, 413 { 414 for i in 0..self.entries.len() { 415 if self.entries[i].0.borrow() == k { 416 return true; 417 } 418 } 419 420 false 421 } 422 } 423 424 impl<K, V> StreamMap<K, V> 425 where 426 K: Unpin, 427 V: Stream + Unpin, 428 { 429 /// Polls the next value, includes the vec entry index poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>430 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { 431 use Poll::*; 432 433 let start = crate::util::thread_rng_n(self.entries.len() as u32) as usize; 434 let mut idx = start; 435 436 for _ in 0..self.entries.len() { 437 let (_, stream) = &mut self.entries[idx]; 438 439 match Pin::new(stream).poll_next(cx) { 440 Ready(Some(val)) => return Ready(Some((idx, val))), 441 Ready(None) => { 442 // Remove the entry 443 self.entries.swap_remove(idx); 444 445 // Check if this was the last entry, if so the cursor needs 446 // to wrap 447 if idx == self.entries.len() { 448 idx = 0; 449 } else if idx < start && start <= self.entries.len() { 450 // The stream being swapped into the current index has 451 // already been polled, so skip it. 452 idx = idx.wrapping_add(1) % self.entries.len(); 453 } 454 } 455 Pending => { 456 idx = idx.wrapping_add(1) % self.entries.len(); 457 } 458 } 459 } 460 461 // If the map is empty, then the stream is complete. 462 if self.entries.is_empty() { 463 Ready(None) 464 } else { 465 Pending 466 } 467 } 468 } 469 470 impl<K, V> Stream for StreamMap<K, V> 471 where 472 K: Clone + Unpin, 473 V: Stream + Unpin, 474 { 475 type Item = (K, V::Item); 476 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>477 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 478 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { 479 let key = self.entries[idx].0.clone(); 480 Poll::Ready(Some((key, val))) 481 } else { 482 Poll::Ready(None) 483 } 484 } 485 size_hint(&self) -> (usize, Option<usize>)486 fn size_hint(&self) -> (usize, Option<usize>) { 487 let mut ret = (0, Some(0)); 488 489 for (_, stream) in &self.entries { 490 let hint = stream.size_hint(); 491 492 ret.0 += hint.0; 493 494 match (ret.1, hint.1) { 495 (Some(a), Some(b)) => ret.1 = Some(a + b), 496 (Some(_), None) => ret.1 = None, 497 _ => {} 498 } 499 } 500 501 ret 502 } 503 } 504