1 /*
2 * mpi_manager.cpp
3 *
4 * This file is part of NEST.
5 *
6 * Copyright (C) 2004 The NEST Initiative
7 *
8 * NEST is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * NEST is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with NEST. If not, see <http://www.gnu.org/licenses/>.
20 *
21 */
22
23 #include "mpi_manager.h"
24
25 // C++ includes:
26 #include <limits>
27 #include <numeric>
28
29 // Includes from libnestutil:
30 #include "compose.hpp"
31 #include "logging.h"
32 #include "stopwatch.h"
33
34 // Includes from nestkernel:
35 #include "kernel_manager.h"
36 #include "mpi_manager_impl.h"
37 #include "nest_types.h"
38
39 // Includes from sli:
40 #include "dictutils.h"
41
42 #ifdef HAVE_MPI
43
44 template <>
45 MPI_Datatype MPI_Type< int >::type = MPI_INT;
46 template <>
47 MPI_Datatype MPI_Type< double >::type = MPI_DOUBLE;
48 template <>
49 MPI_Datatype MPI_Type< long >::type = MPI_LONG;
50 template <>
51 MPI_Datatype MPI_Type< unsigned int >::type = MPI_INT;
52 template <>
53 MPI_Datatype MPI_Type< unsigned long >::type = MPI_UNSIGNED_LONG;
54
55 #endif /* #ifdef HAVE_MPI */
56
MPIManager()57 nest::MPIManager::MPIManager()
58 : num_processes_( 1 )
59 , rank_( 0 )
60 , send_buffer_size_( 0 )
61 , recv_buffer_size_( 0 )
62 , use_mpi_( false )
63 , buffer_size_target_data_( 1 )
64 , buffer_size_spike_data_( 1 )
65 , max_buffer_size_target_data_( 16777216 )
66 , max_buffer_size_spike_data_( 8388608 )
67 , adaptive_target_buffers_( true )
68 , adaptive_spike_buffers_( true )
69 , growth_factor_buffer_spike_data_( 1.5 )
70 , growth_factor_buffer_target_data_( 1.5 )
71 , shrink_factor_buffer_spike_data_( 1.1 )
72 , send_recv_count_spike_data_per_rank_( 0 )
73 , send_recv_count_target_data_per_rank_( 0 )
74 #ifdef HAVE_MPI
75 , comm_step_( std::vector< int >() )
76 , COMM_OVERFLOW_ERROR( std::numeric_limits< unsigned int >::max() )
77 , comm( 0 )
78 , MPI_OFFGRID_SPIKE( 0 )
79 #endif
80 {
81 }
82
83 #ifndef HAVE_MPI
84
85 void
init_mpi(int *,char ***)86 nest::MPIManager::init_mpi( int*, char*** )
87 {
88 // if ! HAVE_MPI, initialize process entries for 1 rank
89 // use 2 processes entries (need at least two
90 // entries per process to use flag of first entry as validity and
91 // last entry to communicate end of communication)
92 kernel().mpi_manager.set_buffer_size_target_data( 2 );
93 kernel().mpi_manager.set_buffer_size_spike_data( 2 );
94
95 recv_counts_secondary_events_in_int_per_rank_.resize( 1, 0 );
96 recv_displacements_secondary_events_in_int_per_rank_.resize( 1, 0 );
97 send_counts_secondary_events_in_int_per_rank_.resize( 1, 0 );
98 send_displacements_secondary_events_in_int_per_rank_.resize( 1, 0 );
99 }
100
101 #else /* HAVE_MPI */
102
103 void
set_communicator(MPI_Comm global_comm)104 nest::MPIManager::set_communicator( MPI_Comm global_comm )
105 {
106 comm = global_comm;
107 MPI_Comm_size( comm, &num_processes_ );
108 MPI_Comm_rank( comm, &rank_ );
109 recv_buffer_size_ = send_buffer_size_ * get_num_processes();
110
111 // use at least 2 * number of processes entries (need at least two
112 // entries per process to use flag of first entry as validity and
113 // last entry to communicate end of communication)
114 kernel().mpi_manager.set_buffer_size_target_data( 2 * kernel().mpi_manager.get_num_processes() );
115 kernel().mpi_manager.set_buffer_size_spike_data( 2 * kernel().mpi_manager.get_num_processes() );
116 }
117
118 void
init_mpi(int * argc,char ** argv[])119 nest::MPIManager::init_mpi( int* argc, char** argv[] )
120 {
121 int init;
122 MPI_Initialized( &init );
123
124 if ( init == 0 )
125 {
126 #ifdef HAVE_MUSIC
127 kernel().music_manager.init_music( argc, argv );
128 // get a communicator from MUSIC
129 set_communicator( static_cast< MPI_Comm >( kernel().music_manager.communicator() ) );
130 #else /* #ifdef HAVE_MUSIC */
131 int provided_thread_level;
132 MPI_Init_thread( argc, argv, MPI_THREAD_FUNNELED, &provided_thread_level );
133 set_communicator( MPI_COMM_WORLD );
134 #endif /* #ifdef HAVE_MUSIC */
135 }
136 else
137 {
138 #ifdef HAVE_MUSIC
139 LOG( M_ERROR,
140 "MPIManager::init_mpi()",
141 "When compiled with MUSIC, NEST must be initialized before any other modules that call MPI_Init(). "
142 "Calling MPI_Abort()." );
143 comm = MPI_COMM_WORLD;
144 mpi_abort( 1 );
145 #else
146 set_communicator( MPI_COMM_WORLD );
147 #endif
148 }
149
150 recv_counts_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
151 recv_displacements_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
152 send_counts_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
153 send_displacements_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
154
155 // create off-grid-spike type for MPI communication
156 // creating derived datatype
157 OffGridSpike::assert_datatype_compatibility_();
158 MPI_Datatype source_types[ 2 ];
159 int blockcounts[ 2 ];
160 MPI_Aint offsets[ 2 ];
161 MPI_Aint start_address, address;
162 OffGridSpike ogs( 0, 0.0 );
163
164 // OffGridSpike.node_id
165 offsets[ 0 ] = 0;
166 source_types[ 0 ] = MPI_DOUBLE;
167 blockcounts[ 0 ] = 1;
168
169 // OffGridSpike.offset
170 MPI_Get_address( &( ogs.node_id_ ), &start_address );
171 MPI_Get_address( &( ogs.offset_ ), &address );
172 offsets[ 1 ] = address - start_address;
173 source_types[ 1 ] = MPI_DOUBLE;
174 blockcounts[ 1 ] = 1;
175
176 // generate and commit struct
177 MPI_Type_create_struct( 2, blockcounts, offsets, source_types, &MPI_OFFGRID_SPIKE );
178 MPI_Type_commit( &MPI_OFFGRID_SPIKE );
179
180 use_mpi_ = true;
181 }
182
183 #endif /* #ifdef HAVE_MPI */
184
185 void
initialize()186 nest::MPIManager::initialize()
187 {
188 }
189
190 void
finalize()191 nest::MPIManager::finalize()
192 {
193 }
194
195 void
set_status(const DictionaryDatum & dict)196 nest::MPIManager::set_status( const DictionaryDatum& dict )
197 {
198 updateValue< bool >( dict, names::adaptive_target_buffers, adaptive_target_buffers_ );
199 updateValue< bool >( dict, names::adaptive_spike_buffers, adaptive_spike_buffers_ );
200
201 long new_buffer_size_target_data = buffer_size_target_data_;
202 updateValue< long >( dict, names::buffer_size_target_data, new_buffer_size_target_data );
203 if ( new_buffer_size_target_data != static_cast< long >( buffer_size_target_data_ )
204 and new_buffer_size_target_data < static_cast< long >( max_buffer_size_target_data_ ) )
205 {
206 set_buffer_size_target_data( new_buffer_size_target_data );
207 }
208
209 long new_buffer_size_spike_data = buffer_size_spike_data_;
210 updateValue< long >( dict, names::buffer_size_spike_data, new_buffer_size_spike_data );
211 if ( new_buffer_size_spike_data != static_cast< long >( buffer_size_spike_data_ )
212 and new_buffer_size_spike_data < static_cast< long >( max_buffer_size_spike_data_ ) )
213 {
214 set_buffer_size_spike_data( new_buffer_size_spike_data );
215 }
216
217 updateValue< double >( dict, names::growth_factor_buffer_spike_data, growth_factor_buffer_spike_data_ );
218 updateValue< double >( dict, names::growth_factor_buffer_target_data, growth_factor_buffer_target_data_ );
219
220 updateValue< long >( dict, names::max_buffer_size_target_data, max_buffer_size_target_data_ );
221 updateValue< long >( dict, names::max_buffer_size_spike_data, max_buffer_size_spike_data_ );
222
223 updateValue< double >( dict, names::shrink_factor_buffer_spike_data, shrink_factor_buffer_spike_data_ );
224 }
225
226 void
get_status(DictionaryDatum & dict)227 nest::MPIManager::get_status( DictionaryDatum& dict )
228 {
229 def< long >( dict, names::num_processes, num_processes_ );
230 def< bool >( dict, names::adaptive_spike_buffers, adaptive_spike_buffers_ );
231 def< bool >( dict, names::adaptive_target_buffers, adaptive_target_buffers_ );
232 def< size_t >( dict, names::buffer_size_target_data, buffer_size_target_data_ );
233 def< size_t >( dict, names::buffer_size_spike_data, buffer_size_spike_data_ );
234 def< size_t >( dict, names::send_buffer_size_secondary_events, get_send_buffer_size_secondary_events_in_int() );
235 def< size_t >( dict, names::recv_buffer_size_secondary_events, get_recv_buffer_size_secondary_events_in_int() );
236 def< size_t >( dict, names::max_buffer_size_spike_data, max_buffer_size_spike_data_ );
237 def< size_t >( dict, names::max_buffer_size_target_data, max_buffer_size_target_data_ );
238 def< double >( dict, names::growth_factor_buffer_spike_data, growth_factor_buffer_spike_data_ );
239 def< double >( dict, names::growth_factor_buffer_target_data, growth_factor_buffer_target_data_ );
240 }
241
242 #ifdef HAVE_MPI
243
244 void
mpi_finalize(int exitcode)245 nest::MPIManager::mpi_finalize( int exitcode )
246 {
247 MPI_Type_free( &MPI_OFFGRID_SPIKE );
248
249 int finalized;
250 MPI_Finalized( &finalized );
251
252 int initialized;
253 MPI_Initialized( &initialized );
254
255 if ( finalized == 0 and initialized == 1 )
256 {
257 if ( exitcode == 0 )
258 {
259 kernel().music_manager.music_finalize(); // calls MPI_Finalize()
260 }
261 else
262 {
263 LOG( M_INFO, "MPIManager::finalize()", "Calling MPI_Abort() due to errors in the script." );
264 mpi_abort( exitcode );
265 }
266 }
267 }
268
269 #else /* #ifdef HAVE_MPI */
270
271 void
mpi_finalize(int)272 nest::MPIManager::mpi_finalize( int )
273 {
274 }
275
276 #endif /* #ifdef HAVE_MPI */
277
278 #ifdef HAVE_MPI
279
280 void
mpi_abort(int exitcode)281 nest::MPIManager::mpi_abort( int exitcode )
282 {
283 MPI_Abort( comm, exitcode );
284 }
285
286
287 std::string
get_processor_name()288 nest::MPIManager::get_processor_name()
289 {
290 char name[ 1024 ];
291 int len;
292 MPI_Get_processor_name( name, &len );
293 name[ len ] = '\0';
294 return name;
295 }
296
297 void
communicate(std::vector<long> & local_nodes,std::vector<long> & global_nodes)298 nest::MPIManager::communicate( std::vector< long >& local_nodes, std::vector< long >& global_nodes )
299 {
300 size_t np = get_num_processes();
301 // Get size of buffers
302 std::vector< int > num_nodes_per_rank( np );
303 num_nodes_per_rank[ get_rank() ] = local_nodes.size();
304 communicate( num_nodes_per_rank );
305
306 size_t num_globals = std::accumulate( num_nodes_per_rank.begin(), num_nodes_per_rank.end(), 0 );
307 global_nodes.resize( num_globals, 0L );
308
309 // Set up displacements vector. Entry i specifies the displacement (relative
310 // to recv_buffer ) at which to place the incoming data from process i
311 std::vector< int > displacements( np, 0 );
312 for ( size_t i = 1; i < np; ++i )
313 {
314 displacements.at( i ) = displacements.at( i - 1 ) + num_nodes_per_rank.at( i - 1 );
315 }
316
317 MPI_Allgatherv( &local_nodes[ 0 ],
318 local_nodes.size(),
319 MPI_Type< long >::type,
320 &global_nodes[ 0 ],
321 &num_nodes_per_rank[ 0 ],
322 &displacements[ 0 ],
323 MPI_Type< long >::type,
324 comm );
325 }
326
327 void
communicate(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)328 nest::MPIManager::communicate( std::vector< unsigned int >& send_buffer,
329 std::vector< unsigned int >& recv_buffer,
330 std::vector< int >& displacements )
331 {
332 displacements.resize( num_processes_, 0 );
333 if ( get_num_processes() == 1 ) // purely thread-based
334 {
335 displacements[ 0 ] = 0;
336 if ( static_cast< unsigned int >( recv_buffer_size_ ) < send_buffer.size() )
337 {
338 recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
339 recv_buffer.resize( recv_buffer_size_ );
340 }
341 recv_buffer.swap( send_buffer );
342 }
343 else
344 {
345 communicate_Allgather( send_buffer, recv_buffer, displacements );
346 }
347 }
348
349 void
communicate_Allgather(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)350 nest::MPIManager::communicate_Allgather( std::vector< unsigned int >& send_buffer,
351 std::vector< unsigned int >& recv_buffer,
352 std::vector< int >& displacements )
353 {
354 std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
355
356 // attempt Allgather
357 if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
358 {
359 MPI_Allgather(
360 &send_buffer[ 0 ], send_buffer_size_, MPI_UNSIGNED, &recv_buffer[ 0 ], send_buffer_size_, MPI_UNSIGNED, comm );
361 }
362 else
363 {
364 // DEC cxx required 0U literal, HEP 2007-03-26
365 std::vector< unsigned int > overflow_buffer( send_buffer_size_, 0U );
366 overflow_buffer[ 0 ] = COMM_OVERFLOW_ERROR;
367 overflow_buffer[ 1 ] = send_buffer.size();
368 MPI_Allgather( &overflow_buffer[ 0 ],
369 send_buffer_size_,
370 MPI_UNSIGNED,
371 &recv_buffer[ 0 ],
372 send_buffer_size_,
373 MPI_UNSIGNED,
374 comm );
375 }
376 // check for overflow condition
377 int disp = 0;
378 unsigned int max_recv_count = send_buffer_size_;
379 bool overflow = false;
380 for ( int pid = 0; pid < get_num_processes(); ++pid )
381 {
382 unsigned int block_disp = pid * send_buffer_size_;
383 displacements[ pid ] = disp;
384 if ( recv_buffer[ block_disp ] == COMM_OVERFLOW_ERROR )
385 {
386 overflow = true;
387 recv_counts[ pid ] = recv_buffer[ block_disp + 1 ];
388 if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
389 {
390 max_recv_count = recv_counts[ pid ];
391 }
392 }
393 disp += recv_counts[ pid ];
394 }
395
396 // do Allgatherv if necessary
397 if ( overflow )
398 {
399 recv_buffer.resize( disp, 0 );
400 MPI_Allgatherv( &send_buffer[ 0 ],
401 send_buffer.size(),
402 MPI_UNSIGNED,
403 &recv_buffer[ 0 ],
404 &recv_counts[ 0 ],
405 &displacements[ 0 ],
406 MPI_UNSIGNED,
407 comm );
408 send_buffer_size_ = max_recv_count;
409 recv_buffer_size_ = send_buffer_size_ * get_num_processes();
410 }
411 }
412
413 template < typename T >
414 void
communicate_Allgather(std::vector<T> & send_buffer,std::vector<T> & recv_buffer,std::vector<int> & displacements)415 nest::MPIManager::communicate_Allgather( std::vector< T >& send_buffer,
416 std::vector< T >& recv_buffer,
417 std::vector< int >& displacements )
418 {
419 std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
420
421 // attempt Allgather
422 if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
423 {
424 MPI_Allgather( &send_buffer[ 0 ],
425 send_buffer_size_,
426 MPI_Type< T >::type,
427 &recv_buffer[ 0 ],
428 send_buffer_size_,
429 MPI_Type< T >::type,
430 comm );
431 }
432 else
433 {
434 // DEC cxx required 0U literal, HEP 2007-03-26
435 std::vector< unsigned int > overflow_buffer( send_buffer_size_, 0U );
436 overflow_buffer[ 0 ] = COMM_OVERFLOW_ERROR;
437 overflow_buffer[ 1 ] = send_buffer.size();
438 MPI_Allgather( &overflow_buffer[ 0 ],
439 send_buffer_size_,
440 MPI_Type< T >::type,
441 &recv_buffer[ 0 ],
442 send_buffer_size_,
443 MPI_Type< T >::type,
444 comm );
445 }
446 // check for overflow condition
447 int disp = 0;
448 unsigned int max_recv_count = send_buffer_size_;
449 bool overflow = false;
450 for ( int pid = 0; pid < get_num_processes(); ++pid )
451 {
452 unsigned int block_disp = pid * send_buffer_size_;
453 displacements[ pid ] = disp;
454 if ( recv_buffer[ block_disp ] == COMM_OVERFLOW_ERROR )
455 {
456 overflow = true;
457 recv_counts[ pid ] = recv_buffer[ block_disp + 1 ];
458 if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
459 {
460 max_recv_count = recv_counts[ pid ];
461 }
462 }
463 disp += recv_counts[ pid ];
464 }
465
466 // do Allgatherv if necessary
467 if ( overflow )
468 {
469 recv_buffer.resize( disp, 0 );
470 MPI_Allgatherv( &send_buffer[ 0 ],
471 send_buffer.size(),
472 MPI_Type< T >::type,
473 &recv_buffer[ 0 ],
474 &recv_counts[ 0 ],
475 &displacements[ 0 ],
476 MPI_Type< T >::type,
477 comm );
478 send_buffer_size_ = max_recv_count;
479 recv_buffer_size_ = send_buffer_size_ * get_num_processes();
480 }
481 }
482
483 void
communicate(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)484 nest::MPIManager::communicate( std::vector< OffGridSpike >& send_buffer,
485 std::vector< OffGridSpike >& recv_buffer,
486 std::vector< int >& displacements )
487 {
488 displacements.resize( num_processes_, 0 );
489 if ( get_num_processes() == 1 ) // purely thread-based
490 {
491 displacements[ 0 ] = 0;
492 if ( static_cast< unsigned int >( recv_buffer_size_ ) < send_buffer.size() )
493 {
494 recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
495 recv_buffer.resize( recv_buffer_size_ );
496 }
497 recv_buffer.swap( send_buffer );
498 }
499 else
500 {
501 communicate_Allgather( send_buffer, recv_buffer, displacements );
502 }
503 }
504
505 void
communicate_Allgather(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)506 nest::MPIManager::communicate_Allgather( std::vector< OffGridSpike >& send_buffer,
507 std::vector< OffGridSpike >& recv_buffer,
508 std::vector< int >& displacements )
509 {
510 std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
511 // attempt Allgather
512 if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
513 {
514 MPI_Allgather( &send_buffer[ 0 ],
515 send_buffer_size_,
516 MPI_OFFGRID_SPIKE,
517 &recv_buffer[ 0 ],
518 send_buffer_size_,
519 MPI_OFFGRID_SPIKE,
520 comm );
521 }
522 else
523 {
524 std::vector< OffGridSpike > overflow_buffer( send_buffer_size_ );
525 overflow_buffer[ 0 ] = OffGridSpike( COMM_OVERFLOW_ERROR, 0.0 );
526 overflow_buffer[ 1 ] = OffGridSpike( send_buffer.size(), 0.0 );
527 MPI_Allgather( &overflow_buffer[ 0 ],
528 send_buffer_size_,
529 MPI_OFFGRID_SPIKE,
530 &recv_buffer[ 0 ],
531 send_buffer_size_,
532 MPI_OFFGRID_SPIKE,
533 comm );
534 }
535
536 // check for overflow condition
537 int disp = 0;
538 unsigned int max_recv_count = send_buffer_size_;
539 bool overflow = false;
540 for ( int pid = 0; pid < get_num_processes(); ++pid )
541 {
542 unsigned int block_disp = pid * send_buffer_size_;
543 displacements[ pid ] = disp;
544 if ( ( recv_buffer[ block_disp ] ).get_node_id() == COMM_OVERFLOW_ERROR )
545 {
546 overflow = true;
547 recv_counts[ pid ] = ( recv_buffer[ block_disp + 1 ] ).get_node_id();
548 if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
549 {
550 max_recv_count = recv_counts[ pid ];
551 }
552 }
553 disp += recv_counts[ pid ];
554 }
555
556 // do Allgatherv if necessary
557 if ( overflow )
558 {
559 recv_buffer.resize( disp );
560 MPI_Allgatherv( &send_buffer[ 0 ],
561 send_buffer.size(),
562 MPI_OFFGRID_SPIKE,
563 &recv_buffer[ 0 ],
564 &recv_counts[ 0 ],
565 &displacements[ 0 ],
566 MPI_OFFGRID_SPIKE,
567 comm );
568 send_buffer_size_ = max_recv_count;
569 recv_buffer_size_ = send_buffer_size_ * get_num_processes();
570 }
571 }
572
573 void
communicate(std::vector<double> & send_buffer,std::vector<double> & recv_buffer,std::vector<int> & displacements)574 nest::MPIManager::communicate( std::vector< double >& send_buffer,
575 std::vector< double >& recv_buffer,
576 std::vector< int >& displacements )
577 {
578 // get size of buffers
579 std::vector< int > n_nodes( get_num_processes() );
580 n_nodes[ get_rank() ] = send_buffer.size();
581 communicate( n_nodes );
582 // Set up displacements vector.
583 displacements.resize( get_num_processes(), 0 );
584 for ( int i = 1; i < get_num_processes(); ++i )
585 {
586 displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
587 }
588
589 // Calculate total number of node data items to be gathered.
590 size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
591
592 if ( n_globals != 0 )
593 {
594 recv_buffer.resize( n_globals, 0.0 );
595 communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
596 }
597 else
598 {
599 recv_buffer.clear();
600 }
601 }
602
603 void
communicate(std::vector<unsigned long> & send_buffer,std::vector<unsigned long> & recv_buffer,std::vector<int> & displacements)604 nest::MPIManager::communicate( std::vector< unsigned long >& send_buffer,
605 std::vector< unsigned long >& recv_buffer,
606 std::vector< int >& displacements )
607 {
608 // get size of buffers
609 std::vector< int > n_nodes( num_processes_ );
610 n_nodes[ rank_ ] = send_buffer.size();
611 communicate( n_nodes );
612 // Set up displacements vector.
613 displacements.resize( num_processes_, 0 );
614 for ( int i = 1; i < num_processes_; ++i )
615 {
616 displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
617 }
618
619 // Calculate total number of node data items to be gathered.
620 size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
621
622 if ( n_globals != 0 )
623 {
624 recv_buffer.resize( n_globals, 0.0 );
625 communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
626 }
627 else
628 {
629 recv_buffer.clear();
630 }
631 }
632
633 void
communicate(std::vector<int> & send_buffer,std::vector<int> & recv_buffer,std::vector<int> & displacements)634 nest::MPIManager::communicate( std::vector< int >& send_buffer,
635 std::vector< int >& recv_buffer,
636 std::vector< int >& displacements )
637 {
638 // get size of buffers
639 std::vector< int > n_nodes( num_processes_ );
640 n_nodes[ rank_ ] = send_buffer.size();
641 communicate( n_nodes );
642 // Set up displacements vector.
643 displacements.resize( num_processes_, 0 );
644 for ( int i = 1; i < num_processes_; ++i )
645 {
646 displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
647 }
648
649 // Calculate total number of node data items to be gathered.
650 size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
651
652 if ( n_globals != 0 )
653 {
654 recv_buffer.resize( n_globals, 0.0 );
655 communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
656 }
657 else
658 {
659 recv_buffer.clear();
660 }
661 }
662
663 void
communicate(double send_val,std::vector<double> & recv_buffer)664 nest::MPIManager::communicate( double send_val, std::vector< double >& recv_buffer )
665 {
666 recv_buffer.resize( get_num_processes() );
667 MPI_Allgather( &send_val, 1, MPI_DOUBLE, &recv_buffer[ 0 ], 1, MPI_DOUBLE, comm );
668 }
669
670
671 /**
672 * communicate function for sending set-up information
673 */
674 void
communicate(std::vector<int> & buffer)675 nest::MPIManager::communicate( std::vector< int >& buffer )
676 {
677 communicate_Allgather( buffer );
678 }
679
680 void
communicate(std::vector<long> & buffer)681 nest::MPIManager::communicate( std::vector< long >& buffer )
682 {
683 communicate_Allgather( buffer );
684 }
685
686 void
communicate_Allgather(std::vector<int> & buffer)687 nest::MPIManager::communicate_Allgather( std::vector< int >& buffer )
688 {
689 // avoid aliasing, see http://www.mpi-forum.org/docs/mpi-11-html/node10.html
690 int my_val = buffer[ get_rank() ];
691 MPI_Allgather( &my_val, 1, MPI_INT, &buffer[ 0 ], 1, MPI_INT, comm );
692 }
693
694 /*
695 * Sum across all rank
696 */
697 void
communicate_Allreduce_sum_in_place(double buffer)698 nest::MPIManager::communicate_Allreduce_sum_in_place( double buffer )
699 {
700 MPI_Allreduce( MPI_IN_PLACE, &buffer, 1, MPI_Type< double >::type, MPI_SUM, comm );
701 }
702
703 void
communicate_Allreduce_sum_in_place(std::vector<double> & buffer)704 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< double >& buffer )
705 {
706 MPI_Allreduce( MPI_IN_PLACE, &buffer[ 0 ], buffer.size(), MPI_Type< double >::type, MPI_SUM, comm );
707 }
708
709 void
communicate_Allreduce_sum_in_place(std::vector<int> & buffer)710 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< int >& buffer )
711 {
712 MPI_Allreduce( MPI_IN_PLACE, &buffer[ 0 ], buffer.size(), MPI_Type< int >::type, MPI_SUM, comm );
713 }
714
715 void
communicate_Allreduce_sum(std::vector<double> & send_buffer,std::vector<double> & recv_buffer)716 nest::MPIManager::communicate_Allreduce_sum( std::vector< double >& send_buffer, std::vector< double >& recv_buffer )
717 {
718 assert( recv_buffer.size() == send_buffer.size() );
719 MPI_Allreduce( &send_buffer[ 0 ], &recv_buffer[ 0 ], send_buffer.size(), MPI_Type< double >::type, MPI_SUM, comm );
720 }
721
722 double
min_cross_ranks(double value)723 nest::MPIManager::min_cross_ranks( double value )
724 {
725 MPI_Allreduce( MPI_IN_PLACE, &value, 1, MPI_DOUBLE, MPI_MIN, comm );
726 return value;
727 }
728
729 double
max_cross_ranks(double value)730 nest::MPIManager::max_cross_ranks( double value )
731 {
732 MPI_Allreduce( MPI_IN_PLACE, &value, 1, MPI_DOUBLE, MPI_MAX, comm );
733 return value;
734 }
735
736 void
communicate_Allgather(std::vector<long> & buffer)737 nest::MPIManager::communicate_Allgather( std::vector< long >& buffer )
738 {
739 // avoid aliasing, see http://www.mpi-forum.org/docs/mpi-11-html/node10.html
740 long my_val = buffer[ get_rank() ];
741 MPI_Allgather( &my_val, 1, MPI_LONG, &buffer[ 0 ], 1, MPI_LONG, comm );
742 }
743
744 void
communicate_Alltoall_(void * send_buffer,void * recv_buffer,const unsigned int send_recv_count)745 nest::MPIManager::communicate_Alltoall_( void* send_buffer, void* recv_buffer, const unsigned int send_recv_count )
746 {
747 MPI_Alltoall( send_buffer, send_recv_count, MPI_UNSIGNED, recv_buffer, send_recv_count, MPI_UNSIGNED, comm );
748 }
749
750 void
communicate_Alltoallv_(void * send_buffer,const int * send_counts,const int * send_displacements,void * recv_buffer,const int * recv_counts,const int * recv_displacements)751 nest::MPIManager::communicate_Alltoallv_( void* send_buffer,
752 const int* send_counts,
753 const int* send_displacements,
754 void* recv_buffer,
755 const int* recv_counts,
756 const int* recv_displacements )
757 {
758 MPI_Alltoallv( send_buffer,
759 send_counts,
760 send_displacements,
761 MPI_UNSIGNED,
762 recv_buffer,
763 recv_counts,
764 recv_displacements,
765 MPI_UNSIGNED,
766 comm );
767 }
768
769 void
communicate_recv_counts_secondary_events()770 nest::MPIManager::communicate_recv_counts_secondary_events()
771 {
772
773 communicate_Alltoall(
774 recv_counts_secondary_events_in_int_per_rank_, send_counts_secondary_events_in_int_per_rank_, 1 );
775
776 std::partial_sum( send_counts_secondary_events_in_int_per_rank_.begin(),
777 send_counts_secondary_events_in_int_per_rank_.end() - 1,
778 send_displacements_secondary_events_in_int_per_rank_.begin() + 1 );
779 }
780
781 /**
782 * Ensure all processes have reached the same stage by waiting until all
783 * processes have sent a dummy message to process 0.
784 */
785 void
synchronize()786 nest::MPIManager::synchronize()
787 {
788 MPI_Barrier( comm );
789 }
790
791
792 // any_true: takes a single bool, exchanges with all other processes,
793 // and returns "true" if one or more processes provide "true"
794 bool
any_true(const bool my_bool)795 nest::MPIManager::any_true( const bool my_bool )
796 {
797 if ( get_num_processes() == 1 )
798 {
799 return my_bool;
800 }
801
802 // since there is no MPI_BOOL we first convert to int
803 int my_int = my_bool;
804
805 std::vector< int > all_int( get_num_processes() );
806 MPI_Allgather( &my_int, 1, MPI_INT, &all_int[ 0 ], 1, MPI_INT, comm );
807 // check if any MPI process sent a "true"
808 for ( unsigned int i = 0; i < all_int.size(); ++i )
809 {
810 if ( all_int[ i ] != 0 )
811 {
812 return true;
813 }
814 }
815
816 return false;
817 }
818
819 // average communication time for a packet size of num_bytes using Allgather
820 double
time_communicate(int num_bytes,int samples)821 nest::MPIManager::time_communicate( int num_bytes, int samples )
822 {
823 if ( get_num_processes() == 1 )
824 {
825 return 0.0;
826 }
827 unsigned int packet_length = num_bytes / sizeof( unsigned int );
828 if ( packet_length < 1 )
829 {
830 packet_length = 1;
831 }
832 std::vector< unsigned int > test_send_buffer( packet_length );
833 std::vector< unsigned int > test_recv_buffer( packet_length * get_num_processes() );
834 // start time measurement here
835 Stopwatch foo;
836 foo.start();
837 for ( int i = 0; i < samples; ++i )
838 {
839 MPI_Allgather(
840 &test_send_buffer[ 0 ], packet_length, MPI_UNSIGNED, &test_recv_buffer[ 0 ], packet_length, MPI_UNSIGNED, comm );
841 }
842 // finish time measurement here
843 foo.stop();
844 return foo.elapsed() / samples;
845 }
846
847 // average communication time for a packet size of num_bytes using Allgatherv
848 double
time_communicatev(int num_bytes,int samples)849 nest::MPIManager::time_communicatev( int num_bytes, int samples )
850 {
851 if ( get_num_processes() == 1 )
852 {
853 return 0.0;
854 }
855 unsigned int packet_length = num_bytes / sizeof( unsigned int );
856 if ( packet_length < 1 )
857 {
858 packet_length = 1;
859 }
860 std::vector< unsigned int > test_send_buffer( packet_length );
861 std::vector< unsigned int > test_recv_buffer( packet_length * get_num_processes() );
862 std::vector< int > n_nodes( get_num_processes(), packet_length );
863 std::vector< int > displacements( get_num_processes(), 0 );
864
865 for ( int i = 1; i < get_num_processes(); ++i )
866 {
867 displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
868 }
869
870 // start time measurement here
871 Stopwatch foo;
872 foo.start();
873 for ( int i = 0; i < samples; ++i )
874 {
875 communicate_Allgatherv( test_send_buffer, test_recv_buffer, displacements, n_nodes );
876 }
877
878 // finish time measurement here
879 foo.stop();
880 return foo.elapsed() / samples;
881 }
882
883 // average communication time for a packet size of num_bytes
884 double
time_communicate_offgrid(int num_bytes,int samples)885 nest::MPIManager::time_communicate_offgrid( int num_bytes, int samples )
886 {
887 if ( get_num_processes() == 1 )
888 {
889 return 0.0;
890 }
891 unsigned int packet_length = num_bytes / sizeof( OffGridSpike );
892 if ( packet_length < 1 )
893 {
894 packet_length = 1;
895 }
896 std::vector< OffGridSpike > test_send_buffer( packet_length );
897 std::vector< OffGridSpike > test_recv_buffer( packet_length * get_num_processes() );
898 // start time measurement here
899 Stopwatch foo;
900 foo.start();
901 for ( int i = 0; i < samples; ++i )
902 {
903 MPI_Allgather( &test_send_buffer[ 0 ],
904 packet_length,
905 MPI_OFFGRID_SPIKE,
906 &test_recv_buffer[ 0 ],
907 packet_length,
908 MPI_OFFGRID_SPIKE,
909 comm );
910 }
911 // finish time measurement here
912 foo.stop();
913 return foo.elapsed() / samples;
914 }
915
916 // average communication time for a packet size of num_bytes using Alltoall
917 double
time_communicate_alltoall(int num_bytes,int samples)918 nest::MPIManager::time_communicate_alltoall( int num_bytes, int samples )
919 {
920 if ( get_num_processes() == 1 )
921 {
922 return 0.0;
923 }
924 unsigned int packet_length = num_bytes / sizeof( unsigned int ); // this size should be sent to each process
925 unsigned int total_packet_length = packet_length * get_num_processes(); // total size of send and receive buffers
926 if ( total_packet_length < 1 )
927 {
928 total_packet_length = 1;
929 }
930 std::vector< unsigned int > test_send_buffer( total_packet_length );
931 std::vector< unsigned int > test_recv_buffer( total_packet_length );
932 // start time measurement here
933 Stopwatch foo;
934 foo.start();
935 for ( int i = 0; i < samples; ++i )
936 {
937 MPI_Alltoall(
938 &test_send_buffer[ 0 ], packet_length, MPI_UNSIGNED, &test_recv_buffer[ 0 ], packet_length, MPI_UNSIGNED, comm );
939 }
940 // finish time measurement here
941 foo.stop();
942 return foo.elapsed() / samples;
943 }
944
945 // average communication time for a packet size of num_bytes using Alltoallv
946 double
time_communicate_alltoallv(int num_bytes,int samples)947 nest::MPIManager::time_communicate_alltoallv( int num_bytes, int samples )
948 {
949 if ( get_num_processes() == 1 )
950 {
951 return 0.0;
952 }
953 unsigned int packet_length = num_bytes / sizeof( unsigned int ); // this size should be sent to each process
954 unsigned int total_packet_length = packet_length * get_num_processes(); // total size of send and receive buffers
955 if ( total_packet_length < 1 )
956 {
957 total_packet_length = 1;
958 }
959 std::vector< unsigned int > test_send_buffer( total_packet_length );
960 std::vector< unsigned int > test_recv_buffer( total_packet_length );
961 std::vector< int > n_nodes( get_num_processes(), packet_length );
962 std::vector< int > displacements( get_num_processes(), 0 );
963
964 for ( int i = 1; i < get_num_processes(); ++i )
965 {
966 displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
967 }
968
969 // start time measurement here
970 Stopwatch foo;
971 foo.start();
972 for ( int i = 0; i < samples; ++i )
973 {
974 MPI_Alltoallv( &test_send_buffer[ 0 ],
975 &n_nodes[ 0 ],
976 &displacements[ 0 ],
977 MPI_UNSIGNED,
978 &test_recv_buffer[ 0 ],
979 &n_nodes[ 0 ],
980 &displacements[ 0 ],
981 MPI_UNSIGNED,
982 comm );
983 }
984 // finish time measurement here
985 foo.stop();
986 return foo.elapsed() / samples;
987 }
988
989 #else /* #ifdef HAVE_MPI */
990
991 /**
992 * communicate (on-grid) if compiled without MPI
993 */
994 void
communicate(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)995 nest::MPIManager::communicate( std::vector< unsigned int >& send_buffer,
996 std::vector< unsigned int >& recv_buffer,
997 std::vector< int >& displacements )
998 {
999 displacements.resize( num_processes_, 0 );
1000 displacements[ 0 ] = 0;
1001 if ( static_cast< size_t >( recv_buffer_size_ ) < send_buffer.size() )
1002 {
1003 recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
1004 recv_buffer.resize( recv_buffer_size_ );
1005 }
1006 recv_buffer.swap( send_buffer );
1007 }
1008
1009 /**
1010 * communicate (off-grid) if compiled without MPI
1011 */
1012 void
communicate(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)1013 nest::MPIManager::communicate( std::vector< OffGridSpike >& send_buffer,
1014 std::vector< OffGridSpike >& recv_buffer,
1015 std::vector< int >& displacements )
1016 {
1017 displacements.resize( num_processes_, 0 );
1018 displacements[ 0 ] = 0;
1019 if ( static_cast< size_t >( recv_buffer_size_ ) < send_buffer.size() )
1020 {
1021 recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
1022 recv_buffer.resize( recv_buffer_size_ );
1023 }
1024 recv_buffer.swap( send_buffer );
1025 }
1026
1027 void
communicate(std::vector<double> & send_buffer,std::vector<double> & recv_buffer,std::vector<int> & displacements)1028 nest::MPIManager::communicate( std::vector< double >& send_buffer,
1029 std::vector< double >& recv_buffer,
1030 std::vector< int >& displacements )
1031 {
1032 displacements.resize( num_processes_, 0 );
1033 displacements[ 0 ] = 0;
1034 recv_buffer.swap( send_buffer );
1035 }
1036
1037 void
communicate(std::vector<unsigned long> & send_buffer,std::vector<unsigned long> & recv_buffer,std::vector<int> & displacements)1038 nest::MPIManager::communicate( std::vector< unsigned long >& send_buffer,
1039 std::vector< unsigned long >& recv_buffer,
1040 std::vector< int >& displacements )
1041 {
1042 displacements.resize( num_processes_, 0 );
1043 displacements[ 0 ] = 0;
1044 recv_buffer.swap( send_buffer );
1045 }
1046
1047 void
communicate(std::vector<int> & send_buffer,std::vector<int> & recv_buffer,std::vector<int> & displacements)1048 nest::MPIManager::communicate( std::vector< int >& send_buffer,
1049 std::vector< int >& recv_buffer,
1050 std::vector< int >& displacements )
1051 {
1052 displacements.resize( num_processes_, 0 );
1053 displacements[ 0 ] = 0;
1054 recv_buffer.swap( send_buffer );
1055 }
1056
1057 void
communicate(double send_val,std::vector<double> & recv_buffer)1058 nest::MPIManager::communicate( double send_val, std::vector< double >& recv_buffer )
1059 {
1060 recv_buffer.resize( 1 );
1061 recv_buffer[ 0 ] = send_val;
1062 }
1063
1064 void
communicate(std::vector<long> &,std::vector<long> &)1065 nest::MPIManager::communicate( std::vector< long >&, std::vector< long >& )
1066 {
1067 }
1068
1069 void
communicate_Allreduce_sum_in_place(double)1070 nest::MPIManager::communicate_Allreduce_sum_in_place( double )
1071 {
1072 }
1073
1074 void
communicate_Allreduce_sum_in_place(std::vector<double> &)1075 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< double >& )
1076 {
1077 }
1078
1079 void
communicate_Allreduce_sum_in_place(std::vector<int> &)1080 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< int >& )
1081 {
1082 }
1083
1084 void
communicate_Allreduce_sum(std::vector<double> & send_buffer,std::vector<double> & recv_buffer)1085 nest::MPIManager::communicate_Allreduce_sum( std::vector< double >& send_buffer, std::vector< double >& recv_buffer )
1086 {
1087 recv_buffer.swap( send_buffer );
1088 }
1089
1090 double
min_cross_ranks(double value)1091 nest::MPIManager::min_cross_ranks( double value )
1092 {
1093 return value;
1094 }
1095
1096 double
max_cross_ranks(double value)1097 nest::MPIManager::max_cross_ranks( double value )
1098 {
1099 return value;
1100 }
1101
1102 void
communicate_recv_counts_secondary_events()1103 nest::MPIManager::communicate_recv_counts_secondary_events()
1104 {
1105 // since we only have one process, the send count is equal to the recv count
1106 send_counts_secondary_events_in_int_per_rank_ = recv_counts_secondary_events_in_int_per_rank_;
1107
1108 // since we only have one process, the send displacement is zero
1109 assert( send_displacements_secondary_events_in_int_per_rank_.size() == 1 );
1110 send_displacements_secondary_events_in_int_per_rank_[ 0 ] = 0;
1111 }
1112
1113 #endif /* #ifdef HAVE_MPI */
1114