1 #![warn(rust_2018_idioms)]
2 
3 use std::sync::Arc;
4 use std::task::Poll;
5 
6 use futures::future::FutureExt;
7 use futures::stream;
8 use futures::stream::StreamExt;
9 
10 use tokio::sync::{Barrier, RwLock};
11 use tokio_test::task::spawn;
12 use tokio_test::{assert_pending, assert_ready};
13 
14 #[test]
into_inner()15 fn into_inner() {
16     let rwlock = RwLock::new(42);
17     assert_eq!(rwlock.into_inner(), 42);
18 }
19 
20 // multiple reads should be Ready
21 #[test]
read_shared()22 fn read_shared() {
23     let rwlock = RwLock::new(100);
24 
25     let mut t1 = spawn(rwlock.read());
26     let _g1 = assert_ready!(t1.poll());
27     let mut t2 = spawn(rwlock.read());
28     assert_ready!(t2.poll());
29 }
30 
31 // When there is an active shared owner, exclusive access should not be possible
32 #[test]
write_shared_pending()33 fn write_shared_pending() {
34     let rwlock = RwLock::new(100);
35     let mut t1 = spawn(rwlock.read());
36 
37     let _g1 = assert_ready!(t1.poll());
38     let mut t2 = spawn(rwlock.write());
39     assert_pending!(t2.poll());
40 }
41 
42 // When there is an active exclusive owner, subsequent exclusive access should not be possible
43 #[test]
read_exclusive_pending()44 fn read_exclusive_pending() {
45     let rwlock = RwLock::new(100);
46     let mut t1 = spawn(rwlock.write());
47 
48     let _g1 = assert_ready!(t1.poll());
49     let mut t2 = spawn(rwlock.read());
50     assert_pending!(t2.poll());
51 }
52 
53 // If the max shared access is reached and subsequent shared access is pending
54 // should be made available when one of the shared accesses is dropped
55 #[test]
exhaust_reading()56 fn exhaust_reading() {
57     let rwlock = RwLock::with_max_readers(100, 1024);
58     let mut reads = Vec::new();
59     loop {
60         let mut t = spawn(rwlock.read());
61         match t.poll() {
62             Poll::Ready(guard) => reads.push(guard),
63             Poll::Pending => break,
64         }
65     }
66 
67     let mut t1 = spawn(rwlock.read());
68     assert_pending!(t1.poll());
69     let g2 = reads.pop().unwrap();
70     drop(g2);
71     assert!(t1.is_woken());
72     assert_ready!(t1.poll());
73 }
74 
75 // When there is an active exclusive owner, subsequent exclusive access should not be possible
76 #[test]
write_exclusive_pending()77 fn write_exclusive_pending() {
78     let rwlock = RwLock::new(100);
79     let mut t1 = spawn(rwlock.write());
80 
81     let _g1 = assert_ready!(t1.poll());
82     let mut t2 = spawn(rwlock.write());
83     assert_pending!(t2.poll());
84 }
85 
86 // When there is an active shared owner, exclusive access should be possible after shared is dropped
87 #[test]
write_shared_drop()88 fn write_shared_drop() {
89     let rwlock = RwLock::new(100);
90     let mut t1 = spawn(rwlock.read());
91 
92     let g1 = assert_ready!(t1.poll());
93     let mut t2 = spawn(rwlock.write());
94     assert_pending!(t2.poll());
95     drop(g1);
96     assert!(t2.is_woken());
97     assert_ready!(t2.poll());
98 }
99 
100 // when there is an active shared owner, and exclusive access is triggered,
101 // subsequent shared access should not be possible as write gathers all the available semaphore permits
102 #[test]
write_read_shared_pending()103 fn write_read_shared_pending() {
104     let rwlock = RwLock::new(100);
105     let mut t1 = spawn(rwlock.read());
106     let _g1 = assert_ready!(t1.poll());
107 
108     let mut t2 = spawn(rwlock.read());
109     assert_ready!(t2.poll());
110 
111     let mut t3 = spawn(rwlock.write());
112     assert_pending!(t3.poll());
113 
114     let mut t4 = spawn(rwlock.read());
115     assert_pending!(t4.poll());
116 }
117 
118 // when there is an active shared owner, and exclusive access is triggered,
119 // reading should be possible after pending exclusive access is dropped
120 #[test]
write_read_shared_drop_pending()121 fn write_read_shared_drop_pending() {
122     let rwlock = RwLock::new(100);
123     let mut t1 = spawn(rwlock.read());
124     let _g1 = assert_ready!(t1.poll());
125 
126     let mut t2 = spawn(rwlock.write());
127     assert_pending!(t2.poll());
128 
129     let mut t3 = spawn(rwlock.read());
130     assert_pending!(t3.poll());
131     drop(t2);
132 
133     assert!(t3.is_woken());
134     assert_ready!(t3.poll());
135 }
136 
137 // Acquire an RwLock nonexclusively by a single task
138 #[tokio::test]
read_uncontested()139 async fn read_uncontested() {
140     let rwlock = RwLock::new(100);
141     let result = *rwlock.read().await;
142 
143     assert_eq!(result, 100);
144 }
145 
146 // Acquire an uncontested RwLock in exclusive mode
147 #[tokio::test]
write_uncontested()148 async fn write_uncontested() {
149     let rwlock = RwLock::new(100);
150     let mut result = rwlock.write().await;
151     *result += 50;
152     assert_eq!(*result, 150);
153 }
154 
155 // RwLocks should be acquired in the order that their Futures are waited upon.
156 #[tokio::test]
write_order()157 async fn write_order() {
158     let rwlock = RwLock::<Vec<u32>>::new(vec![]);
159     let fut2 = rwlock.write().map(|mut guard| guard.push(2));
160     let fut1 = rwlock.write().map(|mut guard| guard.push(1));
161     fut1.await;
162     fut2.await;
163 
164     let g = rwlock.read().await;
165     assert_eq!(*g, vec![1, 2]);
166 }
167 
168 // A single RwLock is contested by tasks in multiple threads
169 #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
multithreaded()170 async fn multithreaded() {
171     let barrier = Arc::new(Barrier::new(5));
172     let rwlock = Arc::new(RwLock::<u32>::new(0));
173     let rwclone1 = rwlock.clone();
174     let rwclone2 = rwlock.clone();
175     let rwclone3 = rwlock.clone();
176     let rwclone4 = rwlock.clone();
177 
178     let b1 = barrier.clone();
179     tokio::spawn(async move {
180         stream::iter(0..1000)
181             .for_each(move |_| {
182                 let rwlock = rwclone1.clone();
183                 async move {
184                     let mut guard = rwlock.write().await;
185                     *guard += 2;
186                 }
187             })
188             .await;
189         b1.wait().await;
190     });
191 
192     let b2 = barrier.clone();
193     tokio::spawn(async move {
194         stream::iter(0..1000)
195             .for_each(move |_| {
196                 let rwlock = rwclone2.clone();
197                 async move {
198                     let mut guard = rwlock.write().await;
199                     *guard += 3;
200                 }
201             })
202             .await;
203         b2.wait().await;
204     });
205 
206     let b3 = barrier.clone();
207     tokio::spawn(async move {
208         stream::iter(0..1000)
209             .for_each(move |_| {
210                 let rwlock = rwclone3.clone();
211                 async move {
212                     let mut guard = rwlock.write().await;
213                     *guard += 5;
214                 }
215             })
216             .await;
217         b3.wait().await;
218     });
219 
220     let b4 = barrier.clone();
221     tokio::spawn(async move {
222         stream::iter(0..1000)
223             .for_each(move |_| {
224                 let rwlock = rwclone4.clone();
225                 async move {
226                     let mut guard = rwlock.write().await;
227                     *guard += 7;
228                 }
229             })
230             .await;
231         b4.wait().await;
232     });
233 
234     barrier.wait().await;
235     let g = rwlock.read().await;
236     assert_eq!(*g, 17_000);
237 }
238 
239 #[tokio::test]
try_write()240 async fn try_write() {
241     let lock = RwLock::new(0);
242     let read_guard = lock.read().await;
243     assert!(lock.try_write().is_err());
244     drop(read_guard);
245     assert!(lock.try_write().is_ok());
246 }
247 
248 #[test]
try_read_try_write()249 fn try_read_try_write() {
250     let lock: RwLock<usize> = RwLock::new(15);
251 
252     {
253         let rg1 = lock.try_read().unwrap();
254         assert_eq!(*rg1, 15);
255 
256         assert!(lock.try_write().is_err());
257 
258         let rg2 = lock.try_read().unwrap();
259         assert_eq!(*rg2, 15)
260     }
261 
262     {
263         let mut wg = lock.try_write().unwrap();
264         *wg = 1515;
265 
266         assert!(lock.try_read().is_err())
267     }
268 
269     assert_eq!(*lock.try_read().unwrap(), 1515);
270 }
271