1 /*
2  * Copyright (c) 2013-2017 Cisco Systems, Inc.  All rights reserved
3  * $COPYRIGHT$
4  *
5  * Additional copyrights may follow
6  *
7  * $HEADER$
8  */
9 
10 #ifndef BTL_USNIC_RECV_H
11 #define BTL_USNIC_RECV_H
12 
13 #include "btl_usnic.h"
14 #include "btl_usnic_util.h"
15 #include "btl_usnic_frag.h"
16 #include "btl_usnic_proc.h"
17 
18 
19 void opal_btl_usnic_recv_call(opal_btl_usnic_module_t *module,
20                               opal_btl_usnic_recv_segment_t *rseg,
21                               opal_btl_usnic_channel_t *channel);
22 
23 static inline int
opal_btl_usnic_post_recv_list(opal_btl_usnic_channel_t * channel)24 opal_btl_usnic_post_recv_list(opal_btl_usnic_channel_t *channel)
25 {
26     struct iovec iov;
27     struct fi_msg msg;
28     uint64_t flag;
29     opal_btl_usnic_recv_segment_t *rseg;
30     int rc;
31 
32     msg.msg_iov = &iov;
33     msg.iov_count = 1;
34     for (rseg = channel->repost_recv_head; NULL != rseg; rseg = rseg->rs_next) {
35         msg.context = rseg;
36         iov.iov_base = rseg->rs_protocol_header;
37         iov.iov_len = rseg->rs_len;
38 
39         ++channel->rx_post_cnt;
40         if (OPAL_UNLIKELY((channel->rx_post_cnt & 15) == 0)) {
41             flag = 0;
42         } else {
43             flag = FI_MORE;
44         }
45 
46         rc = fi_recvmsg(channel->ep, &msg, flag);
47         if (0 != rc) {
48             return rc;
49         }
50     }
51     channel->repost_recv_head = NULL;
52 
53     return 0;
54 }
55 
56 /*
57  * Given an incoming segment, lookup the endpoint that sent it
58  */
59 static inline opal_btl_usnic_endpoint_t *
lookup_sender(opal_btl_usnic_module_t * module,opal_btl_usnic_segment_t * seg)60 lookup_sender(opal_btl_usnic_module_t *module, opal_btl_usnic_segment_t *seg)
61 {
62     int ret;
63     opal_btl_usnic_endpoint_t *sender;
64 
65     /* Use the hashed ORTE process name in the BTL header to uniquely
66        identify the sending process (using the MAC/hardware address
67        only identifies the sending server -- not the sending ORTE
68        process). */
69     /* JMS We've experimented with using a handshake before sending
70        any data so that instead of looking up a hash on the
71        btl_header->sender, echo back the ptr to the sender's
72        ompi_proc.  There was limited speedup with this scheme; more
73        investigation is required. */
74     ret = opal_hash_table_get_value_uint64(&module->senders,
75                                            seg->us_btl_header->sender,
76                                            (void**) &sender);
77     if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
78         return sender;
79     }
80 
81     /* The sender wasn't in the hash table, so do a slow lookup and
82        put the result in the hash table */
83     sender = opal_btl_usnic_proc_lookup_endpoint(module,
84                                                  seg->us_btl_header->sender);
85     if (NULL != sender) {
86         opal_hash_table_set_value_uint64(&module->senders,
87                                          seg->us_btl_header->sender, sender);
88         return sender;
89     }
90 
91     /* Whoa -- not found at all! */
92     return NULL;
93 }
94 
95 /*
96  * Packet has been fully processed, update the receive window
97  * to indicate that it and possible following contiguous sequence
98  * numbers have been received.
99  */
100 static inline void
opal_btl_usnic_update_window(opal_btl_usnic_endpoint_t * endpoint,uint32_t window_index)101 opal_btl_usnic_update_window(
102     opal_btl_usnic_endpoint_t *endpoint,
103     uint32_t window_index)
104 {
105     uint32_t i;
106 
107     /* Enable ACK reply if not enabled */
108 #if MSGDEBUG1
109     opal_output(0, "ep: %p, ack_needed = %s\n", (void*)endpoint, endpoint->endpoint_ack_needed?"true":"false");
110 #endif
111     if (!endpoint->endpoint_ack_needed) {
112         opal_btl_usnic_add_to_endpoints_needing_ack(endpoint);
113     }
114 
115     /* A hueristic: set to send this ACK after we have checked our
116        incoming DATA_CHANNEL component.act_iteration_delay times
117        (i.e., so we can piggyback an ACK on an outgoing send) */
118     if (0 == endpoint->endpoint_acktime) {
119         endpoint->endpoint_acktime =
120             get_ticks() + mca_btl_usnic_component.ack_iteration_delay;
121     }
122 
123     /* Save this incoming segment in the received segmentss array on the
124        endpoint. */
125     /* JMS Another optimization: make rcvd_segs be a bitmask (i.e.,
126        more cache friendly) */
127     endpoint->endpoint_rcvd_segs[window_index] = true;
128 
129     /* See if the leftmost segment in the receiver window is
130        occupied.  If so, advance the window.  Repeat until we hit
131        an unoccupied position in the window. */
132     i = endpoint->endpoint_rfstart;
133     while (endpoint->endpoint_rcvd_segs[i]) {
134         endpoint->endpoint_rcvd_segs[i] = false;
135         endpoint->endpoint_next_contig_seq_to_recv++;
136         i = WINDOW_SIZE_MOD(i + 1);
137 
138 #if MSGDEBUG1
139         opal_output(0, "Advance window to %d; next seq to send %" UDSEQ, i,
140                     endpoint->endpoint_next_contig_seq_to_recv);
141 #endif
142     }
143     endpoint->endpoint_rfstart = i;
144 }
145 
146 static inline int
opal_btl_usnic_check_rx_seq(opal_btl_usnic_endpoint_t * endpoint,opal_btl_usnic_recv_segment_t * seg,uint32_t * window_index)147 opal_btl_usnic_check_rx_seq(
148     opal_btl_usnic_endpoint_t *endpoint,
149     opal_btl_usnic_recv_segment_t *seg,
150     uint32_t *window_index)
151 {
152     uint32_t i;
153     opal_btl_usnic_seq_t seq;
154     int delta;
155 
156     /*
157      * Handle piggy-backed ACK if present
158      */
159     if (seg->rs_base.us_btl_header->ack_present) {
160 #if MSGDEBUG1
161         opal_output(0, "Handle piggy-packed ACK seq %"UDSEQ"\n", seg->rs_base.us_btl_header->ack_seq);
162 #endif
163         OPAL_THREAD_LOCK(&btl_usnic_lock);
164         opal_btl_usnic_handle_ack(endpoint,
165                 seg->rs_base.us_btl_header->ack_seq);
166         OPAL_THREAD_UNLOCK(&btl_usnic_lock);
167     }
168 
169     /* Do we have room in the endpoint's receiver window?
170 
171        Receiver window:
172 
173                    |-------- WINDOW_SIZE ----------|
174                   +---------------------------------+
175                   |         highest_seq_rcvd        |
176                   |     somewhere in this range     |
177                   +^--------------------------------+
178                    |
179                    +-- next_contig_seq_to_recv: the window left edge;
180                        will always be less than highest_seq_rcvd
181 
182        The good condition is
183 
184          next_contig_seq_to_recv <= seq < next_contig_seq_to_recv + WINDOW_SIZE
185 
186        And the bad condition is
187 
188          seq < next_contig_seq_to_recv
189            or
190          seq >= next_contig_seg_to_recv + WINDOW_SIZE
191     */
192     seq = seg->rs_base.us_btl_header->pkt_seq;
193     delta = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
194     if (delta < 0 || delta >= WINDOW_SIZE) {
195 #if MSGDEBUG1
196             opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ " outside of window (%" UDSEQ " - %" UDSEQ "), %p, module %p -- DROPPED\n",
197                         (void*)endpoint, seg->rs_base.us_btl_header->pkt_seq,
198                         endpoint->endpoint_next_contig_seq_to_recv,
199                         (endpoint->endpoint_next_contig_seq_to_recv +
200                          WINDOW_SIZE - 1),
201                         (void*) seg,
202                         (void*) endpoint->endpoint_module);
203 #endif
204 
205         /* Stats */
206         if (delta < 0) {
207             ++endpoint->endpoint_module->stats.num_oow_low_recvs;
208         } else {
209             ++endpoint->endpoint_module->stats.num_oow_high_recvs;
210         }
211         goto dup_needs_ack;
212     }
213 
214     /* Ok, this segment is within the receiver window.  Have we
215        already received it?  It's possible that the sender has
216        re-sent a segment that we've already received (but not yet
217        ACKed).
218 
219        We have saved all un-ACKed segment in an array on the
220        endpoint that is the same legnth as the receiver's window
221        (i.e., WINDOW_SIZE).  We can use the incoming segment sequence
222        number to find its position in the array.  It's a little
223        tricky because the left edge of the receiver window keeps
224        moving, so we use a starting reference point in the array
225        that is updated when we sent ACKs (and therefore move the
226        left edge of the receiver's window).
227 
228        So this segment's index into the endpoint array is:
229 
230            rel_posn_in_recv_win = seq - next_contig_seq_to_recv
231            array_posn = (rel_posn_in_recv_win + rfstart) % WINDOW_SIZE
232 
233        rfstart is then updated when we send ACKs:
234 
235            rfstart = (rfstart + num_acks_sent) % WINDOW_SIZE
236     */
237     i = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
238     i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
239     if (endpoint->endpoint_rcvd_segs[i]) {
240 #if MSGDEBUG1
241         opal_output(0, "<-- Received FRAG/CHUNK ep %p, seq %" UDSEQ ", seg %p: duplicate -- DROPPED\n",
242             (void*) endpoint, seg->rs_base.us_btl_header->pkt_seq, (void*) seg);
243 #endif
244         /* highest_seq_rcvd is for debug stats only; it's not used
245            in any window calculations */
246         assert(SEQ_LE(seq, endpoint->endpoint_highest_seq_rcvd));
247         /* next_contig_seq_to_recv-1 is the ack number we'll
248            send */
249         assert (SEQ_GT(seq, endpoint->endpoint_next_contig_seq_to_recv - 1));
250 
251         /* Stats */
252         ++endpoint->endpoint_module->stats.num_dup_recvs;
253         goto dup_needs_ack;
254     }
255 
256     /* Stats: is this the highest sequence number we've received? */
257     if (SEQ_GT(seq, endpoint->endpoint_highest_seq_rcvd)) {
258         endpoint->endpoint_highest_seq_rcvd = seq;
259     }
260 
261     *window_index = i;
262     return 0;
263 
264 dup_needs_ack:
265     if (!endpoint->endpoint_ack_needed) {
266         opal_btl_usnic_add_to_endpoints_needing_ack(endpoint);
267     }
268     return -1;
269 }
270 
271 /*
272  * We have received a segment, take action based on the
273  * packet type in the BTL header.
274  * Try to be fast here - defer as much bookkeeping until later as
275  * possible.
276  * See README.txt for a discussion of receive fastpath
277  */
278 static inline void
opal_btl_usnic_recv_fast(opal_btl_usnic_module_t * module,opal_btl_usnic_recv_segment_t * seg,opal_btl_usnic_channel_t * channel)279 opal_btl_usnic_recv_fast(opal_btl_usnic_module_t *module,
280                          opal_btl_usnic_recv_segment_t *seg,
281                          opal_btl_usnic_channel_t *channel)
282 {
283     opal_btl_usnic_segment_t *bseg;
284     mca_btl_active_message_callback_t* reg;
285     opal_btl_usnic_seq_t seq;
286     opal_btl_usnic_endpoint_t *endpoint;
287     int delta;
288     int i;
289 
290     /* Make the whole payload Valgrind defined */
291     opal_memchecker_base_mem_defined(seg->rs_protocol_header, seg->rs_len);
292 
293     bseg = &seg->rs_base;
294 
295     /* Find out who sent this segment */
296     endpoint = lookup_sender(module, bseg);
297     seg->rs_endpoint = endpoint;
298 
299 #if 0
300 opal_output(0, "fast recv %d bytes:\n", bseg->us_btl_header->payload_len + sizeof(opal_btl_usnic_btl_header_t));
301 opal_btl_usnic_dump_hex(15, USNIC_OUT, bseg->us_btl_header, bseg->us_btl_header->payload_len + sizeof(opal_btl_usnic_btl_header_t));
302 #endif
303     /* If this is a short incoming message (i.e., the message is
304        wholly contained in this one message -- it is not chunked
305        across multiple messages), and it's not a PUT from the sender,
306        then just handle it here. */
307     if (endpoint != NULL && !endpoint->endpoint_exiting &&
308             (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
309                 bseg->us_btl_header->payload_type) &&
310             seg->rs_base.us_btl_header->put_addr == NULL) {
311 
312         seq = seg->rs_base.us_btl_header->pkt_seq;
313         delta = SEQ_DIFF(seq, endpoint->endpoint_next_contig_seq_to_recv);
314         if (delta < 0 || delta >= WINDOW_SIZE) {
315             goto drop;
316         }
317 
318         i = seq - endpoint->endpoint_next_contig_seq_to_recv;
319         i = WINDOW_SIZE_MOD(i + endpoint->endpoint_rfstart);
320         if (endpoint->endpoint_rcvd_segs[i]) {
321             goto drop;
322         }
323 
324         /* Pass this segment up to the PML.
325          * Be sure to get the payload length from the BTL header because
326          * the L2 layer may artificially inflate (or otherwise change)
327          * the frame length to meet minimum sizes, add protocol information,
328          * etc.
329          */
330         reg = mca_btl_base_active_message_trigger + bseg->us_btl_header->tag;
331         seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
332         reg->cbfunc(&module->super, bseg->us_btl_header->tag,
333                     &seg->rs_desc, reg->cbdata);
334 
335 drop:
336         channel->chan_deferred_recv = seg;
337     }
338 
339     /* Otherwise, handle all the other cases the "normal" way */
340     else {
341         opal_btl_usnic_recv_call(module, seg, channel);
342     }
343 }
344 
345 /*
346  */
347 static inline int
opal_btl_usnic_recv_frag_bookkeeping(opal_btl_usnic_module_t * module,opal_btl_usnic_recv_segment_t * seg,opal_btl_usnic_channel_t * channel)348 opal_btl_usnic_recv_frag_bookkeeping(
349     opal_btl_usnic_module_t* module,
350     opal_btl_usnic_recv_segment_t *seg,
351     opal_btl_usnic_channel_t *channel)
352 {
353     opal_btl_usnic_endpoint_t* endpoint;
354     uint32_t window_index;
355     int rc;
356 
357     endpoint = seg->rs_endpoint;
358 
359     /* Valgrind help */
360     opal_memchecker_base_mem_defined(
361                 (void*)(seg->rs_protocol_header), seg->rs_len);
362 
363     ++module->stats.num_total_recvs;
364 
365     /* Do late processing of incoming sequence # */
366     rc = opal_btl_usnic_check_rx_seq(endpoint, seg, &window_index);
367     if (OPAL_UNLIKELY(rc != 0)) {
368         goto repost;
369     }
370 
371     ++module->stats.num_frag_recvs;
372 
373     opal_btl_usnic_update_window(endpoint, window_index);
374 
375 repost:
376     /* if endpoint exiting, and all ACKs received, release the endpoint */
377     if (endpoint->endpoint_exiting && ENDPOINT_DRAINED(endpoint)) {
378         OBJ_RELEASE(endpoint);
379     }
380 
381     ++module->stats.num_recv_reposts;
382 
383     /* Add recv to linked list for reposting */
384     seg->rs_next = channel->repost_recv_head;
385     channel->repost_recv_head = seg;
386 
387     return rc;
388 }
389 
390 /*
391  * We have received a segment, take action based on the
392  * packet type in the BTL header
393  */
394 static inline void
opal_btl_usnic_recv(opal_btl_usnic_module_t * module,opal_btl_usnic_recv_segment_t * seg,opal_btl_usnic_channel_t * channel)395 opal_btl_usnic_recv(opal_btl_usnic_module_t *module,
396                     opal_btl_usnic_recv_segment_t *seg,
397                     opal_btl_usnic_channel_t *channel)
398 {
399     opal_btl_usnic_segment_t *bseg;
400     mca_btl_active_message_callback_t* reg;
401     opal_btl_usnic_endpoint_t *endpoint;
402     int rc;
403 
404     /* Make the whole payload Valgrind defined */
405     opal_memchecker_base_mem_defined(seg->rs_protocol_header, seg->rs_len);
406 
407     bseg = &seg->rs_base;
408 
409     /* Find out who sent this segment */
410     endpoint = lookup_sender(module, bseg);
411     seg->rs_endpoint = endpoint;
412 
413     /* If this is a short incoming message (i.e., the message is
414        wholly contained in this one message -- it is not chunked
415        across multiple messages), and it's not a PUT from the sender,
416        then just handle it here. */
417     if (endpoint != NULL && !endpoint->endpoint_exiting &&
418             (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG ==
419                 bseg->us_btl_header->payload_type) &&
420             seg->rs_base.us_btl_header->put_addr == NULL) {
421 
422         MSGDEBUG1_OUT("<-- Received FRAG (fastpath) ep %p, seq %" UDSEQ ", len=%" PRIu16 "\n",
423                       (void*) endpoint, bseg->us_btl_header->pkt_seq,
424                       bseg->us_btl_header->payload_len);
425 
426         /* do the receive bookkeeping */
427         rc = opal_btl_usnic_recv_frag_bookkeeping(module, seg, channel);
428         if (rc != 0) {
429             return;
430         }
431 
432         /* Pass this segment up to the PML.
433          * Be sure to get the payload length from the BTL header because
434          * the L2 layer may artificially inflate (or otherwise change)
435          * the frame length to meet minimum sizes, add protocol information,
436          * etc.
437          */
438         reg = mca_btl_base_active_message_trigger + bseg->us_btl_header->tag;
439         seg->rs_segment.seg_len = bseg->us_btl_header->payload_len;
440         reg->cbfunc(&module->super, bseg->us_btl_header->tag,
441                     &seg->rs_desc, reg->cbdata);
442 
443     }
444 
445     /* Otherwise, handle all the other cases the "normal" way */
446     else {
447         opal_btl_usnic_recv_call(module, seg, channel);
448     }
449 }
450 
451 #endif /* BTL_USNIC_RECV_H */
452