1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_PREVIEW_RESUMABLE_TASKS 1
18 #include "tbb/tbb_config.h"
19 
20 #include "tbb/task.h"
21 #include "tbb/task_group.h"
22 #include "tbb/task_scheduler_init.h"
23 #include "tbb/tick_count.h"
24 
25 #include "tbb/parallel_for.h"
26 
27 #include <vector>
28 #include <stack>
29 #include <functional>
30 #include <numeric>
31 #include <algorithm>
32 
33 /************************************************************************/
34 /* SETTINGS                                                             */
35 /************************************************************************/
36 
37 const int DEF_BENCH_RUNS = 1000;
38 
39 /************************************************************************/
40 /* HELPERS                                                              */
41 /************************************************************************/
42 
43 #include "harness_perf.h" // harness_perf::median
44 
45 template<typename T>
get_median(std::vector<T> & times)46 T get_median(std::vector<T>& times) {
47     return harness_perf::median(times.begin(), times.end());
48 }
49 
50 /************************************************************************/
51 /* SERIAL BENCHMARKS                                                    */
52 /************************************************************************/
53 
54 //! Allocate COROUTINES_NUM fibers in a row (suspend) in a recursive manner
55 //! and then swith back (resume) unwinding the ctx_stack.
BenchCoroutinesAllocation()56 void BenchCoroutinesAllocation() {
57     tbb::task_scheduler_init init(1);
58 
59     const int COROUTINES_NUM = 100;
60     std::stack<tbb::task::suspend_point> ctx_stack;
61     tbb::task_group tg;
62 
63     std::function<void(int)> recursive_f;
64     recursive_f = [=, &ctx_stack, &tg, &recursive_f](int i) {
65         if (i < COROUTINES_NUM) {
66             tg.run([&recursive_f, i]() {
67                 recursive_f(i + 1);
68             });
69             tbb::task::suspend([&ctx_stack](tbb::task::suspend_point ctx) {
70                 ctx_stack.push(ctx);
71             });
72         }
73         if (ctx_stack.size() != 0) {
74             tbb::task::suspend_point ctx = ctx_stack.top(); ctx_stack.pop();
75             tbb::task::resume(ctx);
76         }
77     };
78     tg.run([=, &recursive_f]() {
79         std::vector<double> times;
80         for (int i = 0; i < DEF_BENCH_RUNS; i++) {
81             tbb::tick_count tick = tbb::tick_count::now();
82             recursive_f(1);
83             double interval = (tbb::tick_count::now() - tick).seconds() * 1e6;
84             times.push_back(interval);
85         }
86         // COROUTINES_NUM suspend and resume operations in each run
87         double median = get_median(times) / double(COROUTINES_NUM);
88         printf("Test 1 (Coroutines alloc/dealloc): Median time (microseconds): %.4f\n", median);
89     });
90     tg.wait();
91 }
92 
93 //! Create a task, which suspends and resumes intself, thus reusing once created coroutine
BenchReusage()94 void BenchReusage() {
95     tbb::task_scheduler_init init(1);
96     tbb::task_group tg;
97 
98     std::vector<double> times;
99     tg.run([&times]() {
100         for (int i = 0; i < DEF_BENCH_RUNS * 10; i++) {
101             tbb::tick_count tick = tbb::tick_count::now();
102             tbb::task::suspend([](tbb::task::suspend_point ctx) {
103                 tbb::task::resume(ctx);
104             });
105             double diff = (tbb::tick_count::now() - tick).seconds() * 1e6;
106             times.push_back(diff);
107         }
108     });
109     tg.wait();
110     double median = get_median(times);
111     printf("Test 2 (Coroutine reusage): Median time (microseconds): %.4f\n", median);
112 }
113 
114 //! Create two tasks and switch between them (suspend current and resume previously suspended coroutine)
115 //! Measure an average time of the context switch
BenchContextSwitch()116 void BenchContextSwitch() {
117     tbb::task_scheduler_init init(1);
118     tbb::task_group tg;
119     const int N = 10000; // number of switches
120     const int tasks_num = 2;
121 
122     std::vector<double> times;
123     for (int i = 0; i < 100; ++i) {
124         int switch_counter = N;
125         tbb::task::suspend_point current_ctx = NULL;
126 
127         tbb::tick_count tick = tbb::tick_count::now();
128         for (int j = 0; j < tasks_num; ++j) {
129             tg.run([=, &switch_counter, &current_ctx]() {
130                 while (switch_counter-- > 0) {
131                     tbb::task::suspend([=, &switch_counter, &current_ctx](tbb::task::suspend_point ctx) {
132                         if (switch_counter == N - 1) {
133                             current_ctx = ctx;
134                         } else {
135                             tbb::task::suspend_point ctx_to_resume = current_ctx;
136                             current_ctx = ctx;
137                             tbb::task::resume(ctx_to_resume);
138                         }
139                     });
140                 }
141                 if (switch_counter == -1) {
142                     tbb::task::resume(current_ctx);
143                 }
144             });
145         }
146         tg.wait();
147         // To get an average context switch time divide the bench time by the number of context switches
148         double diff = ((tbb::tick_count::now() - tick).seconds() / double(N)) * 1e6;
149         times.push_back(diff);
150     }
151     printf("Test 3 (Context Switch): Median time (microseconds): %.4f\n", get_median(times));
152 }
153 
154 /************************************************************************/
155 /* PARALLEL BENCHMARKS                                                  */
156 /************************************************************************/
157 
158 //! Strong scaling benchmark with predefined number of iterations (N), each parallel_for task
159 //! suspends and resumes itself with a predefined busy-waiting iterations (work size).
160 //! Reports 3 numbers: serial, half of the machine, and full available concurrency
161 template <bool UseResumableTasks>
ScalabilityBenchmark(const size_t work_size)162 void ScalabilityBenchmark(const size_t work_size) {
163     const int N = 1000;
164     const int NUM_THREADS = tbb::task_scheduler_init::default_num_threads();
165     const int STEP_RATIO = 2;
166 
167     // Count 3 scalability metrics: the serial, half and full machine concurrency
168     for (int i = 0; i <= NUM_THREADS; i += (NUM_THREADS / STEP_RATIO)) {
169         const int concurrency = (i == 0) ? 1 : i; // just to make step loop nice looking
170         tbb::task_scheduler_init init(concurrency);
171         std::vector<double> times;
172         for (int j = 0; j < 100; j++) {
173             tbb::tick_count tick = tbb::tick_count::now();
174             tbb::parallel_for(0, N, [&work_size](const int /*j*/) {
175                 if (UseResumableTasks) {
176                     tbb::task::suspend([](tbb::task::suspend_point ctx) {
177                         tbb::task::resume(ctx);
178                     });
179                 }
180                 for (volatile size_t k = 0; k < work_size; ++k);
181             }, tbb::simple_partitioner());
182             double diff = (tbb::tick_count::now() - tick).seconds() * 1e3;
183             times.push_back(diff);
184         }
185         printf("Test 4 (Scalability): Work Size: %zu, With RT-feature: %s, Concurrency: %d, Time (milliseconds): %.4f\n",
186                 work_size, (UseResumableTasks ? "true" : "false"), concurrency, get_median(times));
187     }
188 }
189 
190 /************************************************************************/
191 /* NATIVE IMPLEMENTATION                                                */
192 /************************************************************************/
193 
194 // Dependencies section for co_context.h
195 
196 #if _WIN32
197 #include <windows.h> // GetSystemInfo
198 #else
199 #include <unistd.h> // sysconf(_SC_PAGESIZE)
200 #endif
201 
202 namespace tbb {
203 namespace internal {
204 //! System dependent impl
GetDefaultSystemPageSize()205 inline size_t GetDefaultSystemPageSize() {
206 #if _WIN32
207     SYSTEM_INFO si;
208     GetSystemInfo(&si);
209     return si.dwPageSize;
210 #else
211     return sysconf(_SC_PAGESIZE);
212 #endif
213 }
214 class governor {
215     //! Caches the size of OS regular memory page
216     static size_t DefaultPageSize;
217 public:
218     //! Staic accessor for OS regular memory page size
default_page_size()219     static size_t default_page_size () {
220         return DefaultPageSize ? DefaultPageSize : DefaultPageSize = GetDefaultSystemPageSize();
221     }
222 };
223 size_t governor::DefaultPageSize;
224 } // namespace internal
225 } // namespace tbb
226 
227 // No-op versions of __TBB_ASSERT/EX for co_context.h header
228 #define __TBB_ASSERT(predicate,comment) ((void)0)
229 #define __TBB_ASSERT_EX(predicate,comment) ((void)(1 && (predicate)))
230 
231 // TBB coroutines implementation
232 // Disable governor header to remove the dependency
233 #define _TBB_governor_H
234 #include "../tbb/co_context.h"
235 using namespace tbb::internal;
236 #undef _TBB_governor_H
237 
238 #define HARNESS_CUSTOM_MAIN 1
239 #include "../test/harness.h" // NativeParallelFor
240 
241 namespace tbb {
242 namespace internal {
243 // Our native coroutine function
244 #if _WIN32
co_local_wait_for_all(void * arg)245 /* [[noreturn]] */ inline void __stdcall co_local_wait_for_all(void* arg) {
246 #else
247 /* [[noreturn]] */ inline void co_local_wait_for_all(void* arg) {
248 #endif
249     coroutine_type next = *static_cast<coroutine_type*>(arg);
250     coroutine_type current; current_coroutine(current);
251     swap_coroutine(current, next);
252 }
253 } // namespace internal
254 } // namespace tbb
255 
256 // The same scalability benchmark as for TBB, but written with native OS fibers implementation
BenchNativeImpl(const size_t work_size)257 void BenchNativeImpl(const size_t work_size) {
258     const int N = 1000;
259     const int NUM_THREADS = tbb::task_scheduler_init::default_num_threads();
260     const int STEP_RATIO = 2;
261     const size_t STACK_SIZE = 4 * 1024 * 1024; // Just like default TBB worker thread stack size
262 
263     // Count 3 scalability metrics: the serial, half and full machine concurrency
264     for (int i = 0; i <= NUM_THREADS; i += (NUM_THREADS / STEP_RATIO)) {
265         const int concurrency = (i == 0) ? 1 : i; // just to make step loop nice looking
266         const int sub_range = N / concurrency;
267         std::vector<double> times;
268         for (int r = 0; r < 100; r++) {
269             tbb::tick_count tick = tbb::tick_count::now();
270             NativeParallelFor(concurrency, [=, &work_size, &sub_range](int /*idx*/) {
271                 // Each iteration of sub-range emulates a single TBB task
272                 for (int j = 0; j < sub_range; j++) {
273                     coroutine_type co_next;
274                     coroutine_type co_current; current_coroutine(co_current);
275                     create_coroutine(co_next, STACK_SIZE, &co_current);
276                     swap_coroutine(co_current, co_next);
277 
278                     // Busy-wait for a while emulating some work
279                     for (volatile size_t k = 0; k < work_size; ++k);
280                     destroy_coroutine(co_next);
281                 }
282             });
283             double diff = (tbb::tick_count::now() - tick).seconds() * 1e3;
284             times.push_back(diff);
285         }
286         printf("Test 5 (Native Implementation): Work size: %zu, Concurrency: %d, Time (milliseconds): %.4f\n",
287                 work_size, concurrency, get_median(times));
288     }
289 }
290 
291 /************************************************************************/
292 /* MAIN DRIVER                                                          */
293 /************************************************************************/
294 
main()295 int main() {
296     // Serial microbenchmarks
297     BenchCoroutinesAllocation();
298     BenchReusage();
299     BenchContextSwitch();
300 
301     // Scalability benchmarks
302     // Big work size + no resumable tasks feature (false)
303     ScalabilityBenchmark<false>(100000);
304     // Big work size + resumable tasks feature (true)
305     ScalabilityBenchmark<true>(100000);
306     // Small work size + no resumable tasks feature (false)
307     ScalabilityBenchmark<false>(1000);
308     // Small work size + resumable tasks feature (true)
309     ScalabilityBenchmark<true>(1000);
310     // No any work + just resumable tasks feature (true)
311     ScalabilityBenchmark<true>(0);
312 
313     // Native implementation
314     // Big work size
315     BenchNativeImpl(100000);
316     // Small work size
317     BenchNativeImpl(1000);
318     // Just coroutines/fibers switching
319     BenchNativeImpl(0);
320 
321     return 0;
322 }
323 
324