1 /*
2  *  mpi_manager.cpp
3  *
4  *  This file is part of NEST.
5  *
6  *  Copyright (C) 2004 The NEST Initiative
7  *
8  *  NEST is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 2 of the License, or
11  *  (at your option) any later version.
12  *
13  *  NEST is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  *
18  *  You should have received a copy of the GNU General Public License
19  *  along with NEST.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  */
22 
23 #include "mpi_manager.h"
24 
25 // C++ includes:
26 #include <limits>
27 #include <numeric>
28 
29 // Includes from libnestutil:
30 #include "compose.hpp"
31 #include "logging.h"
32 #include "stopwatch.h"
33 
34 // Includes from nestkernel:
35 #include "kernel_manager.h"
36 #include "mpi_manager_impl.h"
37 #include "nest_types.h"
38 
39 // Includes from sli:
40 #include "dictutils.h"
41 
42 #ifdef HAVE_MPI
43 
44 template <>
45 MPI_Datatype MPI_Type< int >::type = MPI_INT;
46 template <>
47 MPI_Datatype MPI_Type< double >::type = MPI_DOUBLE;
48 template <>
49 MPI_Datatype MPI_Type< long >::type = MPI_LONG;
50 template <>
51 MPI_Datatype MPI_Type< unsigned int >::type = MPI_INT;
52 template <>
53 MPI_Datatype MPI_Type< unsigned long >::type = MPI_UNSIGNED_LONG;
54 
55 #endif /* #ifdef HAVE_MPI */
56 
MPIManager()57 nest::MPIManager::MPIManager()
58   : num_processes_( 1 )
59   , rank_( 0 )
60   , send_buffer_size_( 0 )
61   , recv_buffer_size_( 0 )
62   , use_mpi_( false )
63   , buffer_size_target_data_( 1 )
64   , buffer_size_spike_data_( 1 )
65   , max_buffer_size_target_data_( 16777216 )
66   , max_buffer_size_spike_data_( 8388608 )
67   , adaptive_target_buffers_( true )
68   , adaptive_spike_buffers_( true )
69   , growth_factor_buffer_spike_data_( 1.5 )
70   , growth_factor_buffer_target_data_( 1.5 )
71   , shrink_factor_buffer_spike_data_( 1.1 )
72   , send_recv_count_spike_data_per_rank_( 0 )
73   , send_recv_count_target_data_per_rank_( 0 )
74 #ifdef HAVE_MPI
75   , comm_step_( std::vector< int >() )
76   , COMM_OVERFLOW_ERROR( std::numeric_limits< unsigned int >::max() )
77   , comm( 0 )
78   , MPI_OFFGRID_SPIKE( 0 )
79 #endif
80 {
81 }
82 
83 #ifndef HAVE_MPI
84 
85 void
init_mpi(int *,char ***)86 nest::MPIManager::init_mpi( int*, char*** )
87 {
88   // if ! HAVE_MPI, initialize process entries for 1 rank
89   // use 2 processes entries (need at least two
90   // entries per process to use flag of first entry as validity and
91   // last entry to communicate end of communication)
92   kernel().mpi_manager.set_buffer_size_target_data( 2 );
93   kernel().mpi_manager.set_buffer_size_spike_data( 2 );
94 
95   recv_counts_secondary_events_in_int_per_rank_.resize( 1, 0 );
96   recv_displacements_secondary_events_in_int_per_rank_.resize( 1, 0 );
97   send_counts_secondary_events_in_int_per_rank_.resize( 1, 0 );
98   send_displacements_secondary_events_in_int_per_rank_.resize( 1, 0 );
99 }
100 
101 #else /* HAVE_MPI */
102 
103 void
set_communicator(MPI_Comm global_comm)104 nest::MPIManager::set_communicator( MPI_Comm global_comm )
105 {
106   comm = global_comm;
107   MPI_Comm_size( comm, &num_processes_ );
108   MPI_Comm_rank( comm, &rank_ );
109   recv_buffer_size_ = send_buffer_size_ * get_num_processes();
110 
111   // use at least 2 * number of processes entries (need at least two
112   // entries per process to use flag of first entry as validity and
113   // last entry to communicate end of communication)
114   kernel().mpi_manager.set_buffer_size_target_data( 2 * kernel().mpi_manager.get_num_processes() );
115   kernel().mpi_manager.set_buffer_size_spike_data( 2 * kernel().mpi_manager.get_num_processes() );
116 }
117 
118 void
init_mpi(int * argc,char ** argv[])119 nest::MPIManager::init_mpi( int* argc, char** argv[] )
120 {
121   int init;
122   MPI_Initialized( &init );
123 
124   if ( init == 0 )
125   {
126 #ifdef HAVE_MUSIC
127     kernel().music_manager.init_music( argc, argv );
128     // get a communicator from MUSIC
129     set_communicator( static_cast< MPI_Comm >( kernel().music_manager.communicator() ) );
130 #else  /* #ifdef HAVE_MUSIC */
131     int provided_thread_level;
132     MPI_Init_thread( argc, argv, MPI_THREAD_FUNNELED, &provided_thread_level );
133     set_communicator( MPI_COMM_WORLD );
134 #endif /* #ifdef HAVE_MUSIC */
135   }
136   else
137   {
138 #ifdef HAVE_MUSIC
139     LOG( M_ERROR,
140       "MPIManager::init_mpi()",
141       "When compiled with MUSIC, NEST must be initialized before any other modules that call MPI_Init(). "
142       "Calling MPI_Abort()." );
143     comm = MPI_COMM_WORLD;
144     mpi_abort( 1 );
145 #else
146     set_communicator( MPI_COMM_WORLD );
147 #endif
148   }
149 
150   recv_counts_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
151   recv_displacements_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
152   send_counts_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
153   send_displacements_secondary_events_in_int_per_rank_.resize( get_num_processes(), 0 );
154 
155   // create off-grid-spike type for MPI communication
156   // creating derived datatype
157   OffGridSpike::assert_datatype_compatibility_();
158   MPI_Datatype source_types[ 2 ];
159   int blockcounts[ 2 ];
160   MPI_Aint offsets[ 2 ];
161   MPI_Aint start_address, address;
162   OffGridSpike ogs( 0, 0.0 );
163 
164   // OffGridSpike.node_id
165   offsets[ 0 ] = 0;
166   source_types[ 0 ] = MPI_DOUBLE;
167   blockcounts[ 0 ] = 1;
168 
169   // OffGridSpike.offset
170   MPI_Get_address( &( ogs.node_id_ ), &start_address );
171   MPI_Get_address( &( ogs.offset_ ), &address );
172   offsets[ 1 ] = address - start_address;
173   source_types[ 1 ] = MPI_DOUBLE;
174   blockcounts[ 1 ] = 1;
175 
176   // generate and commit struct
177   MPI_Type_create_struct( 2, blockcounts, offsets, source_types, &MPI_OFFGRID_SPIKE );
178   MPI_Type_commit( &MPI_OFFGRID_SPIKE );
179 
180   use_mpi_ = true;
181 }
182 
183 #endif /* #ifdef HAVE_MPI */
184 
185 void
initialize()186 nest::MPIManager::initialize()
187 {
188 }
189 
190 void
finalize()191 nest::MPIManager::finalize()
192 {
193 }
194 
195 void
set_status(const DictionaryDatum & dict)196 nest::MPIManager::set_status( const DictionaryDatum& dict )
197 {
198   updateValue< bool >( dict, names::adaptive_target_buffers, adaptive_target_buffers_ );
199   updateValue< bool >( dict, names::adaptive_spike_buffers, adaptive_spike_buffers_ );
200 
201   long new_buffer_size_target_data = buffer_size_target_data_;
202   updateValue< long >( dict, names::buffer_size_target_data, new_buffer_size_target_data );
203   if ( new_buffer_size_target_data != static_cast< long >( buffer_size_target_data_ )
204     and new_buffer_size_target_data < static_cast< long >( max_buffer_size_target_data_ ) )
205   {
206     set_buffer_size_target_data( new_buffer_size_target_data );
207   }
208 
209   long new_buffer_size_spike_data = buffer_size_spike_data_;
210   updateValue< long >( dict, names::buffer_size_spike_data, new_buffer_size_spike_data );
211   if ( new_buffer_size_spike_data != static_cast< long >( buffer_size_spike_data_ )
212     and new_buffer_size_spike_data < static_cast< long >( max_buffer_size_spike_data_ ) )
213   {
214     set_buffer_size_spike_data( new_buffer_size_spike_data );
215   }
216 
217   updateValue< double >( dict, names::growth_factor_buffer_spike_data, growth_factor_buffer_spike_data_ );
218   updateValue< double >( dict, names::growth_factor_buffer_target_data, growth_factor_buffer_target_data_ );
219 
220   updateValue< long >( dict, names::max_buffer_size_target_data, max_buffer_size_target_data_ );
221   updateValue< long >( dict, names::max_buffer_size_spike_data, max_buffer_size_spike_data_ );
222 
223   updateValue< double >( dict, names::shrink_factor_buffer_spike_data, shrink_factor_buffer_spike_data_ );
224 }
225 
226 void
get_status(DictionaryDatum & dict)227 nest::MPIManager::get_status( DictionaryDatum& dict )
228 {
229   def< long >( dict, names::num_processes, num_processes_ );
230   def< bool >( dict, names::adaptive_spike_buffers, adaptive_spike_buffers_ );
231   def< bool >( dict, names::adaptive_target_buffers, adaptive_target_buffers_ );
232   def< size_t >( dict, names::buffer_size_target_data, buffer_size_target_data_ );
233   def< size_t >( dict, names::buffer_size_spike_data, buffer_size_spike_data_ );
234   def< size_t >( dict, names::send_buffer_size_secondary_events, get_send_buffer_size_secondary_events_in_int() );
235   def< size_t >( dict, names::recv_buffer_size_secondary_events, get_recv_buffer_size_secondary_events_in_int() );
236   def< size_t >( dict, names::max_buffer_size_spike_data, max_buffer_size_spike_data_ );
237   def< size_t >( dict, names::max_buffer_size_target_data, max_buffer_size_target_data_ );
238   def< double >( dict, names::growth_factor_buffer_spike_data, growth_factor_buffer_spike_data_ );
239   def< double >( dict, names::growth_factor_buffer_target_data, growth_factor_buffer_target_data_ );
240 }
241 
242 #ifdef HAVE_MPI
243 
244 void
mpi_finalize(int exitcode)245 nest::MPIManager::mpi_finalize( int exitcode )
246 {
247   MPI_Type_free( &MPI_OFFGRID_SPIKE );
248 
249   int finalized;
250   MPI_Finalized( &finalized );
251 
252   int initialized;
253   MPI_Initialized( &initialized );
254 
255   if ( finalized == 0 and initialized == 1 )
256   {
257     if ( exitcode == 0 )
258     {
259       kernel().music_manager.music_finalize(); // calls MPI_Finalize()
260     }
261     else
262     {
263       LOG( M_INFO, "MPIManager::finalize()", "Calling MPI_Abort() due to errors in the script." );
264       mpi_abort( exitcode );
265     }
266   }
267 }
268 
269 #else /* #ifdef HAVE_MPI */
270 
271 void
mpi_finalize(int)272 nest::MPIManager::mpi_finalize( int )
273 {
274 }
275 
276 #endif /* #ifdef HAVE_MPI */
277 
278 #ifdef HAVE_MPI
279 
280 void
mpi_abort(int exitcode)281 nest::MPIManager::mpi_abort( int exitcode )
282 {
283   MPI_Abort( comm, exitcode );
284 }
285 
286 
287 std::string
get_processor_name()288 nest::MPIManager::get_processor_name()
289 {
290   char name[ 1024 ];
291   int len;
292   MPI_Get_processor_name( name, &len );
293   name[ len ] = '\0';
294   return name;
295 }
296 
297 void
communicate(std::vector<long> & local_nodes,std::vector<long> & global_nodes)298 nest::MPIManager::communicate( std::vector< long >& local_nodes, std::vector< long >& global_nodes )
299 {
300   size_t np = get_num_processes();
301   // Get size of buffers
302   std::vector< int > num_nodes_per_rank( np );
303   num_nodes_per_rank[ get_rank() ] = local_nodes.size();
304   communicate( num_nodes_per_rank );
305 
306   size_t num_globals = std::accumulate( num_nodes_per_rank.begin(), num_nodes_per_rank.end(), 0 );
307   global_nodes.resize( num_globals, 0L );
308 
309   // Set up displacements vector. Entry i specifies the displacement (relative
310   // to recv_buffer ) at which to place the incoming data from process i
311   std::vector< int > displacements( np, 0 );
312   for ( size_t i = 1; i < np; ++i )
313   {
314     displacements.at( i ) = displacements.at( i - 1 ) + num_nodes_per_rank.at( i - 1 );
315   }
316 
317   MPI_Allgatherv( &local_nodes[ 0 ],
318     local_nodes.size(),
319     MPI_Type< long >::type,
320     &global_nodes[ 0 ],
321     &num_nodes_per_rank[ 0 ],
322     &displacements[ 0 ],
323     MPI_Type< long >::type,
324     comm );
325 }
326 
327 void
communicate(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)328 nest::MPIManager::communicate( std::vector< unsigned int >& send_buffer,
329   std::vector< unsigned int >& recv_buffer,
330   std::vector< int >& displacements )
331 {
332   displacements.resize( num_processes_, 0 );
333   if ( get_num_processes() == 1 ) // purely thread-based
334   {
335     displacements[ 0 ] = 0;
336     if ( static_cast< unsigned int >( recv_buffer_size_ ) < send_buffer.size() )
337     {
338       recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
339       recv_buffer.resize( recv_buffer_size_ );
340     }
341     recv_buffer.swap( send_buffer );
342   }
343   else
344   {
345     communicate_Allgather( send_buffer, recv_buffer, displacements );
346   }
347 }
348 
349 void
communicate_Allgather(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)350 nest::MPIManager::communicate_Allgather( std::vector< unsigned int >& send_buffer,
351   std::vector< unsigned int >& recv_buffer,
352   std::vector< int >& displacements )
353 {
354   std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
355 
356   // attempt Allgather
357   if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
358   {
359     MPI_Allgather(
360       &send_buffer[ 0 ], send_buffer_size_, MPI_UNSIGNED, &recv_buffer[ 0 ], send_buffer_size_, MPI_UNSIGNED, comm );
361   }
362   else
363   {
364     // DEC cxx required 0U literal, HEP 2007-03-26
365     std::vector< unsigned int > overflow_buffer( send_buffer_size_, 0U );
366     overflow_buffer[ 0 ] = COMM_OVERFLOW_ERROR;
367     overflow_buffer[ 1 ] = send_buffer.size();
368     MPI_Allgather( &overflow_buffer[ 0 ],
369       send_buffer_size_,
370       MPI_UNSIGNED,
371       &recv_buffer[ 0 ],
372       send_buffer_size_,
373       MPI_UNSIGNED,
374       comm );
375   }
376   // check for overflow condition
377   int disp = 0;
378   unsigned int max_recv_count = send_buffer_size_;
379   bool overflow = false;
380   for ( int pid = 0; pid < get_num_processes(); ++pid )
381   {
382     unsigned int block_disp = pid * send_buffer_size_;
383     displacements[ pid ] = disp;
384     if ( recv_buffer[ block_disp ] == COMM_OVERFLOW_ERROR )
385     {
386       overflow = true;
387       recv_counts[ pid ] = recv_buffer[ block_disp + 1 ];
388       if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
389       {
390         max_recv_count = recv_counts[ pid ];
391       }
392     }
393     disp += recv_counts[ pid ];
394   }
395 
396   // do Allgatherv if necessary
397   if ( overflow )
398   {
399     recv_buffer.resize( disp, 0 );
400     MPI_Allgatherv( &send_buffer[ 0 ],
401       send_buffer.size(),
402       MPI_UNSIGNED,
403       &recv_buffer[ 0 ],
404       &recv_counts[ 0 ],
405       &displacements[ 0 ],
406       MPI_UNSIGNED,
407       comm );
408     send_buffer_size_ = max_recv_count;
409     recv_buffer_size_ = send_buffer_size_ * get_num_processes();
410   }
411 }
412 
413 template < typename T >
414 void
communicate_Allgather(std::vector<T> & send_buffer,std::vector<T> & recv_buffer,std::vector<int> & displacements)415 nest::MPIManager::communicate_Allgather( std::vector< T >& send_buffer,
416   std::vector< T >& recv_buffer,
417   std::vector< int >& displacements )
418 {
419   std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
420 
421   // attempt Allgather
422   if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
423   {
424     MPI_Allgather( &send_buffer[ 0 ],
425       send_buffer_size_,
426       MPI_Type< T >::type,
427       &recv_buffer[ 0 ],
428       send_buffer_size_,
429       MPI_Type< T >::type,
430       comm );
431   }
432   else
433   {
434     // DEC cxx required 0U literal, HEP 2007-03-26
435     std::vector< unsigned int > overflow_buffer( send_buffer_size_, 0U );
436     overflow_buffer[ 0 ] = COMM_OVERFLOW_ERROR;
437     overflow_buffer[ 1 ] = send_buffer.size();
438     MPI_Allgather( &overflow_buffer[ 0 ],
439       send_buffer_size_,
440       MPI_Type< T >::type,
441       &recv_buffer[ 0 ],
442       send_buffer_size_,
443       MPI_Type< T >::type,
444       comm );
445   }
446   // check for overflow condition
447   int disp = 0;
448   unsigned int max_recv_count = send_buffer_size_;
449   bool overflow = false;
450   for ( int pid = 0; pid < get_num_processes(); ++pid )
451   {
452     unsigned int block_disp = pid * send_buffer_size_;
453     displacements[ pid ] = disp;
454     if ( recv_buffer[ block_disp ] == COMM_OVERFLOW_ERROR )
455     {
456       overflow = true;
457       recv_counts[ pid ] = recv_buffer[ block_disp + 1 ];
458       if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
459       {
460         max_recv_count = recv_counts[ pid ];
461       }
462     }
463     disp += recv_counts[ pid ];
464   }
465 
466   // do Allgatherv if necessary
467   if ( overflow )
468   {
469     recv_buffer.resize( disp, 0 );
470     MPI_Allgatherv( &send_buffer[ 0 ],
471       send_buffer.size(),
472       MPI_Type< T >::type,
473       &recv_buffer[ 0 ],
474       &recv_counts[ 0 ],
475       &displacements[ 0 ],
476       MPI_Type< T >::type,
477       comm );
478     send_buffer_size_ = max_recv_count;
479     recv_buffer_size_ = send_buffer_size_ * get_num_processes();
480   }
481 }
482 
483 void
communicate(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)484 nest::MPIManager::communicate( std::vector< OffGridSpike >& send_buffer,
485   std::vector< OffGridSpike >& recv_buffer,
486   std::vector< int >& displacements )
487 {
488   displacements.resize( num_processes_, 0 );
489   if ( get_num_processes() == 1 ) // purely thread-based
490   {
491     displacements[ 0 ] = 0;
492     if ( static_cast< unsigned int >( recv_buffer_size_ ) < send_buffer.size() )
493     {
494       recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
495       recv_buffer.resize( recv_buffer_size_ );
496     }
497     recv_buffer.swap( send_buffer );
498   }
499   else
500   {
501     communicate_Allgather( send_buffer, recv_buffer, displacements );
502   }
503 }
504 
505 void
communicate_Allgather(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)506 nest::MPIManager::communicate_Allgather( std::vector< OffGridSpike >& send_buffer,
507   std::vector< OffGridSpike >& recv_buffer,
508   std::vector< int >& displacements )
509 {
510   std::vector< int > recv_counts( get_num_processes(), send_buffer_size_ );
511   // attempt Allgather
512   if ( send_buffer.size() == static_cast< unsigned int >( send_buffer_size_ ) )
513   {
514     MPI_Allgather( &send_buffer[ 0 ],
515       send_buffer_size_,
516       MPI_OFFGRID_SPIKE,
517       &recv_buffer[ 0 ],
518       send_buffer_size_,
519       MPI_OFFGRID_SPIKE,
520       comm );
521   }
522   else
523   {
524     std::vector< OffGridSpike > overflow_buffer( send_buffer_size_ );
525     overflow_buffer[ 0 ] = OffGridSpike( COMM_OVERFLOW_ERROR, 0.0 );
526     overflow_buffer[ 1 ] = OffGridSpike( send_buffer.size(), 0.0 );
527     MPI_Allgather( &overflow_buffer[ 0 ],
528       send_buffer_size_,
529       MPI_OFFGRID_SPIKE,
530       &recv_buffer[ 0 ],
531       send_buffer_size_,
532       MPI_OFFGRID_SPIKE,
533       comm );
534   }
535 
536   // check for overflow condition
537   int disp = 0;
538   unsigned int max_recv_count = send_buffer_size_;
539   bool overflow = false;
540   for ( int pid = 0; pid < get_num_processes(); ++pid )
541   {
542     unsigned int block_disp = pid * send_buffer_size_;
543     displacements[ pid ] = disp;
544     if ( ( recv_buffer[ block_disp ] ).get_node_id() == COMM_OVERFLOW_ERROR )
545     {
546       overflow = true;
547       recv_counts[ pid ] = ( recv_buffer[ block_disp + 1 ] ).get_node_id();
548       if ( static_cast< unsigned int >( recv_counts[ pid ] ) > max_recv_count )
549       {
550         max_recv_count = recv_counts[ pid ];
551       }
552     }
553     disp += recv_counts[ pid ];
554   }
555 
556   // do Allgatherv if necessary
557   if ( overflow )
558   {
559     recv_buffer.resize( disp );
560     MPI_Allgatherv( &send_buffer[ 0 ],
561       send_buffer.size(),
562       MPI_OFFGRID_SPIKE,
563       &recv_buffer[ 0 ],
564       &recv_counts[ 0 ],
565       &displacements[ 0 ],
566       MPI_OFFGRID_SPIKE,
567       comm );
568     send_buffer_size_ = max_recv_count;
569     recv_buffer_size_ = send_buffer_size_ * get_num_processes();
570   }
571 }
572 
573 void
communicate(std::vector<double> & send_buffer,std::vector<double> & recv_buffer,std::vector<int> & displacements)574 nest::MPIManager::communicate( std::vector< double >& send_buffer,
575   std::vector< double >& recv_buffer,
576   std::vector< int >& displacements )
577 {
578   // get size of buffers
579   std::vector< int > n_nodes( get_num_processes() );
580   n_nodes[ get_rank() ] = send_buffer.size();
581   communicate( n_nodes );
582   // Set up displacements vector.
583   displacements.resize( get_num_processes(), 0 );
584   for ( int i = 1; i < get_num_processes(); ++i )
585   {
586     displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
587   }
588 
589   // Calculate total number of node data items to be gathered.
590   size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
591 
592   if ( n_globals != 0 )
593   {
594     recv_buffer.resize( n_globals, 0.0 );
595     communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
596   }
597   else
598   {
599     recv_buffer.clear();
600   }
601 }
602 
603 void
communicate(std::vector<unsigned long> & send_buffer,std::vector<unsigned long> & recv_buffer,std::vector<int> & displacements)604 nest::MPIManager::communicate( std::vector< unsigned long >& send_buffer,
605   std::vector< unsigned long >& recv_buffer,
606   std::vector< int >& displacements )
607 {
608   // get size of buffers
609   std::vector< int > n_nodes( num_processes_ );
610   n_nodes[ rank_ ] = send_buffer.size();
611   communicate( n_nodes );
612   // Set up displacements vector.
613   displacements.resize( num_processes_, 0 );
614   for ( int i = 1; i < num_processes_; ++i )
615   {
616     displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
617   }
618 
619   // Calculate total number of node data items to be gathered.
620   size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
621 
622   if ( n_globals != 0 )
623   {
624     recv_buffer.resize( n_globals, 0.0 );
625     communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
626   }
627   else
628   {
629     recv_buffer.clear();
630   }
631 }
632 
633 void
communicate(std::vector<int> & send_buffer,std::vector<int> & recv_buffer,std::vector<int> & displacements)634 nest::MPIManager::communicate( std::vector< int >& send_buffer,
635   std::vector< int >& recv_buffer,
636   std::vector< int >& displacements )
637 {
638   // get size of buffers
639   std::vector< int > n_nodes( num_processes_ );
640   n_nodes[ rank_ ] = send_buffer.size();
641   communicate( n_nodes );
642   // Set up displacements vector.
643   displacements.resize( num_processes_, 0 );
644   for ( int i = 1; i < num_processes_; ++i )
645   {
646     displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
647   }
648 
649   // Calculate total number of node data items to be gathered.
650   size_t n_globals = std::accumulate( n_nodes.begin(), n_nodes.end(), 0 );
651 
652   if ( n_globals != 0 )
653   {
654     recv_buffer.resize( n_globals, 0.0 );
655     communicate_Allgatherv( send_buffer, recv_buffer, displacements, n_nodes );
656   }
657   else
658   {
659     recv_buffer.clear();
660   }
661 }
662 
663 void
communicate(double send_val,std::vector<double> & recv_buffer)664 nest::MPIManager::communicate( double send_val, std::vector< double >& recv_buffer )
665 {
666   recv_buffer.resize( get_num_processes() );
667   MPI_Allgather( &send_val, 1, MPI_DOUBLE, &recv_buffer[ 0 ], 1, MPI_DOUBLE, comm );
668 }
669 
670 
671 /**
672  * communicate function for sending set-up information
673  */
674 void
communicate(std::vector<int> & buffer)675 nest::MPIManager::communicate( std::vector< int >& buffer )
676 {
677   communicate_Allgather( buffer );
678 }
679 
680 void
communicate(std::vector<long> & buffer)681 nest::MPIManager::communicate( std::vector< long >& buffer )
682 {
683   communicate_Allgather( buffer );
684 }
685 
686 void
communicate_Allgather(std::vector<int> & buffer)687 nest::MPIManager::communicate_Allgather( std::vector< int >& buffer )
688 {
689   // avoid aliasing, see http://www.mpi-forum.org/docs/mpi-11-html/node10.html
690   int my_val = buffer[ get_rank() ];
691   MPI_Allgather( &my_val, 1, MPI_INT, &buffer[ 0 ], 1, MPI_INT, comm );
692 }
693 
694 /*
695  * Sum across all rank
696  */
697 void
communicate_Allreduce_sum_in_place(double buffer)698 nest::MPIManager::communicate_Allreduce_sum_in_place( double buffer )
699 {
700   MPI_Allreduce( MPI_IN_PLACE, &buffer, 1, MPI_Type< double >::type, MPI_SUM, comm );
701 }
702 
703 void
communicate_Allreduce_sum_in_place(std::vector<double> & buffer)704 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< double >& buffer )
705 {
706   MPI_Allreduce( MPI_IN_PLACE, &buffer[ 0 ], buffer.size(), MPI_Type< double >::type, MPI_SUM, comm );
707 }
708 
709 void
communicate_Allreduce_sum_in_place(std::vector<int> & buffer)710 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< int >& buffer )
711 {
712   MPI_Allreduce( MPI_IN_PLACE, &buffer[ 0 ], buffer.size(), MPI_Type< int >::type, MPI_SUM, comm );
713 }
714 
715 void
communicate_Allreduce_sum(std::vector<double> & send_buffer,std::vector<double> & recv_buffer)716 nest::MPIManager::communicate_Allreduce_sum( std::vector< double >& send_buffer, std::vector< double >& recv_buffer )
717 {
718   assert( recv_buffer.size() == send_buffer.size() );
719   MPI_Allreduce( &send_buffer[ 0 ], &recv_buffer[ 0 ], send_buffer.size(), MPI_Type< double >::type, MPI_SUM, comm );
720 }
721 
722 double
min_cross_ranks(double value)723 nest::MPIManager::min_cross_ranks( double value )
724 {
725   MPI_Allreduce( MPI_IN_PLACE, &value, 1, MPI_DOUBLE, MPI_MIN, comm );
726   return value;
727 }
728 
729 double
max_cross_ranks(double value)730 nest::MPIManager::max_cross_ranks( double value )
731 {
732   MPI_Allreduce( MPI_IN_PLACE, &value, 1, MPI_DOUBLE, MPI_MAX, comm );
733   return value;
734 }
735 
736 void
communicate_Allgather(std::vector<long> & buffer)737 nest::MPIManager::communicate_Allgather( std::vector< long >& buffer )
738 {
739   // avoid aliasing, see http://www.mpi-forum.org/docs/mpi-11-html/node10.html
740   long my_val = buffer[ get_rank() ];
741   MPI_Allgather( &my_val, 1, MPI_LONG, &buffer[ 0 ], 1, MPI_LONG, comm );
742 }
743 
744 void
communicate_Alltoall_(void * send_buffer,void * recv_buffer,const unsigned int send_recv_count)745 nest::MPIManager::communicate_Alltoall_( void* send_buffer, void* recv_buffer, const unsigned int send_recv_count )
746 {
747   MPI_Alltoall( send_buffer, send_recv_count, MPI_UNSIGNED, recv_buffer, send_recv_count, MPI_UNSIGNED, comm );
748 }
749 
750 void
communicate_Alltoallv_(void * send_buffer,const int * send_counts,const int * send_displacements,void * recv_buffer,const int * recv_counts,const int * recv_displacements)751 nest::MPIManager::communicate_Alltoallv_( void* send_buffer,
752   const int* send_counts,
753   const int* send_displacements,
754   void* recv_buffer,
755   const int* recv_counts,
756   const int* recv_displacements )
757 {
758   MPI_Alltoallv( send_buffer,
759     send_counts,
760     send_displacements,
761     MPI_UNSIGNED,
762     recv_buffer,
763     recv_counts,
764     recv_displacements,
765     MPI_UNSIGNED,
766     comm );
767 }
768 
769 void
communicate_recv_counts_secondary_events()770 nest::MPIManager::communicate_recv_counts_secondary_events()
771 {
772 
773   communicate_Alltoall(
774     recv_counts_secondary_events_in_int_per_rank_, send_counts_secondary_events_in_int_per_rank_, 1 );
775 
776   std::partial_sum( send_counts_secondary_events_in_int_per_rank_.begin(),
777     send_counts_secondary_events_in_int_per_rank_.end() - 1,
778     send_displacements_secondary_events_in_int_per_rank_.begin() + 1 );
779 }
780 
781 /**
782  * Ensure all processes have reached the same stage by waiting until all
783  * processes have sent a dummy message to process 0.
784  */
785 void
synchronize()786 nest::MPIManager::synchronize()
787 {
788   MPI_Barrier( comm );
789 }
790 
791 
792 // any_true: takes a single bool, exchanges with all other processes,
793 // and returns "true" if one or more processes provide "true"
794 bool
any_true(const bool my_bool)795 nest::MPIManager::any_true( const bool my_bool )
796 {
797   if ( get_num_processes() == 1 )
798   {
799     return my_bool;
800   }
801 
802   // since there is no MPI_BOOL we first convert to int
803   int my_int = my_bool;
804 
805   std::vector< int > all_int( get_num_processes() );
806   MPI_Allgather( &my_int, 1, MPI_INT, &all_int[ 0 ], 1, MPI_INT, comm );
807   // check if any MPI process sent a "true"
808   for ( unsigned int i = 0; i < all_int.size(); ++i )
809   {
810     if ( all_int[ i ] != 0 )
811     {
812       return true;
813     }
814   }
815 
816   return false;
817 }
818 
819 // average communication time for a packet size of num_bytes using Allgather
820 double
time_communicate(int num_bytes,int samples)821 nest::MPIManager::time_communicate( int num_bytes, int samples )
822 {
823   if ( get_num_processes() == 1 )
824   {
825     return 0.0;
826   }
827   unsigned int packet_length = num_bytes / sizeof( unsigned int );
828   if ( packet_length < 1 )
829   {
830     packet_length = 1;
831   }
832   std::vector< unsigned int > test_send_buffer( packet_length );
833   std::vector< unsigned int > test_recv_buffer( packet_length * get_num_processes() );
834   // start time measurement here
835   Stopwatch foo;
836   foo.start();
837   for ( int i = 0; i < samples; ++i )
838   {
839     MPI_Allgather(
840       &test_send_buffer[ 0 ], packet_length, MPI_UNSIGNED, &test_recv_buffer[ 0 ], packet_length, MPI_UNSIGNED, comm );
841   }
842   // finish time measurement here
843   foo.stop();
844   return foo.elapsed() / samples;
845 }
846 
847 // average communication time for a packet size of num_bytes using Allgatherv
848 double
time_communicatev(int num_bytes,int samples)849 nest::MPIManager::time_communicatev( int num_bytes, int samples )
850 {
851   if ( get_num_processes() == 1 )
852   {
853     return 0.0;
854   }
855   unsigned int packet_length = num_bytes / sizeof( unsigned int );
856   if ( packet_length < 1 )
857   {
858     packet_length = 1;
859   }
860   std::vector< unsigned int > test_send_buffer( packet_length );
861   std::vector< unsigned int > test_recv_buffer( packet_length * get_num_processes() );
862   std::vector< int > n_nodes( get_num_processes(), packet_length );
863   std::vector< int > displacements( get_num_processes(), 0 );
864 
865   for ( int i = 1; i < get_num_processes(); ++i )
866   {
867     displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
868   }
869 
870   // start time measurement here
871   Stopwatch foo;
872   foo.start();
873   for ( int i = 0; i < samples; ++i )
874   {
875     communicate_Allgatherv( test_send_buffer, test_recv_buffer, displacements, n_nodes );
876   }
877 
878   // finish time measurement here
879   foo.stop();
880   return foo.elapsed() / samples;
881 }
882 
883 // average communication time for a packet size of num_bytes
884 double
time_communicate_offgrid(int num_bytes,int samples)885 nest::MPIManager::time_communicate_offgrid( int num_bytes, int samples )
886 {
887   if ( get_num_processes() == 1 )
888   {
889     return 0.0;
890   }
891   unsigned int packet_length = num_bytes / sizeof( OffGridSpike );
892   if ( packet_length < 1 )
893   {
894     packet_length = 1;
895   }
896   std::vector< OffGridSpike > test_send_buffer( packet_length );
897   std::vector< OffGridSpike > test_recv_buffer( packet_length * get_num_processes() );
898   // start time measurement here
899   Stopwatch foo;
900   foo.start();
901   for ( int i = 0; i < samples; ++i )
902   {
903     MPI_Allgather( &test_send_buffer[ 0 ],
904       packet_length,
905       MPI_OFFGRID_SPIKE,
906       &test_recv_buffer[ 0 ],
907       packet_length,
908       MPI_OFFGRID_SPIKE,
909       comm );
910   }
911   // finish time measurement here
912   foo.stop();
913   return foo.elapsed() / samples;
914 }
915 
916 // average communication time for a packet size of num_bytes using Alltoall
917 double
time_communicate_alltoall(int num_bytes,int samples)918 nest::MPIManager::time_communicate_alltoall( int num_bytes, int samples )
919 {
920   if ( get_num_processes() == 1 )
921   {
922     return 0.0;
923   }
924   unsigned int packet_length = num_bytes / sizeof( unsigned int );        // this size should be sent to each process
925   unsigned int total_packet_length = packet_length * get_num_processes(); // total size of send and receive buffers
926   if ( total_packet_length < 1 )
927   {
928     total_packet_length = 1;
929   }
930   std::vector< unsigned int > test_send_buffer( total_packet_length );
931   std::vector< unsigned int > test_recv_buffer( total_packet_length );
932   // start time measurement here
933   Stopwatch foo;
934   foo.start();
935   for ( int i = 0; i < samples; ++i )
936   {
937     MPI_Alltoall(
938       &test_send_buffer[ 0 ], packet_length, MPI_UNSIGNED, &test_recv_buffer[ 0 ], packet_length, MPI_UNSIGNED, comm );
939   }
940   // finish time measurement here
941   foo.stop();
942   return foo.elapsed() / samples;
943 }
944 
945 // average communication time for a packet size of num_bytes using Alltoallv
946 double
time_communicate_alltoallv(int num_bytes,int samples)947 nest::MPIManager::time_communicate_alltoallv( int num_bytes, int samples )
948 {
949   if ( get_num_processes() == 1 )
950   {
951     return 0.0;
952   }
953   unsigned int packet_length = num_bytes / sizeof( unsigned int );        // this size should be sent to each process
954   unsigned int total_packet_length = packet_length * get_num_processes(); // total size of send and receive buffers
955   if ( total_packet_length < 1 )
956   {
957     total_packet_length = 1;
958   }
959   std::vector< unsigned int > test_send_buffer( total_packet_length );
960   std::vector< unsigned int > test_recv_buffer( total_packet_length );
961   std::vector< int > n_nodes( get_num_processes(), packet_length );
962   std::vector< int > displacements( get_num_processes(), 0 );
963 
964   for ( int i = 1; i < get_num_processes(); ++i )
965   {
966     displacements.at( i ) = displacements.at( i - 1 ) + n_nodes.at( i - 1 );
967   }
968 
969   // start time measurement here
970   Stopwatch foo;
971   foo.start();
972   for ( int i = 0; i < samples; ++i )
973   {
974     MPI_Alltoallv( &test_send_buffer[ 0 ],
975       &n_nodes[ 0 ],
976       &displacements[ 0 ],
977       MPI_UNSIGNED,
978       &test_recv_buffer[ 0 ],
979       &n_nodes[ 0 ],
980       &displacements[ 0 ],
981       MPI_UNSIGNED,
982       comm );
983   }
984   // finish time measurement here
985   foo.stop();
986   return foo.elapsed() / samples;
987 }
988 
989 #else /* #ifdef HAVE_MPI */
990 
991 /**
992  * communicate (on-grid) if compiled without MPI
993  */
994 void
communicate(std::vector<unsigned int> & send_buffer,std::vector<unsigned int> & recv_buffer,std::vector<int> & displacements)995 nest::MPIManager::communicate( std::vector< unsigned int >& send_buffer,
996   std::vector< unsigned int >& recv_buffer,
997   std::vector< int >& displacements )
998 {
999   displacements.resize( num_processes_, 0 );
1000   displacements[ 0 ] = 0;
1001   if ( static_cast< size_t >( recv_buffer_size_ ) < send_buffer.size() )
1002   {
1003     recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
1004     recv_buffer.resize( recv_buffer_size_ );
1005   }
1006   recv_buffer.swap( send_buffer );
1007 }
1008 
1009 /**
1010  * communicate (off-grid) if compiled without MPI
1011  */
1012 void
communicate(std::vector<OffGridSpike> & send_buffer,std::vector<OffGridSpike> & recv_buffer,std::vector<int> & displacements)1013 nest::MPIManager::communicate( std::vector< OffGridSpike >& send_buffer,
1014   std::vector< OffGridSpike >& recv_buffer,
1015   std::vector< int >& displacements )
1016 {
1017   displacements.resize( num_processes_, 0 );
1018   displacements[ 0 ] = 0;
1019   if ( static_cast< size_t >( recv_buffer_size_ ) < send_buffer.size() )
1020   {
1021     recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
1022     recv_buffer.resize( recv_buffer_size_ );
1023   }
1024   recv_buffer.swap( send_buffer );
1025 }
1026 
1027 void
communicate(std::vector<double> & send_buffer,std::vector<double> & recv_buffer,std::vector<int> & displacements)1028 nest::MPIManager::communicate( std::vector< double >& send_buffer,
1029   std::vector< double >& recv_buffer,
1030   std::vector< int >& displacements )
1031 {
1032   displacements.resize( num_processes_, 0 );
1033   displacements[ 0 ] = 0;
1034   recv_buffer.swap( send_buffer );
1035 }
1036 
1037 void
communicate(std::vector<unsigned long> & send_buffer,std::vector<unsigned long> & recv_buffer,std::vector<int> & displacements)1038 nest::MPIManager::communicate( std::vector< unsigned long >& send_buffer,
1039   std::vector< unsigned long >& recv_buffer,
1040   std::vector< int >& displacements )
1041 {
1042   displacements.resize( num_processes_, 0 );
1043   displacements[ 0 ] = 0;
1044   recv_buffer.swap( send_buffer );
1045 }
1046 
1047 void
communicate(std::vector<int> & send_buffer,std::vector<int> & recv_buffer,std::vector<int> & displacements)1048 nest::MPIManager::communicate( std::vector< int >& send_buffer,
1049   std::vector< int >& recv_buffer,
1050   std::vector< int >& displacements )
1051 {
1052   displacements.resize( num_processes_, 0 );
1053   displacements[ 0 ] = 0;
1054   recv_buffer.swap( send_buffer );
1055 }
1056 
1057 void
communicate(double send_val,std::vector<double> & recv_buffer)1058 nest::MPIManager::communicate( double send_val, std::vector< double >& recv_buffer )
1059 {
1060   recv_buffer.resize( 1 );
1061   recv_buffer[ 0 ] = send_val;
1062 }
1063 
1064 void
communicate(std::vector<long> &,std::vector<long> &)1065 nest::MPIManager::communicate( std::vector< long >&, std::vector< long >& )
1066 {
1067 }
1068 
1069 void
communicate_Allreduce_sum_in_place(double)1070 nest::MPIManager::communicate_Allreduce_sum_in_place( double )
1071 {
1072 }
1073 
1074 void
communicate_Allreduce_sum_in_place(std::vector<double> &)1075 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< double >& )
1076 {
1077 }
1078 
1079 void
communicate_Allreduce_sum_in_place(std::vector<int> &)1080 nest::MPIManager::communicate_Allreduce_sum_in_place( std::vector< int >& )
1081 {
1082 }
1083 
1084 void
communicate_Allreduce_sum(std::vector<double> & send_buffer,std::vector<double> & recv_buffer)1085 nest::MPIManager::communicate_Allreduce_sum( std::vector< double >& send_buffer, std::vector< double >& recv_buffer )
1086 {
1087   recv_buffer.swap( send_buffer );
1088 }
1089 
1090 double
min_cross_ranks(double value)1091 nest::MPIManager::min_cross_ranks( double value )
1092 {
1093   return value;
1094 }
1095 
1096 double
max_cross_ranks(double value)1097 nest::MPIManager::max_cross_ranks( double value )
1098 {
1099   return value;
1100 }
1101 
1102 void
communicate_recv_counts_secondary_events()1103 nest::MPIManager::communicate_recv_counts_secondary_events()
1104 {
1105   // since we only have one process, the send count is equal to the recv count
1106   send_counts_secondary_events_in_int_per_rank_ = recv_counts_secondary_events_in_int_per_rank_;
1107 
1108   // since we only have one process, the send displacement is zero
1109   assert( send_displacements_secondary_events_in_int_per_rank_.size() == 1 );
1110   send_displacements_secondary_events_in_int_per_rank_[ 0 ] = 0;
1111 }
1112 
1113 #endif /* #ifdef HAVE_MPI */
1114