1 #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
2 
3 #include <doctest.h>
4 #include <taskflow/taskflow.hpp>
5 
6 // EmptyFuture
7 TEST_CASE("EmptyFuture" * doctest::timeout(300)) {
8   tf::Future<void> fu;
9   REQUIRE(fu.valid() == false);
10   REQUIRE(fu.cancel() == false);
11 }
12 
13 // Future
14 TEST_CASE("Future" * doctest::timeout(300)) {
15 
16   tf::Taskflow taskflow;
17   tf::Executor executor(4);
18 
19   std::atomic<int> counter{0};
20 
21   for(int i=0; i<100; i++) {
__anonfe2c4ddb0102()22     taskflow.emplace([&](){
23       counter.fetch_add(1, std::memory_order_relaxed);
24     });
25   }
26 
27   auto fu = executor.run(taskflow);
28 
29   fu.get();
30 
31   REQUIRE(counter == 100);
32 }
33 
34 // Cancel
35 TEST_CASE("Cancel" * doctest::timeout(300)) {
36 
37   tf::Taskflow taskflow;
38   tf::Executor executor(4);
39 
40   std::atomic<int> counter{0};
41 
42   // artificially long (possible larger than 300 seconds)
43   for(int i=0; i<10000; i++) {
__anonfe2c4ddb0202()44     taskflow.emplace([&](){
45       std::this_thread::sleep_for(std::chrono::milliseconds(100));
46       counter.fetch_add(1, std::memory_order_relaxed);
47     });
48   }
49 
50   // a new round
51   counter = 0;
52   auto fu = executor.run(taskflow);
53   REQUIRE(fu.cancel() == true);
54   fu.get();
55   REQUIRE(counter < 10000);
56 
57   // a new round
58   counter = 0;
59   fu = executor.run_n(taskflow, 100);
60   REQUIRE(fu.cancel() == true);
61   fu.get();
62   REQUIRE(counter < 10000);
63 }
64 
65 // multiple cnacels
66 TEST_CASE("MultipleCancels" * doctest::timeout(300)) {
67 
68   tf::Taskflow taskflow1, taskflow2, taskflow3, taskflow4;
69   tf::Executor executor(4);
70 
71   std::atomic<int> counter{0};
72 
73   // artificially long (possible larger than 300 seconds)
74   for(int i=0; i<10000; i++) {
__anonfe2c4ddb0302()75     taskflow1.emplace([&](){
76       std::this_thread::sleep_for(std::chrono::milliseconds(100));
77       counter.fetch_add(1, std::memory_order_relaxed);
78     });
__anonfe2c4ddb0402()79     taskflow2.emplace([&](){
80       std::this_thread::sleep_for(std::chrono::milliseconds(100));
81       counter.fetch_add(1, std::memory_order_relaxed);
82     });
__anonfe2c4ddb0502()83     taskflow3.emplace([&](){
84       std::this_thread::sleep_for(std::chrono::milliseconds(100));
85       counter.fetch_add(1, std::memory_order_relaxed);
86     });
__anonfe2c4ddb0602()87     taskflow4.emplace([&](){
88       std::this_thread::sleep_for(std::chrono::milliseconds(100));
89       counter.fetch_add(1, std::memory_order_relaxed);
90     });
91   }
92 
93   // a new round
94   counter = 0;
95   auto fu1 = executor.run(taskflow1);
96   auto fu2 = executor.run(taskflow2);
97   auto fu3 = executor.run(taskflow3);
98   auto fu4 = executor.run(taskflow4);
99   REQUIRE(fu1.cancel() == true);
100   REQUIRE(fu2.cancel() == true);
101   REQUIRE(fu3.cancel() == true);
102   REQUIRE(fu4.cancel() == true);
103   executor.wait_for_all();
104   REQUIRE(counter < 10000);
105   REQUIRE(fu1.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready);
106   REQUIRE(fu2.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready);
107   REQUIRE(fu3.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready);
108   REQUIRE(fu4.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready);
109 }
110 
111 
112 
113 // cancel subflow
114 TEST_CASE("CancelSubflow" * doctest::timeout(300)) {
115 
116   tf::Taskflow taskflow;
117   tf::Executor executor(4);
118 
119   std::atomic<int> counter{0};
120 
121   // artificially long (possible larger than 300 seconds)
122   for(int i=0; i<100; i++) {
__anonfe2c4ddb0702(tf::Subflow& sf)123     taskflow.emplace([&, i](tf::Subflow& sf){
124       for(int j=0; j<100; j++) {
125         sf.emplace([&](){
126           std::this_thread::sleep_for(std::chrono::milliseconds(100));
127           counter.fetch_add(1, std::memory_order_relaxed);
128         });
129       }
130       if(i % 2) {
131         sf.join();
132       }
133       else {
134         sf.detach();
135       }
136     });
137   }
138 
139   // a new round
140   counter = 0;
141   auto fu = executor.run(taskflow);
142   REQUIRE(fu.cancel() == true);
143   fu.get();
144   REQUIRE(counter < 10000);
145 
146   // a new round
147   counter = 0;
148   auto fu1 = executor.run(taskflow);
149   auto fu2 = executor.run(taskflow);
150   auto fu3 = executor.run(taskflow);
151   REQUIRE(fu1.cancel() == true);
152   REQUIRE(fu2.cancel() == true);
153   REQUIRE(fu3.cancel() == true);
154   fu1.get();
155   fu2.get();
156   fu3.get();
157   REQUIRE(counter < 10000);
158 }
159 
160 // cancel asynchronous tasks in subflow
161 TEST_CASE("CancelSubflowAsyncTasks" * doctest::timeout(300)) {
162 
163   tf::Taskflow taskflow;
164   tf::Executor executor(4);
165 
166   std::atomic<int> counter{0};
167 
168   // artificially long (possible larger than 300 seconds)
169   for(int i=0; i<100; i++) {
__anonfe2c4ddb0902(tf::Subflow& sf)170     taskflow.emplace([&](tf::Subflow& sf){
171       for(int j=0; j<100; j++) {
172         auto a = sf.emplace([&](){
173           std::this_thread::sleep_for(std::chrono::milliseconds(100));
174           counter.fetch_add(1, std::memory_order_relaxed);
175         });
176         auto b = sf.emplace([&](){
177           std::this_thread::sleep_for(std::chrono::milliseconds(100));
178           counter.fetch_add(1, std::memory_order_relaxed);
179         });
180         a.precede(b);
181         sf.async([&](){
182           std::this_thread::sleep_for(std::chrono::milliseconds(100));
183           counter.fetch_add(1, std::memory_order_relaxed);
184         });
185         sf.silent_async([&](){
186           std::this_thread::sleep_for(std::chrono::milliseconds(100));
187           counter.fetch_add(1, std::memory_order_relaxed);
188         });
189       }
190     });
191   }
192 
193   // a new round
194   counter = 0;
195   auto fu = executor.run(taskflow);
196   REQUIRE(fu.cancel() == true);
197   fu.get();
198   REQUIRE(counter < 10000);
199 }
200 
201 // cancel infinite loop
202 TEST_CASE("CancelInfiniteLoop" * doctest::timeout(300)) {
203 
204   tf::Taskflow taskflow;
205   tf::Executor executor(4);
206 
207   for(int i=0; i<100; i++) {
__anonfe2c4ddb0e02()208     auto a = taskflow.emplace([](){});
__anonfe2c4ddb0f02()209     auto b = taskflow.emplace([](){ return 0; });
210     a.precede(b);
211     b.precede(b);
212   }
213 
214   auto fu = executor.run(taskflow);
215   REQUIRE(fu.cancel() == true);
216   fu.get();
217 }
218 
219 // cancel from another
220 TEST_CASE("CancelFromAnother" * doctest::timeout(300)) {
221 
222   tf::Taskflow taskflow, another;
223   tf::Executor executor(4);
224 
225   // create a single inifnite loop
__anonfe2c4ddb1002()226   auto a = taskflow.emplace([](){});
__anonfe2c4ddb1102()227   auto b = taskflow.emplace([](){ return 0; });
228   a.precede(b);
229   b.precede(b);
230 
231   auto fu = executor.run(taskflow);
232 
233   REQUIRE(fu.wait_for(
234     std::chrono::milliseconds(100)) == std::future_status::timeout
235   );
236 
237   // create a task to cancel another flow
__anonfe2c4ddb1202() 238   another.emplace([&]() { REQUIRE(fu.cancel() == true); });
239 
240   executor.run(another).wait();
241 }
242 
243 // cancel from async task
244 TEST_CASE("CancelFromAsync" * doctest::timeout(300)) {
245 
246   tf::Taskflow taskflow;
247   tf::Executor executor(4);
248 
249   // create a single inifnite loop
__anonfe2c4ddb1302()250   auto a = taskflow.emplace([](){});
__anonfe2c4ddb1402()251   auto b = taskflow.emplace([&](){ return 0; });
252   a.precede(b);
253   b.precede(b);
254 
__anonfe2c4ddb1502()255   executor.async([&](){
256     auto fu = executor.run_n(taskflow, 100);
257     std::this_thread::sleep_for(std::chrono::milliseconds(100));
258     REQUIRE(fu.cancel() == true);
259   });
260 
261   executor.wait_for_all();
262 }
263 
264 // cancel async tasks
265 TEST_CASE("CancelAsync") {
266 
267   tf::Executor executor(2);
268 
269   std::vector<tf::Future<void>> futures;
270 
271   for(int i=0; i<10000; i++) {
__anonfe2c4ddb1602()272     futures.push_back(executor.async([](){
273       std::this_thread::sleep_for(std::chrono::milliseconds(100));
274     }));
275   }
276 
277   size_t n_success = 0, n_failure = 0;
278 
279   for(auto& fu : futures) {
280     if(fu.cancel() == true) n_success++;
281     else n_failure++;
282   }
283 
284   executor.wait_for_all();
285 
286   REQUIRE(n_success > n_failure);
287 
288   for(auto& fu : futures) {
289     REQUIRE(fu.valid());
290     CHECK_NOTHROW(fu.get());
291   }
292 }
293 
294 // cancel subflow async tasks
295 TEST_CASE("CancelSubflowAsync") {
296 
297   tf::Taskflow taskflow;
298   tf::Executor executor(2);
299 
300   std::atomic<bool> futures_ready {false};
301   std::vector<tf::Future<void>> futures;
302 
__anonfe2c4ddb1702(tf::Subflow& sf)303   taskflow.emplace([&](tf::Subflow& sf){
304     for(int i=0; i<10000; i++) {
305       futures.push_back(sf.async([](){
306         std::this_thread::sleep_for(std::chrono::milliseconds(100));
307       }));
308     }
309     futures_ready = true;
310   });
311 
312   executor.run(taskflow);
313 
314   while(!futures_ready);
315 
316   size_t n_success = 0, n_failure = 0;
317 
318   for(auto& fu : futures) {
319     if(fu.cancel() == true) n_success++;
320     else n_failure++;
321   }
322 
323   executor.wait_for_all();
324   REQUIRE(n_success > n_failure);
325 
326   for(auto& fu : futures) {
327     REQUIRE(fu.valid());
328     CHECK_NOTHROW(fu.get());
329   }
330 }
331 
332