1 /*
2  * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2011 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2006      Sandia National Laboratories. All rights
13  *                         reserved.
14  * Copyright (c) 2008-2017 Cisco Systems, Inc.  All rights reserved
15  * Copyright (c) 2012      Los Alamos National Security, LLC.  All rights
16  *                         reserved.
17  * $COPYRIGHT$
18  *
19  * Additional copyrights may follow
20  *
21  * $HEADER$
22  */
23 
24 #include "opal_config.h"
25 
26 #include <unistd.h>
27 
28 #include "opal_stdint.h"
29 #include "opal/mca/memchecker/base/base.h"
30 #include "opal/constants.h"
31 
32 #if BTL_IN_OPAL
33 #include "opal/mca/btl/btl.h"
34 #include "opal/mca/btl/base/base.h"
35 #else
36 #include "ompi/mca/btl/btl.h"
37 #include "ompi/mca/btl/base/base.h"
38 #endif
39 
40 #include "btl_usnic.h"
41 #include "btl_usnic_frag.h"
42 #include "btl_usnic_endpoint.h"
43 #include "btl_usnic_module.h"
44 #include "btl_usnic_proc.h"
45 #include "btl_usnic_ack.h"
46 #include "btl_usnic_recv.h"
47 #include "btl_usnic_util.h"
48 
49 
50 /*
51  * We have received a segment, take action based on the
52  * packet type in the BTL header
53  */
opal_btl_usnic_recv_call(opal_btl_usnic_module_t * module,opal_btl_usnic_recv_segment_t * seg,opal_btl_usnic_channel_t * channel)54 void opal_btl_usnic_recv_call(opal_btl_usnic_module_t *module,
55                               opal_btl_usnic_recv_segment_t *seg,
56                               opal_btl_usnic_channel_t *channel)
57 {
58     opal_btl_usnic_segment_t *bseg;
59     mca_btl_active_message_callback_t* reg;
60     opal_btl_usnic_endpoint_t *endpoint;
61     opal_btl_usnic_btl_chunk_header_t *chunk_hdr;
62     opal_btl_usnic_btl_header_t *hdr;
63     uint32_t window_index;
64     int rc;
65 #if MSGDEBUG1
66     char local_ip[IPV4STRADDRLEN];
67     char remote_ip[IPV4STRADDRLEN];
68 #endif
69 
70     bseg = &seg->rs_base;
71 
72     ++module->stats.num_total_recvs;
73 
74     /* Valgrind help */
75     opal_memchecker_base_mem_defined((void*)(seg->rs_protocol_header),
76                                      seg->rs_len);
77 
78     /* Find out who sent this segment */
79     endpoint = seg->rs_endpoint;
80     if (FAKE_RECV_DROP || OPAL_UNLIKELY(NULL == endpoint)) {
81         /* No idea who this was from, so drop it */
82 #if MSGDEBUG1
83         opal_output(0, "=== Unknown sender; dropped: seq %" UDSEQ,
84                     bseg->us_btl_header->pkt_seq);
85 #endif
86         ++module->stats.num_unk_recvs;
87         goto repost_no_endpoint;
88     }
89 
90 #if MSGDEBUG1
91     struct opal_btl_usnic_modex_t *modex;
92 
93     modex = &module->local_modex;
94     opal_btl_usnic_snprintf_ipv4_addr(local_ip, sizeof(local_ip),
95                                       modex->ipv4_addr,
96                                       modex->netmask);
97     modex = &endpoint->endpoint_remote_modex;
98     opal_btl_usnic_snprintf_ipv4_addr(remote_ip, sizeof(remote_ip),
99                                       modex->ipv4_addr,
100                                       modex->netmask);
101 #endif
102 
103     /***********************************************************************/
104     /* Segment is an incoming frag */
105     if (OPAL_BTL_USNIC_PAYLOAD_TYPE_FRAG == bseg->us_btl_header->payload_type) {
106 
107         /* do the receive bookkeeping */
108         rc = opal_btl_usnic_recv_frag_bookkeeping(module, seg, channel);
109         if (rc != 0) {
110             return;
111         }
112 
113         hdr = seg->rs_base.us_btl_header;
114 
115 #if MSGDEBUG1
116         opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ ", len=%d\n",
117                     (void*) endpoint, hdr->pkt_seq, hdr->payload_len);
118 #if 0
119 
120         opal_output(0, "<-- Received FRAG ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
121                     (void*) endpoint,
122                     seg->rs_base.us_btl_header->pkt_seq,
123                     remote_ip, local_ip,
124                     window_index,
125                     endpoint->endpoint_next_contig_seq_to_recv,
126                     endpoint->endpoint_highest_seq_rcvd,
127                     endpoint->endpoint_rfstart,
128                     (void*) seg, (void*) module);
129         if (hdr->put_addr != NULL) {
130             opal_output(0, "  put_addr = %p\n",
131                     seg->rs_base.us_btl_header->put_addr);
132         }
133 #endif
134 #endif
135 
136         /* If this it not a PUT, Pass this segment up to the PML.
137          * Be sure to get the payload length from the BTL header because
138          * the L2 layer may artificially inflate (or otherwise change)
139          * the frame length to meet minimum sizes, add protocol information,
140          * etc.
141          */
142         if (hdr->put_addr == NULL) {
143             reg = mca_btl_base_active_message_trigger + hdr->tag;
144             seg->rs_segment.seg_len = hdr->payload_len;
145 #if MSGDEBUG2
146                 opal_output(0, "small recv complete, pass up %u bytes, tag=%d\n",
147                         (unsigned)bseg->us_btl_header->payload_len,
148                         (int)bseg->us_btl_header->tag);
149 #endif
150             reg->cbfunc(&module->super, hdr->tag, &seg->rs_desc, reg->cbdata);
151 
152         /*
153          * If this is a PUT, need to copy it to user buffer
154          */
155         } else {
156 #if MSGDEBUG1
157             opal_output(0, "Copy %d PUT bytes to %p\n",
158                 seg->rs_base.us_btl_header->payload_len,
159                 (void*)seg->rs_base.us_btl_header->put_addr);
160 #endif
161             memcpy(seg->rs_base.us_btl_header->put_addr,
162                     seg->rs_base.us_payload.raw,
163                     seg->rs_base.us_btl_header->payload_len);
164         }
165 
166         /* do not jump to repost, already done by bookkeeping */
167         return;
168     }
169 
170     /***********************************************************************/
171     /* Segment is an incoming chunk */
172     if (OPAL_BTL_USNIC_PAYLOAD_TYPE_CHUNK == bseg->us_btl_header->payload_type) {
173         int frag_index;
174         opal_btl_usnic_rx_frag_info_t *fip;
175 
176         /* Is incoming sequence # ok? */
177         if (OPAL_UNLIKELY(opal_btl_usnic_check_rx_seq(endpoint, seg,
178                         &window_index) != 0)) {
179             goto repost;
180         }
181 
182 #if MSGDEBUG1
183         opal_output(0, "<-- Received CHUNK fid %d ep %p, seq %" UDSEQ " from %s to %s: GOOD! (rel seq %d, lowest seq %" UDSEQ ", highest seq: %" UDSEQ ", rwstart %d) seg %p, module %p\n",
184                     seg->rs_base.us_btl_chunk_header->ch_frag_id,
185                     (void*) endpoint,
186                     seg->rs_base.us_btl_chunk_header->ch_hdr.pkt_seq,
187                     remote_ip, local_ip,
188                     window_index,
189                     endpoint->endpoint_next_contig_seq_to_recv,
190                     endpoint->endpoint_highest_seq_rcvd,
191                     endpoint->endpoint_rfstart,
192                     (void*) seg, (void*) module);
193 #endif
194 
195         /* start a new fragment if not one in progress
196          * alloc memory, etc.  when last byte arrives, dealloc the
197          * frag_id and pass data to PML
198          */
199         chunk_hdr = seg->rs_base.us_btl_chunk_header;
200         frag_index = chunk_hdr->ch_frag_id % MAX_ACTIVE_FRAGS;
201         fip = &(endpoint->endpoint_rx_frag_info[frag_index]);
202 
203         /* frag_id == 0 means this slot it empty, grab it! */
204         if (0 == fip->rfi_frag_id) {
205             fip->rfi_frag_id = chunk_hdr->ch_frag_id;
206             fip->rfi_frag_size = chunk_hdr->ch_frag_size;
207             if (chunk_hdr->ch_hdr.put_addr == NULL) {
208                 int pool;
209 
210                 fip->rfi_data = NULL;
211 
212                 /* See which data pool this should come from,
213                  * or if it should be malloc()ed
214                  */
215                 pool = usnic_fls(chunk_hdr->ch_frag_size-1);
216                 if (pool >= module->first_pool &&
217                         pool <= module->last_pool) {
218                     opal_free_list_item_t* item;
219                     opal_btl_usnic_rx_buf_t *rx_buf;
220                     USNIC_COMPAT_FREE_LIST_GET(&module->module_recv_buffers[pool], item);
221                     rx_buf = (opal_btl_usnic_rx_buf_t *)item;
222                     if (OPAL_LIKELY(NULL != rx_buf)) {
223                         fip->rfi_fl_elt = item;
224                         fip->rfi_data = rx_buf->buf;
225                         fip->rfi_data_pool = pool;
226                         fip->rfi_data_in_pool = true;
227                     }
228                 }
229                 if (fip->rfi_data == NULL) {
230                     fip->rfi_data = malloc(chunk_hdr->ch_frag_size);
231                     fip->rfi_data_in_pool = false;
232                 }
233                 if (fip->rfi_data == NULL) {
234                     abort();
235                 }
236 #if MSGDEBUG1
237                 opal_output(0, "Start large recv to %p, size=%"PRIu32"\n",
238                     (void *)fip->rfi_data, chunk_hdr->ch_frag_size);
239 #endif
240             } else {
241 #if MSGDEBUG1
242                 opal_output(0, "Start PUT to %p\n",
243                         (void *)chunk_hdr->ch_hdr.put_addr);
244 #endif
245                 fip->rfi_data = chunk_hdr->ch_hdr.put_addr;
246             }
247             fip->rfi_bytes_left = chunk_hdr->ch_frag_size;
248             fip->rfi_frag_id = chunk_hdr->ch_frag_id;
249 
250         /* frag_id is not 0 - it must match, drop if not */
251         } else if (fip->rfi_frag_id != chunk_hdr->ch_frag_id) {
252             ++module->stats.num_badfrag_recvs;
253             goto repost;
254         }
255 #if MSGDEBUG1
256         opal_output(0, "put_addr=%p, copy_addr=%p, off=%d\n",
257                 chunk_hdr->ch_hdr.put_addr,
258                 fip->rfi_data+chunk_hdr->ch_frag_offset,
259                 chunk_hdr->ch_frag_offset);
260 #endif
261 
262         /* Stats */
263         ++module->stats.num_chunk_recvs;
264 
265         /* validate offset and len to be within fragment */
266         assert(chunk_hdr->ch_frag_offset + chunk_hdr->ch_hdr.payload_len <=
267                 fip->rfi_frag_size);
268         assert(fip->rfi_frag_size == chunk_hdr->ch_frag_size);
269 
270         /* copy the data into place */
271         memcpy(fip->rfi_data + chunk_hdr->ch_frag_offset, (char *)(chunk_hdr+1),
272                 chunk_hdr->ch_hdr.payload_len);
273 
274         /* update sliding window */
275         opal_btl_usnic_update_window(endpoint, window_index);
276 
277         fip->rfi_bytes_left -= chunk_hdr->ch_hdr.payload_len;
278         if (0 == fip->rfi_bytes_left) {
279             mca_btl_base_descriptor_t desc;
280             mca_btl_base_segment_t segment;
281 
282             segment.seg_addr.pval = fip->rfi_data;
283             segment.seg_len = fip->rfi_frag_size;
284             desc.USNIC_RECV_LOCAL = &segment;
285             desc.USNIC_RECV_LOCAL_COUNT = 1;
286 
287             /* only up to PML if this was not a put */
288             if (chunk_hdr->ch_hdr.put_addr == NULL) {
289 
290                 /* Pass this segment up to the PML */
291 #if MSGDEBUG2
292                 opal_output(0, "large recv complete, pass up %p, %u bytes, tag=%d\n",
293                         desc.USNIC_RECV_LOCAL->seg_addr.pval,
294                         (unsigned)desc.USNIC_RECV_LOCAL->seg_len,
295                         (int)chunk_hdr->ch_hdr.tag);
296 #endif
297                 reg = mca_btl_base_active_message_trigger +
298                     chunk_hdr->ch_hdr.tag;
299 
300                 /* mca_pml_ob1_recv_frag_callback_frag() */
301                 reg->cbfunc(&module->super, chunk_hdr->ch_hdr.tag,
302                         &desc, reg->cbdata);
303 
304                 /* free temp buffer for non-put */
305                 if (fip->rfi_data_in_pool) {
306                     USNIC_COMPAT_FREE_LIST_RETURN(&module->module_recv_buffers[fip->rfi_data_pool],
307                                                   fip->rfi_fl_elt);
308                 } else {
309                     free(fip->rfi_data);
310                 }
311 
312 #if MSGDEBUG1
313             } else {
314                 opal_output(0, "PUT recv complete, no callback\n");
315 #endif
316             }
317 
318             /* release the fragment ID */
319             fip->rfi_frag_id = 0;
320 
321             /* force immediate ACK */
322             endpoint->endpoint_acktime = 0;
323         }
324         goto repost;
325     }
326 
327     /***********************************************************************/
328     /* Frag is an incoming ACK */
329     else if (OPAL_LIKELY(OPAL_BTL_USNIC_PAYLOAD_TYPE_ACK ==
330                          bseg->us_btl_header->payload_type)) {
331         opal_btl_usnic_seq_t ack_seq;
332 
333         /* sequence being ACKed */
334         ack_seq = bseg->us_btl_header->ack_seq;
335 
336         /* Stats */
337         ++module->stats.num_ack_recvs;
338 
339 #if MSGDEBUG1
340         opal_output(0, "    Received ACK for sequence number %" UDSEQ " from %s to %s\n",
341                     bseg->us_btl_header->ack_seq, remote_ip, local_ip);
342 #endif
343         OPAL_THREAD_LOCK(&btl_usnic_lock);
344         opal_btl_usnic_handle_ack(endpoint, ack_seq);
345         OPAL_THREAD_UNLOCK(&btl_usnic_lock);
346         goto repost;
347     }
348 
349     /***********************************************************************/
350     /* Have no idea what the frag is; drop it */
351     else {
352         ++module->stats.num_unk_recvs;
353         if (module->stats.num_unk_recvs < 10) {
354             opal_output_verbose(15, USNIC_OUT, "unrecognized payload type %d", bseg->us_btl_header->payload_type);
355             opal_output_verbose(15, USNIC_OUT, "base = %p, proto = %p, hdr = %p", bseg->us_list.ptr, seg->rs_protocol_header, (void*) bseg->us_btl_header);
356             opal_btl_usnic_dump_hex(15, USNIC_OUT, bseg->us_list.ptr, 96+sizeof(*bseg->us_btl_header));
357         }
358         goto repost;
359     }
360 
361     /***********************************************************************/
362  repost:
363 
364     /* if endpoint exiting, and all ACKs received, release the endpoint */
365     if (endpoint->endpoint_exiting && ENDPOINT_DRAINED(endpoint)) {
366         OBJ_RELEASE(endpoint);
367     }
368  repost_no_endpoint:
369     ++module->stats.num_recv_reposts;
370 
371     /* Add recv to linked list for reposting */
372     seg->rs_next = channel->repost_recv_head;
373     channel->repost_recv_head = seg;
374 }
375