1 #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN 2 3 #include <doctest.h> 4 #include <taskflow/taskflow.hpp> 5 6 // EmptyFuture 7 TEST_CASE("EmptyFuture" * doctest::timeout(300)) { 8 tf::Future<void> fu; 9 REQUIRE(fu.valid() == false); 10 REQUIRE(fu.cancel() == false); 11 } 12 13 // Future 14 TEST_CASE("Future" * doctest::timeout(300)) { 15 16 tf::Taskflow taskflow; 17 tf::Executor executor(4); 18 19 std::atomic<int> counter{0}; 20 21 for(int i=0; i<100; i++) { __anonfe2c4ddb0102()22 taskflow.emplace([&](){ 23 counter.fetch_add(1, std::memory_order_relaxed); 24 }); 25 } 26 27 auto fu = executor.run(taskflow); 28 29 fu.get(); 30 31 REQUIRE(counter == 100); 32 } 33 34 // Cancel 35 TEST_CASE("Cancel" * doctest::timeout(300)) { 36 37 tf::Taskflow taskflow; 38 tf::Executor executor(4); 39 40 std::atomic<int> counter{0}; 41 42 // artificially long (possible larger than 300 seconds) 43 for(int i=0; i<10000; i++) { __anonfe2c4ddb0202()44 taskflow.emplace([&](){ 45 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 46 counter.fetch_add(1, std::memory_order_relaxed); 47 }); 48 } 49 50 // a new round 51 counter = 0; 52 auto fu = executor.run(taskflow); 53 REQUIRE(fu.cancel() == true); 54 fu.get(); 55 REQUIRE(counter < 10000); 56 57 // a new round 58 counter = 0; 59 fu = executor.run_n(taskflow, 100); 60 REQUIRE(fu.cancel() == true); 61 fu.get(); 62 REQUIRE(counter < 10000); 63 } 64 65 // multiple cnacels 66 TEST_CASE("MultipleCancels" * doctest::timeout(300)) { 67 68 tf::Taskflow taskflow1, taskflow2, taskflow3, taskflow4; 69 tf::Executor executor(4); 70 71 std::atomic<int> counter{0}; 72 73 // artificially long (possible larger than 300 seconds) 74 for(int i=0; i<10000; i++) { __anonfe2c4ddb0302()75 taskflow1.emplace([&](){ 76 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 77 counter.fetch_add(1, std::memory_order_relaxed); 78 }); __anonfe2c4ddb0402()79 taskflow2.emplace([&](){ 80 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 81 counter.fetch_add(1, std::memory_order_relaxed); 82 }); __anonfe2c4ddb0502()83 taskflow3.emplace([&](){ 84 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 85 counter.fetch_add(1, std::memory_order_relaxed); 86 }); __anonfe2c4ddb0602()87 taskflow4.emplace([&](){ 88 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 89 counter.fetch_add(1, std::memory_order_relaxed); 90 }); 91 } 92 93 // a new round 94 counter = 0; 95 auto fu1 = executor.run(taskflow1); 96 auto fu2 = executor.run(taskflow2); 97 auto fu3 = executor.run(taskflow3); 98 auto fu4 = executor.run(taskflow4); 99 REQUIRE(fu1.cancel() == true); 100 REQUIRE(fu2.cancel() == true); 101 REQUIRE(fu3.cancel() == true); 102 REQUIRE(fu4.cancel() == true); 103 executor.wait_for_all(); 104 REQUIRE(counter < 10000); 105 REQUIRE(fu1.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready); 106 REQUIRE(fu2.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready); 107 REQUIRE(fu3.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready); 108 REQUIRE(fu4.wait_for(std::chrono::milliseconds(0)) == std::future_status::ready); 109 } 110 111 112 113 // cancel subflow 114 TEST_CASE("CancelSubflow" * doctest::timeout(300)) { 115 116 tf::Taskflow taskflow; 117 tf::Executor executor(4); 118 119 std::atomic<int> counter{0}; 120 121 // artificially long (possible larger than 300 seconds) 122 for(int i=0; i<100; i++) { __anonfe2c4ddb0702(tf::Subflow& sf)123 taskflow.emplace([&, i](tf::Subflow& sf){ 124 for(int j=0; j<100; j++) { 125 sf.emplace([&](){ 126 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 127 counter.fetch_add(1, std::memory_order_relaxed); 128 }); 129 } 130 if(i % 2) { 131 sf.join(); 132 } 133 else { 134 sf.detach(); 135 } 136 }); 137 } 138 139 // a new round 140 counter = 0; 141 auto fu = executor.run(taskflow); 142 REQUIRE(fu.cancel() == true); 143 fu.get(); 144 REQUIRE(counter < 10000); 145 146 // a new round 147 counter = 0; 148 auto fu1 = executor.run(taskflow); 149 auto fu2 = executor.run(taskflow); 150 auto fu3 = executor.run(taskflow); 151 REQUIRE(fu1.cancel() == true); 152 REQUIRE(fu2.cancel() == true); 153 REQUIRE(fu3.cancel() == true); 154 fu1.get(); 155 fu2.get(); 156 fu3.get(); 157 REQUIRE(counter < 10000); 158 } 159 160 // cancel asynchronous tasks in subflow 161 TEST_CASE("CancelSubflowAsyncTasks" * doctest::timeout(300)) { 162 163 tf::Taskflow taskflow; 164 tf::Executor executor(4); 165 166 std::atomic<int> counter{0}; 167 168 // artificially long (possible larger than 300 seconds) 169 for(int i=0; i<100; i++) { __anonfe2c4ddb0902(tf::Subflow& sf)170 taskflow.emplace([&](tf::Subflow& sf){ 171 for(int j=0; j<100; j++) { 172 auto a = sf.emplace([&](){ 173 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 174 counter.fetch_add(1, std::memory_order_relaxed); 175 }); 176 auto b = sf.emplace([&](){ 177 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 178 counter.fetch_add(1, std::memory_order_relaxed); 179 }); 180 a.precede(b); 181 sf.async([&](){ 182 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 183 counter.fetch_add(1, std::memory_order_relaxed); 184 }); 185 sf.silent_async([&](){ 186 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 187 counter.fetch_add(1, std::memory_order_relaxed); 188 }); 189 } 190 }); 191 } 192 193 // a new round 194 counter = 0; 195 auto fu = executor.run(taskflow); 196 REQUIRE(fu.cancel() == true); 197 fu.get(); 198 REQUIRE(counter < 10000); 199 } 200 201 // cancel infinite loop 202 TEST_CASE("CancelInfiniteLoop" * doctest::timeout(300)) { 203 204 tf::Taskflow taskflow; 205 tf::Executor executor(4); 206 207 for(int i=0; i<100; i++) { __anonfe2c4ddb0e02()208 auto a = taskflow.emplace([](){}); __anonfe2c4ddb0f02()209 auto b = taskflow.emplace([](){ return 0; }); 210 a.precede(b); 211 b.precede(b); 212 } 213 214 auto fu = executor.run(taskflow); 215 REQUIRE(fu.cancel() == true); 216 fu.get(); 217 } 218 219 // cancel from another 220 TEST_CASE("CancelFromAnother" * doctest::timeout(300)) { 221 222 tf::Taskflow taskflow, another; 223 tf::Executor executor(4); 224 225 // create a single inifnite loop __anonfe2c4ddb1002()226 auto a = taskflow.emplace([](){}); __anonfe2c4ddb1102()227 auto b = taskflow.emplace([](){ return 0; }); 228 a.precede(b); 229 b.precede(b); 230 231 auto fu = executor.run(taskflow); 232 233 REQUIRE(fu.wait_for( 234 std::chrono::milliseconds(100)) == std::future_status::timeout 235 ); 236 237 // create a task to cancel another flow __anonfe2c4ddb1202() 238 another.emplace([&]() { REQUIRE(fu.cancel() == true); }); 239 240 executor.run(another).wait(); 241 } 242 243 // cancel from async task 244 TEST_CASE("CancelFromAsync" * doctest::timeout(300)) { 245 246 tf::Taskflow taskflow; 247 tf::Executor executor(4); 248 249 // create a single inifnite loop __anonfe2c4ddb1302()250 auto a = taskflow.emplace([](){}); __anonfe2c4ddb1402()251 auto b = taskflow.emplace([&](){ return 0; }); 252 a.precede(b); 253 b.precede(b); 254 __anonfe2c4ddb1502()255 executor.async([&](){ 256 auto fu = executor.run_n(taskflow, 100); 257 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 258 REQUIRE(fu.cancel() == true); 259 }); 260 261 executor.wait_for_all(); 262 } 263 264 // cancel async tasks 265 TEST_CASE("CancelAsync") { 266 267 tf::Executor executor(2); 268 269 std::vector<tf::Future<void>> futures; 270 271 for(int i=0; i<10000; i++) { __anonfe2c4ddb1602()272 futures.push_back(executor.async([](){ 273 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 274 })); 275 } 276 277 size_t n_success = 0, n_failure = 0; 278 279 for(auto& fu : futures) { 280 if(fu.cancel() == true) n_success++; 281 else n_failure++; 282 } 283 284 executor.wait_for_all(); 285 286 REQUIRE(n_success > n_failure); 287 288 for(auto& fu : futures) { 289 REQUIRE(fu.valid()); 290 CHECK_NOTHROW(fu.get()); 291 } 292 } 293 294 // cancel subflow async tasks 295 TEST_CASE("CancelSubflowAsync") { 296 297 tf::Taskflow taskflow; 298 tf::Executor executor(2); 299 300 std::atomic<bool> futures_ready {false}; 301 std::vector<tf::Future<void>> futures; 302 __anonfe2c4ddb1702(tf::Subflow& sf)303 taskflow.emplace([&](tf::Subflow& sf){ 304 for(int i=0; i<10000; i++) { 305 futures.push_back(sf.async([](){ 306 std::this_thread::sleep_for(std::chrono::milliseconds(100)); 307 })); 308 } 309 futures_ready = true; 310 }); 311 312 executor.run(taskflow); 313 314 while(!futures_ready); 315 316 size_t n_success = 0, n_failure = 0; 317 318 for(auto& fu : futures) { 319 if(fu.cancel() == true) n_success++; 320 else n_failure++; 321 } 322 323 executor.wait_for_all(); 324 REQUIRE(n_success > n_failure); 325 326 for(auto& fu : futures) { 327 REQUIRE(fu.valid()); 328 CHECK_NOTHROW(fu.get()); 329 } 330 } 331 332