1 /********************************************************************/ 2 /* File: taskmanager.cpp */ 3 /* Author: M. Hochsterger, J. Schoeberl */ 4 /* Date: 10. Mar. 2015 */ 5 /********************************************************************/ 6 7 #include <thread> 8 #include <atomic> 9 #include <mutex> 10 #include <chrono> 11 12 #include "concurrentqueue.h" 13 #include "mpi_wrapper.hpp" 14 #include "paje_trace.hpp" 15 #include "profiler.hpp" 16 #include "taskmanager.hpp" 17 18 #ifdef USE_MKL 19 #include <mkl.h> 20 #endif 21 22 23 24 namespace ngcore 25 { 26 using std::mutex; 27 using std::lock_guard; 28 using std::memory_order_release; 29 using std::memory_order_relaxed; 30 using std::make_tuple; 31 32 TaskManager * task_manager = nullptr; 33 bool TaskManager :: use_paje_trace = false; 34 int TaskManager :: max_threads = getenv("NGS_NUM_THREADS") ? atoi(getenv("NGS_NUM_THREADS")) : std::thread::hardware_concurrency(); 35 int TaskManager :: num_threads = 1; 36 37 38 thread_local int TaskManager :: thread_id = 0; 39 40 const function<void(TaskInfo&)> * TaskManager::func; 41 const function<void()> * TaskManager::startup_function = nullptr; 42 const function<void()> * TaskManager::cleanup_function = nullptr; 43 44 atomic<int> TaskManager::ntasks; 45 Exception * TaskManager::ex; 46 47 atomic<int> TaskManager::jobnr; 48 49 atomic<int> TaskManager::complete[8]; // max nodes 50 atomic<int> TaskManager::done; 51 atomic<int> TaskManager::active_workers; 52 atomic<int> TaskManager::workers_on_node[8]; // max nodes 53 54 55 int TaskManager::sleep_usecs = 1000; 56 bool TaskManager::sleep = false; 57 58 TaskManager::NodeData *TaskManager::nodedata[8]; 59 int TaskManager::num_nodes; 60 61 static mutex copyex_mutex; 62 EnterTaskManager()63 int EnterTaskManager () 64 { 65 if (task_manager) 66 { 67 // no task manager started 68 return 0; 69 } 70 71 task_manager = new TaskManager(); 72 73 GetLogger("TaskManager")->info("task-based parallelization (C++11 threads) using {} threads", task_manager->GetNumThreads()); 74 75 #ifdef USE_NUMA 76 numa_run_on_node (0); 77 #endif 78 79 #ifndef WIN32 80 // master has maximal priority ! 81 int policy; 82 struct sched_param param; 83 pthread_getschedparam(pthread_self(), &policy, ¶m); 84 param.sched_priority = sched_get_priority_max(policy); 85 pthread_setschedparam(pthread_self(), policy, ¶m); 86 #endif // WIN32 87 88 89 task_manager->StartWorkers(); 90 91 ParallelFor (Range(100), [&] (int i) { ; }); // startup 92 return task_manager->GetNumThreads(); 93 } 94 95 ExitTaskManager(int num_threads)96 void ExitTaskManager (int num_threads) 97 { 98 if(num_threads > 0) 99 { 100 task_manager->StopWorkers(); 101 delete task_manager; 102 task_manager = nullptr; 103 } 104 } 105 RunWithTaskManager(function<void ()> alg)106 void RunWithTaskManager (function<void()> alg) 107 { 108 int num_threads = EnterTaskManager(); 109 alg(); 110 ExitTaskManager(num_threads); 111 } 112 113 114 115 SetNumThreads(int amax_threads)116 void TaskManager :: SetNumThreads(int amax_threads) 117 { 118 if(task_manager && task_manager->active_workers>0) 119 { 120 std::cerr << "Warning: can't change number of threads while TaskManager active!" << std::endl; 121 return; 122 } 123 max_threads = amax_threads; 124 } 125 126 TaskManager()127 TaskManager :: TaskManager() 128 { 129 num_threads = GetMaxThreads(); 130 // if (MyMPI_GetNTasks() > 1) num_threads = 1; 131 132 #ifdef USE_NUMA 133 numa_available(); 134 num_nodes = numa_max_node() + 1; 135 if (num_nodes > num_threads) num_nodes = num_threads; 136 137 for (int j = 0; j < num_nodes; j++) 138 { 139 void * mem = numa_alloc_onnode (sizeof(NodeData), j); 140 nodedata[j] = new (mem) NodeData; 141 complete[j] = -1; 142 workers_on_node[j] = 0; 143 } 144 #else 145 num_nodes = 1; 146 nodedata[0] = new NodeData; 147 complete[0] = -1; 148 workers_on_node[0] = 0; 149 #endif 150 151 jobnr = 0; 152 done = 0; 153 sleep = false; 154 sleep_usecs = 1000; 155 active_workers = 0; 156 157 static int cnt = 0; 158 if (use_paje_trace) 159 trace = new PajeTrace(num_threads, "ng" + ToString(cnt++)); 160 } 161 162 ~TaskManager()163 TaskManager :: ~TaskManager () 164 { 165 if (use_paje_trace) 166 { 167 delete trace; 168 trace = nullptr; 169 } 170 num_threads = 1; 171 } 172 173 #ifdef WIN32 GetThreadId()174 int TaskManager :: GetThreadId() 175 { 176 return thread_id; 177 } 178 #endif 179 StartWorkers()180 void TaskManager :: StartWorkers() 181 { 182 done = false; 183 184 for (int i = 1; i < num_threads; i++) 185 { 186 std::thread([this,i]() { this->Loop(i); }).detach(); 187 } 188 thread_id = 0; 189 190 size_t alloc_size = num_threads*NgProfiler::SIZE; 191 NgProfiler::thread_times = new size_t[alloc_size]; 192 for (size_t i = 0; i < alloc_size; i++) 193 NgProfiler::thread_times[i] = 0; 194 NgProfiler::thread_flops = new size_t[alloc_size]; 195 for (size_t i = 0; i < alloc_size; i++) 196 NgProfiler::thread_flops[i] = 0; 197 198 while (active_workers < num_threads-1) 199 ; 200 } 201 202 static size_t calibrate_init_tsc = GetTimeCounter(); 203 typedef std::chrono::system_clock TClock; 204 static TClock::time_point calibrate_init_clock = TClock::now(); 205 StopWorkers()206 void TaskManager :: StopWorkers() 207 { 208 done = true; 209 double delta_tsc = GetTimeCounter()-calibrate_init_tsc; 210 double delta_sec = std::chrono::duration<double>(TClock::now()-calibrate_init_clock).count(); 211 double frequ = (delta_sec != 0) ? delta_tsc/delta_sec : 2.7e9; 212 213 // cout << "cpu frequ = " << frequ << endl; 214 // collect timings 215 for (size_t i = 0; i < num_threads; i++) 216 for (size_t j = NgProfiler::SIZE; j-- > 0; ) 217 { 218 if (!NgProfiler::timers[j].usedcounter) break; 219 NgProfiler::timers[j].tottime += 1.0/frequ * NgProfiler::thread_times[i*NgProfiler::SIZE+j]; 220 NgProfiler::timers[j].flops += NgProfiler::thread_flops[i*NgProfiler::SIZE+j]; 221 } 222 delete [] NgProfiler::thread_times; 223 NgProfiler::thread_times = NgProfiler::dummy_thread_times.data(); 224 delete [] NgProfiler::thread_flops; 225 NgProfiler::thread_flops = NgProfiler::dummy_thread_flops.data(); 226 227 while (active_workers) 228 ; 229 } 230 231 /////////////////////// NEW: nested tasks using concurrent queue 232 233 struct TNestedTask 234 { 235 const function<void(TaskInfo&)> * func; 236 int mynr; 237 int total; 238 int producing_thread; 239 atomic<int> * endcnt; 240 TNestedTaskngcore::TNestedTask241 TNestedTask () { ; } TNestedTaskngcore::TNestedTask242 TNestedTask (const function<void(TaskInfo&)> & _func, 243 int _mynr, int _total, 244 atomic<int> & _endcnt, int prod_tid) 245 : func(&_func), mynr(_mynr), total(_total), endcnt(&_endcnt), producing_thread(prod_tid) 246 { 247 ; 248 } 249 }; 250 251 typedef moodycamel::ConcurrentQueue<TNestedTask> TQueue; 252 typedef moodycamel::ProducerToken TPToken; 253 typedef moodycamel::ConsumerToken TCToken; 254 255 static TQueue taskqueue; 256 AddTask(const function<void (TaskInfo &)> & afunc,atomic<int> & endcnt)257 void AddTask (const function<void(TaskInfo&)> & afunc, 258 atomic<int> & endcnt) 259 260 { 261 TPToken ptoken(taskqueue); 262 263 int num = endcnt; 264 auto tid = TaskManager::GetThreadId(); 265 for (int i = 0; i < num; i++) 266 taskqueue.enqueue (ptoken, { afunc, i, num, endcnt, tid }); 267 } 268 ProcessTask()269 bool TaskManager :: ProcessTask() 270 { 271 // static Timer t("process task"); 272 TNestedTask task; 273 TCToken ctoken(taskqueue); 274 275 if (taskqueue.try_dequeue(ctoken, task)) 276 { 277 TaskInfo ti; 278 ti.task_nr = task.mynr; 279 ti.ntasks = task.total; 280 ti.thread_nr = TaskManager::GetThreadId(); 281 ti.nthreads = TaskManager::GetNumThreads(); 282 /* 283 { 284 lock_guard<mutex> guard(m); 285 cout << "process nested, nr = " << ti.task_nr << "/" << ti.ntasks << endl; 286 } 287 */ 288 // if(trace && task.producing_thread != ti.thread_nr) 289 // trace->StartTask (ti.thread_nr, t, PajeTrace::Task::ID_TIMER, task.producing_thread); 290 291 (*task.func)(ti); 292 --*task.endcnt; 293 294 // if(trace && task.producing_thread != ti.thread_nr) 295 // trace->StopTask (ti.thread_nr, t); 296 return true; 297 } 298 return false; 299 } 300 301 CreateJob(const function<void (TaskInfo &)> & afunc,int antasks)302 void TaskManager :: CreateJob (const function<void(TaskInfo&)> & afunc, 303 int antasks) 304 { 305 if (num_threads == 1 || !task_manager) // || func) 306 { 307 if (startup_function) (*startup_function)(); 308 309 TaskInfo ti; 310 ti.ntasks = antasks; 311 ti.thread_nr = 0; ti.nthreads = 1; 312 // ti.node_nr = 0; ti.nnodes = 1; 313 for (ti.task_nr = 0; ti.task_nr < antasks; ti.task_nr++) 314 afunc(ti); 315 316 if (cleanup_function) (*cleanup_function)(); 317 return; 318 } 319 320 321 if (func) 322 { // we are already parallel, use nested tasks 323 // startup for inner function not supported ... 324 // if (startup_function) (*startup_function)(); 325 326 if (antasks == 1) 327 { 328 TaskInfo ti; 329 ti.task_nr = 0; 330 ti.ntasks = 1; 331 ti.thread_nr = 0; ti.nthreads = 1; 332 afunc(ti); 333 return; 334 } 335 336 atomic<int> endcnt(antasks); 337 AddTask (afunc, endcnt); 338 while (endcnt > 0) 339 { 340 ProcessTask(); 341 } 342 343 // if (cleanup_function) (*cleanup_function)(); 344 return; 345 } 346 347 if (antasks == 1) 348 { 349 if (trace) 350 trace->StartJob(jobnr, afunc.target_type()); 351 jobnr++; 352 if (startup_function) (*startup_function)(); 353 TaskInfo ti; 354 ti.task_nr = 0; 355 ti.ntasks = 1; 356 ti.thread_nr = 0; ti.nthreads = 1; 357 { 358 RegionTracer t(ti.thread_nr, jobnr, RegionTracer::ID_JOB, ti.task_nr); 359 afunc(ti); 360 } 361 if (cleanup_function) (*cleanup_function)(); 362 if (trace) 363 trace->StopJob(); 364 return; 365 } 366 367 if (trace) 368 trace->StartJob(jobnr, afunc.target_type()); 369 370 func = &afunc; 371 372 ntasks.store (antasks); // , memory_order_relaxed); 373 ex = nullptr; 374 375 376 nodedata[0]->start_cnt.store (0, memory_order_relaxed); 377 378 jobnr++; 379 380 for (int j = 0; j < num_nodes; j++) 381 nodedata[j]->participate |= 1; 382 383 if (startup_function) (*startup_function)(); 384 385 int thd = 0; 386 int thds = GetNumThreads(); 387 int mynode = num_nodes * thd/thds; 388 389 IntRange mytasks = Range(int(ntasks)).Split (mynode, num_nodes); 390 NodeData & mynode_data = *(nodedata[mynode]); 391 392 TaskInfo ti; 393 ti.nthreads = thds; 394 ti.thread_nr = thd; 395 // ti.nnodes = num_nodes; 396 // ti.node_nr = mynode; 397 398 try 399 { 400 while (1) 401 { 402 int mytask = mynode_data.start_cnt++; 403 if (mytask >= mytasks.Size()) break; 404 405 ti.task_nr = mytasks.First()+mytask; 406 ti.ntasks = ntasks; 407 408 { 409 RegionTracer t(ti.thread_nr, jobnr, RegionTracer::ID_JOB, ti.task_nr); 410 (*func)(ti); 411 } 412 } 413 414 } 415 catch (Exception e) 416 { 417 { 418 lock_guard<mutex> guard(copyex_mutex); 419 delete ex; 420 ex = new Exception (e); 421 mynode_data.start_cnt = mytasks.Size(); 422 } 423 } 424 425 if (cleanup_function) (*cleanup_function)(); 426 427 for (int j = 0; j < num_nodes; j++) 428 if (workers_on_node[j]) 429 { 430 while (complete[j] != jobnr) 431 { 432 #ifdef NETGEN_ARCH_AMD64 433 _mm_pause(); 434 #endif // NETGEN_ARCH_AMD64 435 } 436 } 437 438 func = nullptr; 439 if (ex) 440 throw Exception (*ex); 441 442 if (trace) 443 trace->StopJob(); 444 } 445 Loop(int thd)446 void TaskManager :: Loop(int thd) 447 { 448 /* 449 static Timer tADD("add entry counter"); 450 static Timer tCASready1("spin-CAS ready tick1"); 451 static Timer tCASready2("spin-CAS ready tick2"); 452 static Timer tCASyield("spin-CAS yield"); 453 static Timer tCAS1("spin-CAS wait"); 454 static Timer texit("exit zone"); 455 static Timer tdec("decrement"); 456 */ 457 thread_id = thd; 458 459 int thds = GetNumThreads(); 460 461 int mynode = num_nodes * thd/thds; 462 463 NodeData & mynode_data = *(nodedata[mynode]); 464 465 466 467 TaskInfo ti; 468 ti.nthreads = thds; 469 ti.thread_nr = thd; 470 // ti.nnodes = num_nodes; 471 // ti.node_nr = mynode; 472 473 474 #ifdef USE_NUMA 475 numa_run_on_node (mynode); 476 #endif 477 active_workers++; 478 workers_on_node[mynode]++; 479 int jobdone = 0; 480 481 482 #ifdef USE_MKL 483 auto mkl_max = mkl_get_max_threads(); 484 mkl_set_num_threads_local(1); 485 #endif 486 487 488 while (!done) 489 { 490 if (complete[mynode] > jobdone) 491 jobdone = complete[mynode]; 492 493 if (jobnr == jobdone) 494 { 495 // RegionTracer t(ti.thread_nr, tCASyield, ti.task_nr); 496 while (ProcessTask()); // do the nested tasks 497 498 if(sleep) 499 std::this_thread::sleep_for(std::chrono::microseconds(sleep_usecs)); 500 else 501 { 502 #ifdef WIN32 503 std::this_thread::yield(); 504 #else // WIN32 505 sched_yield(); 506 #endif // WIN32 507 } 508 continue; 509 } 510 511 { 512 // RegionTracer t(ti.thread_nr, tADD, ti.task_nr); 513 514 // non-atomic fast check ... 515 if ( (mynode_data.participate & 1) == 0) continue; 516 517 int oldval = mynode_data.participate += 2; 518 if ( (oldval & 1) == 0) 519 { // job not active, going out again 520 mynode_data.participate -= 2; 521 continue; 522 } 523 } 524 525 if (startup_function) (*startup_function)(); 526 527 IntRange mytasks = Range(int(ntasks)).Split (mynode, num_nodes); 528 529 try 530 { 531 532 while (1) 533 { 534 if (mynode_data.start_cnt >= mytasks.Size()) break; 535 int mytask = mynode_data.start_cnt.fetch_add(1, memory_order_relaxed); 536 if (mytask >= mytasks.Size()) break; 537 538 ti.task_nr = mytasks.First()+mytask; 539 ti.ntasks = ntasks; 540 541 { 542 RegionTracer t(ti.thread_nr, jobnr, RegionTracer::ID_JOB, ti.task_nr); 543 (*func)(ti); 544 } 545 } 546 547 } 548 catch (Exception e) 549 { 550 { 551 // cout << "got exception in TM" << endl; 552 lock_guard<mutex> guard(copyex_mutex); 553 delete ex; 554 ex = new Exception (e); 555 mynode_data.start_cnt = mytasks.Size(); 556 } 557 } 558 559 #ifndef __MIC__ 560 atomic_thread_fence (memory_order_release); 561 #endif // __MIC__ 562 563 if (cleanup_function) (*cleanup_function)(); 564 565 jobdone = jobnr; 566 567 mynode_data.participate-=2; 568 569 { 570 int oldpart = 1; 571 if (mynode_data.participate.compare_exchange_strong (oldpart, 0)) 572 { 573 if (jobdone < jobnr.load()) 574 { // reopen gate 575 mynode_data.participate |= 1; 576 } 577 else 578 { 579 if (mynode != 0) 580 mynode_data.start_cnt = 0; 581 complete[mynode] = jobnr.load(); 582 } 583 } 584 } 585 } 586 587 588 #ifdef USE_MKL 589 mkl_set_num_threads_local(mkl_max); 590 #endif 591 592 workers_on_node[mynode]--; 593 active_workers--; 594 } 595 596 Timing()597 std::list<std::tuple<std::string,double>> TaskManager :: Timing () 598 { 599 /* 600 list<tuple<string,double>>timings; 601 double time = 602 RunTiming 603 ( [&] () 604 { 605 ParallelJob ( [] (TaskInfo ti) { ; } , 606 TasksPerThread(1) ); 607 }); 608 timings.push_back (make_tuple("parallel job with 1 task per thread", time*1e9)); 609 610 time = 611 RunTiming 612 ( [&] () 613 { 614 ParallelJob ( [] (TaskInfo ti) { ; } , 615 TasksPerThread(10) ); 616 }); 617 timings.push_back (make_tuple("parallel job with 10 tasks per thread", time*1e9)); 618 619 time = 620 RunTiming 621 ( [&] () 622 { 623 ParallelJob ( [] (TaskInfo ti) { ; } , 624 TasksPerThread(100) ); 625 }); 626 timings.push_back (make_tuple("parallel job with 100 tasks per thread", time*1e9)); 627 628 return timings; 629 */ 630 631 632 633 // this is the old function moved from the py-interface: 634 std::list<std::tuple<std::string,double>>timings; 635 double starttime, time; 636 double maxtime = 0.5; 637 size_t steps; 638 639 starttime = WallTime(); 640 steps = 0; 641 do 642 { 643 for (size_t i = 0; i < 1000; i++) 644 ParallelJob ( [] (TaskInfo ti) { ; }, 645 TasksPerThread(1)); 646 steps += 1000; 647 time = WallTime()-starttime; 648 } 649 while (time < maxtime); 650 timings.push_back(make_tuple("ParallelJob 1 task/thread", time/steps*1e9)); 651 652 653 starttime = WallTime(); 654 steps = 0; 655 do 656 { 657 for (size_t i = 0; i < 1000; i++) 658 ParallelJob ( [] (TaskInfo ti) { ; }, 659 TasksPerThread(100)); 660 steps += 1000; 661 time = WallTime()-starttime; 662 } 663 while (time < maxtime); 664 timings.push_back(make_tuple("ParallelJob 100 task/thread", time/steps*1e9)); 665 666 667 starttime = WallTime(); 668 steps = 0; 669 do 670 { 671 for (int k = 0; k < 10000; k++) 672 { 673 SharedLoop2 sl(1000); 674 steps += 1; 675 } 676 time = WallTime()-starttime; 677 } 678 while (time < maxtime); 679 timings.push_back(make_tuple("SharedLoop init", time/steps*1e9)); 680 681 starttime = WallTime(); 682 steps = 0; 683 do 684 { 685 for (int k = 0; k < 1000; k++) 686 { 687 SharedLoop sl(5); 688 ParallelJob ( [&sl] (TaskInfo ti) 689 { 690 for (auto i : sl) 691 (void)i; // silence warning 692 } ); 693 } 694 steps += 1000; 695 time = WallTime()-starttime; 696 } 697 while (time < maxtime); 698 timings.push_back(make_tuple("short SharedLoop", time/steps*1e9)); 699 700 701 starttime = WallTime(); 702 steps = 0; 703 do 704 { 705 for (int k = 0; k < 1000; k++) 706 { 707 SharedLoop sl1(5), sl2(5), sl3(5), sl4(5), sl5(5); 708 ParallelJob ( [&sl1, &sl2, &sl3, &sl4, &sl5] (TaskInfo ti) 709 { 710 for (auto i : sl1) 711 (void)i; // silence warning 712 for (auto i : sl2) 713 (void)i; // silence warning 714 for (auto i : sl3) 715 (void)i; // silence warning 716 for (auto i : sl4) 717 (void)i; // silence warning 718 for (auto i : sl5) 719 (void)i; // silence warning 720 } ); 721 } 722 steps += 1000; 723 time = WallTime()-starttime; 724 } 725 while (time < maxtime); 726 timings.push_back(make_tuple("5 short SharedLoops", time/steps*1e9)); 727 728 729 starttime = WallTime(); 730 steps = 0; 731 SharedLoop2 sl2(5); 732 do 733 { 734 for (int k = 0; k < 1000; k++) 735 { 736 sl2.Reset(5); 737 ParallelJob ( [&sl2] (TaskInfo ti) 738 { 739 for (auto i : sl2) 740 (void)i; // silence warning 741 } ); 742 } 743 steps += 1000; 744 time = WallTime()-starttime; 745 } 746 while (time < maxtime); 747 timings.push_back(make_tuple("short SharedLoop2", time/steps*1e9)); 748 749 { 750 starttime = WallTime(); 751 steps = 0; 752 SharedLoop2 sl1(5), sl2(5), sl3(5), sl4(5), sl5(5); 753 do 754 { 755 for (int k = 0; k < 1000; k++) 756 { 757 sl1.Reset(5); 758 sl2.Reset(5); 759 sl3.Reset(5); 760 sl4.Reset(5); 761 sl5.Reset(5); 762 ParallelJob ( [&sl1,&sl2,&sl3,&sl4,&sl5] (TaskInfo ti) 763 { 764 for (auto i : sl1) 765 (void)i; // silence warning 766 for (auto i : sl2) 767 (void)i; // silence warning 768 for (auto i : sl3) 769 (void)i; // silence warning 770 for (auto i : sl4) 771 (void)i; // silence warning 772 for (auto i : sl5) 773 (void)i; // silence warning 774 } ); 775 } 776 steps += 1000; 777 time = WallTime()-starttime; 778 } 779 while (time < maxtime); 780 timings.push_back(make_tuple("5 short SharedLoop2", time/steps*1e9)); 781 } 782 783 784 starttime = WallTime(); 785 steps = 0; 786 { 787 SharedLoop2 sl(1000); 788 do 789 { 790 for (int k = 0; k < 1000; k++) 791 { 792 sl.Reset(1000); 793 ParallelJob ( [&sl] (TaskInfo ti) 794 { 795 for (auto i : sl) 796 (void)i; // silence warning 797 } ); 798 steps += 1000; 799 } 800 time = WallTime()-starttime; 801 } 802 while (time < maxtime); 803 timings.push_back(make_tuple("SharedLoop2 1000, time per iteration", time/steps*1e9)); 804 } 805 806 { 807 starttime = WallTime(); 808 steps = 0; 809 SharedLoop2 sl(1000000); 810 do 811 { 812 sl.Reset(1000000); 813 ParallelJob ( [&sl] (TaskInfo ti) 814 { 815 for (auto i : sl) 816 (void)i; // silence warning 817 } ); 818 steps += 1000000; 819 time = WallTime()-starttime; 820 } 821 while (time < maxtime); 822 timings.push_back(make_tuple("SharedLoop2 1000000, time per iteration", time/steps*1e9)); 823 } 824 825 return timings; 826 } 827 828 } 829