1 //  Copyright (c) 2015-2017 John Biddiscombe
2 //  Copyright (c) 2017      Thomas Heller
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include <plugins/parcelport/libfabric/header.hpp>
8 #include <plugins/parcelport/libfabric/libfabric_region_provider.hpp>
9 #include <plugins/parcelport/libfabric/parcelport_libfabric.hpp>
10 #include <plugins/parcelport/libfabric/sender.hpp>
11 #include <plugins/parcelport/rma_memory_pool.hpp>
12 //
13 #include <hpx/util/assert.hpp>
14 #include <hpx/util/atomic_count.hpp>
15 #include <hpx/util/high_resolution_timer.hpp>
16 #include <hpx/util/unique_function.hpp>
17 #include <hpx/util/yield_while.hpp>
18 //
19 #include <rdma/fi_endpoint.h>
20 //
21 #include <memory>
22 #include <cstddef>
23 #include <cstring>
24 
25 namespace hpx {
26 namespace parcelset {
27 namespace policies {
28 namespace libfabric
29 {
30     // --------------------------------------------------------------------
31     // The main message send routine
async_write_impl()32     void sender::async_write_impl()
33     {
34         buffer_.data_point_.time_ = util::high_resolution_clock::now();
35         HPX_ASSERT(message_region_ == nullptr);
36         HPX_ASSERT(completion_count_ == 0);
37         // increment counter of total messages sent
38         ++sends_posted_;
39 
40         // for each zerocopy chunk, we must create a memory region for the data
41         // do this before creating the header as the chunk details will be copied
42         // into the header space
43         int index = 0;
44         for (auto &c : buffer_.chunks_)
45         {
46             // Debug only, dump out the chunk info
47             LOG_DEBUG_MSG("write : chunk : size " << hexnumber(c.size_)
48                     << " type " << decnumber((uint64_t)c.type_)
49                     << " rkey " << hexpointer(c.rkey_)
50                     << " cpos " << hexpointer(c.data_.cpos_)
51                     << " index " << decnumber(c.data_.index_));
52             if (c.type_ == serialization::chunk_type_pointer)
53             {
54                 LOG_EXCLUSIVE(util::high_resolution_timer regtimer);
55 
56                 // create a new memory region from the user supplied pointer
57                 region_type *zero_copy_region =
58                     new region_type(domain_, c.data_.cpos_, c.size_);
59 
60                 rma_regions_.push_back(zero_copy_region);
61 
62                 // set the region remote access key in the chunk space
63                 c.rkey_  = zero_copy_region->get_remote_key();
64                     LOG_DEBUG_MSG("Time to register memory (ns) "
65                         << decnumber(regtimer.elapsed_nanoseconds()));
66                 LOG_DEBUG_MSG("Created zero-copy rdma Get region "
67                     << decnumber(index) << *zero_copy_region
68                     << "for rkey " << hexpointer(c.rkey_));
69 
70                 LOG_TRACE_MSG(
71                     CRC32_MEM(zero_copy_region->get_address(),
72                         zero_copy_region->get_message_length(),
73                         "zero_copy_region (pre-send) "));
74             }
75             ++index;
76         }
77 
78         // create the header using placement new in the pinned memory block
79         char *header_memory = (char*)(header_region_->get_address());
80 
81         LOG_DEBUG_MSG("Placement new for header");
82         header_ = new(header_memory) header_type(buffer_, this);
83         header_region_->set_message_length(header_->header_length());
84 
85         LOG_DEBUG_MSG("sender " << hexpointer(this)
86             << ", buffsize " << hexuint32(header_->message_size())
87             << ", header_length " << decnumber(header_->header_length())
88             << ", chunks zerocopy( " << decnumber(buffer_.num_chunks_.first) << ") "
89             << ", normal( " << decnumber(buffer_.num_chunks_.second) << ") "
90             << ", chunk_flag " << decnumber(header_->header_length())
91             << ", tag " << hexuint64(header_->tag())
92         );
93 
94         // reserve some space for zero copy information
95         rma_regions_.reserve(buffer_.num_chunks_.first);
96 
97         // Get the block of pinned memory where the message was encoded
98         // during serialization
99         message_region_ = buffer_.data_.m_region_;
100         message_region_->set_message_length(header_->message_size());
101 
102         HPX_ASSERT(header_->message_size() == buffer_.data_.size());
103         LOG_DEBUG_MSG("Found region allocated during encode_parcel : address "
104             << hexpointer(buffer_.data_.m_array_)
105             << " region "<< *message_region_);
106 
107         // The number of completions we need before cleaning up:
108         // 1 (header block send) + 1 (ack message if we have RMA chunks)
109         completion_count_ = 1;
110         region_list_[0] = {
111             header_region_->get_address(), header_region_->get_message_length() };
112         region_list_[1] = {
113             message_region_->get_address(), message_region_->get_message_length() };
114 
115         desc_[0] = header_region_->get_desc();
116         desc_[1] = message_region_->get_desc();
117         if (rma_regions_.size()>0 || !header_->message_piggy_back()) {
118             completion_count_ = 2;
119         }
120 
121         if (header_->chunk_data()) {
122             LOG_DEBUG_MSG("Sender " << hexpointer(this)
123                 << "Chunk info is piggybacked");
124         }
125         else {
126             LOG_DEBUG_MSG("Setting up header-chunk rma data with "
127                 << "zero-copy chunks " << decnumber(rma_regions_.size()));
128             auto &cb = header_->chunk_header_ptr()->chunk_rma;
129             chunk_region_  = memory_pool_->allocate_region(cb.size_);
130             cb.data_.pos_  = chunk_region_->get_address();
131             cb.rkey_       = chunk_region_->get_remote_key();
132             std::memcpy(cb.data_.pos_, buffer_.chunks_.data(), cb.size_);
133             LOG_DEBUG_MSG("Set up header-chunk rma data with "
134                 << "size " << decnumber(cb.size_)
135                 << "rkey " << hexpointer(cb.rkey_)
136                 << "addr " << hexpointer(cb.data_.cpos_));
137         }
138 
139         if (header_->message_piggy_back())
140         {
141             LOG_DEBUG_MSG("Sender " << hexpointer(this)
142                 << "Main message is piggybacked");
143 
144             LOG_TRACE_MSG(CRC32_MEM(header_region_->get_address(),
145                 header_region_->get_message_length(),
146                 "Header region (send piggyback)"));
147 
148             LOG_TRACE_MSG(CRC32_MEM(message_region_->get_address(),
149                 message_region_->get_message_length(),
150                 "Message region (send piggyback)"));
151 
152             // send 2 regions as one message, goes into one receive
153             hpx::util::yield_while([this]()
154                 {
155                     int ret = fi_sendv(this->endpoint_, this->region_list_,
156                         this->desc_, 2, this->dst_addr_, this);
157 
158                     if (ret == -FI_EAGAIN)
159                     {
160                         LOG_ERROR_MSG("reposting fi_sendv...\n");
161                         return true;
162                     }
163                     else if (ret)
164                     {
165                         throw fabric_error(ret, "fi_sendv");
166                     }
167 
168                     return false;
169                 }, "sender::async_write");
170         }
171         else
172         {
173             header_->set_message_rdma_info(
174                 message_region_->get_remote_key(), message_region_->get_address());
175 
176             LOG_DEBUG_MSG("Sender " << hexpointer(this)
177                 << "message region NOT piggybacked "
178                 << hexnumber(buffer_.data_.size())
179                 << *message_region_);
180 
181             LOG_TRACE_MSG(CRC32_MEM(header_region_->get_address(),
182                 header_region_->get_message_length(),
183                 "Header region (pre-send)"));
184 
185             LOG_TRACE_MSG(CRC32_MEM(message_region_->get_address(),
186                 message_region_->get_message_length(),
187                 "Message region (send for rdma fetch)"));
188 
189             // send just the header region - a single message
190             hpx::util::yield_while([this]()
191                 {
192                     int ret = fi_send(this->endpoint_,
193                         this->region_list_[0].iov_base,
194                         this->region_list_[0].iov_len,
195                         this->desc_[0], this->dst_addr_, this);
196 
197                     if (ret == -FI_EAGAIN)
198                     {
199                         LOG_ERROR_MSG("reposting fi_send...\n");
200                         return true;
201                     }
202                     else if (ret)
203                     {
204                         throw fabric_error(ret, "fi_sendv");
205                     }
206 
207                     return false;
208                 }, "sender::async_write");
209         }
210 
211         FUNC_END_DEBUG_MSG;
212     }
213 
214     // --------------------------------------------------------------------
handle_send_completion()215     void sender::handle_send_completion()
216     {
217         LOG_DEBUG_MSG("Sender " << hexpointer(this)
218             << "handle send_completion "
219             << "RMA regions " << decnumber(rma_regions_.size())
220             << "completion count " << decnumber(completion_count_));
221         cleanup();
222     }
223 
224     // --------------------------------------------------------------------
handle_message_completion_ack()225     void sender::handle_message_completion_ack()
226     {
227         LOG_DEBUG_MSG("Sender " << hexpointer(this)
228             << "handle handle_message_completion_ack ( "
229             << "RMA regions " << decnumber(rma_regions_.size())
230             << "completion count " << decnumber(completion_count_));
231         ++acks_received_;
232         cleanup();
233     }
234 
235     // --------------------------------------------------------------------
cleanup()236     void sender::cleanup()
237     {
238         LOG_DEBUG_MSG("Sender " << hexpointer(this)
239             << "decrementing completion_count from " << decnumber(completion_count_));
240 
241         // if we need to wait for more completion events, return without cleaning
242         if (--completion_count_ > 0)
243             return;
244 
245         // track deletions
246         ++sends_deleted_;
247 
248         error_code ec;
249         handler_(ec);
250         handler_.reset();
251 
252         // cleanup header and message region
253         memory_pool_->deallocate(message_region_);
254         message_region_ = nullptr;
255         header_         = nullptr;
256         // cleanup chunk region
257         if (chunk_region_) {
258             memory_pool_->deallocate(chunk_region_);
259             chunk_region_ = nullptr;
260         }
261 
262         for (auto& region: rma_regions_) {
263             memory_pool_->deallocate(region);
264         }
265         rma_regions_.clear();
266         buffer_.data_point_.time_ =
267             util::high_resolution_clock::now() - buffer_.data_point_.time_;
268         parcelport_->add_sent_data(buffer_.data_point_);
269         postprocess_handler_(this);
270     }
271 
272     // --------------------------------------------------------------------
handle_error(struct fi_cq_err_entry err)273     void sender::handle_error(struct fi_cq_err_entry err)
274     {
275         LOG_ERROR_MSG("resending message after error " << hexpointer(this));
276 
277         if (header_->message_piggy_back())
278         {
279             // send 2 regions as one message, goes into one receive
280             hpx::util::yield_while([this]()
281                 {
282                     int ret = fi_sendv(this->endpoint_, this->region_list_,
283                         this->desc_, 2, this->dst_addr_, this);
284 
285                     if (ret == -FI_EAGAIN)
286                     {
287                         LOG_ERROR_MSG("reposting fi_sendv...\n");
288                         return true;
289                     }
290                     else if (ret)
291                     {
292                         throw fabric_error(ret, "fi_sendv");
293                     }
294 
295                     return false;
296                 }, "libfabric::sender::handle_error");
297         }
298         else
299         {
300             header_->set_message_rdma_info(
301                 message_region_->get_remote_key(), message_region_->get_address());
302 
303             // send just the header region - a single message
304             hpx::util::yield_while([this]()
305                 {
306                     int ret = fi_send(this->endpoint_,
307                         this->region_list_[0].iov_base,
308                         this->region_list_[0].iov_len,
309                         this->desc_[0], this->dst_addr_, this);
310 
311                     if (ret == -FI_EAGAIN)
312                     {
313                         LOG_ERROR_MSG("reposting fi_send...\n");
314                         return true;
315                     }
316                     else if (ret)
317                     {
318                         throw fabric_error(ret, "fi_sendv");
319                     }
320 
321                     return false;
322                 }, "libfabric::sender::handle_error");
323         }
324     }
325 
326 }}}}
327