1 /*
2 * Copyright (C) 2016 by Argonne National Laboratory.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32 #ifndef _FI_PROV_BGQ_RX_H_
33 #define _FI_PROV_BGQ_RX_H_
34
35 #define FI_BGQ_UEPKT_BLOCKSIZE (1024)
36 #define PROCESS_RFIFO_MAX 64
37
38 // #define FI_BGQ_TRACE 1
39
40 /* forward declaration - see: prov/bgq/src/fi_bgq_atomic.c */
41 void fi_bgq_rx_atomic_dispatch (void * buf, void * addr, size_t nbytes,
42 enum fi_datatype dt, enum fi_op op);
43
44 static inline
dump_uepkt_queue(struct rx_operation * rx)45 void dump_uepkt_queue (struct rx_operation * rx) {
46
47 fprintf(stderr, "%s:%s():%d rx=%p, head=%p, tail=%p\n", __FILE__, __func__, __LINE__, rx, rx->ue.head, rx->ue.tail);
48 struct fi_bgq_mu_packet * pkt = rx->ue.head;
49 while (pkt) {
50 fprintf(stderr, "%s:%s():%d --> %p\n", __FILE__, __func__, __LINE__, pkt);
51 pkt = pkt->next;
52 }
53 }
54
55 static inline
complete_atomic_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)56 void complete_atomic_operation (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt) {
57
58 const uint32_t dt = pkt->hdr.atomic.dt;
59 const uint32_t op = pkt->hdr.atomic.op;
60
61 const uint64_t is_fetch = pkt->hdr.atomic.is_fetch;
62 const uint64_t do_cntr = pkt->hdr.atomic.do_cntr;
63 const uint64_t cntr_bat_id = pkt->hdr.atomic.cntr_bat_id;
64
65 const uint16_t nbytes = pkt->hdr.atomic.nbytes_minus_1 + 1;
66
67 void * addr;
68 if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
69 addr = (void*) pkt->hdr.atomic.offset;
70 } else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
71 const uint16_t key = pkt->hdr.atomic.key;
72 const uint64_t offset = pkt->hdr.atomic.offset;
73 const uintptr_t base = (uintptr_t)fi_bgq_domain_bat_read_vaddr(bgq_ep->rx.poll.bat, key);
74 addr = (void*)(base + offset);
75 }
76 else {
77 assert(0);
78 }
79
80 const uint32_t origin = pkt->hdr.atomic.origin_raw & FI_BGQ_MUHWI_DESTINATION_MASK;
81
82 if (is_fetch || (op == FI_ATOMIC_READ)) {
83
84 const uint64_t dst_paddr = pkt->payload.atomic_fetch.metadata.dst_paddr;
85 const uint64_t cq_paddr = pkt->payload.atomic_fetch.metadata.cq_paddr;
86 const uint64_t fifo_map = pkt->payload.atomic_fetch.metadata.fifo_map;
87
88 MUHWI_Descriptor_t * desc =
89 fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
90
91 qpx_memcpy64((void*)desc, (const void *)&bgq_ep->rx.poll.atomic_dput_model);
92
93 desc->PacketHeader.NetworkHeader.pt2pt.Destination.Destination.Destination = origin;
94 desc->Torus_FIFO_Map = fifo_map;
95
96 /* locate the payload lookaside slot */
97 uint64_t payload_paddr = 0;
98 void * payload_vaddr =
99 fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
100 desc, &payload_paddr);
101 desc->Pa_Payload = payload_paddr;
102
103 /* copy the target data into the injection lookaside buffer */
104 memcpy(payload_vaddr, (const void*) addr, nbytes);
105 desc->Message_Length = nbytes;
106
107 MUSPI_SetRecPayloadBaseAddressInfo(desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
108 if (cq_paddr != 0) { /* unlikely */
109 desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
110 MUSPI_GetAtomicAddress(cq_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
111 }
112
113 fi_bgq_rx_atomic_dispatch((void*)&pkt->payload.atomic_fetch.data[0], addr, nbytes, dt, op);
114
115 MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
116
117 } else {
118
119 fi_bgq_rx_atomic_dispatch(&pkt->payload.byte[0], addr, nbytes, dt, op);
120
121 /*
122 * cq completions (unlikely) are accomplished via a fence
123 * operation for non-fetch atomic operations
124 */
125 }
126
127 if (do_cntr) { /* likely -- TODO: change to *always* do a counter update?? */
128
129 const uint64_t is_local = pkt->hdr.atomic.is_local;
130
131 MUHWI_Descriptor_t * desc =
132 fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
133
134 qpx_memcpy64((void*)desc, (const void*)&bgq_ep->rx.poll.atomic_cntr_update_model[is_local]);
135
136 desc->PacketHeader.NetworkHeader.pt2pt.Destination.Destination.Destination = origin;
137 desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Payload_Base_Address_Id = cntr_bat_id;
138
139 MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
140 }
141 }
142
143
144 /* The 'set_desc_payload_paddr' function sets an mu desc payload addr
145 * in one of two ways based on the mr mode.
146 * For FI_MR_SCALABLE is assumes that the base+offset is a
147 * virtual address, which then must be converted into a physical address.
148 *
149 * For FI_MR_BASIC will set offset-key as the physical address.
150 */
151 static inline
set_desc_payload_paddr(union fi_bgq_mu_descriptor * fi_mu_desc,struct fi_bgq_bat_entry * bat)152 void set_desc_payload_paddr (union fi_bgq_mu_descriptor * fi_mu_desc, struct fi_bgq_bat_entry * bat) {
153
154
155 const uint8_t rma_update_type = fi_mu_desc->rma.update_type;
156
157 if (rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_DST) {
158 const uint64_t key_msb = fi_mu_desc->rma.key_msb;
159 const uint64_t key_lsb = fi_mu_desc->rma.key_lsb;
160 const uint64_t key = (key_msb << 48) | key_lsb;
161 uint64_t paddr = 0;
162 const uint64_t offset = fi_mu_desc->rma.dput.put_offset;
163
164 if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
165 paddr = offset-key;
166 } else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
167
168 const uintptr_t base = (uintptr_t) fi_bgq_domain_bat_read_vaddr(bat, key);
169 fi_bgq_cnk_vaddr2paddr((const void *)(base+offset), 1, &paddr);
170 }
171 else {
172 assert(0);
173 }
174
175 #ifdef FI_BGQ_TRACE
176 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_DST paddr is 0x%016lx\n",paddr);
177 #endif
178 MUSPI_SetRecPayloadBaseAddressInfo((MUHWI_Descriptor_t *)fi_mu_desc,
179 FI_BGQ_MU_BAT_ID_GLOBAL, paddr);
180
181 } else if (rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC) {
182 const uint64_t key_msb = fi_mu_desc->rma.key_msb;
183 const uint64_t key_lsb = fi_mu_desc->rma.key_lsb;
184 const uint64_t key = (key_msb << 48) | key_lsb;
185 const uint64_t offset = fi_mu_desc->rma.Pa_Payload;
186 if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
187 fi_mu_desc->rma.Pa_Payload = offset-key;
188 #ifdef FI_BGQ_TRACE
189 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC for FI_MR_BASIC fi_mu_desc->rma.Pa_Payload set to paddr 0x%016lx\n",(offset-key));
190 fflush(stderr);
191 #endif
192 } else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
193
194 const uintptr_t base = (uintptr_t) fi_bgq_domain_bat_read_vaddr(bat, key);
195 fi_bgq_cnk_vaddr2paddr((const void *)(base+offset), 1, &fi_mu_desc->rma.Pa_Payload);
196 #ifdef FI_BGQ_TRACE
197 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC for FI_MR_SCALABLE fi_mu_desc->rma.Pa_Payload set to paddr 0x%016lx\n",fi_mu_desc->rma.Pa_Payload);
198 fflush(stderr);
199 #endif
200 }
201 else {
202 assert(0);
203 }
204 } else {
205 /* no update requested */
206 }
207 }
208
209 static inline
complete_rma_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)210 void complete_rma_operation (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt) {
211
212 struct fi_bgq_bat_entry * bat = bgq_ep->rx.poll.bat;
213 const uint64_t nbytes = pkt->hdr.rma.nbytes;
214 const uint64_t ndesc = pkt->hdr.rma.ndesc;
215 MUHWI_Descriptor_t * payload = (MUHWI_Descriptor_t *) &pkt->payload;
216
217 #ifdef FI_BGQ_TRACE
218 fprintf(stderr,"complete_rma_operation starting - nbytes is %lu ndesc is %lu\n",nbytes,ndesc);
219 fflush(stderr);
220 #endif
221 if (nbytes > 0) { /* only for direct-put emulation */
222 const uint64_t payload_offset = ndesc << BGQ_MU_DESCRIPTOR_SIZE_IN_POWER_OF_2;
223 if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
224 uintptr_t vaddr = (uintptr_t) pkt->hdr.rma.offset;
225 memcpy((void*)vaddr, (void *)((uintptr_t)payload + payload_offset), nbytes);
226 #ifdef FI_BGQ_TRACE
227 fprintf(stderr,"direct-put emulation memcpy vaddr is 0x%016lx nbytes is %lu\n",vaddr,nbytes);
228 #endif
229
230 } else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
231
232 uintptr_t vaddr = (uintptr_t)fi_bgq_domain_bat_read_vaddr(bat, pkt->hdr.rma.key);
233 vaddr += pkt->hdr.rma.offset;
234 #ifdef FI_BGQ_TRACE
235 fprintf(stderr,"direct-put emulation memcpy vaddr is 0x%016lx nbytes is %lu\n",vaddr,nbytes);
236 #endif
237
238 memcpy((void*)vaddr, (void *)((uintptr_t)payload + payload_offset), nbytes);
239 }
240 else {
241 assert(0);
242 }
243 }
244
245 unsigned i;
246 for (i = 0; i < ndesc; ++i) {
247
248 /*
249 * busy-wait until a fifo slot is available ..
250 */
251 MUHWI_Descriptor_t * desc =
252 fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
253
254 qpx_memcpy64((void*)desc, (const void*)&payload[i]);
255
256 if (desc->PacketHeader.NetworkHeader.pt2pt.Byte8.Packet_Type == 2) { /* rget descriptor */
257
258 #ifdef FI_BGQ_TRACE
259 fprintf(stderr,"complete_rma_operation - processing rgat desc %d\n",i);
260 fflush(stderr);
261 #endif
262 /* locate the payload lookaside slot */
263 uint64_t payload_paddr = 0;
264 void * payload_vaddr =
265 fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
266 desc, &payload_paddr);
267 desc->Pa_Payload = payload_paddr;
268
269 /* copy the rget payload descriptors into the injection lookaside buffer */
270 union fi_bgq_mu_descriptor * rget_payload = (union fi_bgq_mu_descriptor *) payload_vaddr;
271 qpx_memcpy64((void*)rget_payload, (const void*)&payload[i+1]);
272
273 const uint64_t rget_ndesc = desc->Message_Length >> BGQ_MU_DESCRIPTOR_SIZE_IN_POWER_OF_2;
274 i += rget_ndesc;
275
276 unsigned j;
277 for (j = 0; j < rget_ndesc; ++j) {
278 set_desc_payload_paddr(&rget_payload[j], bat);
279 }
280
281 } else {
282
283 #ifdef FI_BGQ_TRACE
284 fprintf(stderr,"complete_rma_operation - processing fifo desc %d\n",i);
285 fflush(stderr);
286 #endif
287 set_desc_payload_paddr((union fi_bgq_mu_descriptor *)desc, bat);
288
289 }
290 MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
291 }
292 #ifdef FI_BGQ_TRACE
293 fprintf(stderr,"complete_rma_operation complete\n");
294 fflush(stderr);
295 #endif
296 }
297
298
299 static inline
inject_eager_completion(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)300 void inject_eager_completion (struct fi_bgq_ep * bgq_ep,
301 struct fi_bgq_mu_packet * pkt) {
302
303 const uint64_t is_local = pkt->hdr.completion.is_local;
304 const uint64_t cntr_paddr = ((uint64_t)pkt->hdr.completion.cntr_paddr_rsh3b) << 3;
305
306 MUHWI_Descriptor_t * desc =
307 fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
308
309 qpx_memcpy64((void*)desc, (const void*)&bgq_ep->rx.poll.ack_model[is_local]);
310
311 MUSPI_SetRecPayloadBaseAddressInfo(desc, FI_BGQ_MU_BAT_ID_GLOBAL, cntr_paddr);
312 desc->PacketHeader.NetworkHeader.pt2pt.Destination = pkt->hdr.completion.origin;
313
314 MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
315
316 return;
317 }
318
319
320 /**
321 * \brief Complete a receive operation that has matched the packet header with
322 * the match information
323 *
324 * \param[in] bgq_ep Edpoint associated with the receive
325 * \param[in] hdr MU packet header that matched
326 * \param[in,out] entry Completion entry
327 */
328 static inline
complete_receive_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const uint64_t origin_tag,union fi_bgq_context * context,const unsigned is_context_ext,const unsigned is_multi_receive,const unsigned is_manual_progress)329 void complete_receive_operation (struct fi_bgq_ep * bgq_ep,
330 struct fi_bgq_mu_packet * pkt,
331 const uint64_t origin_tag,
332 union fi_bgq_context * context,
333 const unsigned is_context_ext,
334 const unsigned is_multi_receive,
335 const unsigned is_manual_progress) {
336
337 #ifdef FI_BGQ_TRACE
338 fprintf(stderr,"complete_receive_operation starting\n");
339 #endif
340 const uint64_t recv_len = context->len;
341 void * recv_buf = context->buf;
342 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
343
344 const uint64_t immediate_data = pkt->hdr.pt2pt.immediate_data;
345
346 if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
347 #ifdef FI_BGQ_TRACE
348 fprintf(stderr,"complete_receive_operation - packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER\n");
349 #endif
350
351 const uint64_t send_len = pkt->hdr.pt2pt.send.message_length;
352
353 if (is_multi_receive) { /* branch should compile out */
354 if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
355
356 union fi_bgq_context * original_multi_recv_context = context;
357 context = (union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
358 assert((((uintptr_t)context) & 0x07) == 0);
359
360 context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
361 context->buf = recv_buf;
362 context->len = send_len;
363 context->data = immediate_data;
364 context->tag = 0; /* tag is not valid for multi-receives */
365 context->multi_recv_context = original_multi_recv_context;
366 context->byte_counter = 0;
367
368 /* the next 'fi_bgq_context' must be 8-byte aligned */
369 uint64_t bytes_consumed = ((send_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
370 original_multi_recv_context->len -= bytes_consumed;
371 original_multi_recv_context->buf = (void*)((uintptr_t)(original_multi_recv_context->buf) + bytes_consumed);
372 #ifdef FI_BGQ_TRACE
373 fprintf(stderr,"complete_receive_operation - is_multi_receive - enqueue cq for child context %p of parent context %p\n",context,original_multi_recv_context);
374 #endif
375
376
377 /* post a completion event for the individual receive */
378 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
379
380 } else if (send_len <= recv_len) {
381 if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
382 #ifdef FI_BGQ_TRACE
383 fprintf(stderr,"EAGER complete_receive_operation send_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_completed\n",send_len,recv_len);
384 #endif
385
386 context->buf = NULL;
387 context->len = send_len;
388 context->data = immediate_data;
389 context->tag = origin_tag;
390 context->byte_counter = 0;
391
392 /* post a completion event for the individual receive */
393 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
394
395 } else { /* truncation - unlikely */
396 #ifdef FI_BGQ_TRACE
397 fprintf(stderr,"EAGER complete_receive_operation truncation - send_len %lu > recv_len %lu posting error\n",send_len,recv_len);
398
399 #endif
400
401 struct fi_bgq_context_ext * ext;
402 if (is_context_ext) {
403 ext = (struct fi_bgq_context_ext *)context;
404 ext->err_entry.op_context = ext->msg.op_context;
405 } else {
406 posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
407 ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
408 ext->err_entry.op_context = context;
409 }
410
411 ext->err_entry.flags = context->flags;
412 ext->err_entry.len = recv_len;
413 ext->err_entry.buf = recv_buf;
414 ext->err_entry.data = immediate_data;
415 ext->err_entry.tag = origin_tag;
416 ext->err_entry.olen = send_len - recv_len;
417 ext->err_entry.err = FI_ETRUNC;
418 ext->err_entry.prov_errno = 0;
419 ext->err_entry.err_data = NULL;
420
421 ext->bgq_context.byte_counter = 0;
422
423 fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
424 }
425
426 return;
427
428 } else { /* rendezvous packet */
429
430 uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
431 assert(niov <= (7-is_multi_receive));
432 uint64_t xfer_len = pkt->payload.rendezvous.mu_iov[0].message_length;
433 {
434 uint64_t i;
435 for (i=1; i<niov; ++i) xfer_len += pkt->payload.rendezvous.mu_iov[i].message_length;
436 }
437
438 uint64_t byte_counter_vaddr = 0;
439
440 if (is_multi_receive) { /* branch should compile out */
441
442 /* This code functionaliy is unverified - exit with an error mesg for now
443 * when we have an mpich case for this we will then verify.
444 */
445
446 fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
447 fflush(stderr);
448 exit(1);
449
450
451 #ifdef FI_BGQ_TRACE
452 fprintf(stderr,"rendezvous multirecv\n");
453 #endif
454
455 union fi_bgq_context * multi_recv_context =
456 (union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
457 assert((((uintptr_t)multi_recv_context) & 0x07) == 0);
458
459 multi_recv_context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
460 multi_recv_context->buf = recv_buf;
461 multi_recv_context->len = xfer_len;
462 multi_recv_context->data = immediate_data;
463 multi_recv_context->tag = 0; /* tag is not valid for multi-receives */
464 multi_recv_context->multi_recv_context = context;
465 multi_recv_context->byte_counter = xfer_len;
466
467 /* the next 'fi_bgq_context' must be 8-byte aligned */
468 uint64_t bytes_consumed = ((xfer_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
469 context->len -= bytes_consumed;
470 context->buf = (void*)((uintptr_t)(context->buf) + bytes_consumed);
471
472 byte_counter_vaddr = (uint64_t)&multi_recv_context->byte_counter;
473
474 /* the original multi-receive context actually uses an
475 * operation counter - not a byte counter - but nevertheless
476 * the same field in the context structure is used */
477 context->byte_counter += 1;
478
479 /* post a completion event for the individual receive */
480 fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
481
482 } else if (xfer_len <= recv_len) {
483
484 #ifdef FI_BGQ_TRACE
485 fprintf(stderr,"rendezvous complete_receive_operation xfer_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_pending\n",xfer_len,recv_len);
486 #endif
487 context->len = xfer_len;
488 context->data = immediate_data;
489 context->tag = origin_tag;
490 context->byte_counter = xfer_len;
491
492 byte_counter_vaddr = (uint64_t)&context->byte_counter;
493
494 /* post a completion event for the individual receive */
495 fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
496
497 } else {
498 #ifdef FI_BGQ_TRACE
499 fprintf(stderr,"rendezvous truncation xfer_len %lu > recv_len %lu posting error\n",xfer_len,recv_len);
500 #endif
501
502 /* truncation */
503 struct fi_bgq_context_ext * ext;
504 if (is_context_ext) {
505 ext = (struct fi_bgq_context_ext *)context;
506 ext->err_entry.op_context = ext->msg.op_context;
507 } else {
508 posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
509 ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
510 ext->err_entry.op_context = context;
511 }
512
513 ext->err_entry.flags = context->flags;
514 ext->err_entry.len = recv_len;
515 ext->err_entry.buf = recv_buf;
516 ext->err_entry.data = immediate_data;
517 ext->err_entry.tag = origin_tag;
518 ext->err_entry.olen = xfer_len - recv_len;
519 ext->err_entry.err = FI_ETRUNC;
520 ext->err_entry.prov_errno = 0;
521 ext->err_entry.err_data = NULL;
522
523 ext->bgq_context.byte_counter = 0;
524
525 byte_counter_vaddr = (uint64_t)&ext->bgq_context.byte_counter;
526
527 fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
528
529 xfer_len = 0;
530 niov = 0;
531 }
532
533 /* determine the physical address of the byte counter memory */
534 uint64_t byte_counter_paddr = 0;
535 {
536 Kernel_MemoryRegion_t mr;
537 Kernel_CreateMemoryRegion(&mr, (void*)byte_counter_vaddr, sizeof(uint64_t));
538 byte_counter_paddr = (uint64_t)mr.BasePa + (byte_counter_vaddr - (uint64_t)mr.BaseVa);
539 }
540
541 /* determine the physical address of the destination buffer */
542 uint64_t dst_paddr = 0;
543 {
544 Kernel_MemoryRegion_t mr;
545 Kernel_CreateMemoryRegion(&mr, (void*)recv_buf, recv_len);
546 dst_paddr = (uint64_t)mr.BasePa + ((uint64_t)recv_buf - (uint64_t)mr.BaseVa);
547 }
548
549 const uint64_t fifo_map = fi_bgq_mu_packet_get_fifo_map(pkt);
550 const uint64_t is_local = (fifo_map & (MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL0 | MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL1)) != 0;
551
552 /*
553 * inject a "remote get" descriptor - the payload is composed
554 * of two descriptors:
555 *
556 * the first is a "direct put" descriptor that will rdma
557 * transfer the source data from the origin and will
558 * decrement a reception counter on the target as it
559 * completes
560 *
561 * the second is a "direct put" descriptor that will clear
562 * the byte counter for the send completion entry on the
563 * origin
564 */
565
566 /* busy-wait until a fifo slot is available .. */
567 MUHWI_Descriptor_t * rget_desc =
568 fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
569
570 assert(rget_desc);
571 assert((((uintptr_t)rget_desc)&0x1F) == 0);
572
573 /* locate the payload lookaside slot */
574 uint64_t payload_paddr = 0;
575 MUHWI_Descriptor_t * payload =
576 (MUHWI_Descriptor_t *)fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
577 rget_desc, &payload_paddr);
578
579 /* initialize the remote-get descriptor in the injection fifo */
580 qpx_memcpy64((void*)rget_desc, (const void*)&bgq_ep->rx.poll.rzv.rget_model[is_local]);
581
582 rget_desc->Pa_Payload = payload_paddr;
583 rget_desc->PacketHeader.messageUnitHeader.Packet_Types.Remote_Get.Rget_Inj_FIFO_Id =
584 pkt->hdr.pt2pt.rendezvous.rget_inj_fifo_id; /* TODO - different rget inj fifos for tag vs msg operations? */
585
586 rget_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(pkt->hdr.pt2pt.uid.fi);
587
588 /* initialize the direct-put ("data transfer") descriptor(s) in the rget payload */
589 unsigned i;
590 for (i=0; i<niov; ++i) {
591 MUHWI_Descriptor_t * xfer_desc = payload++;
592
593 qpx_memcpy64((void*)xfer_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_model[is_local]);
594
595 xfer_desc->Pa_Payload = pkt->payload.rendezvous.mu_iov[i].src_paddr;
596 const uint64_t message_length = pkt->payload.rendezvous.mu_iov[i].message_length;
597 xfer_desc->Message_Length = message_length;
598 MUSPI_SetRecPayloadBaseAddressInfo(xfer_desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
599 dst_paddr += message_length;
600 xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
601 MUSPI_GetAtomicAddress(byte_counter_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
602 xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Counter_Base_Address_Id =
603 FI_BGQ_MU_BAT_ID_GLOBAL;
604
605 rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
606
607 if (is_multi_receive) { /* branch should compile out */
608 xfer_desc->Torus_FIFO_Map = fifo_map;
609 }
610 }
611
612 /* initialize the direct-put ("origin completion") descriptor in the rget payload */
613 {
614 MUHWI_Descriptor_t * dput_desc = payload;
615 qpx_memcpy64((void*)dput_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_completion_model);
616
617 const uint64_t counter_paddr = ((uint64_t) pkt->payload.rendezvous.cntr_paddr_rsh3b) << 3;
618 dput_desc->Pa_Payload =
619 MUSPI_GetAtomicAddress(counter_paddr,
620 MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);
621 }
622
623 /* initialize the memory-fifo ("rendezvous ack") descriptor in the rget payload for multi-receives */
624 if (is_multi_receive) { /* branch should compile out */
625 MUHWI_Descriptor_t * ack_desc = ++payload;
626 qpx_memcpy64((void*)ack_desc, (const void*)&bgq_ep->rx.poll.rzv.multi_recv_ack_model);
627
628 ack_desc->Torus_FIFO_Map = fifo_map;
629 rget_desc->Torus_FIFO_Map = fifo_map;
630 rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
631
632 union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &ack_desc->PacketHeader;
633 hdr->ack.context = (uintptr_t) context;
634 }
635
636 /*
637 * inject the descriptor
638 */
639 MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
640 }
641 return;
642 }
643
644 static inline
is_match(struct fi_bgq_mu_packet * pkt,union fi_bgq_context * context,const unsigned poll_msg)645 unsigned is_match(struct fi_bgq_mu_packet *pkt, union fi_bgq_context * context, const unsigned poll_msg)
646 {
647 const uint64_t origin_tag = pkt->hdr.pt2pt.ofi_tag;
648 const fi_bgq_uid_t origin_uid = pkt->hdr.pt2pt.uid.fi;
649 const fi_bgq_uid_t target_uid = fi_bgq_addr_uid(context->src_addr);
650 const uint64_t ignore = context->ignore;
651 const uint64_t target_tag = context->tag;
652 const uint64_t target_tag_and_not_ignore = target_tag & ~ignore;
653 const uint64_t origin_tag_and_not_ignore = origin_tag & ~ignore;
654
655 #ifdef FI_BGQ_TRACE
656 fprintf(stderr, "%s:%s():%d context %p origin_uid=0x%08x target_uid=0x%08x origin_tag=0x%016lx target_tag=0x%016lx ignore=0x%016lx any_source is %u returning %u\n", __FILE__, __func__, __LINE__, context,origin_uid, target_uid, origin_tag, target_tag, ignore, (context->src_addr == FI_ADDR_UNSPEC),((origin_tag_and_not_ignore == target_tag_and_not_ignore) && ((context->src_addr == FI_ADDR_UNSPEC) || (origin_uid == target_uid))));
657 fflush(stderr);
658 #endif
659
660 return ((origin_tag_and_not_ignore == target_tag_and_not_ignore) && ((context->src_addr == FI_ADDR_UNSPEC) || (origin_uid == target_uid)));
661 }
662
663 static inline
process_rfifo_packet_optimized(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const unsigned poll_msg,const unsigned is_manual_progress)664 void process_rfifo_packet_optimized (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt, const unsigned poll_msg, const unsigned is_manual_progress)
665 {
666 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
667
668 #ifdef FI_BGQ_TRACE
669 fprintf(stderr,"process_rfifo_packet_optimized - poll_msg is %u mq addr is %p\n",poll_msg,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
670 fflush(stderr);
671 #endif
672 if (poll_msg) {
673 if (packet_type == FI_BGQ_MU_PACKET_TYPE_ACK) { /* branch should compile out */
674
675 union fi_bgq_context * context = (union fi_bgq_context *) pkt->hdr.ack.context;
676 context->byte_counter -= 1;
677 /* TODO - msync? */
678 return;
679 }
680
681 if (packet_type == FI_BGQ_MU_PACKET_TYPE_RMA) {
682 complete_rma_operation(bgq_ep, pkt);
683 return;
684 }
685
686 if (packet_type == FI_BGQ_MU_PACKET_TYPE_ATOMIC) {
687 complete_atomic_operation(bgq_ep, pkt);
688 return;
689 }
690 }
691
692 if ((packet_type & (FI_BGQ_MU_PACKET_TYPE_ACK|FI_BGQ_MU_PACKET_TYPE_EAGER)) ==
693 (FI_BGQ_MU_PACKET_TYPE_ACK|FI_BGQ_MU_PACKET_TYPE_EAGER)) { /* unlikely? */
694 inject_eager_completion(bgq_ep, pkt);
695 return;
696 }
697
698 assert(packet_type & (FI_BGQ_MU_PACKET_TYPE_EAGER | FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS));
699
700 /* search the match queue */
701 union fi_bgq_context * head = bgq_ep->rx.poll.rfifo[poll_msg].mq.head;
702 union fi_bgq_context * context = head;
703 union fi_bgq_context * prev = NULL;
704 #ifdef FI_BGQ_TRACE
705 fprintf(stderr,"searching mq - head is %p\n",bgq_ep->rx.poll.rfifo[poll_msg].mq.head);
706 #endif
707 while (context) {
708
709 const uint64_t rx_op_flags = context->flags;
710 #ifdef FI_BGQ_TRACE
711 fprintf(stderr,"is_match calling with context %p prev is %p next is %p\n",context,p,context->next);
712 fflush(stderr);
713 #endif
714 if (is_match(pkt, context, poll_msg)) {
715
716 if (!poll_msg || ((rx_op_flags | FI_MULTI_RECV) == 0)) { /* branch should compile out for tagged receives */
717
718 union fi_bgq_context * next = context->next;
719
720 /* remove the context from the match queue */
721 if (prev) prev->next = next;
722 else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
723
724 if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
725
726 const uint64_t is_context_ext = rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT;
727
728 /* branch will compile out */
729 if (poll_msg)
730 complete_receive_operation(bgq_ep, pkt,
731 0, context, is_context_ext, 0, is_manual_progress);
732 else
733 complete_receive_operation(bgq_ep, pkt,
734 pkt->hdr.pt2pt.ofi_tag, context, is_context_ext, 0, is_manual_progress);
735
736 return;
737
738 } else { /* FI_MULTI_RECV - unlikely */
739
740 /* verify that there is enough space available in
741 * the multi-receive buffer for the incoming data */
742 const uint64_t recv_len = context->len;
743 uint64_t send_len = 0;
744
745 if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
746 send_len = pkt->hdr.pt2pt.send.message_length;
747 } else /* FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS */ {
748
749 /* This code functionaliy is unverified - exit with an error mesg for now
750 * when we have an mpich case for this we will then verify.
751 */
752
753 fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
754 fflush(stderr);
755 exit(1);
756
757 const uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
758 send_len = pkt->payload.rendezvous.mu_iov[0].message_length;
759 uint64_t i;
760 for (i=1; i<niov; ++i) send_len += pkt->payload.rendezvous.mu_iov[i].message_length;
761 }
762
763 if (send_len > recv_len) {
764
765 /* To keep ordering need to complete this multirecv context now and remove
766 * from match queue and the next multirecv context should have enough room.
767 */
768
769 union fi_bgq_context * next = context->next;
770
771 /* remove the context from the match queue */
772 if (prev) prev->next = next;
773 else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
774
775 if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
776
777 context->byte_counter = 0;
778 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);
779
780 } else {
781
782 complete_receive_operation(bgq_ep, pkt,
783 0, context, 0, 1, is_manual_progress);
784
785 if (context->len < bgq_ep->rx.poll.min_multi_recv) {
786 /* after processing this message there is not
787 * enough space available in the multi-receive
788 * buffer to receive the next message; post a
789 * 'FI_MULTI_RECV' event to the completion
790 * queue and return. */
791
792 union fi_bgq_context * next = context->next;
793
794 /* remove the context from the match queue */
795 if (prev) prev->next = next;
796 else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
797
798 if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
799
800 /* post a completion event for the multi-receive */
801 context->byte_counter = 0;
802 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
803
804 }
805 }
806 return;
807 }
808
809 } else {
810 prev = context;
811 context = context->next;
812 }
813 }
814
815 /* did not find a match .. add this packet to the unexpected queue */
816
817 #ifdef FI_BGQ_TRACE
818 fprintf(stderr, "process_rfifo_packet_optimized - did not find a match .. add this packet to the unexpected queue \n");
819 fflush(stderr);
820 #endif
821 if (bgq_ep->rx.poll.rfifo[poll_msg].ue.free == NULL) { /* unlikely */
822 struct fi_bgq_mu_packet * block = NULL;
823 int rc __attribute__ ((unused));
824 rc = posix_memalign((void **)&block,
825 32, sizeof(struct fi_bgq_mu_packet)*FI_BGQ_UEPKT_BLOCKSIZE);
826 assert(rc==0);
827 unsigned i;
828 for (i=0; i<(FI_BGQ_UEPKT_BLOCKSIZE-1); ++i) block[i].next = &block[i+1];
829 block[FI_BGQ_UEPKT_BLOCKSIZE-1].next = NULL;
830 bgq_ep->rx.poll.rfifo[poll_msg].ue.free = block;
831 }
832
833 /* pop the free list */
834 struct fi_bgq_mu_packet * uepkt = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
835 bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt->next;
836
837 /* copy the packet and append to the ue queue */
838 size_t bytes_to_copy = (pkt->hdr.muhwi.NetworkHeader.pt2pt.Byte8.Size + 1) * 32;
839 memcpy((void*)uepkt, (const void *)pkt, bytes_to_copy);
840 uepkt->next = NULL;
841 if (bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL) {
842 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt;
843 } else {
844 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail->next = uepkt;
845 }
846 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = uepkt;
847
848 return;
849 }
850
851 static inline
process_rfifo_packet(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const unsigned poll_msg,const unsigned is_manual_progress)852 void process_rfifo_packet (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt, const unsigned poll_msg, const unsigned is_manual_progress)
853 {
854 process_rfifo_packet_optimized(bgq_ep, pkt, poll_msg, is_manual_progress);
855 }
856
857 static inline
poll_rfifo(struct fi_bgq_ep * bgq_ep,const unsigned is_manual_progress)858 int poll_rfifo (struct fi_bgq_ep * bgq_ep, const unsigned is_manual_progress) {
859
860 /*
861 * The mu reception fifo is consumed by software at the 'head' and
862 * produced by hardware at the 'tail'.
863 */
864 MUSPI_Fifo_t * fifo_ptr = &bgq_ep->rx.poll.muspi_recfifo->_fifo;
865 assert(fifo_ptr);
866 volatile uint64_t pa_tail = MUSPI_getHwTail(fifo_ptr);
867 const uintptr_t pa_start = MUSPI_getStartPa(fifo_ptr);
868 const uintptr_t offset_tail = pa_tail - pa_start;
869
870 const uintptr_t va_head = (uintptr_t) MUSPI_getHeadVa(fifo_ptr);
871 const uintptr_t va_start = (uintptr_t) MUSPI_getStartVa(fifo_ptr);
872 const uintptr_t offset_head = va_head - va_start;
873
874 MUHWI_PacketHeader_t * hdr = (MUHWI_PacketHeader_t *) va_head;
875
876 if (offset_head < offset_tail) { /* likely */
877
878 muspi_dcbt(va_head, 0);
879 _bgq_msync();
880
881 const uintptr_t stop = va_head + offset_tail - offset_head;
882 int process_rfifo_iter = 0;
883 while (((uintptr_t)hdr < stop) && (process_rfifo_iter < PROCESS_RFIFO_MAX)) {
884
885 process_rfifo_iter++;
886 struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
887 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
888
889 if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) { /* likely */
890 process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
891 } else {
892 process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
893 }
894
895 hdr += hdr->NetworkHeader.pt2pt.Byte8.Size + 1;
896 muspi_dcbt(hdr, 0);
897 }
898
899 MUSPI_setHeadVa(fifo_ptr, (void*)hdr);
900 MUSPI_setHwHead(fifo_ptr, (uintptr_t)hdr-va_start);
901
902
903 } else if (offset_head > offset_tail) { /* unlikely ? */
904
905 /* check if the head packet wraps */
906 const uintptr_t va_end = (uintptr_t) fifo_ptr->va_end;
907 if ((va_head + 544) < va_end) { /* likely */
908
909 /* head packet does not wrap */
910 muspi_dcbt(va_head, 0);
911 _bgq_msync();
912
913 const uintptr_t stop = va_end - 544;
914 int process_rfifo_iter = 0;
915 while (((uintptr_t)hdr < stop) && (process_rfifo_iter < PROCESS_RFIFO_MAX)) {
916
917 process_rfifo_iter++;
918 struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
919 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
920
921 if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) { /* likely */
922 process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
923 } else {
924 process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
925 }
926
927 hdr += hdr->NetworkHeader.pt2pt.Byte8.Size + 1;
928 muspi_dcbt(hdr, 0);
929 }
930
931 MUSPI_setHeadVa(fifo_ptr, (void*)hdr);
932 MUSPI_setHwHead(fifo_ptr, (uintptr_t)hdr-va_start);
933
934 } else { /* unlikely */
935
936 /* head packet may wrap */
937 muspi_dcbt(va_head, 0);
938 _bgq_msync();
939
940 uint32_t packet_bytes = ((uint32_t)hdr->NetworkHeader.pt2pt.Byte8.Size + 1) << 5;
941 const uintptr_t bytes_before_wrap = va_end - va_head;
942 if (packet_bytes < bytes_before_wrap) {
943 struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
944 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
945
946 if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) { /* likely */
947 process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
948 } else {
949 process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
950 }
951
952 const uintptr_t new_offset_head = offset_head + packet_bytes;
953 MUSPI_setHeadVa(fifo_ptr, (void*)(va_start + new_offset_head));
954 MUSPI_setHwHead(fifo_ptr, new_offset_head);
955
956 } else if (packet_bytes == bytes_before_wrap) {
957 struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
958 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
959
960 if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) { /* likely */
961 process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
962 } else {
963 process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
964 }
965
966 MUSPI_setHeadVa(fifo_ptr, (void*)(va_start));
967 MUSPI_setHwHead(fifo_ptr, 0);
968
969 } else {
970 uint8_t tmp_pkt[544] __attribute__((__aligned__(32)));
971
972 memcpy((void*)&tmp_pkt[0], (void*)va_head, bytes_before_wrap);
973 const uintptr_t bytes_after_wrap = packet_bytes - bytes_before_wrap;
974 memcpy((void*)&tmp_pkt[bytes_before_wrap], (void*)va_start, bytes_after_wrap);
975
976 hdr = (MUHWI_PacketHeader_t *)&tmp_pkt[0];
977 struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
978 const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
979
980 if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) { /* likely */
981 process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
982 } else {
983 process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
984 }
985
986 MUSPI_setHeadVa(fifo_ptr, (void*)(va_start + bytes_after_wrap));
987 MUSPI_setHwHead(fifo_ptr, bytes_after_wrap);
988 }
989 }
990 }
991
992
993 return 0;
994 }
995
996
997 /* rx_op_flags is only checked for FI_PEEK | FI_CLAIM | FI_MULTI_RECV
998 * rx_op_flags is only used if FI_PEEK | FI_CLAIM | cancel_context
999 * is_context_ext is only used if FI_PEEK | cancel_context | iovec
1000 *
1001 * The "normal" data movement functions, such as fi_[t]recv(), can safely
1002 * specify '0' for cancel_context, rx_op_flags, and is_context_ext, in
1003 * order to reduce code path.
1004 */
1005 static inline
process_mfifo_context(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context,union fi_bgq_context * context,const uint64_t rx_op_flags,const uint64_t is_context_ext,const unsigned is_manual_progress)1006 int process_mfifo_context (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg,
1007 const uint64_t cancel_context, union fi_bgq_context * context,
1008 const uint64_t rx_op_flags, const uint64_t is_context_ext,
1009 const unsigned is_manual_progress) {
1010 #ifdef FI_BGQ_TRACE
1011 fprintf(stderr,"process_mfifo_context starting - context->tag is %d\n",context->tag);
1012 if (rx_op_flags & FI_PEEK)
1013 fprintf(stderr,"just peeking\n");
1014 fflush(stderr);
1015 #endif
1016 if (cancel_context) { /* branch should compile out */
1017 const uint64_t compare_context = is_context_ext ?
1018 (uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
1019 (uint64_t)context;
1020
1021 if (compare_context == cancel_context) {
1022
1023 struct fi_bgq_context_ext * ext;
1024 if (is_context_ext) {
1025 ext = (struct fi_bgq_context_ext *)context;
1026 } else {
1027 posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1028 ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
1029 }
1030
1031 ext->bgq_context.byte_counter = 0;
1032 ext->err_entry.op_context = (void *)cancel_context;
1033 ext->err_entry.flags = rx_op_flags;
1034 ext->err_entry.len = 0;
1035 ext->err_entry.buf = 0;
1036 ext->err_entry.data = 0;
1037 ext->err_entry.tag = context->tag;
1038 ext->err_entry.olen = 0;
1039 ext->err_entry.err = FI_ECANCELED;
1040 ext->err_entry.prov_errno = 0;
1041 ext->err_entry.err_data = NULL;
1042
1043 fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1044
1045 return FI_ECANCELED;
1046 }
1047 }
1048
1049 if ((rx_op_flags & (FI_PEEK | FI_CLAIM | FI_MULTI_RECV)) == 0) { /* likely */
1050
1051 /* search the unexpected packet queue */
1052 struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1053 struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1054 struct fi_bgq_mu_packet * prev = NULL;
1055 struct fi_bgq_mu_packet * uepkt = head;
1056
1057 unsigned found_match = 0;
1058 while (uepkt != NULL) {
1059
1060 #ifdef FI_BGQ_TRACE
1061 fprintf(stderr,"process_mfifo_context - searching unexpected queue\n");
1062 fflush(stderr);
1063 #endif
1064 if (is_match(uepkt, context, poll_msg)) {
1065 #ifdef FI_BGQ_TRACE
1066 fprintf(stderr,"process_mfifo_context - found match on unexpected queue\n");
1067 fflush(stderr);
1068 #endif
1069
1070 /* branch will compile out */
1071 if (poll_msg)
1072 complete_receive_operation(bgq_ep, uepkt,
1073 0, context, 0, 0, is_manual_progress);
1074 else
1075 complete_receive_operation(bgq_ep, uepkt,
1076 uepkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
1077
1078 /* remove the uepkt from the ue queue */
1079 if (head == tail) {
1080 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1081 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1082 } else if (prev == NULL) {
1083 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1084 } else if (tail == uepkt) {
1085 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1086 prev->next = NULL;
1087 } else {
1088 prev->next = uepkt->next;
1089 }
1090
1091 /* ... and prepend the uehdr to the ue free list. */
1092 uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1093 bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
1094
1095 /* found a match; break from the loop */
1096 uepkt = NULL;
1097 found_match = 1;
1098
1099 } else {
1100
1101 /* a match was not found; advance to the next ue header */
1102 prev = uepkt;
1103 uepkt = uepkt->next;
1104 }
1105 }
1106
1107 if (!found_match) {
1108
1109 #ifdef FI_BGQ_TRACE
1110 fprintf(stderr,"process_mfifo_context - nothing found on unexpected queue adding to match queue for poll_msg %u context->tag is %d context is %p mq addr is %p\n",poll_msg,context->tag,context,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
1111 fflush(stderr);
1112 #endif
1113 /*
1114 * no unexpected headers were matched; add this match
1115 * information to the appropriate match queue
1116 */
1117
1118 union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1119
1120 context->next = NULL;
1121 if (tail == NULL) {
1122 bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
1123 } else {
1124 tail->next = context;
1125 }
1126 bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
1127 }
1128
1129 } else if (rx_op_flags & FI_PEEK) { /* unlikely */
1130
1131 /* search the unexpected packet queue */
1132 struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1133 struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1134 struct fi_bgq_mu_packet * prev = NULL;
1135 struct fi_bgq_mu_packet * uepkt = head;
1136
1137 #ifdef FI_BGQ_TRACE
1138 fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_PEEK searching unexpected queue\n");
1139 if (uepkt == NULL)
1140 fprintf(stderr,"uepkt == NULL\n");
1141 else
1142 fprintf(stderr,"uepkt != NULL\n");
1143
1144 fflush(stderr);
1145 #endif
1146 unsigned found_match = 0;
1147 while (uepkt != NULL) {
1148
1149 #ifdef FI_BGQ_TRACE
1150 fprintf(stderr,"process_mfifo_context uepkt != NULL - rx_op_flags & FI_PEEK searching unexpected queue\n");
1151 fflush(stderr);
1152 #endif
1153 if (is_match(uepkt, context, poll_msg)) {
1154
1155 const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
1156 if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
1157 const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
1158 uint64_t len = 0;
1159 unsigned i;
1160 for (i=0; i<niov; ++i) len += uepkt->payload.rendezvous.mu_iov[i].message_length;
1161 context->len = len;
1162 } else { /* "eager" or "eager with completion" packet type */
1163 context->len = uepkt->hdr.pt2pt.send.message_length;
1164 }
1165 context->tag = poll_msg ? 0 : uepkt->hdr.pt2pt.ofi_tag;
1166 context->byte_counter = 0;
1167
1168 if (rx_op_flags & FI_CLAIM) { /* both FI_PEEK and FI_CLAIM were specified */
1169 assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
1170
1171 context->claim = uepkt;
1172
1173 /* remove the uepkt from the ue queue */
1174 if (head == tail) {
1175 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1176 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1177 } else if (prev == NULL) {
1178 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1179 } else if (tail == uepkt) {
1180 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1181 prev->next = NULL;
1182 } else {
1183 prev->next = uepkt->next;
1184 }
1185 }
1186 /* tranfer immediate data from pkt to context for matching FI_PEEK */
1187 context->data = uepkt->hdr.pt2pt.immediate_data;
1188
1189 /* post a completion event for the receive */
1190 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
1191
1192 found_match = 1;
1193 uepkt = NULL;
1194
1195 } else {
1196
1197 /* a match was not found; advance to the next ue header */
1198 prev = uepkt;
1199 uepkt = uepkt->next;
1200 }
1201 }
1202
1203 if (!found_match) {
1204
1205 #ifdef FI_BGQ_TRACE
1206 fprintf(stderr,"didn't find a match for this FI_PEEK\n");
1207 fflush(stderr);
1208 #endif
1209 /* did not find a match for this "peek" */
1210
1211
1212 struct fi_bgq_context_ext * ext;
1213 uint64_t mfifo_value;
1214 if (is_context_ext) {
1215 ext = (struct fi_bgq_context_ext *)context;
1216 mfifo_value = (uint64_t)context >> 3;
1217 } else {
1218 posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1219 ext->bgq_context.flags = rx_op_flags | FI_BGQ_CQ_CONTEXT_EXT;
1220
1221 mfifo_value = (uint64_t)ext >> 3;
1222 }
1223
1224 ext->err_entry.op_context = context;
1225 ext->err_entry.flags = rx_op_flags;
1226 ext->err_entry.len = 0;
1227 ext->err_entry.buf = 0;
1228 ext->err_entry.data = 0;
1229 ext->err_entry.tag = 0;
1230 ext->err_entry.olen = 0;
1231 ext->err_entry.err = FI_ENOMSG;
1232 ext->err_entry.prov_errno = 0;
1233 ext->err_entry.err_data = NULL;
1234 ext->bgq_context.byte_counter = 0;
1235
1236 #ifdef FI_BGQ_TRACE
1237 fprintf(stderr,"process_mfifo_context - no match found on unexpected queue posting error\n");
1238 fflush(stderr);
1239 #endif
1240 fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1241
1242 }
1243
1244 } else if (rx_op_flags & FI_CLAIM) { /* unlikely */
1245 assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
1246 #ifdef FI_BGQ_TRACE
1247 fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_CLAIM complete receive operation\n");
1248 #endif
1249
1250 /* only FI_CLAIM was specified
1251 *
1252 * this occurs after a previous FI_PEEK + FI_CLAIM
1253 * operation has removed an unexpected packet from
1254 * the queue and saved a pointer to it in the context
1255 *
1256 * complete the receive for this "claimed" message ... */
1257 struct fi_bgq_mu_packet * claimed_pkt = context->claim;
1258 if (poll_msg)
1259 complete_receive_operation(bgq_ep, claimed_pkt,
1260 0, context, 0, 0, is_manual_progress);
1261 else
1262 complete_receive_operation(bgq_ep, claimed_pkt,
1263 claimed_pkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
1264
1265 /* ... and prepend the uehdr to the ue free list. */
1266 claimed_pkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1267 bgq_ep->rx.poll.rfifo[poll_msg].ue.free = claimed_pkt;
1268
1269 } else if (poll_msg && (rx_op_flags & FI_MULTI_RECV)) { /* unlikely - branch should compile out for tagged receives */
1270 /* search the unexpected packet queue */
1271 struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1272 struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1273 struct fi_bgq_mu_packet * prev = NULL;
1274 struct fi_bgq_mu_packet * uepkt = head;
1275
1276 unsigned full_multirecv_buffer = 0;
1277 while (uepkt != NULL) {
1278
1279 if (is_match(uepkt, context, poll_msg)) {
1280
1281 /* verify that there is enough space available in
1282 * the multi-receive buffer for the incoming data */
1283 const uint64_t recv_len = context->len;
1284 const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
1285 uint64_t send_len = 0;
1286
1287 if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
1288 send_len = uepkt->hdr.pt2pt.send.message_length;
1289 } else if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
1290
1291 /* This code functionaliy is unverified - exit with an error mesg for now
1292 * when we have an mpich case for this we will then verify.
1293 */
1294
1295 fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
1296 fflush(stderr);
1297 exit(1);
1298
1299 const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
1300 send_len = uepkt->payload.rendezvous.mu_iov[0].message_length;
1301 uint64_t i;
1302 for (i=1; i<niov; ++i) send_len += uepkt->payload.rendezvous.mu_iov[i].message_length;
1303 }
1304
1305 if (send_len > recv_len) {
1306 /* There is not enough room for the next subcontext multirec.
1307 * to preserver the ordering just break off here with whatever
1308 * matches are in the buffer and hopefully the next multirecv
1309 * has space.
1310 */
1311
1312 uepkt = NULL;
1313 full_multirecv_buffer = 1;
1314 context->byte_counter = 0;
1315 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);
1316
1317 } else {
1318 complete_receive_operation(bgq_ep, uepkt,
1319 0, context, 0, 1, is_manual_progress);
1320
1321 /* remove the uepkt from the ue queue */
1322 if (head == tail) {
1323 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1324 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1325 } else if (prev == NULL) {
1326 bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1327 } else if (tail == uepkt) {
1328 bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1329 prev->next = NULL;
1330 } else {
1331 prev->next = uepkt->next;
1332 }
1333
1334 struct fi_bgq_mu_packet *matched_uepkt_next = uepkt->next;
1335
1336 /* ... and prepend the uehdr to the ue free list. */
1337 uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1338 bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
1339
1340 if (context->len < bgq_ep->rx.poll.min_multi_recv) {
1341 /* after processing this message there is not
1342 * enough space available in the multi-receive
1343 * buffer to receive the next message; break
1344 * from the loop and post a 'FI_MULTI_RECV'
1345 * event to the completion queue. */
1346 uepkt = NULL;
1347 full_multirecv_buffer = 1;
1348
1349 /* post a completion event for the multi-receive */
1350 context->byte_counter = 0;
1351 fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
1352 }
1353 else {
1354 uepkt = matched_uepkt_next;
1355 }
1356
1357 }
1358
1359 } else {
1360
1361 /* a match was not found; advance to the next ue header */
1362 prev = uepkt;
1363 uepkt = uepkt->next;
1364 }
1365 }
1366
1367 if (!full_multirecv_buffer) {
1368
1369 /* The multirecv context has room in its buffer.
1370 * Post to match queue for further filling.
1371 */
1372
1373 union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1374
1375 context->next = NULL;
1376 if (tail == NULL) {
1377 bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
1378 } else {
1379 tail->next = context;
1380 }
1381 bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
1382 }
1383 }
1384
1385 return 0;
1386 }
1387
1388
1389 static inline
poll_mfifo(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context,const unsigned is_manual_progress)1390 int poll_mfifo (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg, const uint64_t cancel_context, const unsigned is_manual_progress) {
1391
1392 #ifdef DEBUG
1393 if (bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].ue.tail == NULL);
1394 if (bgq_ep->rx.poll.rfifo[poll_msg].ue.tail == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL);
1395 if (bgq_ep->rx.poll.rfifo[poll_msg].mq.head == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].mq.tail == NULL);
1396 if (bgq_ep->rx.poll.rfifo[poll_msg].mq.tail == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].mq.head == NULL);
1397 #endif
1398
1399 /*
1400 * attempt to match each new match element from the match fifo with any
1401 * unexpected headers and compete the receives; if no match is found,
1402 * append the match element to the match queue which will be searched
1403 * for a match as each rfifo packet is processed
1404 */
1405 uint64_t mfifo_value;
1406 struct l2atomic_fifo_consumer * consumer = &bgq_ep->rx.poll.rfifo[poll_msg].match;
1407 unsigned loop_count = 0;
1408 while (++loop_count < 16 && l2atomic_fifo_consume(consumer, &mfifo_value) == 0) {
1409
1410 union fi_bgq_context * context = (union fi_bgq_context *)(mfifo_value << 3);
1411 const uint64_t rx_op_flags = context->flags;
1412 const uint64_t is_context_ext = rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT;
1413
1414 #ifdef FI_BGQ_TRACE
1415 fprintf(stderr,"poll_mfifo calling process_mfifo_context\n");
1416 #endif
1417 process_mfifo_context(bgq_ep, poll_msg, cancel_context,
1418 context, rx_op_flags, is_context_ext, is_manual_progress);
1419
1420 }
1421
1422 return 0;
1423 }
1424
1425
1426 static inline
cancel_match_queue(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context)1427 int cancel_match_queue (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg, const uint64_t cancel_context) {
1428
1429 /* search the match queue */
1430 union fi_bgq_context * head = bgq_ep->rx.poll.rfifo[poll_msg].mq.head;
1431 union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1432 union fi_bgq_context * context = head;
1433 union fi_bgq_context * prev = NULL;
1434 while (context) {
1435
1436 const uint64_t is_context_ext = context->flags & FI_BGQ_CQ_CONTEXT_EXT;
1437 const uint64_t compare_context = is_context_ext ?
1438 (uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
1439 (uint64_t)context;
1440
1441 if (compare_context == cancel_context) {
1442
1443 /* remove the context from the match queue */
1444 if (context == head)
1445 bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context->next;
1446 else
1447 prev->next = context->next;
1448
1449 if (context == tail)
1450 bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
1451
1452 struct fi_bgq_context_ext * ext;
1453 if (is_context_ext) {
1454 ext = (struct fi_bgq_context_ext *)context;
1455 } else {
1456 posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1457 ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
1458 }
1459
1460 ext->bgq_context.byte_counter = 0;
1461 ext->err_entry.op_context = (void *)cancel_context;
1462 ext->err_entry.flags = context->flags;
1463 ext->err_entry.len = 0;
1464 ext->err_entry.buf = 0;
1465 ext->err_entry.data = 0;
1466 ext->err_entry.tag = context->tag;
1467 ext->err_entry.olen = 0;
1468 ext->err_entry.err = FI_ECANCELED;
1469 ext->err_entry.prov_errno = 0;
1470 ext->err_entry.err_data = NULL;
1471
1472 fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1473
1474 return FI_ECANCELED;
1475 }
1476 else
1477 prev = context;
1478 context = context->next;
1479 }
1480
1481 return 0;
1482 }
1483
1484 static inline
poll_cfifo(struct fi_bgq_ep * bgq_ep,const unsigned is_manual_progress)1485 void poll_cfifo (struct fi_bgq_ep * bgq_ep, const unsigned is_manual_progress) { /* TODO - make no inline */
1486
1487 struct l2atomic_fifo_consumer * consumer = &bgq_ep->rx.poll.control;
1488 uint64_t value = 0;
1489 if (l2atomic_fifo_consume(consumer, &value) == 0) {
1490
1491 const unsigned poll_fi_msg = bgq_ep->rx.caps & FI_MSG;
1492 const unsigned poll_fi_tag = bgq_ep->rx.caps & FI_TAGGED;
1493
1494 /* const uint64_t flags = value & 0xE000000000000000ull; -- currently not used */
1495 const uint64_t cancel_context = value << 3;
1496
1497 if (poll_fi_msg && poll_fi_tag) {
1498 if (FI_ECANCELED != cancel_match_queue(bgq_ep, 0, cancel_context)) {
1499 if (FI_ECANCELED != poll_mfifo(bgq_ep, 0, cancel_context, is_manual_progress)) {
1500
1501 if (FI_ECANCELED != cancel_match_queue(bgq_ep, 1, cancel_context)) {
1502 if (FI_ECANCELED != poll_mfifo(bgq_ep, 1, cancel_context, is_manual_progress)) {
1503 /* did not find a match */
1504 }
1505 }
1506 }
1507 }
1508 } else if (poll_fi_msg) {
1509 if (FI_ECANCELED != cancel_match_queue(bgq_ep, 1, cancel_context)) {
1510 if (FI_ECANCELED != poll_mfifo(bgq_ep, 1, cancel_context, is_manual_progress)) {
1511 /* did not find a match */
1512 }
1513 }
1514 } else if (poll_fi_tag) {
1515 if (FI_ECANCELED != cancel_match_queue(bgq_ep, 0, cancel_context)) {
1516 if (FI_ECANCELED != poll_mfifo(bgq_ep, 0, cancel_context, is_manual_progress)) {
1517 /* did not find a match */
1518 }
1519 }
1520 }
1521 }
1522 }
1523
1524 static inline
poll_rx(struct fi_bgq_ep * bgq_ep,const unsigned poll_fi_msg,const unsigned poll_fi_tag)1525 void poll_rx (struct fi_bgq_ep * bgq_ep,
1526 const unsigned poll_fi_msg,
1527 const unsigned poll_fi_tag) {
1528
1529 volatile uint64_t * async_is_enabled = &bgq_ep->async.enabled;
1530 while (L2_AtomicLoad(async_is_enabled)) {
1531 unsigned loop_count = 64;
1532 do {
1533 if (poll_fi_msg) {
1534 poll_mfifo(bgq_ep, 1, 0, 0);
1535 poll_rfifo(bgq_ep, 0);
1536 }
1537 if (poll_fi_tag) {
1538 poll_mfifo(bgq_ep, 0, 0, 0);
1539 poll_rfifo(bgq_ep, 0);
1540 }
1541 } while (--loop_count);
1542
1543 poll_cfifo(bgq_ep, 0);
1544 }
1545 }
1546
1547 static inline
poll_fn(void * arg)1548 void * poll_fn (void *arg) {
1549 //fprintf(stderr, "%s:%s():%d .... arg = %p\n", __FILE__, __func__, __LINE__, arg);
1550 struct fi_bgq_ep * bgq_ep = (struct fi_bgq_ep *) arg;
1551
1552 volatile uint64_t * async_is_active = &bgq_ep->async.active;
1553 L2_AtomicStore(async_is_active, 1);
1554
1555 uint64_t rx_caps = bgq_ep->rx.caps & (FI_MSG | FI_TAGGED);
1556
1557 if (rx_caps == (FI_MSG | FI_TAGGED)) {
1558 poll_rx(bgq_ep, 1, 1);
1559 } else if (rx_caps == FI_MSG) {
1560 poll_rx(bgq_ep, 1, 0);
1561 } else if (rx_caps == FI_TAGGED) {
1562 poll_rx(bgq_ep, 0, 1);
1563 }
1564
1565 L2_AtomicStore(async_is_active, 0);
1566
1567 return NULL;
1568 }
1569
1570
1571
1572 #endif /* _FI_PROV_BGQ_RX_H_ */
1573