1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3 
4 use std::sync::Arc;
5 use std::thread::sleep;
6 use tokio::time::Duration;
7 
8 use tokio::runtime::Builder;
9 
10 struct PanicOnDrop;
11 
12 impl Drop for PanicOnDrop {
drop(&mut self)13     fn drop(&mut self) {
14         panic!("Well what did you expect would happen...");
15     }
16 }
17 
18 /// Checks that a suspended task can be aborted without panicking as reported in
19 /// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
20 #[test]
test_abort_without_panic_3157()21 fn test_abort_without_panic_3157() {
22     let rt = Builder::new_multi_thread()
23         .enable_time()
24         .worker_threads(1)
25         .build()
26         .unwrap();
27 
28     rt.block_on(async move {
29         let handle = tokio::spawn(async move {
30             println!("task started");
31             tokio::time::sleep(Duration::new(100, 0)).await
32         });
33 
34         // wait for task to sleep.
35         tokio::time::sleep(Duration::from_millis(10)).await;
36 
37         handle.abort();
38         let _ = handle.await;
39     });
40 }
41 
42 /// Checks that a suspended task can be aborted inside of a current_thread
43 /// executor without panicking as reported in issue #3662:
44 /// <https://github.com/tokio-rs/tokio/issues/3662>.
45 #[test]
test_abort_without_panic_3662()46 fn test_abort_without_panic_3662() {
47     use std::sync::atomic::{AtomicBool, Ordering};
48     use std::sync::Arc;
49 
50     struct DropCheck(Arc<AtomicBool>);
51 
52     impl Drop for DropCheck {
53         fn drop(&mut self) {
54             self.0.store(true, Ordering::SeqCst);
55         }
56     }
57 
58     let rt = Builder::new_current_thread().build().unwrap();
59 
60     rt.block_on(async move {
61         let drop_flag = Arc::new(AtomicBool::new(false));
62         let drop_check = DropCheck(drop_flag.clone());
63 
64         let j = tokio::spawn(async move {
65             // NB: just grab the drop check here so that it becomes part of the
66             // task.
67             let _drop_check = drop_check;
68             futures::future::pending::<()>().await;
69         });
70 
71         let drop_flag2 = drop_flag.clone();
72 
73         let task = std::thread::spawn(move || {
74             // This runs in a separate thread so it doesn't have immediate
75             // thread-local access to the executor. It does however transition
76             // the underlying task to be completed, which will cause it to be
77             // dropped (but not in this thread).
78             assert!(!drop_flag2.load(Ordering::SeqCst));
79             j.abort();
80             j
81         })
82         .join()
83         .unwrap();
84 
85         let result = task.await;
86         assert!(drop_flag.load(Ordering::SeqCst));
87         assert!(result.unwrap_err().is_cancelled());
88 
89         // Note: We do the following to trigger a deferred task cleanup.
90         //
91         // The relevant piece of code you want to look at is in:
92         // `Inner::block_on` of `basic_scheduler.rs`.
93         //
94         // We cause the cleanup to happen by having a poll return Pending once
95         // so that the scheduler can go into the "auxiliary tasks" mode, at
96         // which point the task is removed from the scheduler.
97         let i = tokio::spawn(async move {
98             tokio::task::yield_now().await;
99         });
100 
101         i.await.unwrap();
102     });
103 }
104 
105 /// Checks that a suspended LocalSet task can be aborted from a remote thread
106 /// without panicking and without running the tasks destructor on the wrong thread.
107 /// <https://github.com/tokio-rs/tokio/issues/3929>
108 #[test]
remote_abort_local_set_3929()109 fn remote_abort_local_set_3929() {
110     struct DropCheck {
111         created_on: std::thread::ThreadId,
112         not_send: std::marker::PhantomData<*const ()>,
113     }
114 
115     impl DropCheck {
116         fn new() -> Self {
117             Self {
118                 created_on: std::thread::current().id(),
119                 not_send: std::marker::PhantomData,
120             }
121         }
122     }
123     impl Drop for DropCheck {
124         fn drop(&mut self) {
125             if std::thread::current().id() != self.created_on {
126                 panic!("non-Send value dropped in another thread!");
127             }
128         }
129     }
130 
131     let rt = Builder::new_current_thread().build().unwrap();
132     let local = tokio::task::LocalSet::new();
133 
134     let check = DropCheck::new();
135     let jh = local.spawn_local(async move {
136         futures::future::pending::<()>().await;
137         drop(check);
138     });
139 
140     let jh2 = std::thread::spawn(move || {
141         sleep(Duration::from_millis(10));
142         jh.abort();
143     });
144 
145     rt.block_on(local);
146     jh2.join().unwrap();
147 }
148 
149 /// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
150 /// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>.
151 #[test]
test_abort_wakes_task_3964()152 fn test_abort_wakes_task_3964() {
153     let rt = Builder::new_current_thread().enable_time().build().unwrap();
154 
155     rt.block_on(async move {
156         let notify_dropped = Arc::new(());
157         let weak_notify_dropped = Arc::downgrade(&notify_dropped);
158 
159         let handle = tokio::spawn(async move {
160             // Make sure the Arc is moved into the task
161             let _notify_dropped = notify_dropped;
162             println!("task started");
163             tokio::time::sleep(Duration::new(100, 0)).await
164         });
165 
166         // wait for task to sleep.
167         tokio::time::sleep(Duration::from_millis(10)).await;
168 
169         handle.abort();
170         drop(handle);
171 
172         // wait for task to abort.
173         tokio::time::sleep(Duration::from_millis(10)).await;
174 
175         // Check that the Arc has been dropped.
176         assert!(weak_notify_dropped.upgrade().is_none());
177     });
178 }
179 
180 /// Checks that aborting a task whose destructor panics does not allow the
181 /// panic to escape the task.
182 #[test]
test_abort_task_that_panics_on_drop_contained()183 fn test_abort_task_that_panics_on_drop_contained() {
184     let rt = Builder::new_current_thread().enable_time().build().unwrap();
185 
186     rt.block_on(async move {
187         let handle = tokio::spawn(async move {
188             // Make sure the Arc is moved into the task
189             let _panic_dropped = PanicOnDrop;
190             println!("task started");
191             tokio::time::sleep(Duration::new(100, 0)).await
192         });
193 
194         // wait for task to sleep.
195         tokio::time::sleep(Duration::from_millis(10)).await;
196 
197         handle.abort();
198         drop(handle);
199 
200         // wait for task to abort.
201         tokio::time::sleep(Duration::from_millis(10)).await;
202     });
203 }
204 
205 /// Checks that aborting a task whose destructor panics has the expected result.
206 #[test]
test_abort_task_that_panics_on_drop_returned()207 fn test_abort_task_that_panics_on_drop_returned() {
208     let rt = Builder::new_current_thread().enable_time().build().unwrap();
209 
210     rt.block_on(async move {
211         let handle = tokio::spawn(async move {
212             // Make sure the Arc is moved into the task
213             let _panic_dropped = PanicOnDrop;
214             println!("task started");
215             tokio::time::sleep(Duration::new(100, 0)).await
216         });
217 
218         // wait for task to sleep.
219         tokio::time::sleep(Duration::from_millis(10)).await;
220 
221         handle.abort();
222         assert!(handle.await.unwrap_err().is_panic());
223     });
224 }
225