1 /*
2 Copyright (c) 2005-2020 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #define TBB_PREVIEW_RESUMABLE_TASKS 1
18 #include "tbb/tbb_config.h"
19
20 #include "tbb/task.h"
21 #include "tbb/task_group.h"
22 #include "tbb/task_scheduler_init.h"
23 #include "tbb/tick_count.h"
24
25 #include "tbb/parallel_for.h"
26
27 #include <vector>
28 #include <stack>
29 #include <functional>
30 #include <numeric>
31 #include <algorithm>
32
33 /************************************************************************/
34 /* SETTINGS */
35 /************************************************************************/
36
37 const int DEF_BENCH_RUNS = 1000;
38
39 /************************************************************************/
40 /* HELPERS */
41 /************************************************************************/
42
43 #include "harness_perf.h" // harness_perf::median
44
45 template<typename T>
get_median(std::vector<T> & times)46 T get_median(std::vector<T>& times) {
47 return harness_perf::median(times.begin(), times.end());
48 }
49
50 /************************************************************************/
51 /* SERIAL BENCHMARKS */
52 /************************************************************************/
53
54 //! Allocate COROUTINES_NUM fibers in a row (suspend) in a recursive manner
55 //! and then swith back (resume) unwinding the ctx_stack.
BenchCoroutinesAllocation()56 void BenchCoroutinesAllocation() {
57 tbb::task_scheduler_init init(1);
58
59 const int COROUTINES_NUM = 100;
60 std::stack<tbb::task::suspend_point> ctx_stack;
61 tbb::task_group tg;
62
63 std::function<void(int)> recursive_f;
64 recursive_f = [=, &ctx_stack, &tg, &recursive_f](int i) {
65 if (i < COROUTINES_NUM) {
66 tg.run([&recursive_f, i]() {
67 recursive_f(i + 1);
68 });
69 tbb::task::suspend([&ctx_stack](tbb::task::suspend_point ctx) {
70 ctx_stack.push(ctx);
71 });
72 }
73 if (ctx_stack.size() != 0) {
74 tbb::task::suspend_point ctx = ctx_stack.top(); ctx_stack.pop();
75 tbb::task::resume(ctx);
76 }
77 };
78 tg.run([=, &recursive_f]() {
79 std::vector<double> times;
80 for (int i = 0; i < DEF_BENCH_RUNS; i++) {
81 tbb::tick_count tick = tbb::tick_count::now();
82 recursive_f(1);
83 double interval = (tbb::tick_count::now() - tick).seconds() * 1e6;
84 times.push_back(interval);
85 }
86 // COROUTINES_NUM suspend and resume operations in each run
87 double median = get_median(times) / double(COROUTINES_NUM);
88 printf("Test 1 (Coroutines alloc/dealloc): Median time (microseconds): %.4f\n", median);
89 });
90 tg.wait();
91 }
92
93 //! Create a task, which suspends and resumes intself, thus reusing once created coroutine
BenchReusage()94 void BenchReusage() {
95 tbb::task_scheduler_init init(1);
96 tbb::task_group tg;
97
98 std::vector<double> times;
99 tg.run([×]() {
100 for (int i = 0; i < DEF_BENCH_RUNS * 10; i++) {
101 tbb::tick_count tick = tbb::tick_count::now();
102 tbb::task::suspend([](tbb::task::suspend_point ctx) {
103 tbb::task::resume(ctx);
104 });
105 double diff = (tbb::tick_count::now() - tick).seconds() * 1e6;
106 times.push_back(diff);
107 }
108 });
109 tg.wait();
110 double median = get_median(times);
111 printf("Test 2 (Coroutine reusage): Median time (microseconds): %.4f\n", median);
112 }
113
114 //! Create two tasks and switch between them (suspend current and resume previously suspended coroutine)
115 //! Measure an average time of the context switch
BenchContextSwitch()116 void BenchContextSwitch() {
117 tbb::task_scheduler_init init(1);
118 tbb::task_group tg;
119 const int N = 10000; // number of switches
120 const int tasks_num = 2;
121
122 std::vector<double> times;
123 for (int i = 0; i < 100; ++i) {
124 int switch_counter = N;
125 tbb::task::suspend_point current_ctx = NULL;
126
127 tbb::tick_count tick = tbb::tick_count::now();
128 for (int j = 0; j < tasks_num; ++j) {
129 tg.run([=, &switch_counter, ¤t_ctx]() {
130 while (switch_counter-- > 0) {
131 tbb::task::suspend([=, &switch_counter, ¤t_ctx](tbb::task::suspend_point ctx) {
132 if (switch_counter == N - 1) {
133 current_ctx = ctx;
134 } else {
135 tbb::task::suspend_point ctx_to_resume = current_ctx;
136 current_ctx = ctx;
137 tbb::task::resume(ctx_to_resume);
138 }
139 });
140 }
141 if (switch_counter == -1) {
142 tbb::task::resume(current_ctx);
143 }
144 });
145 }
146 tg.wait();
147 // To get an average context switch time divide the bench time by the number of context switches
148 double diff = ((tbb::tick_count::now() - tick).seconds() / double(N)) * 1e6;
149 times.push_back(diff);
150 }
151 printf("Test 3 (Context Switch): Median time (microseconds): %.4f\n", get_median(times));
152 }
153
154 /************************************************************************/
155 /* PARALLEL BENCHMARKS */
156 /************************************************************************/
157
158 //! Strong scaling benchmark with predefined number of iterations (N), each parallel_for task
159 //! suspends and resumes itself with a predefined busy-waiting iterations (work size).
160 //! Reports 3 numbers: serial, half of the machine, and full available concurrency
161 template <bool UseResumableTasks>
ScalabilityBenchmark(const size_t work_size)162 void ScalabilityBenchmark(const size_t work_size) {
163 const int N = 1000;
164 const int NUM_THREADS = tbb::task_scheduler_init::default_num_threads();
165 const int STEP_RATIO = 2;
166
167 // Count 3 scalability metrics: the serial, half and full machine concurrency
168 for (int i = 0; i <= NUM_THREADS; i += (NUM_THREADS / STEP_RATIO)) {
169 const int concurrency = (i == 0) ? 1 : i; // just to make step loop nice looking
170 tbb::task_scheduler_init init(concurrency);
171 std::vector<double> times;
172 for (int j = 0; j < 100; j++) {
173 tbb::tick_count tick = tbb::tick_count::now();
174 tbb::parallel_for(0, N, [&work_size](const int /*j*/) {
175 if (UseResumableTasks) {
176 tbb::task::suspend([](tbb::task::suspend_point ctx) {
177 tbb::task::resume(ctx);
178 });
179 }
180 for (volatile size_t k = 0; k < work_size; ++k);
181 }, tbb::simple_partitioner());
182 double diff = (tbb::tick_count::now() - tick).seconds() * 1e3;
183 times.push_back(diff);
184 }
185 printf("Test 4 (Scalability): Work Size: %zu, With RT-feature: %s, Concurrency: %d, Time (milliseconds): %.4f\n",
186 work_size, (UseResumableTasks ? "true" : "false"), concurrency, get_median(times));
187 }
188 }
189
190 /************************************************************************/
191 /* NATIVE IMPLEMENTATION */
192 /************************************************************************/
193
194 // Dependencies section for co_context.h
195
196 #if _WIN32
197 #include <windows.h> // GetSystemInfo
198 #else
199 #include <unistd.h> // sysconf(_SC_PAGESIZE)
200 #endif
201
202 namespace tbb {
203 namespace internal {
204 //! System dependent impl
GetDefaultSystemPageSize()205 inline size_t GetDefaultSystemPageSize() {
206 #if _WIN32
207 SYSTEM_INFO si;
208 GetSystemInfo(&si);
209 return si.dwPageSize;
210 #else
211 return sysconf(_SC_PAGESIZE);
212 #endif
213 }
214 class governor {
215 //! Caches the size of OS regular memory page
216 static size_t DefaultPageSize;
217 public:
218 //! Staic accessor for OS regular memory page size
default_page_size()219 static size_t default_page_size () {
220 return DefaultPageSize ? DefaultPageSize : DefaultPageSize = GetDefaultSystemPageSize();
221 }
222 };
223 size_t governor::DefaultPageSize;
224 } // namespace internal
225 } // namespace tbb
226
227 // No-op versions of __TBB_ASSERT/EX for co_context.h header
228 #define __TBB_ASSERT(predicate,comment) ((void)0)
229 #define __TBB_ASSERT_EX(predicate,comment) ((void)(1 && (predicate)))
230
231 // TBB coroutines implementation
232 // Disable governor header to remove the dependency
233 #define _TBB_governor_H
234 #include "../tbb/co_context.h"
235 using namespace tbb::internal;
236 #undef _TBB_governor_H
237
238 #define HARNESS_CUSTOM_MAIN 1
239 #include "../test/harness.h" // NativeParallelFor
240
241 namespace tbb {
242 namespace internal {
243 // Our native coroutine function
244 #if _WIN32
co_local_wait_for_all(void * arg)245 /* [[noreturn]] */ inline void __stdcall co_local_wait_for_all(void* arg) {
246 #else
247 /* [[noreturn]] */ inline void co_local_wait_for_all(void* arg) {
248 #endif
249 coroutine_type next = *static_cast<coroutine_type*>(arg);
250 coroutine_type current; current_coroutine(current);
251 swap_coroutine(current, next);
252 }
253 } // namespace internal
254 } // namespace tbb
255
256 // The same scalability benchmark as for TBB, but written with native OS fibers implementation
BenchNativeImpl(const size_t work_size)257 void BenchNativeImpl(const size_t work_size) {
258 const int N = 1000;
259 const int NUM_THREADS = tbb::task_scheduler_init::default_num_threads();
260 const int STEP_RATIO = 2;
261 const size_t STACK_SIZE = 4 * 1024 * 1024; // Just like default TBB worker thread stack size
262
263 // Count 3 scalability metrics: the serial, half and full machine concurrency
264 for (int i = 0; i <= NUM_THREADS; i += (NUM_THREADS / STEP_RATIO)) {
265 const int concurrency = (i == 0) ? 1 : i; // just to make step loop nice looking
266 const int sub_range = N / concurrency;
267 std::vector<double> times;
268 for (int r = 0; r < 100; r++) {
269 tbb::tick_count tick = tbb::tick_count::now();
270 NativeParallelFor(concurrency, [=, &work_size, &sub_range](int /*idx*/) {
271 // Each iteration of sub-range emulates a single TBB task
272 for (int j = 0; j < sub_range; j++) {
273 coroutine_type co_next;
274 coroutine_type co_current; current_coroutine(co_current);
275 create_coroutine(co_next, STACK_SIZE, &co_current);
276 swap_coroutine(co_current, co_next);
277
278 // Busy-wait for a while emulating some work
279 for (volatile size_t k = 0; k < work_size; ++k);
280 destroy_coroutine(co_next);
281 }
282 });
283 double diff = (tbb::tick_count::now() - tick).seconds() * 1e3;
284 times.push_back(diff);
285 }
286 printf("Test 5 (Native Implementation): Work size: %zu, Concurrency: %d, Time (milliseconds): %.4f\n",
287 work_size, concurrency, get_median(times));
288 }
289 }
290
291 /************************************************************************/
292 /* MAIN DRIVER */
293 /************************************************************************/
294
main()295 int main() {
296 // Serial microbenchmarks
297 BenchCoroutinesAllocation();
298 BenchReusage();
299 BenchContextSwitch();
300
301 // Scalability benchmarks
302 // Big work size + no resumable tasks feature (false)
303 ScalabilityBenchmark<false>(100000);
304 // Big work size + resumable tasks feature (true)
305 ScalabilityBenchmark<true>(100000);
306 // Small work size + no resumable tasks feature (false)
307 ScalabilityBenchmark<false>(1000);
308 // Small work size + resumable tasks feature (true)
309 ScalabilityBenchmark<true>(1000);
310 // No any work + just resumable tasks feature (true)
311 ScalabilityBenchmark<true>(0);
312
313 // Native implementation
314 // Big work size
315 BenchNativeImpl(100000);
316 // Small work size
317 BenchNativeImpl(1000);
318 // Just coroutines/fibers switching
319 BenchNativeImpl(0);
320
321 return 0;
322 }
323
324