1 use crate::sync::broadcast; 2 use crate::sync::broadcast::error::RecvError::{Closed, Lagged}; 3 4 use loom::future::block_on; 5 use loom::sync::Arc; 6 use loom::thread; 7 use tokio_test::{assert_err, assert_ok}; 8 9 #[test] 10 fn broadcast_send() { 11 loom::model(|| { 12 let (tx1, mut rx) = broadcast::channel(2); 13 let tx1 = Arc::new(tx1); 14 let tx2 = tx1.clone(); 15 16 let th1 = thread::spawn(move || { 17 block_on(async { 18 assert_ok!(tx1.send("one")); 19 assert_ok!(tx1.send("two")); 20 assert_ok!(tx1.send("three")); 21 }); 22 }); 23 24 let th2 = thread::spawn(move || { 25 block_on(async { 26 assert_ok!(tx2.send("eins")); 27 assert_ok!(tx2.send("zwei")); 28 assert_ok!(tx2.send("drei")); 29 }); 30 }); 31 32 block_on(async { 33 let mut num = 0; 34 loop { 35 match rx.recv().await { 36 Ok(_) => num += 1, 37 Err(Closed) => break, 38 Err(Lagged(n)) => num += n as usize, 39 } 40 } 41 assert_eq!(num, 6); 42 }); 43 44 assert_ok!(th1.join()); 45 assert_ok!(th2.join()); 46 }); 47 } 48 49 // An `Arc` is used as the value in order to detect memory leaks. 50 #[test] 51 fn broadcast_two() { 52 loom::model(|| { 53 let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16); 54 let mut rx2 = tx.subscribe(); 55 56 let th1 = thread::spawn(move || { 57 block_on(async { 58 let v = assert_ok!(rx1.recv().await); 59 assert_eq!(*v, "hello"); 60 61 let v = assert_ok!(rx1.recv().await); 62 assert_eq!(*v, "world"); 63 64 match assert_err!(rx1.recv().await) { 65 Closed => {} 66 _ => panic!(), 67 } 68 }); 69 }); 70 71 let th2 = thread::spawn(move || { 72 block_on(async { 73 let v = assert_ok!(rx2.recv().await); 74 assert_eq!(*v, "hello"); 75 76 let v = assert_ok!(rx2.recv().await); 77 assert_eq!(*v, "world"); 78 79 match assert_err!(rx2.recv().await) { 80 Closed => {} 81 _ => panic!(), 82 } 83 }); 84 }); 85 86 assert_ok!(tx.send(Arc::new("hello"))); 87 assert_ok!(tx.send(Arc::new("world"))); 88 drop(tx); 89 90 assert_ok!(th1.join()); 91 assert_ok!(th2.join()); 92 }); 93 } 94 95 #[test] 96 fn broadcast_wrap() { 97 loom::model(|| { 98 let (tx, mut rx1) = broadcast::channel(2); 99 let mut rx2 = tx.subscribe(); 100 101 let th1 = thread::spawn(move || { 102 block_on(async { 103 let mut num = 0; 104 105 loop { 106 match rx1.recv().await { 107 Ok(_) => num += 1, 108 Err(Closed) => break, 109 Err(Lagged(n)) => num += n as usize, 110 } 111 } 112 113 assert_eq!(num, 3); 114 }); 115 }); 116 117 let th2 = thread::spawn(move || { 118 block_on(async { 119 let mut num = 0; 120 121 loop { 122 match rx2.recv().await { 123 Ok(_) => num += 1, 124 Err(Closed) => break, 125 Err(Lagged(n)) => num += n as usize, 126 } 127 } 128 129 assert_eq!(num, 3); 130 }); 131 }); 132 133 assert_ok!(tx.send("one")); 134 assert_ok!(tx.send("two")); 135 assert_ok!(tx.send("three")); 136 137 drop(tx); 138 139 assert_ok!(th1.join()); 140 assert_ok!(th2.join()); 141 }); 142 } 143 144 #[test] 145 fn drop_rx() { 146 loom::model(|| { 147 let (tx, mut rx1) = broadcast::channel(16); 148 let rx2 = tx.subscribe(); 149 150 let th1 = thread::spawn(move || { 151 block_on(async { 152 let v = assert_ok!(rx1.recv().await); 153 assert_eq!(v, "one"); 154 155 let v = assert_ok!(rx1.recv().await); 156 assert_eq!(v, "two"); 157 158 let v = assert_ok!(rx1.recv().await); 159 assert_eq!(v, "three"); 160 161 match assert_err!(rx1.recv().await) { 162 Closed => {} 163 _ => panic!(), 164 } 165 }); 166 }); 167 168 let th2 = thread::spawn(move || { 169 drop(rx2); 170 }); 171 172 assert_ok!(tx.send("one")); 173 assert_ok!(tx.send("two")); 174 assert_ok!(tx.send("three")); 175 drop(tx); 176 177 assert_ok!(th1.join()); 178 assert_ok!(th2.join()); 179 }); 180 } 181 182 #[test] 183 fn drop_multiple_rx_with_overflow() { 184 loom::model(move || { 185 // It is essential to have multiple senders and receivers in this test case. 186 let (tx, mut rx) = broadcast::channel(1); 187 let _rx2 = tx.subscribe(); 188 189 let _ = tx.send(()); 190 let tx2 = tx.clone(); 191 let th1 = thread::spawn(move || { 192 block_on(async { 193 for _ in 0..100 { 194 let _ = tx2.send(()); 195 } 196 }); 197 }); 198 let _ = tx.send(()); 199 200 let th2 = thread::spawn(move || { 201 block_on(async { while let Ok(_) = rx.recv().await {} }); 202 }); 203 204 assert_ok!(th1.join()); 205 assert_ok!(th2.join()); 206 }); 207 } 208