/*
* source_table.cpp
*
* This file is part of NEST.
*
* Copyright (C) 2004 The NEST Initiative
*
* NEST is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* NEST is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with NEST. If not, see .
*
*/
// C++ includes:
#include
// Includes from nestkernel:
#include "connection_manager.h"
#include "connection_manager_impl.h"
#include "kernel_manager.h"
#include "mpi_manager_impl.h"
#include "source_table.h"
#include "vp_manager_impl.h"
nest::SourceTable::SourceTable()
{
}
nest::SourceTable::~SourceTable()
{
}
void
nest::SourceTable::initialize()
{
assert( sizeof( Source ) == 8 );
const thread num_threads = kernel().vp_manager.get_num_threads();
sources_.resize( num_threads );
is_cleared_.initialize( num_threads, false );
saved_entry_point_.initialize( num_threads, false );
current_positions_.resize( num_threads );
saved_positions_.resize( num_threads );
compressible_sources_.resize( num_threads );
compressed_spike_data_map_.resize( num_threads );
#pragma omp parallel
{
const thread tid = kernel().vp_manager.get_thread_id();
sources_[ tid ].resize( 0 );
resize_sources( tid );
compressible_sources_[ tid ].resize( 0 );
compressed_spike_data_map_[ tid ].resize( 0 );
} // of omp parallel
}
void
nest::SourceTable::finalize()
{
for ( thread tid = 0; tid < static_cast< thread >( sources_.size() ); ++tid )
{
if ( is_cleared_[ tid ].is_false() )
{
clear( tid );
compressible_sources_[ tid ].clear();
compressed_spike_data_map_[ tid ].clear();
}
}
sources_.clear();
current_positions_.clear();
saved_positions_.clear();
compressible_sources_.clear();
compressed_spike_data_map_.clear();
}
bool
nest::SourceTable::is_cleared() const
{
return is_cleared_.all_true();
}
std::vector< BlockVector< nest::Source > >&
nest::SourceTable::get_thread_local_sources( const thread tid )
{
return sources_[ tid ];
}
nest::SourceTablePosition
nest::SourceTable::find_maximal_position() const
{
SourceTablePosition max_position( -1, -1, -1 );
for ( thread tid = 0; tid < kernel().vp_manager.get_num_threads(); ++tid )
{
if ( max_position < saved_positions_[ tid ] )
{
max_position = saved_positions_[ tid ];
}
}
return max_position;
}
void
nest::SourceTable::clean( const thread tid )
{
// Find maximal position in source table among threads to make sure
// unprocessed entries are not removed. Given this maximal position,
// we can safely delete all larger entries since they will not be
// touched any more.
const SourceTablePosition max_position = find_maximal_position();
// If this thread corresponds to max_position's thread, we can only
// delete part of the sources table, with indices larger than those
// in max_position; if this thread is larger than max_positions's
// thread, we can delete all sources; otherwise we do nothing.
if ( max_position.tid == tid )
{
for ( synindex syn_id = max_position.syn_id; syn_id < sources_[ tid ].size(); ++syn_id )
{
BlockVector< Source >& sources = sources_[ tid ][ syn_id ];
if ( max_position.syn_id == syn_id )
{
// we need to add 2 to max_position.lcid since
// max_position.lcid + 1 can contain a valid entry which we
// do not want to delete.
if ( max_position.lcid + 2 < static_cast< long >( sources.size() ) )
{
sources.erase( sources.begin() + max_position.lcid + 2, sources.end() );
}
}
else
{
assert( max_position.syn_id < syn_id );
sources.clear();
}
}
}
else if ( max_position.tid < tid )
{
sources_[ tid ].clear();
}
else
{
// do nothing
assert( tid < max_position.tid );
}
}
nest::index
nest::SourceTable::get_node_id( const thread tid, const synindex syn_id, const index lcid ) const
{
if ( not kernel().connection_manager.get_keep_source_table() )
{
throw KernelException( "Cannot use SourceTable::get_node_id when get_keep_source_table is false" );
}
return sources_[ tid ][ syn_id ][ lcid ].get_node_id();
}
nest::index
nest::SourceTable::remove_disabled_sources( const thread tid, const synindex syn_id )
{
if ( sources_[ tid ].size() <= syn_id )
{
return invalid_index;
}
BlockVector< Source >& mysources = sources_[ tid ][ syn_id ];
const index max_size = mysources.size();
if ( max_size == 0 )
{
return invalid_index;
}
// lcid needs to be signed, to allow lcid >= 0 check in while loop
// to fail; afterwards we can be certain that it is non-negative and
// we can static_cast it to index
long lcid = max_size - 1;
while ( lcid >= 0 and mysources[ lcid ].is_disabled() )
{
--lcid;
}
++lcid; // lcid marks first disabled source, but the while loop only
// exits if lcid points at a not disabled element, hence we
// need to increase it by one again
mysources.erase( mysources.begin() + lcid, mysources.end() );
if ( static_cast< index >( lcid ) == max_size )
{
return invalid_index;
}
return static_cast< index >( lcid );
}
void
nest::SourceTable::compute_buffer_pos_for_unique_secondary_sources( const thread tid,
std::map< index, size_t >& buffer_pos_of_source_node_id_syn_id )
{
// set of unique sources & synapse types, required to determine
// secondary events MPI buffer positions
// initialized and deleted by thread 0 in this method
static std::set< std::pair< index, size_t > >* unique_secondary_source_node_id_syn_id;
#pragma omp single
{
unique_secondary_source_node_id_syn_id = new std::set< std::pair< index, size_t > >();
}
// collect all unique pairs of source node ID and synapse-type id
// corresponding to continuous-data connections on this MPI rank;
// using a set makes sure secondary events are not duplicated for
// targets on the same process, but different threads
for ( size_t syn_id = 0; syn_id < sources_[ tid ].size(); ++syn_id )
{
if ( not kernel().model_manager.get_synapse_prototype( syn_id, tid ).is_primary() )
{
for ( BlockVector< Source >::const_iterator source_cit = sources_[ tid ][ syn_id ].begin();
source_cit != sources_[ tid ][ syn_id ].end();
++source_cit )
{
#pragma omp critical
{
( *unique_secondary_source_node_id_syn_id ).insert( std::make_pair( source_cit->get_node_id(), syn_id ) );
}
}
}
}
#pragma omp barrier
#pragma omp single
{
// compute receive buffer positions for all unique pairs of source
// node ID and synapse-type id on this MPI rank
std::vector< int > recv_counts_secondary_events_in_int_per_rank( kernel().mpi_manager.get_num_processes(), 0 );
for (
std::set< std::pair< index, size_t > >::const_iterator cit = ( *unique_secondary_source_node_id_syn_id ).begin();
cit != ( *unique_secondary_source_node_id_syn_id ).end();
++cit )
{
const thread source_rank = kernel().mpi_manager.get_process_id_of_node_id( cit->first );
const size_t event_size = kernel().model_manager.get_secondary_event_prototype( cit->second, tid ).size();
buffer_pos_of_source_node_id_syn_id.insert(
std::make_pair( pack_source_node_id_and_syn_id( cit->first, cit->second ),
recv_counts_secondary_events_in_int_per_rank[ source_rank ] ) );
recv_counts_secondary_events_in_int_per_rank[ source_rank ] += event_size;
}
// each chunk needs to contain one additional int that can be used
// to communicate whether waveform relaxation has converged
for ( auto& recv_count : recv_counts_secondary_events_in_int_per_rank )
{
++recv_count;
}
kernel().mpi_manager.set_recv_counts_secondary_events_in_int_per_rank(
recv_counts_secondary_events_in_int_per_rank );
delete unique_secondary_source_node_id_syn_id;
} // of omp single
}
void
nest::SourceTable::resize_sources( const thread tid )
{
sources_[ tid ].resize( kernel().model_manager.get_num_synapse_prototypes() );
}
bool
nest::SourceTable::source_should_be_processed_( const thread rank_start,
const thread rank_end,
const Source& source ) const
{
const thread source_rank = kernel().mpi_manager.get_process_id_of_node_id( source.get_node_id() );
return not( source.is_processed() or source.is_disabled()
// is this thread responsible for this part of the MPI
// buffer?
or source_rank < rank_start
or rank_end <= source_rank );
}
bool
nest::SourceTable::next_entry_has_same_source_( const SourceTablePosition& current_position,
const Source& current_source ) const
{
assert( not current_position.is_invalid() );
const auto& local_sources = sources_[ current_position.tid ][ current_position.syn_id ];
const size_t next_lcid = current_position.lcid + 1;
return (
next_lcid < local_sources.size() and local_sources[ next_lcid ].get_node_id() == current_source.get_node_id() );
}
bool
nest::SourceTable::previous_entry_has_same_source_( const SourceTablePosition& current_position,
const Source& current_source ) const
{
assert( not current_position.is_invalid() );
const auto& local_sources = sources_[ current_position.tid ][ current_position.syn_id ];
const long previous_lcid = current_position.lcid - 1; // needs to be a signed type such that negative
// values can signal invalid indices
return ( previous_lcid >= 0 and not local_sources[ previous_lcid ].is_processed()
and local_sources[ previous_lcid ].get_node_id() == current_source.get_node_id() );
}
bool
nest::SourceTable::populate_target_data_fields_( const SourceTablePosition& current_position,
const Source& current_source,
const thread source_rank,
TargetData& next_target_data ) const
{
const auto node_id = current_source.get_node_id();
// set values of next_target_data
next_target_data.set_source_lid( kernel().vp_manager.node_id_to_lid( node_id ) );
next_target_data.set_source_tid( kernel().vp_manager.vp_to_thread( kernel().vp_manager.node_id_to_vp( node_id ) ) );
next_target_data.reset_marker();
if ( current_source.is_primary() ) // primary connection, i.e., chemical synapses
{
next_target_data.set_is_primary( true );
TargetDataFields& target_fields = next_target_data.target_data;
target_fields.set_syn_id( current_position.syn_id );
if ( kernel().connection_manager.use_compressed_spikes() )
{
// WARNING: we set the tid field here to zero just to make sure
// it has a defined value; however, this value is _not_ used
// anywhere when using compressed spikes
target_fields.set_tid( 0 );
auto it_idx = compressed_spike_data_map_.at( current_position.tid )
.at( current_position.syn_id )
.find( current_source.get_node_id() );
if ( it_idx != compressed_spike_data_map_.at( current_position.tid ).at( current_position.syn_id ).end() )
{
// WARNING: no matter how tempting, do not try to remove this
// entry from the compressed_spike_data_map_; if the MPI buffer
// is already full, this entry will need to be communicated the
// next MPI comm round, which, naturally, is not possible if it
// has been removed
target_fields.set_lcid( it_idx->second );
}
else // another thread is responsible for communicating this compressed source
{
return false;
}
}
else
{
// we store the thread index of the source table, not our own tid!
target_fields.set_tid( current_position.tid );
target_fields.set_lcid( current_position.lcid );
}
}
else // secondary connection, e.g., gap junctions
{
next_target_data.set_is_primary( false );
// the source rank will write to the buffer position relative to
// the first position from the absolute position in the receive
// buffer
const size_t relative_recv_buffer_pos = kernel().connection_manager.get_secondary_recv_buffer_position(
current_position.tid, current_position.syn_id, current_position.lcid )
- kernel().mpi_manager.get_recv_displacement_secondary_events_in_int( source_rank );
SecondaryTargetDataFields& secondary_fields = next_target_data.secondary_data;
secondary_fields.set_recv_buffer_pos( relative_recv_buffer_pos );
secondary_fields.set_syn_id( current_position.syn_id );
}
return true;
}
bool
nest::SourceTable::get_next_target_data( const thread tid,
const thread rank_start,
const thread rank_end,
thread& source_rank,
TargetData& next_target_data )
{
SourceTablePosition& current_position = current_positions_[ tid ];
if ( current_position.is_invalid() )
{
return false; // nothing to do here
}
// we stay in this loop either until we can return a valid
// TargetData object or we have reached the end of the sources table
while ( true )
{
current_position.seek_to_next_valid_index( sources_ );
if ( current_position.is_invalid() )
{
return false; // reached the end of the sources table
}
// the current position contains an entry, so we retrieve it
Source& current_source = sources_[ current_position.tid ][ current_position.syn_id ][ current_position.lcid ];
if ( not source_should_be_processed_( rank_start, rank_end, current_source ) )
{
current_position.decrease();
continue;
}
// we need to set a marker stating whether the entry following this
// entry, if existent, has the same source
kernel().connection_manager.set_source_has_more_targets( current_position.tid,
current_position.syn_id,
current_position.lcid,
next_entry_has_same_source_( current_position, current_source ) );
// no need to communicate this entry if the previous entry has the same source
if ( previous_entry_has_same_source_( current_position, current_source ) )
{
current_source.set_processed( true ); // no need to look at this entry again
current_position.decrease();
continue;
}
// reaching this means we found an entry that should be
// communicated via MPI, so we prepare to return the relevant data
// set the source rank
source_rank = kernel().mpi_manager.get_process_id_of_node_id( current_source.get_node_id() );
if ( not populate_target_data_fields_( current_position, current_source, source_rank, next_target_data ) )
{
current_position.decrease();
continue;
}
// we are about to return a valid entry, so mark it as processed
current_source.set_processed( true );
current_position.decrease();
return true; // found a valid entry
}
}
void
nest::SourceTable::resize_compressible_sources()
{
for ( thread tid = 0; tid < static_cast< thread >( compressible_sources_.size() ); ++tid )
{
compressible_sources_[ tid ].clear();
compressible_sources_[ tid ].resize(
kernel().model_manager.get_num_synapse_prototypes(), std::map< index, SpikeData >() );
}
}
void
nest::SourceTable::collect_compressible_sources( const thread tid )
{
for ( synindex syn_id = 0; syn_id < sources_[ tid ].size(); ++syn_id )
{
index lcid = 0;
auto& syn_sources = sources_[ tid ][ syn_id ];
while ( lcid < syn_sources.size() )
{
const index old_source_node_id = syn_sources[ lcid ].get_node_id();
const std::pair< index, SpikeData > source_node_id_to_spike_data =
std::make_pair( old_source_node_id, SpikeData( tid, syn_id, lcid, 0 ) );
compressible_sources_[ tid ][ syn_id ].insert( source_node_id_to_spike_data );
// find next source with different node_id (assumes sorted sources)
++lcid;
while ( ( lcid < syn_sources.size() ) and ( syn_sources[ lcid ].get_node_id() == old_source_node_id ) )
{
++lcid;
}
}
}
}
void
nest::SourceTable::fill_compressed_spike_data(
std::vector< std::vector< std::vector< SpikeData > > >& compressed_spike_data )
{
compressed_spike_data.clear();
compressed_spike_data.resize( kernel().model_manager.get_num_synapse_prototypes() );
for ( thread tid = 0; tid < static_cast< thread >( compressible_sources_.size() ); ++tid )
{
compressed_spike_data_map_[ tid ].clear();
compressed_spike_data_map_[ tid ].resize(
kernel().model_manager.get_num_synapse_prototypes(), std::map< index, size_t >() );
}
// pseudo-random thread selector to balance memory usage across
// threads of compressed_spike_data_map_
size_t thread_idx = 0;
// for each local thread and each synapse type we will populate this
// vector with spike data containing information about all process
// local targets
std::vector< SpikeData > spike_data;
for ( thread tid = 0; tid < static_cast< thread >( compressible_sources_.size() ); ++tid )
{
for ( synindex syn_id = 0; syn_id < compressible_sources_[ tid ].size(); ++syn_id )
{
for ( auto it = compressible_sources_[ tid ][ syn_id ].begin();
it != compressible_sources_[ tid ][ syn_id ].end(); )
{
spike_data.clear();
// add target position on this thread
spike_data.push_back( it->second );
// add target positions on all other threads
for ( thread other_tid = tid + 1; other_tid < static_cast< thread >( compressible_sources_.size() );
++other_tid )
{
auto other_it = compressible_sources_[ other_tid ][ syn_id ].find( it->first );
if ( other_it != compressible_sources_[ other_tid ][ syn_id ].end() )
{
spike_data.push_back( other_it->second );
compressible_sources_[ other_tid ][ syn_id ].erase( other_it );
}
}
// WARNING: store source-node-id -> process-global-synapse
// association in compressed_spike_data_map on a
// pseudo-randomly selected thread which houses targets for
// this source; this tries to balance memory usage of this
// data structure across threads
const thread responsible_tid = spike_data[ thread_idx % spike_data.size() ].get_tid();
++thread_idx;
compressed_spike_data_map_[ responsible_tid ][ syn_id ].insert(
std::make_pair( it->first, compressed_spike_data[ syn_id ].size() ) );
compressed_spike_data[ syn_id ].push_back( spike_data );
it = compressible_sources_[ tid ][ syn_id ].erase( it );
}
compressible_sources_[ tid ][ syn_id ].clear();
}
}
}