1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // Before including parallel_pipeline.h, set up the variable to count heap allocated
18 // filter_node objects, and make it known for the header.
19 #include "common/test.h"
20 #include "common/utils.h"
21 #include "common/checktype.h"
22 
23 int filter_node_count = 0;
24 #define __TBB_TEST_FILTER_NODE_COUNT filter_node_count
25 #include "tbb/parallel_pipeline.h"
26 #include "tbb/global_control.h"
27 #include "tbb/spin_mutex.h"
28 #include "tbb/task_group.h"
29 
30 #include <atomic>
31 #include <string.h>
32 #include <memory> // std::unique_ptr
33 
34 //! \file test_parallel_pipeline.cpp
35 //! \brief Test for [algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specification
36 
37 const unsigned n_tokens = 8;
38 // we can conceivably have two buffers used in the middle filter for every token in flight, so
39 // we must allocate two buffers for every token.  Unlikely, but possible.
40 const unsigned n_buffers = 2*n_tokens;
41 const int max_counter = 16;
42 
43 static std::size_t concurrency = 0;
44 
45 static std::atomic<int> output_counter;
46 static std::atomic<int> input_counter;
47 static std::atomic<int> non_pointer_specialized_calls;
48 static std::atomic<int> pointer_specialized_calls;
49 static std::atomic<int> first_pointer_specialized_calls;
50 static std::atomic<int> second_pointer_specialized_calls;
51 
52 static int intbuffer[max_counter];  // store results for <int,int> parallel pipeline test
53 static bool check_intbuffer;
54 
55 static void* buffers[n_buffers];
56 static std::atomic_flag buf_in_use[n_buffers] = {ATOMIC_FLAG_INIT};
57 
fetchNextBuffer()58 void *fetchNextBuffer() {
59     for(size_t icnt = 0; icnt < n_buffers; ++icnt) {
60         if(!buf_in_use[icnt].test_and_set()) {
61             return buffers[icnt];
62         }
63     }
64     CHECK_MESSAGE(false, "Ran out of buffers, p:"<< concurrency);
65     return 0;
66 }
freeBuffer(void * buf)67 void freeBuffer(void *buf) {
68     for(size_t i=0; i < n_buffers;++i) {
69         if(buffers[i] == buf) {
70             buf_in_use[i].clear();
71             return;
72         }
73     }
74     CHECK_MESSAGE(false, "Tried to free a buffer not in our list, p:" << concurrency);
75 }
76 
77 template<typename T>
78 class free_on_scope_exit {
79 public:
free_on_scope_exit(T * p)80     free_on_scope_exit(T *p) : my_p(p) {}
~free_on_scope_exit()81     ~free_on_scope_exit() { if(!my_p) return; my_p->~T(); freeBuffer(my_p); }
82 private:
83     T *my_p;
84 };
85 
86 // methods for testing CheckType< >, that return okay values for other types.
87 template<typename T>
middle_is_ready(T &)88 bool middle_is_ready(T &/*p*/) { return false; }
89 
90 template<typename U>
middle_is_ready(CheckType<U> & p)91 bool middle_is_ready(CheckType<U> &p) { return p.is_ready(); }
92 
93 template<typename T>
output_is_ready(T &)94 bool output_is_ready(T &/*p*/) { return true; }
95 
96 template<typename U>
output_is_ready(CheckType<U> & p)97 bool output_is_ready(CheckType<U> &p) { return p.is_ready(); }
98 
99 template<typename T>
middle_my_id(T &)100 int middle_my_id( T &/*p*/) { return 0; }
101 
102 template<typename U>
middle_my_id(CheckType<U> & p)103 int middle_my_id(CheckType<U> &p) { return p.id(); }
104 
105 template<typename T>
output_my_id(T &)106 int output_my_id( T &/*p*/) { return 1; }
107 
108 template<typename U>
output_my_id(CheckType<U> & p)109 int output_my_id(CheckType<U> &p) { return p.id(); }
110 
111 template<typename T>
my_function(T & p)112 void my_function(T &p) { p = 0; }
113 
114 template<typename U>
my_function(CheckType<U> & p)115 void my_function(CheckType<U> &p) { p.get_ready(); }
116 
117 // Filters must be copy-constructible, and be const-qualifiable.
118 template<typename U>
119 class input_filter : DestroyedTracker {
120 public:
operator ()(tbb::flow_control & control) const121     U operator()( tbb::flow_control& control ) const {
122         CHECK(is_alive());
123         if( --input_counter < 0 ) {
124             control.stop();
125         }
126         else  // only count successful reads
127             ++non_pointer_specialized_calls;
128         return U();  // default constructed
129     }
130 
131 };
132 
133 // specialization for pointer
134 template<typename U>
135 class input_filter<U*> : DestroyedTracker {
136 public:
operator ()(tbb::flow_control & control) const137     U* operator()(tbb::flow_control& control) const {
138         CHECK(is_alive());
139         int ival = --input_counter;
140         if(ival < 0) {
141             control.stop();
142             return nullptr;
143         }
144         ++pointer_specialized_calls;
145         if(ival == max_counter / 2) {
146             return nullptr;  // non-stop NULL
147         }
148         U* myReturn = new(fetchNextBuffer()) U();
149         if (myReturn) {  // may have been passed in a NULL
150             CHECK_MESSAGE(!middle_my_id(*myReturn), "bad id value, p:" << concurrency);
151             CHECK_MESSAGE(!middle_is_ready(*myReturn), "Already ready, p:" << concurrency);
152         }
153         return myReturn;
154     }
155 };
156 
157 template<>
158 class input_filter<void> : DestroyedTracker {
159 public:
operator ()(tbb::flow_control & control) const160     void operator()( tbb::flow_control& control ) const {
161         CHECK(is_alive());
162         if( --input_counter < 0 ) {
163             control.stop();
164         }
165         else
166             ++non_pointer_specialized_calls;
167     }
168 
169 };
170 
171 // specialization for int that passes back a sequence of integers
172 template<>
173 class input_filter<int> : DestroyedTracker {
174 public:
175     int
operator ()(tbb::flow_control & control) const176     operator()(tbb::flow_control& control ) const {
177         CHECK(is_alive());
178         int oldval = --input_counter;
179         if( oldval < 0 ) {
180             control.stop();
181         }
182         else
183             ++non_pointer_specialized_calls;
184         return oldval+1;
185     }
186 };
187 
188 template<typename T, typename U>
189 class middle_filter : DestroyedTracker {
190 public:
operator ()(T t) const191     U operator()(T t) const {
192         CHECK(is_alive());
193         CHECK_MESSAGE(!middle_my_id(t), "bad id value, p:" << concurrency);
194         CHECK_MESSAGE(!middle_is_ready(t), "Already ready, p:" << concurrency );
195         U out;
196         my_function(out);
197         ++non_pointer_specialized_calls;
198         return out;
199     }
200 };
201 
202 template<typename T, typename U>
203 class middle_filter<T*,U> : DestroyedTracker {
204 public:
operator ()(T * my_storage) const205     U operator()(T* my_storage) const {
206         free_on_scope_exit<T> my_ptr(my_storage);  // free_on_scope_exit marks the buffer available
207         CHECK(is_alive());
208         if(my_storage) {  // may have been passed in a NULL
209             CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
210             CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
211         }
212         ++first_pointer_specialized_calls;
213         U out;
214         my_function(out);
215         return out;
216     }
217 };
218 
219 template<typename T, typename U>
220 class middle_filter<T,U*> : DestroyedTracker {
221 public:
operator ()(T my_storage) const222     U* operator()(T my_storage) const {
223         CHECK(is_alive());
224         CHECK_MESSAGE(!middle_my_id(my_storage), "bad id value, p:" << concurrency);
225         CHECK_MESSAGE(!middle_is_ready(my_storage), "Already ready, p:" << concurrency );
226         // allocate new space from buffers
227         U* my_return = new(fetchNextBuffer()) U();
228         my_function(*my_return);
229         ++second_pointer_specialized_calls;
230         return my_return;
231     }
232 };
233 
234 template<typename T, typename U>
235 class middle_filter<T*,U*> : DestroyedTracker {
236 public:
operator ()(T * my_storage) const237     U* operator()(T* my_storage) const {
238         free_on_scope_exit<T> my_ptr(my_storage);  // free_on_scope_exit marks the buffer available
239         CHECK(is_alive());
240         if(my_storage) {
241             CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
242             CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
243         }
244         // may have been passed a NULL
245         ++pointer_specialized_calls;
246         if(!my_storage) return nullptr;
247         CHECK_MESSAGE(!middle_my_id(*my_storage), "bad id value, p:" << concurrency);
248         CHECK_MESSAGE(!middle_is_ready(*my_storage), "Already ready, p:" << concurrency );
249         U* my_return = new(fetchNextBuffer()) U();
250         my_function(*my_return);
251         return my_return;
252     }
253 };
254 
255 // specialization for int that squares the input and returns that.
256 template<>
257 class middle_filter<int,int> : DestroyedTracker {
258 public:
operator ()(int my_input) const259     int operator()(int my_input) const {
260         CHECK(is_alive());
261         ++non_pointer_specialized_calls;
262         return my_input*my_input;
263     }
264 };
265 
266 // ---------------------------------
267 template<typename T>
268 class output_filter : DestroyedTracker {
269 public:
operator ()(T c) const270     void operator()(T c) const {
271         CHECK(is_alive());
272         CHECK_MESSAGE(output_my_id(c), "unset id value, p:" << concurrency);
273         CHECK_MESSAGE(output_is_ready(c), "not yet ready, p:" << concurrency);
274         ++non_pointer_specialized_calls;
275         output_counter++;
276     }
277 };
278 
279 // specialization for int that puts the received value in an array
280 template<>
281 class output_filter<int> : DestroyedTracker {
282 public:
operator ()(int my_input) const283     void operator()(int my_input) const {
284         CHECK(is_alive());
285         ++non_pointer_specialized_calls;
286         int myindx = output_counter++;
287         intbuffer[myindx] = my_input;
288     }
289 };
290 
291 template<typename T>
292 class output_filter<T*> : DestroyedTracker {
293 public:
operator ()(T * c) const294     void operator()(T* c) const {
295         free_on_scope_exit<T> my_ptr(c);
296         CHECK(is_alive());
297         if(c) {
298             CHECK_MESSAGE(output_my_id(*c), "unset id value, p:" << concurrency);
299             CHECK_MESSAGE(output_is_ready(*c), "not yet ready, p:" << concurrency);
300         }
301         output_counter++;
302         ++pointer_specialized_calls;
303     }
304 };
305 
306 typedef enum {
307     no_pointer_counts,
308     assert_nonpointer,
309     assert_firstpointer,
310     assert_secondpointer,
311     assert_allpointer
312 } final_assert_type;
313 
resetCounters()314 void resetCounters() {
315     output_counter = 0;
316     input_counter = max_counter;
317     non_pointer_specialized_calls = 0;
318     pointer_specialized_calls = 0;
319     first_pointer_specialized_calls = 0;
320     second_pointer_specialized_calls = 0;
321     // we have to reset the buffer flags because our input filters return allocated space on end-of-input,
322     // (on eof a default-constructed object is returned) and they do not pass through the filter further.
323     for(size_t i = 0; i < n_buffers; ++i)
324         buf_in_use[i].clear();
325 }
326 
checkCounters(final_assert_type my_t)327 void checkCounters(final_assert_type my_t) {
328     CHECK_MESSAGE(output_counter == max_counter, "Ran out of buffers, p:" << concurrency);
329     switch(my_t) {
330         case assert_nonpointer:
331             CHECK_MESSAGE(pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "non-pointer filters specialized to pointer, p:" << concurrency);
332             CHECK_MESSAGE(non_pointer_specialized_calls == 3*max_counter, "bad count for non-pointer filters, p:" << concurrency);
333             if(check_intbuffer) {
334                 for(int i = 1; i <= max_counter; ++i) {
335                     int j = i*i;
336                     bool found_val = false;
337                     for(int k = 0; k < max_counter; ++k) {
338                         if(intbuffer[k] == j) {
339                             found_val = true;
340                             break;
341                         }
342                     }
343                     CHECK_MESSAGE(found_val, "Missing value in output array, p:" << concurrency );
344                 }
345             }
346             break;
347         case assert_firstpointer:
348             {
349                 bool check = pointer_specialized_calls == max_counter &&  // input filter extra invocation
350                     first_pointer_specialized_calls == max_counter &&
351                     non_pointer_specialized_calls == max_counter &&
352                     second_pointer_specialized_calls == 0;
353                 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
354             }
355             break;
356         case assert_secondpointer:
357             {
358                 bool check = pointer_specialized_calls == max_counter &&
359                     first_pointer_specialized_calls == 0 &&
360                     non_pointer_specialized_calls == max_counter &&  // input filter
361                     second_pointer_specialized_calls == max_counter;
362                 CHECK_MESSAGE(check, "incorrect specialization for firstpointer, p:" << concurrency);
363             }
364             break;
365         case assert_allpointer:
366             CHECK_MESSAGE(non_pointer_specialized_calls+first_pointer_specialized_calls+second_pointer_specialized_calls == 0, "pointer filters specialized to non-pointer, p:" << concurrency);
367             CHECK_MESSAGE(pointer_specialized_calls == 3*max_counter, "bad count for pointer filters, p:" << concurrency);
368             break;
369         case no_pointer_counts:
370             break;
371     }
372 }
373 
374 static const tbb::filter_mode filter_table[] = { tbb::filter_mode::parallel, tbb::filter_mode::serial_in_order, tbb::filter_mode::serial_out_of_order};
375 const unsigned number_of_filter_types = sizeof(filter_table)/sizeof(filter_table[0]);
376 
377 using filter_chain = tbb::filter<void, void>;
378 using mode_array =tbb::filter_mode;
379 
380 // The filters are passed by value, which forces a temporary copy to be created.  This is
381 // to reproduce the bug where a filter_chain uses refs to filters, which after a call
382 // would be references to destructed temporaries.
383 template<typename type1, typename type2>
fill_chain(filter_chain & my_chain,mode_array * filter_type,input_filter<type1> i_filter,middle_filter<type1,type2> m_filter,output_filter<type2> o_filter)384 void fill_chain( filter_chain &my_chain, mode_array *filter_type, input_filter<type1> i_filter,
385          middle_filter<type1, type2> m_filter, output_filter<type2> o_filter ) {
386     my_chain = tbb::filter<void, type1>(filter_type[0], i_filter) &
387         tbb::filter<type1, type2>(filter_type[1], m_filter) &
388         tbb::filter<type2, void>(filter_type[2], o_filter);
389 }
390 
391 template<typename... Context>
run_function_spec(Context &...context)392 void run_function_spec(Context&... context) {
393     CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
394     input_filter<void> i_filter;
395     // Test pipeline that contains only one filter
396     for( unsigned i = 0; i<number_of_filter_types; i++) {
397         tbb::filter<void, void> one_filter( filter_table[i], i_filter );
398         CHECK_MESSAGE(filter_node_count==1, "some filter nodes left after previous iteration?");
399         resetCounters();
400         tbb::parallel_pipeline( n_tokens, one_filter, context... );
401         // no need to check counters
402         std::atomic<int> counter;
403         counter = max_counter;
404         // Construct filter using lambda-syntax when parallel_pipeline() is being run;
405         tbb::parallel_pipeline( n_tokens,
406             tbb::filter<void, void>(filter_table[i], [&counter]( tbb::flow_control& control ) {
407                     if( counter-- == 0 )
408                         control.stop();
409                     }
410             ),
411             context...
412         );
413     }
414     CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
415 }
416 
417 template<typename t1, typename t2, typename... Context>
run_filter_set(input_filter<t1> & i_filter,middle_filter<t1,t2> & m_filter,output_filter<t2> & o_filter,mode_array * filter_type,final_assert_type my_t,Context &...context)418 void run_filter_set(
419         input_filter<t1>& i_filter,
420         middle_filter<t1,t2>& m_filter,
421         output_filter<t2>& o_filter,
422         mode_array *filter_type,
423         final_assert_type my_t,
424         Context&... context) {
425     tbb::filter<void, t1> filter1( filter_type[0], i_filter );
426     tbb::filter<t1, t2> filter2( filter_type[1], m_filter );
427     tbb::filter<t2, void> filter3( filter_type[2], o_filter );
428 
429     CHECK_MESSAGE(filter_node_count==3, "some filter nodes left after previous iteration?");
430     resetCounters();
431     // Create filters sequence when parallel_pipeline() is being run
432     tbb::parallel_pipeline( n_tokens, filter1, filter2, filter3, context...  );
433     checkCounters(my_t);
434 
435     // Create filters sequence partially outside parallel_pipeline() and also when parallel_pipeline() is being run
436     tbb::filter<void, t2> filter12;
437     filter12 = filter1 & filter2;
438     for( int i = 0; i<3; i++)
439     {
440         filter12 &= tbb::filter<t2,t2>(filter_type[i], [](t2 x) -> t2 { return x;});
441     }
442     resetCounters();
443     tbb::parallel_pipeline( n_tokens, filter12, filter3, context...  );
444     checkCounters(my_t);
445 
446     tbb::filter<void, void> filter123 = filter12 & filter3;
447     // Run pipeline twice with the same filter sequence
448     for( unsigned i = 0; i<2; i++ ) {
449         resetCounters();
450         tbb::parallel_pipeline( n_tokens, filter123, context...  );
451         checkCounters(my_t);
452     }
453 
454     // Now copy-and-move-construct another filter instance, and use it to run pipeline
455     {
456         tbb::filter<void, void> copy123( filter123 );
457         resetCounters();
458         tbb::parallel_pipeline( n_tokens, copy123, context...  );
459         checkCounters(my_t);
460         tbb::filter<void, void> move123( std::move(filter123) );
461         resetCounters();
462         tbb::parallel_pipeline( n_tokens, move123, context...  );
463         checkCounters(my_t);
464     }
465 
466     // Construct filters and create the sequence when parallel_pipeline() is being run
467     resetCounters();
468     tbb::parallel_pipeline( n_tokens,
469                tbb::filter<void, t1>(filter_type[0], i_filter),
470                tbb::filter<t1, t2>(filter_type[1], m_filter),
471                tbb::filter<t2, void>(filter_type[2], o_filter),
472                context...  );
473     checkCounters(my_t);
474 
475     // Construct filters, make a copy, destroy the original filters, and run with the copy
476     int cnt = filter_node_count;
477     {
478         tbb::filter<void, void>* p123 = new tbb::filter<void,void> (
479                tbb::filter<void, t1>(filter_type[0], i_filter)&
480                tbb::filter<t1, t2>(filter_type[1], m_filter)&
481                tbb::filter<t2, void>(filter_type[2], o_filter) );
482         CHECK_MESSAGE(filter_node_count==cnt+5, "filter node accounting error?");
483         tbb::filter<void, void> copy123( *p123 );
484         delete p123;
485         CHECK_MESSAGE(filter_node_count==cnt+5, "filter nodes deleted prematurely?");
486         resetCounters();
487         tbb::parallel_pipeline( n_tokens, copy123, context...  );
488         checkCounters(my_t);
489     }
490 
491     // construct a filter with temporaries
492     {
493         tbb::filter<void, void> my_filter;
494         fill_chain<t1,t2>( my_filter, filter_type, i_filter, m_filter, o_filter );
495         resetCounters();
496         tbb::parallel_pipeline( n_tokens, my_filter, context...  );
497         checkCounters(my_t);
498     }
499     CHECK_MESSAGE(filter_node_count==cnt, "scope ended but filter nodes not deleted?");
500 }
501 
502 template <typename t1, typename t2, typename... Context>
run_lambdas_test(mode_array * filter_type,Context &...context)503 void run_lambdas_test( mode_array *filter_type, Context&... context ) {
504     std::atomic<int> counter;
505     counter = max_counter;
506     // Construct filters using lambda-syntax and create the sequence when parallel_pipeline() is being run;
507     resetCounters();  // only need the output_counter reset.
508     tbb::parallel_pipeline( n_tokens,
509         tbb::make_filter<void, t1>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
510                 if( --counter < 0 )
511                     control.stop();
512                 return t1(); }
513         ),
514         tbb::make_filter<t1, t2>(filter_type[1], []( t1 /*my_storage*/ ) -> t2 {
515                 return t2(); }
516         ),
517         tbb::make_filter<t2,void>(filter_type[2], [] ( t2 ) -> void {
518                 output_counter++; }
519         ),
520         context...
521     );
522     checkCounters(no_pointer_counts);  // don't have to worry about specializations
523     counter = max_counter;
524     // pointer filters
525     resetCounters();
526     tbb::parallel_pipeline( n_tokens,
527         tbb::filter<void,t1*>(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
528                 if( --counter < 0 ) {
529                     control.stop();
530                     return nullptr;
531                 }
532                 return new(fetchNextBuffer()) t1(); }
533         ),
534 		tbb::filter<t1*, t2*>(filter_type[1], []( t1* my_storage ) -> t2* {
535                 my_storage->~t1();
536                 return new(my_storage) t2(); }
537         ),
538 		tbb::filter<t2*, void>(filter_type[2], [] ( t2* my_storage ) -> void {
539                 my_storage->~t2();
540                 freeBuffer(my_storage);
541                 output_counter++; }
542         ),
543         context...
544     );
545     checkCounters(no_pointer_counts);
546     // first filter outputs pointer
547     counter = max_counter;
548     resetCounters();
549     tbb::parallel_pipeline( n_tokens,
550         tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1* {
551                 if( --counter < 0 ) {
552                     control.stop();
553                     return nullptr;
554                 }
555                 return new(fetchNextBuffer()) t1(); }
556         )&
557         tbb::make_filter(filter_type[1], []( t1* my_storage ) -> t2 {
558                 my_storage->~t1();
559                 freeBuffer(my_storage);
560                 return t2(); }
561         ),
562         tbb::make_filter(filter_type[2], [] ( t2 /*my_storage*/) -> void {
563                 output_counter++; }
564         ),
565         context...
566     );
567     checkCounters(no_pointer_counts);
568     // second filter outputs pointer
569     counter = max_counter;
570     resetCounters();
571     tbb::parallel_pipeline( n_tokens,
572         tbb::make_filter(filter_type[0], [&counter]( tbb::flow_control& control ) -> t1 {
573                 if( --counter < 0 ) {
574                     control.stop();
575                 }
576                 return t1(); }
577         ),
578         tbb::filter<t1, t2*>(filter_type[1], []( t1 /*my_storage*/ ) -> t2* {
579                 return new(fetchNextBuffer()) t2(); }
580         )&
581         tbb::make_filter<t2*, void>(filter_type[2], [] ( t2* my_storage) -> void {
582                 my_storage->~t2();
583                 freeBuffer(my_storage);
584                 output_counter++; }
585         ),
586         context...
587     );
588     checkCounters(no_pointer_counts);
589 }
590 
591 template<typename type1, typename type2>
run_function(const char * l1,const char * l2)592 void run_function(const char *l1, const char *l2) {
593     CHECK_MESSAGE(!filter_node_count, "invalid filter_node counter");
594 
595     check_intbuffer = (!strcmp(l1,"int") && !strcmp(l2,"int"));
596 
597     Checker<type1> check1;  // check constructions/destructions
598     Checker<type2> check2;  // for type1 or type2 === CheckType<T>
599 
600     const size_t number_of_filters = 3;
601 
602     input_filter<type1> i_filter;
603     input_filter<type1*> p_i_filter;
604 
605     middle_filter<type1, type2> m_filter;
606     middle_filter<type1*, type2> pr_m_filter;
607     middle_filter<type1, type2*> rp_m_filter;
608     middle_filter<type1*, type2*> pp_m_filter;
609 
610     output_filter<type2> o_filter;
611     output_filter<type2*> p_o_filter;
612 
613     // allocate the buffers for the filters
614     unsigned max_size = (sizeof(type1) > sizeof(type2) ) ? sizeof(type1) : sizeof(type2);
615     for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
616         buffers[i] = malloc(max_size);
617         buf_in_use[i].clear();
618     }
619 
620     unsigned limit = 1;
621     // Test pipeline that contains number_of_filters filters
622     for( unsigned i=0; i<number_of_filters; ++i)
623         limit *= number_of_filter_types;
624     // Iterate over possible filter sequences
625     for( unsigned numeral=0; numeral<limit; ++numeral ) {
626         unsigned temp = numeral;
627         tbb::filter_mode filter_type[number_of_filter_types];
628         for( unsigned i=0; i<number_of_filters; ++i, temp/=number_of_filter_types )
629             filter_type[i] = filter_table[temp%number_of_filter_types];
630 
631         tbb::task_group_context context;
632         run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer);
633         run_filter_set<type1,type2>(i_filter, m_filter, o_filter, filter_type, assert_nonpointer, context);
634         run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer);
635         run_filter_set<type1*,type2>(p_i_filter, pr_m_filter, o_filter, filter_type, assert_firstpointer, context);
636         run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer);
637         run_filter_set<type1,type2*>(i_filter, rp_m_filter, p_o_filter, filter_type, assert_secondpointer, context);
638         run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer);
639         run_filter_set<type1*,type2*>(p_i_filter, pp_m_filter, p_o_filter, filter_type, assert_allpointer, context);
640 
641         run_lambdas_test<type1,type2>(filter_type);
642         run_lambdas_test<type1,type2>(filter_type, context);
643     }
644     CHECK_MESSAGE(!filter_node_count, "filter_node objects leaked");
645 
646     for(unsigned i = 0; i < (unsigned)n_buffers; ++i) {
647         free(buffers[i]);
648     }
649 }
650 
651 //! Testing single filter pipeline
652 //! \brief \ref error_guessing
653 TEST_CASE("Pipeline testing for single filter") {
654     run_function_spec();
655     tbb::task_group_context context;
656     run_function_spec(context);
657 }
658 
659 #define RUN_TYPED_TEST_CASE(type1, type2) TEST_CASE("Pipeline testing with "#type1" and "#type2) { \
660                                                 for ( std::size_t concurrency_level : {1, 2, 4, 5, 7, 8} ) { \
661                                                     if ( concurrency_level > tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) ) \
662                                                         break; \
663                                                     concurrency = concurrency_level; \
664                                                     tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); \
665                                                     run_function<type1, type2>(#type1, #type2); \
666                                                 } \
667                                             }
668 // Run test several times with different types
669 RUN_TYPED_TEST_CASE(std::size_t, int)
670 RUN_TYPED_TEST_CASE(int, double)
671 RUN_TYPED_TEST_CASE(std::size_t, double)
672 RUN_TYPED_TEST_CASE(std::size_t, bool)
673 RUN_TYPED_TEST_CASE(int, int)
674 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, std::size_t)
675 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, std::size_t)
676 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned int>)
677 RUN_TYPED_TEST_CASE(CheckType<unsigned int>, CheckType<unsigned short>)
678 RUN_TYPED_TEST_CASE(CheckType<unsigned short>, CheckType<unsigned short>)
679 RUN_TYPED_TEST_CASE(double, CheckType<unsigned short>)
680 RUN_TYPED_TEST_CASE(std::unique_ptr<int>, std::unique_ptr<int>) // move-only type
681 
682 #undef RUN_TYPED_TEST_CASE
683