1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // Before including parallel_pipeline.h, set up the variable to count heap allocated
18 // filter_node objects, and make it known for the header.
19 #include "common/test.h"
20 #include "common/utils.h"
21 #include "common/checktype.h"
22
23 int filter_node_count = 0;
24 #define __TBB_TEST_FILTER_NODE_COUNT filter_node_count
25 #include "tbb/parallel_pipeline.h"
26 #include "tbb/global_control.h"
27 #include "tbb/spin_mutex.h"
28 #include "tbb/task_group.h"
29
30 #include <atomic>
31 #include <string.h>
32 #include <memory> // std::unique_ptr
33
34 //! \file test_parallel_pipeline.cpp
35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
36
37 const unsigned n_tokens = 8;
38 // we can conceivably have two buffers used in the middle filter for every token in flight, so
39 // we must allocate two buffers for every token. Unlikely, but possible.
40 const unsigned n_buffers = 2*n_tokens;
41 const int max_counter = 16;
42
43 static std::size_t concurrency = 0;
44
45 static std::atomic<int> output_counter;
46 static std::atomic<int> input_counter;
47 static std::atomic<int> non_pointer_specialized_calls;
48 static std::atomic<int> pointer_specialized_calls;
49 static std::atomic<int> first_pointer_specialized_calls;
50 static std::atomic<int> second_pointer_specialized_calls;
51
52 static int intbuffer[max_counter]; // store results for <int,int> parallel pipeline test
53 static bool check_intbuffer;
54
55 static void* buffers[n_buffers];
56 static std::atomic_flag buf_in_use[n_buffers] = {ATOMIC_FLAG_INIT};
57
fetchNextBuffer()58 void *fetchNextBuffer() {
59 for(size_t icnt = 0; icnt < n_buffers; ++icnt) {
60 if(!buf_in_use[icnt].test_and_set()) {
61 return buffers[icnt];
62 }
63 }
64 CHECK_MESSAGE(false, "Ran out of buffers, p:"<< concurrency);
65 return 0;
66 }
freeBuffer(void * buf)67 void freeBuffer(void *buf) {
68 for(size_t i=0; i < n_buffers;++i) {
69 if(buffers[i] == buf) {
70 buf_in_use[i].clear();
71 return;
72 }
73 }
74 CHECK_MESSAGE(false, "Tried to free a buffer not in our list, p:" << concurrency);
75 }
76
77 template<typename T>
78 class free_on_scope_exit {
79 public:
free_on_scope_exit(T * p)80 free_on_scope_exit(T *p) : my_p(p) {}
~free_on_scope_exit()81 ~free_on_scope_exit() { if(!my_p) return; my_p->~T(); freeBuffer(my_p); }
82 private:
83 T *my_p;
84 };
85
86 // methods for testing CheckType< >, that return okay values for other types.
87 template<typename T>
middle_is_ready(T &)88 bool middle_is_ready(T &/*p*/) { return false; }
89
90 template<typename U>
middle_is_ready(CheckType<U> & p)91 bool middle_is_ready(CheckType<U> &p) { return p.is_ready(); }
92
93 template<typename T>
output_is_ready(T &)94 bool output_is_ready(T &/*p*/) { return true; }
95
96 template<typename U>
output_is_ready(CheckType<U> & p)97 bool output_is_ready(CheckType<U> &p) { return p.is_ready(); }
98
99 template<typename T>
middle_my_id(T &)100 int middle_my_id( T &/*p*/) { return 0; }
101
102 template<typename U>
middle_my_id(CheckType<U> & p)103 int middle_my_id(CheckType<U> &p) { return p.id(); }
104
105 template<typename T>
output_my_id(T &)106 int output_my_id( T &/*p*/) { return 1; }
107
108 template<typename U>
output_my_id(CheckType<U> & p)109 int output_my_id(CheckType<U> &p) { return p.id(); }
110
111 template<typename T>
my_function(T & p)112 void my_function(T &p) { p = 0; }
113
114 template<typename U>
my_function(CheckType<U> & p)115 void my_function(CheckType<U> &p) { p.get_ready(); }
116
117 // Filters must be copy-constructible, and be const-qualifiable.
118 template<typename U>
119 class input_filter : DestroyedTracker {
120 public:
operator ()(tbb::flow_control & control) const121 U operator()( tbb::flow_control& control ) const {
122 CHECK(is_alive());
123 if( --input_counter < 0 ) {
124 control.stop();
125 }
126 else // only count successful reads
127 ++non_pointer_specialized_calls;
128 return U(); // default constructed
129 }
130
131 };
132
133 // specialization for pointer
134 template<typename U>
135 class input_filter<U*> : DestroyedTracker {
136 public:
operator ()(tbb::flow_control & control) const137 U* operator()(tbb::flow_control& control) const {
138 CHECK(is_alive());
139 int ival = --input_counter;
140 if(ival < 0) {
141 control.stop();
142 return nullptr;
143 }
144 ++pointer_specialized_calls;
145 if(ival == max_counter / 2) {
146 return nullptr; // non-stop NULL
147 }
148 U* myReturn = new(fetchNextBuffer()) U();
149 if (myReturn) { // may have been passed in a NULL
150 CHECK_MESSAGE(!middle_my_id(*myReturn), "bad id value, p:" << concurrency);
151 CHECK_MESSAGE(!middle_is_ready(*myReturn), "Already ready, p:" << concurrency);
152 }
153 return myReturn;
154 }
155 };
156
157 template<>
158 class input_filter<void> : DestroyedTracker {
159 public:
operator ()(tbb::flow_control & control) const160 void operator()( tbb::flow_control& control ) const {
161 CHECK(is_alive());
162 if( --input_counter < 0 ) {
163 control.stop();
164 }
165 else
166 ++non_pointer_specialized_calls;
167 }
168
169 };
170
171 // specialization for int that passes back a sequence of integers
172 template<>
173 class input_filter<int> : DestroyedTracker {
174 public:
175 int
operator ()(tbb::flow_control & control) const176 operator()(tbb::flow_control& control ) const {
177 CHECK(is_alive());
178 int oldval = --input_counter;
179 if( oldval < 0 ) {
180 control.stop();
181 }
182 else
183 ++non_pointer_specialized_calls;
184 return oldval+1;
185 }
186 };
187
188 template<typename T, typename U>
189 class middle_filter : DestroyedTracker {
190 public:
operator ()(T t) const191 U operator()(T t) const {
192 CHECK(is_alive());
193 CHECK_MESSAGE(!middle_my_id(t), "bad id value, p:" << concurrency);
194 CHECK_MESSAGE(!middle_is_ready(t), "Already ready, p:" << concurrency );
195 U out;
196 my_function(out);
197 ++non_pointer_specialized_calls;
198 return out;
199 }
200 };
201
202 template<typename T, typename U>
203 class middle_filter<T*,U> : DestroyedTracker {
204 public:
operator ()(T * my_storage) const205 U operator()(T* my_storage) const {
206 free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available
207 CHECK(is_alive());
208 if(my_storage) { // may have been passed in a NULL
209 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
210 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
211 }
212 ++first_pointer_specialized_calls;
213 U out;
214 my_function(out);
215 return out;
216 }
217 };
218
219 template<typename T, typename U>
220 class middle_filter<T,U*> : DestroyedTracker {
221 public:
operator ()(T my_storage) const222 U* operator()(T my_storage) const {
223 CHECK(is_alive());
224 CHECK_MESSAGE(!middle_my_id(my_storage), "bad id value, p:" << concurrency);
225 CHECK_MESSAGE(!middle_is_ready(my_storage), "Already ready, p:" << concurrency );
226 // allocate new space from buffers
227 U* my_return = new(fetchNextBuffer()) U();
228 my_function(*my_return);
229 ++second_pointer_specialized_calls;
230 return my_return;
231 }
232 };
233
234 template<typename T, typename U>
235 class middle_filter<T*,U*> : DestroyedTracker {
236 public:
operator ()(T * my_storage) const237 U* operator()(T* my_storage) const {
238 free_on_scope_exit<T> my_ptr(my_storage); // free_on_scope_exit marks the buffer available
239 CHECK(is_alive());
240 if(my_storage) {
241 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
242 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
243 }
244 // may have been passed a NULL
245 ++pointer_specialized_calls;
246 if(!my_storage) return nullptr;
247 CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
248 CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
249 U* my_return = new(fetchNextBuffer()) U();
250 my_function(*my_return);
251 return my_return;
252 }
253 };
254
255 // specialization for int that squares the input and returns that.
256 template<>
257 class middle_filter<int,int> : DestroyedTracker {
258 public:
operator ()(int my_input) const259 int operator()(int my_input) const {
260 CHECK(is_alive());
261 ++non_pointer_specialized_calls;
262 return my_input*my_input;
263 }
264 };
265
266 // ---------------------------------
267 template<typename T>
268 class output_filter : DestroyedTracker {
269 public:
operator ()(T c) const270 void operator()(T c) const {
271 CHECK(is_alive());
272 CHECK_MESSAGE(output_my_id(c), "unset id value, p:" << concurrency);
273 CHECK_MESSAGE(output_is_ready(c), "not yet ready, p:" << concurrency);
274 ++non_pointer_specialized_calls;
275 output_counter++;
276 }
277 };
278
279 // specialization for int that puts the received value in an array
280 template<>
281 class output_filter<int> : DestroyedTracker {
282 public:
operator ()(int my_input) const283 void operator()(int my_input) const {
284 CHECK(is_alive());
285 ++non_pointer_specialized_calls;
286 int myindx = output_counter++;
287 intbuffer[myindx] = my_input;
288 }
289 };
290
291 template<typename T>
292 class output_filter<T*> : DestroyedTracker {
293 public:
operator ()(T * c) const294 void operator()(T* c) const {
295 free_on_scope_exit<T> my_ptr(c);
296 CHECK(is_alive());
297 if(c) {
298 CHECK_MESSAGE(output_my_id(*c), "unset id value, p:" << concurrency);
299 CHECK_MESSAGE(output_is_ready(*c), "not yet ready, p:" << concurrency);
300 }
301 output_counter++;
302 ++pointer_specialized_calls;
303 }
304 };
305
306 typedef enum {
307 no_pointer_counts,
308 assert_nonpointer,
309 assert_firstpointer,
310 assert_secondpointer,
311 assert_allpointer
312 } final_assert_type;
313
resetCounters()314 void resetCounters() {
315 output_counter = 0;
316 input_counter = max_counter;
317 non_pointer_specialized_calls = 0;
318 pointer_specialized_calls = 0;
319 first_pointer_specialized_calls = 0;
320 second_pointer_specialized_calls = 0;
321 // we have to reset the buffer flags because our input filters return allocated space on end-of-input,
322 // (on eof a default-constructed object is returned) and they do not pass through the filter further.
323 for(size_t i = 0; i < n_buffers; ++i)
324 buf_in_use[i].clear();
325 }
326
checkCounters(final_assert_type my_t)327 void checkCounters(final_assert_type my_t) {
328 CHECK_MESSAGE(output_counter == max_counter, "Ran out of buffers, p:" << concurrency);
329 switch(my_t) {
330 case assert_nonpointer:
331 CHECK_MESSAGE(pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "non-pointer filters specialized to pointer, p:" << concurrency);
332 CHECK_MESSAGE(non_pointer_specialized_calls == 3*max_counter, "bad count for non-pointer filters, p:" << concurrency);
333 if(check_intbuffer) {
334 for(int i = 1; i <= max_counter; ++i) {
335 int j = i*i;
336 bool found_val = false;
337 for(int k = 0; k < max_counter; ++k) {
338 if(intbuffer[k] == j) {
339 found_val = true;
340 break;
341 }
342 }
343 CHECK_MESSAGE(found_val, "Missing value in output array, p:" << concurrency );
344 }
345 }
346 break;
347 case assert_firstpointer:
348 {
349 bool check = pointer_specialized_calls == max_counter && // input filter extra invocation
350 first_pointer_specialized_calls == max_counter &&
351 non_pointer_specialized_calls == max_counter &&
352 second_pointer_specialized_calls == 0;
353 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
354 }
355 break;
356 case assert_secondpointer:
357 {
358 bool check = pointer_specialized_calls == max_counter &&
359 first_pointer_specialized_calls == 0 &&
360 non_pointer_specialized_calls == max_counter && // input filter
361 second_pointer_specialized_calls == max_counter;
362 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
363 }
364 break;
365 case assert_allpointer:
366 CHECK_MESSAGE(non_pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "pointer filters specialized to non-pointer, p:" << concurrency);
367 CHECK_MESSAGE(pointer_specialized_calls == 3*max_counter, "bad count for pointer filters, p:" << concurrency);
368 break;
369 case no_pointer_counts:
370 break;
371 }
372 }
373
374 static const tbb::filter_mode filter_table[] = { tbb::filter_mode::parallel, tbb::filter_mode::serial_in_order, tbb::filter_mode::serial_out_of_order};
375 const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
376
377 using filter_chain = tbb::filter<void, void>;
378 using mode_array =tbb::filter_mode;
379
380 // The filters are passed by value, which forces a temporary copy to be created. This is
381 // to reproduce the bug where a filter_chain uses refs to filters, which after a call
382 // would be references to destructed temporaries.
383 template<typename type1, typename type2>
fill_chain(filter_chain & my_chain,mode_array * filter_type,input_filter<type1> i_filter,middle_filter<type1,type2> m_filter,output_filter<type2> o_filter)384 void fill_chain( filter_chain &my_chain, mode_array *filter_type, input_filter<type1> i_filter,
385 middle_filter<type1, type2> m_filter, output_filter<type2> o_filter ) {
386 my_chain = tbb::filter<void, type1>(filter_type[0], i_filter) &
387 tbb::filter<type1, type2>(filter_type[1], m_filter) &
388 tbb::filter<type2, void>(filter_type[2], o_filter);
389 }
390
391 template<typename... Context>
run_function_spec(Context &...context)392 void run_function_spec(Context&... context) {
393 CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
394 input_filter<void> i_filter;
395 // Test pipeline that contains only one filter
396 for( unsigned i = 0; i<number_of_filter_types; i++) {
397 tbb::filter<void, void> one_filter( filter_table[i], i_filter );
398 CHECK_MESSAGE(filter_node_count==1, "some filter nodes left after previous iteration?");
399 resetCounters();
400 tbb::parallel_pipeline( n_tokens, one_filter, context... );
401 // no need to check counters
402 std::atomic<int> counter;
403 counter = max_counter;
404 // Construct filter using lambda-syntax when parallel_pipeline() is being run;
405 tbb::parallel_pipeline( n_tokens,
406 tbb::filter<void, void>(filter_table[i], [&counter]( tbb::flow_control& control ) {
407 if( counter-- == 0 )
408 control.stop();
409 }
410 ),
411 context...
412 );
413 }
414 CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
415 }
416
417 template<typename t1, typename t2, typename... Context>
run_filter_set(input_filter<t1> & i_filter,middle_filter<t1,t2> & m_filter,output_filter<t2> & o_filter,mode_array * filter_type,final_assert_type my_t,Context &...context)418 void run_filter_set(
419 input_filter<t1>& i_filter,
420 middle_filter<t1,t2>& m_filter,
421 output_filter<t2>& o_filter,
422 mode_array *filter_type,
423 final_assert_type my_t,
424 Context&... context) {
425 tbb::filter<void, t1> filter1( filter_type[0], i_filter );
426 tbb::filter<t1, t2> filter2( filter_type[1], m_filter );
427 tbb::filter<t2, void> filter3( filter_type[2], o_filter );
428
429 CHECK_MESSAGE(filter_node_count==3, "some filter nodes left after previous iteration?");
430 resetCounters();
431 // Create filters sequence when parallel_pipeline() is being run
432 tbb::parallel_pipeline( n_tokens, filter1, filter2, filter3, context... );
433 checkCounters(my_t);
434
435 // Create filters sequence partially outside parallel_pipeline() and also when parallel_pipeline() is being run
436 tbb::filter<void, t2> filter12;
437 filter12 = filter1 & filter2;
438 for( int i = 0; i<3; i++)
439 {
440 filter12 &= tbb::filter<t2,t2>(filter_type[i], [](t2 x) -> t2 { return x;});
441 }
442 resetCounters();
443 tbb::parallel_pipeline( n_tokens, filter12, filter3, context... );
444 checkCounters(my_t);
445
446 tbb::filter<void, void> filter123 = filter12 & filter3;
447 // Run pipeline twice with the same filter sequence
448 for( unsigned i = 0; i<2; i++ ) {
449 resetCounters();
450 tbb::parallel_pipeline( n_tokens, filter123, context... );
451 checkCounters(my_t);
452 }
453
454 // Now copy-and-move-construct another filter instance, and use it to run pipeline
455 {
456 tbb::filter<void, void> copy123( filter123 );
457 resetCounters();
458 tbb::parallel_pipeline( n_tokens, copy123, context... );
459 checkCounters(my_t);
460 tbb::filter<void, void> move123( std::move(filter123) );
461 resetCounters();
462 tbb::parallel_pipeline( n_tokens, move123, context... );
463 checkCounters(my_t);
464 }
465
466 // Construct filters and create the sequence when parallel_pipeline() is being run
467 resetCounters();
468 tbb::parallel_pipeline( n_tokens,
469 tbb::filter<void, t1>(filter_type[0], i_filter),
470 tbb::filter<t1, t2>(filter_type[1], m_filter),
471 tbb::filter<t2, void>(filter_type[2], o_filter),
472 context... );
473 checkCounters(my_t);
474
475 // Construct filters, make a copy, destroy the original filters, and run with the copy
476 int cnt = filter_node_count;
477 {
478 tbb::filter<void, void>* p123 = new tbb::filter<void,void> (
479 tbb::filter<void, t1>(filter_type[0], i_filter)&
480 tbb::filter<t1, t2>(filter_type[1], m_filter)&
481 tbb::filter<t2, void>(filter_type[2], o_filter) );
482 CHECK_MESSAGE(filter_node_count==cnt+5, "filter node accounting error?");
483 tbb::filter<void, void> copy123( *p123 );
484 delete p123;
485 CHECK_MESSAGE(filter_node_count==cnt+5, "filter nodes deleted prematurely?");
486 resetCounters();
487 tbb::parallel_pipeline( n_tokens, copy123, context... );
488 checkCounters(my_t);
489 }
490
491 // construct a filter with temporaries
492 {
493 tbb::filter<void, void> my_filter;
494 fill_chain<t1,t2>( my_filter, filter_type, i_filter, m_filter, o_filter );
495 resetCounters();
496 tbb::parallel_pipeline( n_tokens, my_filter, context... );
497 checkCounters(my_t);
498 }
499 CHECK_MESSAGE(filter_node_count==cnt, "scope ended but filter nodes not deleted?");
500 }
501
502 template <typename t1, typename t2, typename... Context>
run_lambdas_test(mode_array * filter_type,Context &...context)503 void run_lambdas_test( mode_array *filter_type, Context&... context ) {
504 std::atomic<int> counter;
505 counter = max_counter;
506 // Construct filters using lambda-syntax and create the sequence when parallel_pipeline() is being run;
507 resetCounters(); // only need the output_counter reset.
508 tbb::parallel_pipeline( n_tokens,
509 tbb::make_filter<void, t1>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
510 if( --counter < 0 )
511 control.stop();
512 return t1(); }
513 ),
514 tbb::make_filter<t1, t2>(filter_type[1], []( t1 /*my_storage*/ ) -> t2 {
515 return t2(); }
516 ),
517 tbb::make_filter<t2,void>(filter_type[2], [] ( t2 ) -> void {
518 output_counter++; }
519 ),
520 context...
521 );
522 checkCounters(no_pointer_counts); // don't have to worry about specializations
523 counter = max_counter;
524 // pointer filters
525 resetCounters();
526 tbb::parallel_pipeline( n_tokens,
527 tbb::filter<void,t1*>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
528 if( --counter < 0 ) {
529 control.stop();
530 return nullptr;
531 }
532 return new(fetchNextBuffer()) t1(); }
533 ),
534 tbb::filter<t1*, t2*>(filter_type[1], []( t1* my_storage ) -> t2* {
535 my_storage->~t1();
536 return new(my_storage) t2(); }
537 ),
538 tbb::filter<t2*, void>(filter_type[2], [] ( t2* my_storage ) -> void {
539 my_storage->~t2();
540 freeBuffer(my_storage);
541 output_counter++; }
542 ),
543 context...
544 );
545 checkCounters(no_pointer_counts);
546 // first filter outputs pointer
547 counter = max_counter;
548 resetCounters();
549 tbb::parallel_pipeline( n_tokens,
550 tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
551 if( --counter < 0 ) {
552 control.stop();
553 return nullptr;
554 }
555 return new(fetchNextBuffer()) t1(); }
556 )&
557 tbb::make_filter(filter_type[1], []( t1* my_storage ) -> t2 {
558 my_storage->~t1();
559 freeBuffer(my_storage);
560 return t2(); }
561 ),
562 tbb::make_filter(filter_type[2], [] ( t2 /*my_storage*/) -> void {
563 output_counter++; }
564 ),
565 context...
566 );
567 checkCounters(no_pointer_counts);
568 // second filter outputs pointer
569 counter = max_counter;
570 resetCounters();
571 tbb::parallel_pipeline( n_tokens,
572 tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
573 if( --counter < 0 ) {
574 control.stop();
575 }
576 return t1(); }
577 ),
578 tbb::filter<t1, t2*>(filter_type[1], []( t1 /*my_storage*/ ) -> t2* {
579 return new(fetchNextBuffer()) t2(); }
580 )&
581 tbb::make_filter<t2*, void>(filter_type[2], [] ( t2* my_storage) -> void {
582 my_storage->~t2();
583 freeBuffer(my_storage);
584 output_counter++; }
585 ),
586 context...
587 );
588 checkCounters(no_pointer_counts);
589 }
590
591 template<typename type1, typename type2>
run_function(const char * l1,const char * l2)592 void run_function(const char *l1, const char *l2) {
593 CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
594
595 check_intbuffer = (!strcmp(l1,"int") && !strcmp(l2,"int"));
596
597 Checker<type1> check1; // check constructions/destructions
598 Checker<type2> check2; // for type1 or type2 === CheckType<T>
599
600 const size_t number_of_filters = 3;
601
602 input_filter<type1> i_filter;
603 input_filter<type1*> p_i_filter;
604
605 middle_filter<type1, type2> m_filter;
606 middle_filter<type1*, type2> pr_m_filter;
607 middle_filter<type1, type2*> rp_m_filter;
608 middle_filter<type1*, type2*> pp_m_filter;
609
610 output_filter<type2> o_filter;
611 output_filter<type2*> p_o_filter;
612
613 // allocate the buffers for the filters
614 unsigned max_size = (sizeof(type1) > sizeof(type2) ) ? sizeof(type1) : sizeof(type2);
615 for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
616 buffers[i] = malloc(max_size);
617 buf_in_use[i].clear();
618 }
619
620 unsigned limit = 1;
621 // Test pipeline that contains number_of_filters filters
622 for( unsigned i=0; i<number_of_filters; ++i)
623 limit *= number_of_filter_types;
624 // Iterate over possible filter sequences
625 for( unsigned numeral=0; numeral<limit; ++numeral ) {
626 unsigned temp = numeral;
627 tbb::filter_mode filter_type[number_of_filter_types];
628 for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types )
629 filter_type[i] = filter_table[temp%number_of_filter_types];
630
631 tbb::task_group_context context;
632 run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer);
633 run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer, context);
634 run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer);
635 run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer, context);
636 run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer);
637 run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer, context);
638 run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer);
639 run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer, context);
640
641 run_lambdas_test<type1,type2>(filter_type);
642 run_lambdas_test<type1,type2>(filter_type, context);
643 }
644 CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
645
646 for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
647 free(buffers[i]);
648 }
649 }
650
651 //! Testing single filter pipeline
652 //! \brief \ref error_guessing
653 TEST_CASE("Pipeline testing for single filter") {
654 run_function_spec();
655 tbb::task_group_context context;
656 run_function_spec(context);
657 }
658
659 #define RUN_TYPED_TEST_CASE(type1, type2) TEST_CASE("Pipeline testing with "#type1" and "#type2) { \
660 for ( std::size_t concurrency_level : {1, 2, 4, 5, 7, 8} ) { \
661 if ( concurrency_level > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) ) \
662 break; \
663 concurrency = concurrency_level; \
664 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); \
665 run_function<type1, type2>(#type1, #type2); \
666 } \
667 }
668 // Run test several times with different types
669 RUN_TYPED_TEST_CASE(std::size_t, int)
670 RUN_TYPED_TEST_CASE(int, double)
671 RUN_TYPED_TEST_CASE(std::size_t, double)
672 RUN_TYPED_TEST_CASE(std::size_t, bool)
673 RUN_TYPED_TEST_CASE(int, int)
674 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, std::size_t)
675 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, std::size_t)
676 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned int>)
677 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned short>)
678 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, CheckType<unsigned short>)
679 RUN_TYPED_TEST_CASE(double, CheckType<unsigned short>)
680 RUN_TYPED_TEST_CASE(std::unique_ptr<int>, std::unique_ptr<int>) // move-only type
681
682 #undef RUN_TYPED_TEST_CASE
683