1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_PREVIEW_FLOW_GRAPH_NODES 1
18 #define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1
19 
20 #include "tbb/tbb_config.h"
21 
22 // The old versions of MSVC 2013 fail to compile the test with fatal error
23 #define __TBB_MSVC_TEST_COMPILATION_BROKEN (_MSC_VER && _MSC_FULL_VER <= 180021005 && !__INTEL_COMPILER)
24 
25 #if __TBB_PREVIEW_OPENCL_NODE && !__TBB_MSVC_TEST_COMPILATION_BROKEN
26 #if _MSC_VER
27 #pragma warning (disable: 4503) // Suppress "decorated name length exceeded, name was truncated" warning
28 #endif
29 #include <iterator>
30 #include "tbb/task_scheduler_init.h"
31 #include <vector>
32 #include <iostream>
33 
34 #include "tbb/flow_graph_opencl_node.h"
35 using namespace tbb::flow;
36 
37 #include "harness_assert.h"
38 
39 #if ( __INTEL_COMPILER && __INTEL_COMPILER <= 1500 ) || __clang__
40 // In some corner cases the compiler fails to perform automatic type deduction for function pointer.
41 // Workaround is to replace a function pointer with a function call.
42 #define BROKEN_FUNCTION_POINTER_DEDUCTION(...) __VA_ARGS__()
43 #else
44 #define BROKEN_FUNCTION_POINTER_DEDUCTION(...) __VA_ARGS__
45 #endif
46 
47 #if _MSC_VER <= 1800 && !__INTEL_COMPILER
48 // In some corner cases the compiler fails to perform automatic std::initializer_list deduction for curly brackets.
49 // Workaround is to perform implicit conversion.
50 template <typename T>
make_initializer_list(std::initializer_list<T> il)51 std::initializer_list<T> make_initializer_list( std::initializer_list<T> il ) { return il; }
52 #define BROKEN_INITIALIZER_LIST_DEDUCTION(...) make_initializer_list(__VA_ARGS__)
53 #else
54 #define BROKEN_INITIALIZER_LIST_DEDUCTION(...) __VA_ARGS__
55 #endif
56 
57 #include "harness.h"
58 
59 #include <mutex>
60 std::once_flag tbbRootFlag;
61 char *tbbRoot = NULL;
PathToFile(const std::string & fileName)62 std::string PathToFile(const std::string& fileName) {
63     std::call_once(tbbRootFlag, [] { tbbRoot = Harness::GetEnv("tbb_root"); });
64     std::string prefix = tbbRoot ? tbbRoot : "../..";
65     return prefix + "/src/test/" + fileName;
66 }
67 
68 // Global test variables and types
69 typedef tbb::flow::opencl_range OCLRange;
70 struct test_default_device_filter {
operator ()test_default_device_filter71     opencl_device_list operator()(const opencl_device_list &devices) {
72         opencl_device_list dl;
73         dl.add(*devices.begin());
74         return dl;
75     }
76 };
77 typedef opencl_factory<test_default_device_filter> DefaultFactoryType;
78 
79 struct test_default_device_selector {
80 public:
81     template <typename DeviceFilter>
operator ()test_default_device_selector82     tbb::flow::opencl_device operator()(tbb::flow::opencl_factory<DeviceFilter>& f) {
83         // This is the device filter result
84         const tbb::flow::opencl_device_list &devices = f.devices();
85 
86         // Get total number of available platforms:
87         cl_uint num_of_platforms = 0;
88         clGetPlatformIDs(0, 0, &num_of_platforms);
89         cl_platform_id* platforms = new cl_platform_id[num_of_platforms];
90 
91         // Get IDs for all platforms:
92         clGetPlatformIDs(num_of_platforms, platforms, 0);
93 
94         // By default device filter selects the first platform
95         cl_uint selected_platform_index = 0;
96         cl_platform_id platform = platforms[selected_platform_index];
97 
98         // Count the number of platform devices and compare with selector list
99         cl_uint device_count;
100         clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, 0, NULL, &device_count);
101         // It should be the same
102         ASSERT(device_count == devices.size(), "Default device filter returned not all devices from the platform");
103 
104         // Retrieve device ids from the platform
105         cl_device_id* queuered_devices = (cl_device_id*) malloc(sizeof(cl_device_id) * device_count);
106         clGetDeviceIDs(platform, CL_DEVICE_TYPE_ALL, device_count, queuered_devices, NULL);
107 
108         // Compare retrieved device ids with defaults
109         for (unsigned int i = 0; i < device_count; i++) {
110             cl_device_id searched_id = queuered_devices[i];
111 
112             tbb::flow::opencl_device_list::const_iterator it = std::find_if(devices.cbegin(), devices.cend(),
113                 [&searched_id](const tbb::flow::opencl_device &d) {
114                     return d.device_id() == searched_id;
115             });
116 
117             ASSERT(it != devices.cend(), "Devices parsed from the first platform and filtered devices are not the same");
118         }
119 
120         return *(f.devices().begin());
121     }
122 };
123 
TestArgumentPassing()124 void TestArgumentPassing() {
125     REMARK( "TestArgumentPassing: " );
126 
127     graph g;
128     test_default_device_selector test_device_selector;
129     opencl_node <tuple<opencl_buffer<int>, opencl_buffer<int>, OCLRange>> k( g,
130         opencl_program<>( PathToFile( "test_opencl_node.cl" ) ).get_kernel( "TestArgumentPassing" ), test_device_selector );
131     split_node <tuple<opencl_buffer<int>, opencl_buffer<int>, OCLRange>> s( g );
132 
133     make_edge( output_port<0>( s ), input_port<0>( k ) );
134     make_edge( output_port<1>( s ), input_port<1>( k ) );
135     make_edge( output_port<2>( s ), input_port<2>( k ) );
136 
137     const int N = 1 * 1024 * 1024;
138     opencl_buffer<int> b1( N ), b2( N );
139 
140     const int err_size = 128;
141     opencl_buffer<char> err( err_size );
142 
143     OCLRange l;
144 
145     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
146     std::fill( b1.begin(), b1.end(), 1 );
147     k.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }), BROKEN_INITIALIZER_LIST_DEDUCTION({ 16 }) } );
148     k.set_args( port_ref<0, 1>(), /* stride_x */ 1, /* stride_y */ 0, /* stride_z */ 0, /* dim */ 1, err, err_size );
149     s.try_put( std::tie( b1, b2, l ) );
150     g.wait_for_all();
151     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
152     ASSERT( std::all_of( b2.begin(), b2.end(), []( int c ) { return c == 1; } ), "Validation has failed" );
153 
154     // By default, the first device is used.
155     opencl_device d = *interface11::opencl_info::available_devices().begin();
156     std::array<size_t, 3> maxSizes = d.max_work_item_sizes();
157 
158     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
159     std::fill( b1.begin(), b1.end(), 2 );
160     int stride_x = 1;
161     k.set_args( port_ref<0>(), BROKEN_FUNCTION_POINTER_DEDUCTION( port_ref<1, 1> ), stride_x, /* stride_y */ 1024, /* stride_z */ 0, /* dim */ 2, err, err_size );
162     k.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ 1024, 1024 }),
163                    BROKEN_INITIALIZER_LIST_DEDUCTION({ 16, min( (int)maxSizes[1], 16 ) }) } );
164     s.try_put( std::tie( b1, b2, l ) );
165     g.wait_for_all();
166     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
167     ASSERT( std::all_of( b2.begin(), b2.end(), []( int c ) { return c == 2; } ), "Validation has failed" );
168 
169     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
170     std::fill( b1.begin(), b1.end(), 3 );
171     stride_x = 2; // Nothing should be changed
172     s.try_put( std::tie( b1, b2, l ) );
173     g.wait_for_all();
174     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
175     ASSERT( std::all_of( b2.begin(), b2.end(), []( int c ) { return c == 3; } ), "Validation has failed" );
176 
177     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
178     std::fill( b1.begin(), b1.end(), 4 );
179     int stride_z = 64 * 64;
180     ASSERT( stride_z * 64 < N, NULL );
181     k.set_args( port_ref<0>(), BROKEN_FUNCTION_POINTER_DEDUCTION( port_ref<1> ), /* stride_x */ 1, /* stride_y */ 64, /* stride_z */ stride_z, /* dim */ 3, err, err_size );
182     k.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ 64, 64, 64 }),
183                    BROKEN_INITIALIZER_LIST_DEDUCTION({ 4, min( (int)maxSizes[1], 4 ), min( (int)maxSizes[2], 4 ) }) } );
184     s.try_put( std::make_tuple( b1, b2, OCLRange() ) );
185     g.wait_for_all();
186     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
187     ASSERT( std::all_of( b2.begin(), b2.begin() + stride_z * 64, []( int c ) { return c == 4; } ), "Validation has failed" );
188     ASSERT( std::all_of( b2.begin() + stride_z * 64, b2.end(), []( int c ) { return c == 3; } ), "Validation has failed" );
189 
190     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
191     std::fill( b1.begin(), b1.end(), 5 );
192     ASSERT( 2 * 64 * 64 < N, NULL );
193     k.set_args( port_ref<0, 1>(), /* stride_x */ 2, /* stride_y */ 2 * 64, /* stride_z */ 2 * 64 * 64, /* dim */ 3, err, err_size );
194     k.set_range( BROKEN_FUNCTION_POINTER_DEDUCTION( port_ref<2> ) );
195     l = { BROKEN_INITIALIZER_LIST_DEDUCTION({ 64, 64, 64 }),
196           BROKEN_INITIALIZER_LIST_DEDUCTION({ 4, min( (int)maxSizes[1], 4 ), min( (int)maxSizes[2], 4 ) }) };
197     s.try_put( std::make_tuple( b1, b2, l ) );
198     g.wait_for_all();
199     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
200     auto it = b2.begin();
201     for ( size_t i = 0; i < 64 * 64 * 64; ++i ) ASSERT( it[i] == (i % 2 ? 4 : 5), "Validation has failed" );
202     for ( size_t i = 64 * 64 * 64; i < 2 * 64 * 64 * 64; ++i ) ASSERT( it[i] == (i % 2 ? 3 : 5), "Validation has failed" );
203     ASSERT( std::all_of( b2.begin() + 2 * stride_z * 64, b2.end(), []( int c ) { return c == 3; } ), "Validation has failed" );
204 
205     *err.data() = 0; ASSERT( err.data() != std::string( "Done" ), NULL );
206     std::fill( b1.begin(), b1.end(), 6 );
207     k.set_args( port_ref<0, 1>(), /* stride_x */ 1, /* stride_y */ 1024, /* stride_z */ 0, /* dim */ 2, err, err_size );
208     k.set_range( std::deque<int>( { 1024, 1024 } ) );
209     s.try_put( std::make_tuple( b1, b2, l ) );
210     g.wait_for_all();
211     ASSERT( err.data() == std::string( "Done" ), "Validation has failed" );
212     ASSERT( std::all_of( b2.begin(), b2.end(), []( int c ) { return c == 6; } ), "Validation has failed" );
213     REMARK( "done\n" );
214 }
215 
SimpleDependencyTest()216 void SimpleDependencyTest() {
217     REMARK( "SimpleDependencyTest: " );
218 
219     const int N = 1 * 1024 * 1024;
220     opencl_buffer<float> b1( N ), b2( N ), b3( N );
221     std::vector<float> v1( N ), v2( N ), v3( N );
222 
223     auto i1 = b1.access<write_only>();
224     auto i2 = b2.access<write_only>();
225 
226     for ( int i = 0; i < N; ++i ) {
227         i1[i] = v1[i] = float( i );
228         i2[i] = v2[i] = float( 2 * i );
229     }
230 
231     graph g;
232     opencl_program<> p( PathToFile("test_opencl_node.cl") ) ;
233     opencl_node< tuple<opencl_buffer<float>, opencl_buffer<float>> > k1( g, p.get_kernel( "Sum" ) );
234     k1.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }), BROKEN_INITIALIZER_LIST_DEDUCTION({ 16 }) } );
235 
236     opencl_node < tuple<opencl_buffer<float>, opencl_buffer<float>> > k2( g, p.get_kernel( "Sqr" ) );
237     k2.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }), BROKEN_INITIALIZER_LIST_DEDUCTION({ 16 }) } );
238 
239     make_edge( output_port<1>( k1 ), input_port<0>( k2 ) );
240 
241     split_node< tuple<opencl_buffer<float>, opencl_buffer<float>, opencl_buffer<float>> > s( g );
242 
243     make_edge( output_port<0>( s ), input_port<0>( k1 ) );
244     make_edge( output_port<1>( s ), input_port<1>( k1 ) );
245     make_edge( output_port<2>( s ), input_port<1>( k2 ) );
246 
247     s.try_put( std::tie( b1, b2, b3 ) );
248 
249     g.wait_for_all();
250 
251     // validation
252     for ( int i = 0; i < N; ++i ) {
253         v2[i] += v1[i];
254         v3[i] = v2[i] * v2[i];
255     }
256 
257     auto o2 = b2.access<read_only>();
258     auto o3 = b3.access<read_only>();
259 
260     ASSERT( memcmp( &o2[0], &v2[0], N*sizeof( float ) ) == 0, "Validation has failed" );
261     ASSERT( memcmp( &o3[0], &v3[0], N*sizeof( float ) ) == 0, "Validation has failed" );
262     REMARK( "done\n" );
263 }
264 
265 class device_selector {
266     enum state {
267         DEFAULT_INITIALIZED,
268         COPY_INITIALIZED,
269         DELETED
270     };
271     state my_state;
272 public:
device_selector()273     device_selector() : my_state( DEFAULT_INITIALIZED ) {}
device_selector(const device_selector &)274     device_selector( const device_selector& ) : my_state( COPY_INITIALIZED ) {}
device_selector(device_selector &&)275     device_selector( device_selector&& ) : my_state( COPY_INITIALIZED ) {}
~device_selector()276     ~device_selector() { my_state = DELETED; }
277 
278     template <typename D>
operator ()(opencl_factory<D> & f)279     opencl_device operator()( opencl_factory<D> &f ) {
280         ASSERT( my_state == COPY_INITIALIZED, NULL );
281         ASSERT( ! f.devices().empty(), NULL );
282         return *( f.devices().begin() );
283     }
284 };
285 
BroadcastTest()286 void BroadcastTest() {
287     REMARK( "BroadcastTest: " );
288 
289     graph g;
290 
291     const int N = 1 * 1024;
292     opencl_buffer<cl_int> b( N );
293 
294     const int numNodes = 4 * tbb::task_scheduler_init::default_num_threads();
295     typedef opencl_node <tuple<opencl_buffer<cl_int>, opencl_buffer<cl_int>>> NodeType;
296     std::vector<NodeType> nodes( numNodes, NodeType( g,
297         opencl_program<>( PathToFile("test_opencl_node.cl") ).get_kernel( "BroadcastTest" ),
298         device_selector() ) );
299 
300     for ( std::vector<NodeType>::iterator it = nodes.begin(); it != nodes.end(); ++it )
301         it->set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }), BROKEN_INITIALIZER_LIST_DEDUCTION({ 16 }) } );
302 
303     broadcast_node<opencl_buffer<cl_int>> bc( g );
304     for ( auto &x : nodes ) make_edge( bc, x );
305 
306     std::vector<opencl_buffer<cl_int>> res;
307     for ( int i = 0; i < numNodes; ++i ) res.emplace_back( N );
308 
309     for ( cl_int r = 1; r < 100; ++r ) {
310         std::fill( b.begin(), b.end(), r );
311         bc.try_put( b );
312         for ( int i = 0; i < numNodes; ++i ) input_port<1>( nodes[i] ).try_put( res[i] );
313         g.wait_for_all();
314 
315         ASSERT( std::all_of( res.begin(), res.end(), [r]( const opencl_buffer<cl_int> &buf ) {
316             return std::all_of( buf.begin(), buf.end(), [r]( cl_int c ) { return c == r; } );
317         } ), "Validation has failed" );
318     }
319     REMARK( "done\n" );
320 }
321 
DiamondDependencyTest()322 void DiamondDependencyTest() {
323     REMARK( "DiamondDependencyTest: " );
324 
325     const int N = 1 * 1024 * 1024;
326     opencl_buffer<cl_short> b( N );
327     opencl_buffer<cl_int> b1( N ), b2( N );
328 
329     graph g;
330     device_selector d;
331     opencl_program<> p( PathToFile("test_opencl_node.cl") );
332     opencl_node <tuple<opencl_buffer<cl_short>, cl_short>> k0( g, p.get_kernel( "DiamondDependencyTestFill" ), d );
333     k0.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
334     opencl_node <tuple<opencl_buffer<cl_short>, opencl_buffer<cl_int>>> k1( g, p.get_kernel( "DiamondDependencyTestSquare" ) );
335     k1.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
336     opencl_node <tuple<opencl_buffer<cl_short>, opencl_buffer<cl_int>>> k2( g, p.get_kernel( "DiamondDependencyTestCube" ) );
337     k2.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
338     opencl_node <tuple<opencl_buffer<cl_short>, opencl_buffer<cl_int>, opencl_buffer<cl_int>>> k3( g, p.get_kernel( "DiamondDependencyTestDivision" ) );
339     k3.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
340 
341     make_edge( output_port<0>( k0 ), input_port<0>( k1 ) );
342     make_edge( output_port<0>( k0 ), input_port<0>( k2 ) );
343     make_edge( output_port<0>( k0 ), input_port<0>( k3 ) );
344 
345     make_edge( output_port<1>( k1 ), input_port<1>( k3 ) );
346     make_edge( output_port<1>( k2 ), input_port<2>( k3 ) );
347 
348     split_node< tuple<opencl_buffer<cl_short>, cl_short, opencl_buffer<cl_int>, opencl_buffer<cl_int>> > s( g );
349 
350     make_edge( output_port<0>( s ), input_port<0>( k0 ) );
351     make_edge( output_port<1>( s ), input_port<1>( k0 ) );
352     make_edge( output_port<2>( s ), input_port<1>( k1 ) );
353     make_edge( output_port<3>( s ), input_port<1>( k2 ) );
354 
355     for ( cl_short i = 1; i < 10; ++i ) {
356         s.try_put( std::tie( b, i, b1, b2 ) );
357         g.wait_for_all();
358         ASSERT( std::all_of( b.begin(), b.end(), [i]( cl_short c ) {return c == i*i; } ), "Validation has failed" );
359     }
360     REMARK( "done\n" );
361 }
362 
LoopTest()363 void LoopTest() {
364     REMARK( "LoopTest: " );
365 
366     const int N = 1 * 1024;
367     opencl_buffer<cl_long> b1( N ), b2( N );
368 
369     std::fill( b1.begin(), b1.end(), 0 );
370     std::fill( b2.begin(), b2.end(), 1 );
371 
372     graph g;
373     opencl_node <tuple<opencl_buffer<cl_long>, opencl_buffer<cl_long>>> k( g,
374         opencl_program<>( PathToFile("test_opencl_node.cl") ).get_kernel( "LoopTestIter" ) );
375     k.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
376 
377     make_edge( output_port<1>( k ), input_port<1>( k ) );
378 
379     const cl_long numIters = 1000;
380     cl_long iter = 0;
381     typedef multifunction_node < opencl_buffer<cl_long>, tuple < opencl_buffer<cl_long>, opencl_buffer<cl_long> > > multinode;
382     multinode mf( g, serial, [&iter, numIters]( const opencl_buffer<cl_long> &b, multinode::output_ports_type& op ) {
383         if ( ++iter < numIters ) get<1>( op ).try_put( b );
384         else get<0>( op ).try_put( b );
385     } );
386 
387     make_edge( output_port<1>( mf ), input_port<0>( k ) );
388     make_edge( output_port<0>( k ), mf );
389 
390     function_node<opencl_buffer<cl_long>> f( g, unlimited, [numIters]( const opencl_buffer<cl_long> &b ) {
391         ASSERT( std::all_of( b.begin(), b.end(), [numIters]( cl_long c ) { return c == numIters*(numIters + 1) / 2; } ), "Validation has failed" );
392     } );
393 
394     make_edge( output_port<0>( mf ), f );
395 
396     split_node< tuple<opencl_buffer<cl_long>, opencl_buffer<cl_long> > > s( g );
397 
398     make_edge( output_port<0>( s ), input_port<0>( k ) );
399     make_edge( output_port<1>( s ), input_port<1>( k ) );
400 
401     s.try_put( std::tie( b1, b2 ) );
402     g.wait_for_all();
403     REMARK( "done\n" );
404 }
405 
406 #include "harness_barrier.h"
407 
408 template <typename Factory>
409 struct ConcurrencyTestBodyData {
410     typedef opencl_node< tuple<opencl_buffer<cl_char, Factory>, opencl_subbuffer<cl_short, Factory>>, queueing, Factory > NodeType;
411     typedef std::vector< NodeType* > VectorType;
412 
413     Harness::SpinBarrier barrier;
414     VectorType nodes;
415     function_node< opencl_subbuffer<cl_short, Factory> > validationNode;
416     tbb::atomic<int> numChecks;
417 
ConcurrencyTestBodyDataConcurrencyTestBodyData418     ConcurrencyTestBodyData( graph &g, int numThreads ) : barrier( numThreads ), nodes(numThreads),
419         validationNode( g, unlimited, [numThreads, this]( const opencl_subbuffer<cl_short, Factory> &b ) {
420             ASSERT( std::all_of( b.begin(), b.end(), [numThreads]( cl_short c ) { return c == numThreads; } ), "Validation has failed" );
421             --numChecks;
422         } )
423     {
424         numChecks = 100;
425         // The test creates subbers in pairs so numChecks should be even.
426         ASSERT( numChecks % 2 == 0, NULL );
427     }
428 
~ConcurrencyTestBodyDataConcurrencyTestBodyData429     ~ConcurrencyTestBodyData() {
430         ASSERT( numChecks == 0, NULL );
431         for ( NodeType *n : nodes ) delete n;
432     }
433 };
434 
435 template <typename Factory>
436 class ConcurrencyTestBody : NoAssign {
437     graph &g;
438     std::shared_ptr<ConcurrencyTestBodyData<Factory>> data;
439     Factory &f;
440     const std::vector<opencl_device> &filteredDevices;
441 
442     class RoundRobinDeviceSelector : NoAssign {
443     public:
RoundRobinDeviceSelector(size_t cnt_,int num_checks_,const std::vector<opencl_device> & filteredDevices_)444         RoundRobinDeviceSelector( size_t cnt_, int num_checks_, const std::vector<opencl_device> &filteredDevices_ )
445             : cnt( cnt_ ), num_checks( num_checks_ ), filteredDevices( filteredDevices_ ) {
446         }
RoundRobinDeviceSelector(const RoundRobinDeviceSelector & src)447         RoundRobinDeviceSelector( const RoundRobinDeviceSelector &src )
448             : cnt( src.cnt ), num_checks( src.num_checks ), filteredDevices( src.filteredDevices ) {
449             ASSERT( src.num_checks, "The source has already been copied" );
450             src.num_checks = 0;
451         }
~RoundRobinDeviceSelector()452         ~RoundRobinDeviceSelector() {
453             ASSERT( !num_checks, "Device Selector has not been called required number of times" );
454         }
operator ()(Factory & a_factory)455         opencl_device operator()( Factory &a_factory ) {
456             const opencl_device_list& devices = a_factory.devices();
457             ASSERT( filteredDevices.size() == devices.size(), "Incorrect list of devices" );
458             std::vector<opencl_device>::const_iterator it = filteredDevices.cbegin();
459             for ( auto d = devices.begin(); d != devices.end(); ++d ) ASSERT( (*d) == *it++, "Incorrect list of devices" );
460             --num_checks;
461             return *(devices.begin() + cnt++ % devices.size());
462         }
463     private:
464         size_t cnt;
465         mutable int num_checks;
466         const std::vector<opencl_device> &filteredDevices;
467     };
468 
469 public:
ConcurrencyTestBody(graph & g_,int numThreads,Factory & f_,const std::vector<opencl_device> & filteredDevices_)470     ConcurrencyTestBody( graph &g_, int numThreads, Factory &f_, const std::vector<opencl_device> &filteredDevices_ )
471         : g( g_ )
472         , data( std::make_shared<ConcurrencyTestBodyData<Factory>>( g, numThreads ) )
473         , f( f_ )
474         , filteredDevices( filteredDevices_ ) {
475     }
operator ()(int idx) const476     void operator()( int idx ) const {
477         data->barrier.wait();
478 
479         const int N = 1 * 1024;
480         const int numChecks = data->numChecks;
481 
482         typedef typename ConcurrencyTestBodyData<Factory>::NodeType NodeType;
483         NodeType *n1 = new NodeType( g,
484             opencl_program<Factory>( f, PathToFile( "test_opencl_node.cl" ) ).get_kernel( "ConcurrencyTestIter" ),
485             RoundRobinDeviceSelector( idx, numChecks, filteredDevices ), f );
486         // n2 is used to test the copy constructor
487         NodeType *n2 = new NodeType( *n1 );
488         delete n1;
489         data->nodes[idx] = n2;
490         n2->set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
491 
492         data->barrier.wait();
493 
494         for ( size_t i = 0; i < data->nodes.size() - 1; ++i ) {
495             make_edge( output_port<0>( *data->nodes[i] ), input_port<0>( *data->nodes[i + 1] ) );
496             make_edge( output_port<1>( *data->nodes[i] ), input_port<1>( *data->nodes[i + 1] ) );
497         }
498         make_edge( output_port<1>( *data->nodes.back() ), data->validationNode );
499         for ( size_t i = 0; i < data->nodes.size() - 1; ++i ) {
500             remove_edge( output_port<0>( *data->nodes[i] ), input_port<0>( *data->nodes[i + 1] ) );
501             if ( i != (size_t)idx )
502                 remove_edge( output_port<1>( *data->nodes[i] ), input_port<1>( *data->nodes[i + 1] ) );
503         }
504         if ( (size_t)idx != data->nodes.size() - 1 )
505             remove_edge( output_port<1>( *data->nodes.back() ), data->validationNode );
506 
507         data->barrier.wait();
508         if ( idx == 0 ) {
509             // The first node needs two buffers.
510            Harness::FastRandom rnd(42);
511             cl_uint alignment = 0;
512             for (  auto d = filteredDevices.begin(); d != filteredDevices.end(); ++d ) {
513                 cl_uint deviceAlignment;
514                 (*d).info( CL_DEVICE_MEM_BASE_ADDR_ALIGN, deviceAlignment );
515                 alignment = max( alignment, deviceAlignment );
516             }
517             alignment /= CHAR_BIT;
518             cl_uint alignmentMask = ~(alignment-1);
519             for ( int i = 0; i < numChecks; i += 2 ) {
520                 for ( int j = 0; j < 2; ++j ) {
521                     opencl_buffer<cl_char, Factory> b1( f, N );
522                     std::fill( b1.begin(), b1.end(), cl_char(1) );
523                     input_port<0>( *n2 ).try_put( b1 );
524                 }
525 
526                 // The subbers are created in pairs from one big buffer
527                 opencl_buffer<cl_short, Factory> b( f, 4*N );
528                 size_t id0 = (rnd.get() % N) & alignmentMask;
529                 opencl_subbuffer<cl_short, Factory> sb1( b, id0, N );
530                 std::fill( sb1.begin(), sb1.end(), cl_short(0) );
531                 input_port<1>( *n2 ).try_put( sb1 );
532 
533                 size_t id1 = (rnd.get() % N) & alignmentMask;
534                 opencl_subbuffer<cl_short, Factory> sb2 = b.subbuffer( 2*N + id1, N );
535                 std::fill( sb2.begin(), sb2.end(), cl_short(0) );
536                 input_port<1>( *n2 ).try_put( sb2 );
537             }
538         } else {
539             // Other nodes need only one buffer each because input_port<1> is connected with
540             // output_port<1> of the previous node.
541             for ( int i = 0; i < numChecks; ++i ) {
542                 opencl_buffer<cl_char, Factory> b( f, N );
543                 std::fill( b.begin(), b.end(), cl_char(1) );
544                 input_port<0>( *n2 ).try_put( b );
545             }
546         }
547 
548         g.wait_for_all();
549 
550         // n2 will be deleted in destructor of ConcurrencyTestBodyData
551     }
552 };
553 
554 const int concurrencyTestNumRepeats = 5;
555 
556 template <typename Factory = DefaultFactoryType>
ConcurrencyTest(const std::vector<opencl_device> & filteredDevices)557 void ConcurrencyTest( const std::vector<opencl_device> &filteredDevices ) {
558     const int numThreads = min( tbb::task_scheduler_init::default_num_threads(), 8 );
559     for ( int i = 0; i < concurrencyTestNumRepeats; ++i ) {
560         tbb::task_group_context ctx( tbb::task_group_context::isolated, tbb::task_group_context::default_traits | tbb::task_group_context::concurrent_wait );
561         graph g( ctx );
562         opencl_device_list dl;
563         Factory f;
564         ConcurrencyTestBody<Factory> body( g, numThreads, f, filteredDevices );
565         NativeParallelFor( numThreads, body );
566     }
567 }
568 
569 #include <unordered_map>
570 
571 enum FilterPolicy {
572     MAX_DEVICES,
573     ONE_DEVICE
574 };
575 
576 template <FilterPolicy Policy>
577 struct DeviceFilter {
DeviceFilterDeviceFilter578     DeviceFilter() {
579         filteredDevices.clear();
580     }
operator ()DeviceFilter581     opencl_device_list operator()( opencl_device_list device_list ) {
582         ASSERT( filteredDevices.size() == 0, NULL );
583         switch ( Policy ) {
584         case MAX_DEVICES:
585         {
586             std::unordered_map<std::string, std::vector<opencl_device>> platforms;
587             for (auto d = device_list.begin(); d != device_list.end(); ++d) {
588                 platforms[(*d).platform_name()].push_back(*d);
589             }
590 
591             // Select a platform with maximum number of devices.
592             filteredDevices = std::max_element( platforms.begin(), platforms.end(),
593                 []( const std::pair<std::string, std::vector<opencl_device>>& p1, const std::pair<std::string, std::vector<opencl_device>>& p2 ) {
594                 return p1.second.size() < p2.second.size();
595             } )->second;
596 
597             if ( !numRuns ) {
598                 REMARK( "  Chosen devices from the same platform (%s):\n", filteredDevices[0].platform_name().c_str() );
599                 for ( auto d = filteredDevices.begin(); d != filteredDevices.end(); d++ ) {
600                     REMARK( "    %s\n", (*d).name().c_str() );
601                 }
602             }
603 
604             if ( filteredDevices.size() < 2 )
605                 REPORT_ONCE( "Known issue: the system does not have several devices in one platform\n" );
606             break;
607         }
608         case ONE_DEVICE:
609         {
610             ASSERT( deviceNum < device_list.size(), NULL );
611             opencl_device_list::iterator it = device_list.begin();
612             std::advance( it, deviceNum );
613             filteredDevices.push_back( *it );
614             break;
615         }
616         default:
617             ASSERT( false, NULL );
618         }
619         opencl_device_list dl;
620         for ( auto d = filteredDevices.begin(); d != filteredDevices.end(); ++d ) dl.add( *d );
621 
622         ++numRuns;
623 
624         return dl;
625     }
626     static opencl_device_list::size_type deviceNum;
627     static int numRuns;
628     static std::vector<opencl_device> filteredDevices;
629 };
630 
631 template <FilterPolicy Policy>
632 opencl_device_list::size_type DeviceFilter<Policy>::deviceNum;
633 template <FilterPolicy Policy>
634 int DeviceFilter<Policy>::numRuns;
635 template <FilterPolicy Policy>
636 std::vector<opencl_device> DeviceFilter<Policy>::filteredDevices;
637 
CustomFactoryTest()638 void CustomFactoryTest() {
639     REMARK( "CustomFactoryTest:\n" );
640     REMARK( "  Multi device test:\n" );
641     DeviceFilter<MAX_DEVICES>::numRuns = 0;
642     typedef tbb::flow::opencl_factory <DeviceFilter<MAX_DEVICES>> custom_factory;
643     ConcurrencyTest<custom_factory>( DeviceFilter<MAX_DEVICES>::filteredDevices );
644     ASSERT( DeviceFilter<MAX_DEVICES>::numRuns == concurrencyTestNumRepeats, NULL );
645 
646     REMARK( "  One device tests:\n" );
647     graph g;
648     opencl_device_list all_devices = interface11::opencl_info::available_devices();
649     for ( int i = 0; i < (int)all_devices.size(); ++i ) {
650         opencl_device_list::const_iterator it = all_devices.begin();
651         std::advance( it, i );
652         REMARK( "    %s: ", it->name().c_str() );
653         DeviceFilter<ONE_DEVICE>::numRuns = 0;
654         DeviceFilter<ONE_DEVICE>::deviceNum = i;
655         typedef tbb::flow::opencl_factory <DeviceFilter<ONE_DEVICE>> one_device_factory;
656         ConcurrencyTest<one_device_factory>( DeviceFilter<ONE_DEVICE>::filteredDevices );
657         ASSERT( DeviceFilter<ONE_DEVICE>::numRuns == concurrencyTestNumRepeats, NULL );
658         ASSERT( DeviceFilter<ONE_DEVICE>::filteredDevices[0] == *it, NULL );
659         REMARK( "done\n" );
660     }
661     REMARK( "CustomFactoryTest: done\n" );
662 }
663 
DefaultConcurrencyTest()664 void DefaultConcurrencyTest() {
665     REMARK( "DefaultConcurrencyTest: " );
666     // By default, the first device is selected.
667     ConcurrencyTest( { *interface11::opencl_info::available_devices().begin() } );
668     REMARK( "done\n" );
669 }
670 
671 
SpirKernelTest()672 void SpirKernelTest() {
673     REMARK( "SpirKernelTest:\n" );
674 
675     const opencl_device_list devices = interface11::opencl_info::available_devices();
676 
677     for( auto d = devices.begin(); d != devices.end(); d++ ) {
678         if( !(*d).extension_available( "cl_khr_spir" ) ) {
679             REMARK( "  Extension 'cl_khr_spir' is not available on the device '%s'\n", (*d).name().c_str() );
680             continue;
681         }
682 
683         graph g;
684         DefaultFactoryType factory;
685 
686         bool init = factory.init( { *d } );
687         ASSERT( init, "It should be the first initialization" );
688 
689         std::string path_to_file = PathToFile(std::string("test_opencl_kernel_") +
690                                               std::to_string((*d).address_bits()) + std::string(".spir") );
691         REMARK("  Using SPIR file '%s' on device '%s'\n", path_to_file.c_str(), (*d).name().c_str());
692         const int N = 1 * 1024 * 1024;
693         opencl_buffer<float, DefaultFactoryType> b1( factory, N ), b2( factory, N );
694         std::vector<float> v1( N ), v2( N );
695 
696         auto i1 = b1.access<write_only>();
697         auto i2 = b2.access<write_only>();
698 
699         for ( int i = 0; i < N; ++i ) {
700             i1[i] = v1[i] = float( i );
701             i2[i] = v2[i] = float( 2 * i );
702         }
703 
704         typedef opencl_node< tuple<opencl_buffer<float, DefaultFactoryType>, opencl_buffer<float, DefaultFactoryType> >, queueing, DefaultFactoryType > OpenCLNodeType;
705 
706         OpenCLNodeType k1( g, opencl_program<DefaultFactoryType>( factory, opencl_program_type::SPIR, path_to_file ).get_kernel( "custom_summer" ), factory );
707         k1.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
708 
709         input_port<0>(k1).try_put( b1 );
710         input_port<1>(k1).try_put( b2 );
711 
712         g.wait_for_all();
713 
714         // validation
715         for ( int i = 0; i < N; ++i ) {
716             v2[i] += v1[i];
717         }
718 
719         ASSERT( memcmp( &b2[0], &v2[0], N*sizeof( float ) ) == 0, "Validation has failed" );
720     }
721     REMARK( "done\n" );
722 }
723 
PrecompiledKernelTest()724 void PrecompiledKernelTest() {
725     REMARK( "PrecompiledKernelTest:\n" );
726 
727     graph g;
728     DefaultFactoryType factory;
729 
730     const opencl_device_list devices = interface11::opencl_info::available_devices();
731     opencl_device_list::const_iterator it = std::find_if(
732         devices.cbegin(), devices.cend(),
733         []( const opencl_device &d ) {
734             std::string vendor_name = d.vendor();
735             return std::string::npos != vendor_name.find( "Intel" ) && CL_DEVICE_TYPE_GPU == d.type();
736         } );
737 
738     if ( it == devices.cend() ) {
739         REPORT( "Known issue: there is no device in the system that supports the precompiled GPU kernel.\n" );
740         return;
741     }
742     bool init = factory.init( { *it } );
743     ASSERT( init, "It should be the first initialization" );
744     REMARK( "  Device name '%s', %s:", it->name().c_str(), it->version().c_str() );
745 
746     const int N = 1 * 1024 * 1024;
747     opencl_buffer<float, DefaultFactoryType> b1( factory, N ), b2( factory, N );
748     std::vector<float> v1( N ), v2( N );
749 
750     auto i1 = b1.access<write_only>();
751     auto i2 = b2.access<write_only>();
752 
753     for ( int i = 0; i < N; ++i ) {
754         i1[i] = v1[i] = float( i );
755         i2[i] = v2[i] = float( 2 * i );
756     }
757 
758     std::string path_to_file = PathToFile(std::string("test_opencl_precompiled_kernel_gpu_") + std::to_string((*it).address_bits()) + std::string(".ir"));
759 
760     opencl_program<DefaultFactoryType> p( factory, opencl_program_type::PRECOMPILED, path_to_file);
761     opencl_node < tuple<opencl_buffer<float, DefaultFactoryType>, opencl_buffer<float, DefaultFactoryType> >, queueing, DefaultFactoryType > k1(g, p.get_kernel("custom_subtractor"), factory);
762     k1.set_range({ BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) });
763 
764     input_port<0>(k1).try_put( b1 );
765     input_port<1>(k1).try_put( b2 );
766 
767     g.wait_for_all();
768 
769     // validation
770     for ( int i = 0; i < N; ++i ) {
771         v2[i] -= v1[i];
772     }
773 
774     ASSERT( memcmp( &b2[0], &v2[0], N*sizeof( float ) ) == 0, "Validation has failed" );
775     REMARK( " done\n" );
776 }
777 
778 /*
779     /--functional_node-\   /-functional_node-\                       /--functional_node-\
780     |                  |   |                 |   /--opencl_node--\   |                  |
781     O Buffer generator O---O  Buffer filler  O---O               O---O Result validator O
782     |                  |   |                 |   |               |   |                  |
783     \------------------/   \-----------------/   |               |   \------------------/
784     |   Multiplier  |
785     /--functional_node-\   /-functional_node-\   |               |
786     |                  |   |                 |   |               |
787     O Buffer generator O---O  Buffer filler  O---O               O
788     |                  |   |                 |   \---------------/
789     \------------------/   \-----------------/
790     */
791 
792 template <typename Key>
793 struct BufferWithKey : public opencl_buffer<int> {
794     typedef typename std::decay<Key>::type KeyType;
795     KeyType my_key;
796     int my_idx;
797 
798     // TODO: investigate why default ctor is required
BufferWithKeyBufferWithKey799     BufferWithKey() {}
BufferWithKeyBufferWithKey800     BufferWithKey( size_t N, int idx ) : opencl_buffer<int>( N ), my_idx( idx ) {}
keyBufferWithKey801     const KeyType& key() const { return my_key; }
802 };
803 
804 template <typename Key>
805 Key KeyGenerator( int i );
806 
807 template <>
KeyGenerator(int i)808 int KeyGenerator<int>( int i ) { return i; }
809 
810 template <>
KeyGenerator(int i)811 std::string KeyGenerator<std::string>( int i ) { return std::to_string( i ); }
812 
813 template <typename Key>
GenerateRandomBuffer(BufferWithKey<Key> b)814 BufferWithKey<Key> GenerateRandomBuffer( BufferWithKey<Key> b ) {
815     b.my_key = KeyGenerator<typename std::decay<Key>::type>( b.my_idx );
816     Harness::FastRandom r( b.my_idx );
817     std::generate( b.begin(), b.end(), [&r]() { return r.get(); } );
818     return b;
819 }
820 
821 template <typename Key, typename JP>
KeyMatchingTest()822 bool KeyMatchingTest() {
823     const int N = 1000;
824     const int numMessages = 100;
825 
826     graph g;
827     broadcast_node<int> b( g );
828 
829     // Use opencl_async_msg's to have non-blocking map to host
830     function_node<int, opencl_async_msg<BufferWithKey<Key>>>
831         bufGenerator1( g, unlimited, [N]( int i ) { return opencl_async_msg<BufferWithKey<Key>>( BufferWithKey<Key >(N, i) ); } ),
832         bufGenerator2 = bufGenerator1;
833 
834     function_node<BufferWithKey<Key>, BufferWithKey<Key>>
835         bufFiller1( g, unlimited, []( const BufferWithKey<Key> &b ) { return GenerateRandomBuffer<Key>( b ); } ),
836         bufFiller2 = bufFiller1;
837 
838     opencl_node< tuple< BufferWithKey<Key>, BufferWithKey<Key> >, JP > k( g,
839         opencl_program<>( PathToFile( "test_opencl_node.cl" ) ).get_kernel( "Mul" ) );
840     k.set_range( { BROKEN_INITIALIZER_LIST_DEDUCTION({ N }) } );
841 
842     bool success = true;
843     function_node<BufferWithKey<Key>> checker( g, unlimited, [&success, N]( BufferWithKey<Key> b ) {
844         Harness::FastRandom r( b.my_idx );
845         std::for_each( b.begin(), b.end(), [&success, &r]( int bv ) {
846             const int rv = r.get();
847             if ( bv != rv*rv ) {
848                 success = false;
849                 return;
850             }
851         } );
852     } );
853 
854     make_edge( bufGenerator1, bufFiller1 );
855     make_edge( bufGenerator2, bufFiller2 );
856     make_edge( bufFiller1, input_port<0>( k ) );
857     make_edge( bufFiller2, input_port<1>( k ) );
858     make_edge( output_port<0>( k ), checker );
859 
860     for ( int i = 0; i < numMessages; ++i ) {
861         bufGenerator1.try_put( i );
862         bufGenerator2.try_put( numMessages - i - 1 );
863     }
864 
865     g.wait_for_all();
866 
867     return success;
868 }
869 
KeyMatchingTest()870 void KeyMatchingTest() {
871     REMARK( "KeyMatchingTest:\n" );
872     REMARK( "  Queueing negative test: " );
873     bool res = !KeyMatchingTest<int, queueing>(); // The test should fail with the queueing policy, so the negative result is expected.
874     ASSERT( res, "Queueing negative test has failed" );
875     REMARK( "done\n  key_matching<int> test: " );
876     res = KeyMatchingTest<int, key_matching<int>>();
877     ASSERT( res, "key_matching<int> test has failed" );
878     REMARK( "done\n  key_matching<string&> test: " );
879     res = KeyMatchingTest<std::string&, key_matching<std::string&>>();
880     ASSERT( res, "key_matching<string&> test has failed" );
881     REMARK( "done\n" );
882     REMARK( "KeyMatchingTest: done\n" );
883 }
884 
TestMain()885 int TestMain() {
886 
887 
888     TestArgumentPassing();
889 
890     SimpleDependencyTest();
891     BroadcastTest();
892     DiamondDependencyTest();
893     LoopTest();
894 
895     DefaultConcurrencyTest();
896     CustomFactoryTest();
897 
898     SpirKernelTest();
899 #if !__APPLE__
900     // Consider better support for precompiled programs on Apple
901     PrecompiledKernelTest();
902 #endif
903 
904     KeyMatchingTest();
905 
906     return Harness::Done;
907 }
908 #else
909 #define HARNESS_SKIP_TEST 1
910 #include "harness.h"
911 #endif
912