1 use futures::future::{self, FutureExt, TryFutureExt}; 2 use futures_test::future::FutureTestExt; 3 use std::sync::mpsc; 4 5 #[test] basic_future_combinators()6fn basic_future_combinators() { 7 let (tx1, rx) = mpsc::channel(); 8 let tx2 = tx1.clone(); 9 let tx3 = tx1.clone(); 10 11 let fut = future::ready(1) 12 .then(move |x| { 13 tx1.send(x).unwrap(); // Send 1 14 tx1.send(2).unwrap(); // Send 2 15 future::ready(3) 16 }) 17 .map(move |x| { 18 tx2.send(x).unwrap(); // Send 3 19 tx2.send(4).unwrap(); // Send 4 20 5 21 }) 22 .map(move |x| { 23 tx3.send(x).unwrap(); // Send 5 24 }); 25 26 assert!(rx.try_recv().is_err()); // Not started yet 27 fut.run_in_background(); // Start it 28 for i in 1..=5 { 29 assert_eq!(rx.recv(), Ok(i)); 30 } // Check it 31 assert!(rx.recv().is_err()); // Should be done 32 } 33 34 #[test] basic_try_future_combinators()35fn basic_try_future_combinators() { 36 let (tx1, rx) = mpsc::channel(); 37 let tx2 = tx1.clone(); 38 let tx3 = tx1.clone(); 39 let tx4 = tx1.clone(); 40 let tx5 = tx1.clone(); 41 let tx6 = tx1.clone(); 42 let tx7 = tx1.clone(); 43 let tx8 = tx1.clone(); 44 let tx9 = tx1.clone(); 45 let tx10 = tx1.clone(); 46 47 let fut = future::ready(Ok(1)) 48 .and_then(move |x: i32| { 49 tx1.send(x).unwrap(); // Send 1 50 tx1.send(2).unwrap(); // Send 2 51 future::ready(Ok(3)) 52 }) 53 .or_else(move |x: i32| { 54 tx2.send(x).unwrap(); // Should not run 55 tx2.send(-1).unwrap(); 56 future::ready(Ok(-1)) 57 }) 58 .map_ok(move |x: i32| { 59 tx3.send(x).unwrap(); // Send 3 60 tx3.send(4).unwrap(); // Send 4 61 5 62 }) 63 .map_err(move |x: i32| { 64 tx4.send(x).unwrap(); // Should not run 65 tx4.send(-1).unwrap(); 66 -1 67 }) 68 .map(move |x: Result<i32, i32>| { 69 tx5.send(x.unwrap()).unwrap(); // Send 5 70 tx5.send(6).unwrap(); // Send 6 71 Err(7) // Now return errors! 72 }) 73 .and_then(move |x: i32| { 74 tx6.send(x).unwrap(); // Should not run 75 tx6.send(-1).unwrap(); 76 future::ready(Err(-1)) 77 }) 78 .or_else(move |x: i32| { 79 tx7.send(x).unwrap(); // Send 7 80 tx7.send(8).unwrap(); // Send 8 81 future::ready(Err(9)) 82 }) 83 .map_ok(move |x: i32| { 84 tx8.send(x).unwrap(); // Should not run 85 tx8.send(-1).unwrap(); 86 -1 87 }) 88 .map_err(move |x: i32| { 89 tx9.send(x).unwrap(); // Send 9 90 tx9.send(10).unwrap(); // Send 10 91 11 92 }) 93 .map(move |x: Result<i32, i32>| { 94 tx10.send(x.err().unwrap()).unwrap(); // Send 11 95 tx10.send(12).unwrap(); // Send 12 96 }); 97 98 assert!(rx.try_recv().is_err()); // Not started yet 99 fut.run_in_background(); // Start it 100 for i in 1..=12 { 101 assert_eq!(rx.recv(), Ok(i)); 102 } // Check it 103 assert!(rx.recv().is_err()); // Should be done 104 } 105