1 // Copyright (c) 2015-2017 John Biddiscombe 2 // Copyright (c) 2017 Thomas Heller 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #include <plugins/parcelport/libfabric/header.hpp> 8 #include <plugins/parcelport/libfabric/libfabric_region_provider.hpp> 9 #include <plugins/parcelport/libfabric/parcelport_libfabric.hpp> 10 #include <plugins/parcelport/libfabric/sender.hpp> 11 #include <plugins/parcelport/rma_memory_pool.hpp> 12 // 13 #include <hpx/util/assert.hpp> 14 #include <hpx/util/atomic_count.hpp> 15 #include <hpx/util/high_resolution_timer.hpp> 16 #include <hpx/util/unique_function.hpp> 17 #include <hpx/util/yield_while.hpp> 18 // 19 #include <rdma/fi_endpoint.h> 20 // 21 #include <memory> 22 #include <cstddef> 23 #include <cstring> 24 25 namespace hpx { 26 namespace parcelset { 27 namespace policies { 28 namespace libfabric 29 { 30 // -------------------------------------------------------------------- 31 // The main message send routine async_write_impl()32 void sender::async_write_impl() 33 { 34 buffer_.data_point_.time_ = util::high_resolution_clock::now(); 35 HPX_ASSERT(message_region_ == nullptr); 36 HPX_ASSERT(completion_count_ == 0); 37 // increment counter of total messages sent 38 ++sends_posted_; 39 40 // for each zerocopy chunk, we must create a memory region for the data 41 // do this before creating the header as the chunk details will be copied 42 // into the header space 43 int index = 0; 44 for (auto &c : buffer_.chunks_) 45 { 46 // Debug only, dump out the chunk info 47 LOG_DEBUG_MSG("write : chunk : size " << hexnumber(c.size_) 48 << " type " << decnumber((uint64_t)c.type_) 49 << " rkey " << hexpointer(c.rkey_) 50 << " cpos " << hexpointer(c.data_.cpos_) 51 << " index " << decnumber(c.data_.index_)); 52 if (c.type_ == serialization::chunk_type_pointer) 53 { 54 LOG_EXCLUSIVE(util::high_resolution_timer regtimer); 55 56 // create a new memory region from the user supplied pointer 57 region_type *zero_copy_region = 58 new region_type(domain_, c.data_.cpos_, c.size_); 59 60 rma_regions_.push_back(zero_copy_region); 61 62 // set the region remote access key in the chunk space 63 c.rkey_ = zero_copy_region->get_remote_key(); 64 LOG_DEBUG_MSG("Time to register memory (ns) " 65 << decnumber(regtimer.elapsed_nanoseconds())); 66 LOG_DEBUG_MSG("Created zero-copy rdma Get region " 67 << decnumber(index) << *zero_copy_region 68 << "for rkey " << hexpointer(c.rkey_)); 69 70 LOG_TRACE_MSG( 71 CRC32_MEM(zero_copy_region->get_address(), 72 zero_copy_region->get_message_length(), 73 "zero_copy_region (pre-send) ")); 74 } 75 ++index; 76 } 77 78 // create the header using placement new in the pinned memory block 79 char *header_memory = (char*)(header_region_->get_address()); 80 81 LOG_DEBUG_MSG("Placement new for header"); 82 header_ = new(header_memory) header_type(buffer_, this); 83 header_region_->set_message_length(header_->header_length()); 84 85 LOG_DEBUG_MSG("sender " << hexpointer(this) 86 << ", buffsize " << hexuint32(header_->message_size()) 87 << ", header_length " << decnumber(header_->header_length()) 88 << ", chunks zerocopy( " << decnumber(buffer_.num_chunks_.first) << ") " 89 << ", normal( " << decnumber(buffer_.num_chunks_.second) << ") " 90 << ", chunk_flag " << decnumber(header_->header_length()) 91 << ", tag " << hexuint64(header_->tag()) 92 ); 93 94 // reserve some space for zero copy information 95 rma_regions_.reserve(buffer_.num_chunks_.first); 96 97 // Get the block of pinned memory where the message was encoded 98 // during serialization 99 message_region_ = buffer_.data_.m_region_; 100 message_region_->set_message_length(header_->message_size()); 101 102 HPX_ASSERT(header_->message_size() == buffer_.data_.size()); 103 LOG_DEBUG_MSG("Found region allocated during encode_parcel : address " 104 << hexpointer(buffer_.data_.m_array_) 105 << " region "<< *message_region_); 106 107 // The number of completions we need before cleaning up: 108 // 1 (header block send) + 1 (ack message if we have RMA chunks) 109 completion_count_ = 1; 110 region_list_[0] = { 111 header_region_->get_address(), header_region_->get_message_length() }; 112 region_list_[1] = { 113 message_region_->get_address(), message_region_->get_message_length() }; 114 115 desc_[0] = header_region_->get_desc(); 116 desc_[1] = message_region_->get_desc(); 117 if (rma_regions_.size()>0 || !header_->message_piggy_back()) { 118 completion_count_ = 2; 119 } 120 121 if (header_->chunk_data()) { 122 LOG_DEBUG_MSG("Sender " << hexpointer(this) 123 << "Chunk info is piggybacked"); 124 } 125 else { 126 LOG_DEBUG_MSG("Setting up header-chunk rma data with " 127 << "zero-copy chunks " << decnumber(rma_regions_.size())); 128 auto &cb = header_->chunk_header_ptr()->chunk_rma; 129 chunk_region_ = memory_pool_->allocate_region(cb.size_); 130 cb.data_.pos_ = chunk_region_->get_address(); 131 cb.rkey_ = chunk_region_->get_remote_key(); 132 std::memcpy(cb.data_.pos_, buffer_.chunks_.data(), cb.size_); 133 LOG_DEBUG_MSG("Set up header-chunk rma data with " 134 << "size " << decnumber(cb.size_) 135 << "rkey " << hexpointer(cb.rkey_) 136 << "addr " << hexpointer(cb.data_.cpos_)); 137 } 138 139 if (header_->message_piggy_back()) 140 { 141 LOG_DEBUG_MSG("Sender " << hexpointer(this) 142 << "Main message is piggybacked"); 143 144 LOG_TRACE_MSG(CRC32_MEM(header_region_->get_address(), 145 header_region_->get_message_length(), 146 "Header region (send piggyback)")); 147 148 LOG_TRACE_MSG(CRC32_MEM(message_region_->get_address(), 149 message_region_->get_message_length(), 150 "Message region (send piggyback)")); 151 152 // send 2 regions as one message, goes into one receive 153 hpx::util::yield_while([this]() 154 { 155 int ret = fi_sendv(this->endpoint_, this->region_list_, 156 this->desc_, 2, this->dst_addr_, this); 157 158 if (ret == -FI_EAGAIN) 159 { 160 LOG_ERROR_MSG("reposting fi_sendv...\n"); 161 return true; 162 } 163 else if (ret) 164 { 165 throw fabric_error(ret, "fi_sendv"); 166 } 167 168 return false; 169 }, "sender::async_write"); 170 } 171 else 172 { 173 header_->set_message_rdma_info( 174 message_region_->get_remote_key(), message_region_->get_address()); 175 176 LOG_DEBUG_MSG("Sender " << hexpointer(this) 177 << "message region NOT piggybacked " 178 << hexnumber(buffer_.data_.size()) 179 << *message_region_); 180 181 LOG_TRACE_MSG(CRC32_MEM(header_region_->get_address(), 182 header_region_->get_message_length(), 183 "Header region (pre-send)")); 184 185 LOG_TRACE_MSG(CRC32_MEM(message_region_->get_address(), 186 message_region_->get_message_length(), 187 "Message region (send for rdma fetch)")); 188 189 // send just the header region - a single message 190 hpx::util::yield_while([this]() 191 { 192 int ret = fi_send(this->endpoint_, 193 this->region_list_[0].iov_base, 194 this->region_list_[0].iov_len, 195 this->desc_[0], this->dst_addr_, this); 196 197 if (ret == -FI_EAGAIN) 198 { 199 LOG_ERROR_MSG("reposting fi_send...\n"); 200 return true; 201 } 202 else if (ret) 203 { 204 throw fabric_error(ret, "fi_sendv"); 205 } 206 207 return false; 208 }, "sender::async_write"); 209 } 210 211 FUNC_END_DEBUG_MSG; 212 } 213 214 // -------------------------------------------------------------------- handle_send_completion()215 void sender::handle_send_completion() 216 { 217 LOG_DEBUG_MSG("Sender " << hexpointer(this) 218 << "handle send_completion " 219 << "RMA regions " << decnumber(rma_regions_.size()) 220 << "completion count " << decnumber(completion_count_)); 221 cleanup(); 222 } 223 224 // -------------------------------------------------------------------- handle_message_completion_ack()225 void sender::handle_message_completion_ack() 226 { 227 LOG_DEBUG_MSG("Sender " << hexpointer(this) 228 << "handle handle_message_completion_ack ( " 229 << "RMA regions " << decnumber(rma_regions_.size()) 230 << "completion count " << decnumber(completion_count_)); 231 ++acks_received_; 232 cleanup(); 233 } 234 235 // -------------------------------------------------------------------- cleanup()236 void sender::cleanup() 237 { 238 LOG_DEBUG_MSG("Sender " << hexpointer(this) 239 << "decrementing completion_count from " << decnumber(completion_count_)); 240 241 // if we need to wait for more completion events, return without cleaning 242 if (--completion_count_ > 0) 243 return; 244 245 // track deletions 246 ++sends_deleted_; 247 248 error_code ec; 249 handler_(ec); 250 handler_.reset(); 251 252 // cleanup header and message region 253 memory_pool_->deallocate(message_region_); 254 message_region_ = nullptr; 255 header_ = nullptr; 256 // cleanup chunk region 257 if (chunk_region_) { 258 memory_pool_->deallocate(chunk_region_); 259 chunk_region_ = nullptr; 260 } 261 262 for (auto& region: rma_regions_) { 263 memory_pool_->deallocate(region); 264 } 265 rma_regions_.clear(); 266 buffer_.data_point_.time_ = 267 util::high_resolution_clock::now() - buffer_.data_point_.time_; 268 parcelport_->add_sent_data(buffer_.data_point_); 269 postprocess_handler_(this); 270 } 271 272 // -------------------------------------------------------------------- handle_error(struct fi_cq_err_entry err)273 void sender::handle_error(struct fi_cq_err_entry err) 274 { 275 LOG_ERROR_MSG("resending message after error " << hexpointer(this)); 276 277 if (header_->message_piggy_back()) 278 { 279 // send 2 regions as one message, goes into one receive 280 hpx::util::yield_while([this]() 281 { 282 int ret = fi_sendv(this->endpoint_, this->region_list_, 283 this->desc_, 2, this->dst_addr_, this); 284 285 if (ret == -FI_EAGAIN) 286 { 287 LOG_ERROR_MSG("reposting fi_sendv...\n"); 288 return true; 289 } 290 else if (ret) 291 { 292 throw fabric_error(ret, "fi_sendv"); 293 } 294 295 return false; 296 }, "libfabric::sender::handle_error"); 297 } 298 else 299 { 300 header_->set_message_rdma_info( 301 message_region_->get_remote_key(), message_region_->get_address()); 302 303 // send just the header region - a single message 304 hpx::util::yield_while([this]() 305 { 306 int ret = fi_send(this->endpoint_, 307 this->region_list_[0].iov_base, 308 this->region_list_[0].iov_len, 309 this->desc_[0], this->dst_addr_, this); 310 311 if (ret == -FI_EAGAIN) 312 { 313 LOG_ERROR_MSG("reposting fi_send...\n"); 314 return true; 315 } 316 else if (ret) 317 { 318 throw fabric_error(ret, "fi_sendv"); 319 } 320 321 return false; 322 }, "libfabric::sender::handle_error"); 323 } 324 } 325 326 }}}} 327