1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use std::sync::Arc;
5 use std::thread::sleep;
6 use tokio::time::Duration;
7
8 use tokio::runtime::Builder;
9
10 struct PanicOnDrop;
11
12 impl Drop for PanicOnDrop {
drop(&mut self)13 fn drop(&mut self) {
14 panic!("Well what did you expect would happen...");
15 }
16 }
17
18 /// Checks that a suspended task can be aborted without panicking as reported in
19 /// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
20 #[test]
test_abort_without_panic_3157()21 fn test_abort_without_panic_3157() {
22 let rt = Builder::new_multi_thread()
23 .enable_time()
24 .worker_threads(1)
25 .build()
26 .unwrap();
27
28 rt.block_on(async move {
29 let handle = tokio::spawn(async move {
30 println!("task started");
31 tokio::time::sleep(Duration::new(100, 0)).await
32 });
33
34 // wait for task to sleep.
35 tokio::time::sleep(Duration::from_millis(10)).await;
36
37 handle.abort();
38 let _ = handle.await;
39 });
40 }
41
42 /// Checks that a suspended task can be aborted inside of a current_thread
43 /// executor without panicking as reported in issue #3662:
44 /// <https://github.com/tokio-rs/tokio/issues/3662>.
45 #[test]
test_abort_without_panic_3662()46 fn test_abort_without_panic_3662() {
47 use std::sync::atomic::{AtomicBool, Ordering};
48 use std::sync::Arc;
49
50 struct DropCheck(Arc<AtomicBool>);
51
52 impl Drop for DropCheck {
53 fn drop(&mut self) {
54 self.0.store(true, Ordering::SeqCst);
55 }
56 }
57
58 let rt = Builder::new_current_thread().build().unwrap();
59
60 rt.block_on(async move {
61 let drop_flag = Arc::new(AtomicBool::new(false));
62 let drop_check = DropCheck(drop_flag.clone());
63
64 let j = tokio::spawn(async move {
65 // NB: just grab the drop check here so that it becomes part of the
66 // task.
67 let _drop_check = drop_check;
68 futures::future::pending::<()>().await;
69 });
70
71 let drop_flag2 = drop_flag.clone();
72
73 let task = std::thread::spawn(move || {
74 // This runs in a separate thread so it doesn't have immediate
75 // thread-local access to the executor. It does however transition
76 // the underlying task to be completed, which will cause it to be
77 // dropped (but not in this thread).
78 assert!(!drop_flag2.load(Ordering::SeqCst));
79 j.abort();
80 j
81 })
82 .join()
83 .unwrap();
84
85 let result = task.await;
86 assert!(drop_flag.load(Ordering::SeqCst));
87 assert!(result.unwrap_err().is_cancelled());
88
89 // Note: We do the following to trigger a deferred task cleanup.
90 //
91 // The relevant piece of code you want to look at is in:
92 // `Inner::block_on` of `basic_scheduler.rs`.
93 //
94 // We cause the cleanup to happen by having a poll return Pending once
95 // so that the scheduler can go into the "auxiliary tasks" mode, at
96 // which point the task is removed from the scheduler.
97 let i = tokio::spawn(async move {
98 tokio::task::yield_now().await;
99 });
100
101 i.await.unwrap();
102 });
103 }
104
105 /// Checks that a suspended LocalSet task can be aborted from a remote thread
106 /// without panicking and without running the tasks destructor on the wrong thread.
107 /// <https://github.com/tokio-rs/tokio/issues/3929>
108 #[test]
remote_abort_local_set_3929()109 fn remote_abort_local_set_3929() {
110 struct DropCheck {
111 created_on: std::thread::ThreadId,
112 not_send: std::marker::PhantomData<*const ()>,
113 }
114
115 impl DropCheck {
116 fn new() -> Self {
117 Self {
118 created_on: std::thread::current().id(),
119 not_send: std::marker::PhantomData,
120 }
121 }
122 }
123 impl Drop for DropCheck {
124 fn drop(&mut self) {
125 if std::thread::current().id() != self.created_on {
126 panic!("non-Send value dropped in another thread!");
127 }
128 }
129 }
130
131 let rt = Builder::new_current_thread().build().unwrap();
132 let local = tokio::task::LocalSet::new();
133
134 let check = DropCheck::new();
135 let jh = local.spawn_local(async move {
136 futures::future::pending::<()>().await;
137 drop(check);
138 });
139
140 let jh2 = std::thread::spawn(move || {
141 sleep(Duration::from_millis(10));
142 jh.abort();
143 });
144
145 rt.block_on(local);
146 jh2.join().unwrap();
147 }
148
149 /// Checks that a suspended task can be aborted even if the `JoinHandle` is immediately dropped.
150 /// issue #3964: <https://github.com/tokio-rs/tokio/issues/3964>.
151 #[test]
test_abort_wakes_task_3964()152 fn test_abort_wakes_task_3964() {
153 let rt = Builder::new_current_thread().enable_time().build().unwrap();
154
155 rt.block_on(async move {
156 let notify_dropped = Arc::new(());
157 let weak_notify_dropped = Arc::downgrade(¬ify_dropped);
158
159 let handle = tokio::spawn(async move {
160 // Make sure the Arc is moved into the task
161 let _notify_dropped = notify_dropped;
162 println!("task started");
163 tokio::time::sleep(Duration::new(100, 0)).await
164 });
165
166 // wait for task to sleep.
167 tokio::time::sleep(Duration::from_millis(10)).await;
168
169 handle.abort();
170 drop(handle);
171
172 // wait for task to abort.
173 tokio::time::sleep(Duration::from_millis(10)).await;
174
175 // Check that the Arc has been dropped.
176 assert!(weak_notify_dropped.upgrade().is_none());
177 });
178 }
179
180 /// Checks that aborting a task whose destructor panics does not allow the
181 /// panic to escape the task.
182 #[test]
test_abort_task_that_panics_on_drop_contained()183 fn test_abort_task_that_panics_on_drop_contained() {
184 let rt = Builder::new_current_thread().enable_time().build().unwrap();
185
186 rt.block_on(async move {
187 let handle = tokio::spawn(async move {
188 // Make sure the Arc is moved into the task
189 let _panic_dropped = PanicOnDrop;
190 println!("task started");
191 tokio::time::sleep(Duration::new(100, 0)).await
192 });
193
194 // wait for task to sleep.
195 tokio::time::sleep(Duration::from_millis(10)).await;
196
197 handle.abort();
198 drop(handle);
199
200 // wait for task to abort.
201 tokio::time::sleep(Duration::from_millis(10)).await;
202 });
203 }
204
205 /// Checks that aborting a task whose destructor panics has the expected result.
206 #[test]
test_abort_task_that_panics_on_drop_returned()207 fn test_abort_task_that_panics_on_drop_returned() {
208 let rt = Builder::new_current_thread().enable_time().build().unwrap();
209
210 rt.block_on(async move {
211 let handle = tokio::spawn(async move {
212 // Make sure the Arc is moved into the task
213 let _panic_dropped = PanicOnDrop;
214 println!("task started");
215 tokio::time::sleep(Duration::new(100, 0)).await
216 });
217
218 // wait for task to sleep.
219 tokio::time::sleep(Duration::from_millis(10)).await;
220
221 handle.abort();
222 assert!(handle.await.unwrap_err().is_panic());
223 });
224 }
225