1 /*
2  * Copyright (C) 2016 by Argonne National Laboratory.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #ifndef _FI_PROV_BGQ_RX_H_
33 #define _FI_PROV_BGQ_RX_H_
34 
35 #define FI_BGQ_UEPKT_BLOCKSIZE (1024)
36 #define PROCESS_RFIFO_MAX 64
37 
38 // #define FI_BGQ_TRACE 1
39 
40 /* forward declaration - see: prov/bgq/src/fi_bgq_atomic.c */
41 void fi_bgq_rx_atomic_dispatch (void * buf, void * addr, size_t nbytes,
42 	enum fi_datatype dt, enum fi_op op);
43 
44 static inline
dump_uepkt_queue(struct rx_operation * rx)45 void dump_uepkt_queue (struct rx_operation * rx) {
46 
47 	fprintf(stderr, "%s:%s():%d rx=%p, head=%p, tail=%p\n", __FILE__, __func__, __LINE__, rx, rx->ue.head, rx->ue.tail);
48 	struct fi_bgq_mu_packet * pkt = rx->ue.head;
49 	while (pkt) {
50 		fprintf(stderr, "%s:%s():%d --> %p\n", __FILE__, __func__, __LINE__, pkt);
51 		pkt = pkt->next;
52 	}
53 }
54 
55 static inline
complete_atomic_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)56 void complete_atomic_operation (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt) {
57 
58 	const uint32_t dt = pkt->hdr.atomic.dt;
59 	const uint32_t op = pkt->hdr.atomic.op;
60 
61 	const uint64_t is_fetch = pkt->hdr.atomic.is_fetch;
62 	const uint64_t do_cntr = pkt->hdr.atomic.do_cntr;
63 	const uint64_t cntr_bat_id = pkt->hdr.atomic.cntr_bat_id;
64 
65 	const uint16_t nbytes = pkt->hdr.atomic.nbytes_minus_1 + 1;
66 
67 	void * addr;
68 	if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
69 		addr = (void*) pkt->hdr.atomic.offset;
70 	} else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
71 		const uint16_t key = pkt->hdr.atomic.key;
72 		const uint64_t offset = pkt->hdr.atomic.offset;
73 		const uintptr_t base = (uintptr_t)fi_bgq_domain_bat_read_vaddr(bgq_ep->rx.poll.bat, key);
74 		addr = (void*)(base + offset);
75 	}
76 	else {
77 		assert(0);
78 	}
79 
80 	const uint32_t origin = pkt->hdr.atomic.origin_raw & FI_BGQ_MUHWI_DESTINATION_MASK;
81 
82 	if (is_fetch || (op == FI_ATOMIC_READ)) {
83 
84 		const uint64_t dst_paddr = pkt->payload.atomic_fetch.metadata.dst_paddr;
85 		const uint64_t cq_paddr = pkt->payload.atomic_fetch.metadata.cq_paddr;
86 		const uint64_t fifo_map = pkt->payload.atomic_fetch.metadata.fifo_map;
87 
88 		MUHWI_Descriptor_t * desc =
89 			fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
90 
91 		qpx_memcpy64((void*)desc, (const void *)&bgq_ep->rx.poll.atomic_dput_model);
92 
93 		desc->PacketHeader.NetworkHeader.pt2pt.Destination.Destination.Destination = origin;
94 		desc->Torus_FIFO_Map = fifo_map;
95 
96 		/* locate the payload lookaside slot */
97 		uint64_t payload_paddr = 0;
98 		void * payload_vaddr =
99 			fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
100 				desc, &payload_paddr);
101 		desc->Pa_Payload = payload_paddr;
102 
103 		/* copy the target data into the injection lookaside buffer */
104 		memcpy(payload_vaddr, (const void*) addr, nbytes);
105 		desc->Message_Length = nbytes;
106 
107 		MUSPI_SetRecPayloadBaseAddressInfo(desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
108 		if (cq_paddr != 0) {	/* unlikely */
109 			desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
110 				MUSPI_GetAtomicAddress(cq_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
111 		}
112 
113 		fi_bgq_rx_atomic_dispatch((void*)&pkt->payload.atomic_fetch.data[0], addr, nbytes, dt, op);
114 
115 		MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
116 
117 	} else {
118 
119 		fi_bgq_rx_atomic_dispatch(&pkt->payload.byte[0], addr, nbytes, dt, op);
120 
121 		/*
122 		 * cq completions (unlikely) are accomplished via a fence
123 		 * operation for non-fetch atomic operations
124 		 */
125 	}
126 
127 	if (do_cntr) {	/* likely -- TODO: change to *always* do a counter update?? */
128 
129 		const uint64_t is_local = pkt->hdr.atomic.is_local;
130 
131 		MUHWI_Descriptor_t * desc =
132 			fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
133 
134 		qpx_memcpy64((void*)desc, (const void*)&bgq_ep->rx.poll.atomic_cntr_update_model[is_local]);
135 
136 		desc->PacketHeader.NetworkHeader.pt2pt.Destination.Destination.Destination = origin;
137 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Payload_Base_Address_Id = cntr_bat_id;
138 
139 		MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
140 	}
141 }
142 
143 
144 /* The 'set_desc_payload_paddr' function sets an mu desc payload addr
145  * in one of two ways based on the mr mode.
146  * For FI_MR_SCALABLE is assumes that the base+offset is a
147  * virtual address, which then must be converted into a physical address.
148  *
149  * For FI_MR_BASIC will set offset-key as the physical address.
150  */
151 static inline
set_desc_payload_paddr(union fi_bgq_mu_descriptor * fi_mu_desc,struct fi_bgq_bat_entry * bat)152 void set_desc_payload_paddr (union fi_bgq_mu_descriptor * fi_mu_desc, struct fi_bgq_bat_entry * bat) {
153 
154 
155 	const uint8_t rma_update_type = fi_mu_desc->rma.update_type;
156 
157 	if (rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_DST) {
158 		const uint64_t key_msb = fi_mu_desc->rma.key_msb;
159 		const uint64_t key_lsb = fi_mu_desc->rma.key_lsb;
160 		const uint64_t key = (key_msb << 48) | key_lsb;
161 		uint64_t paddr = 0;
162 		const uint64_t offset = fi_mu_desc->rma.dput.put_offset;
163 
164 		if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
165 			paddr = offset-key;
166 		} else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
167 
168 			const uintptr_t base = (uintptr_t) fi_bgq_domain_bat_read_vaddr(bat, key);
169 			fi_bgq_cnk_vaddr2paddr((const void *)(base+offset), 1, &paddr);
170 		}
171 		else {
172 			assert(0);
173 		}
174 
175 #ifdef FI_BGQ_TRACE
176 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_DST paddr is 0x%016lx\n",paddr);
177 #endif
178 		MUSPI_SetRecPayloadBaseAddressInfo((MUHWI_Descriptor_t *)fi_mu_desc,
179 			FI_BGQ_MU_BAT_ID_GLOBAL, paddr);
180 
181 	} else if (rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC) {
182 		const uint64_t key_msb = fi_mu_desc->rma.key_msb;
183 		const uint64_t key_lsb = fi_mu_desc->rma.key_lsb;
184 		const uint64_t key = (key_msb << 48) | key_lsb;
185 		const uint64_t offset = fi_mu_desc->rma.Pa_Payload;
186 		if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
187 			fi_mu_desc->rma.Pa_Payload = offset-key;
188 #ifdef FI_BGQ_TRACE
189 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC for FI_MR_BASIC fi_mu_desc->rma.Pa_Payload set to paddr 0x%016lx\n",(offset-key));
190 fflush(stderr);
191 #endif
192 		} else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
193 
194 			const uintptr_t base = (uintptr_t) fi_bgq_domain_bat_read_vaddr(bat, key);
195 			fi_bgq_cnk_vaddr2paddr((const void *)(base+offset), 1, &fi_mu_desc->rma.Pa_Payload);
196 #ifdef FI_BGQ_TRACE
197 fprintf(stderr,"set_desc_payload_paddr rma_update_type == FI_BGQ_MU_DESCRIPTOR_UPDATE_BAT_TYPE_SRC for FI_MR_SCALABLE fi_mu_desc->rma.Pa_Payload set to paddr 0x%016lx\n",fi_mu_desc->rma.Pa_Payload);
198 fflush(stderr);
199 #endif
200 		}
201 		else {
202 			assert(0);
203 		}
204 	} else {
205 		/* no update requested */
206 	}
207 }
208 
209 static inline
complete_rma_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)210 void complete_rma_operation (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt) {
211 
212 	struct fi_bgq_bat_entry * bat = bgq_ep->rx.poll.bat;
213 	const uint64_t nbytes = pkt->hdr.rma.nbytes;
214 	const uint64_t ndesc = pkt->hdr.rma.ndesc;
215 	MUHWI_Descriptor_t * payload = (MUHWI_Descriptor_t *) &pkt->payload;
216 
217 #ifdef FI_BGQ_TRACE
218 fprintf(stderr,"complete_rma_operation starting - nbytes is %lu ndesc is %lu\n",nbytes,ndesc);
219 fflush(stderr);
220 #endif
221 	if (nbytes > 0) {	/* only for direct-put emulation */
222 		const uint64_t payload_offset = ndesc << BGQ_MU_DESCRIPTOR_SIZE_IN_POWER_OF_2;
223 		if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_BASIC) {
224 			uintptr_t vaddr = (uintptr_t) pkt->hdr.rma.offset;
225 			memcpy((void*)vaddr, (void *)((uintptr_t)payload + payload_offset), nbytes);
226 #ifdef FI_BGQ_TRACE
227 fprintf(stderr,"direct-put emulation memcpy vaddr is 0x%016lx nbytes is %lu\n",vaddr,nbytes);
228 #endif
229 
230 		} else if (FI_BGQ_FABRIC_DIRECT_MR == FI_MR_SCALABLE) {
231 
232 			uintptr_t vaddr = (uintptr_t)fi_bgq_domain_bat_read_vaddr(bat, pkt->hdr.rma.key);
233 			vaddr += pkt->hdr.rma.offset;
234 #ifdef FI_BGQ_TRACE
235 fprintf(stderr,"direct-put emulation memcpy vaddr is 0x%016lx nbytes is %lu\n",vaddr,nbytes);
236 #endif
237 
238 			memcpy((void*)vaddr, (void *)((uintptr_t)payload + payload_offset), nbytes);
239 		}
240 		else {
241 			assert(0);
242 		}
243 	}
244 
245 	unsigned i;
246 	for (i = 0; i < ndesc; ++i) {
247 
248 		/*
249 		 * busy-wait until a fifo slot is available ..
250 		 */
251 		MUHWI_Descriptor_t * desc =
252 			fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
253 
254 		qpx_memcpy64((void*)desc, (const void*)&payload[i]);
255 
256 		if (desc->PacketHeader.NetworkHeader.pt2pt.Byte8.Packet_Type == 2) {	/* rget descriptor */
257 
258 #ifdef FI_BGQ_TRACE
259 fprintf(stderr,"complete_rma_operation - processing rgat desc %d\n",i);
260 fflush(stderr);
261 #endif
262 			/* locate the payload lookaside slot */
263 			uint64_t payload_paddr = 0;
264 			void * payload_vaddr =
265 				fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
266 					desc, &payload_paddr);
267 			desc->Pa_Payload = payload_paddr;
268 
269 			/* copy the rget payload descriptors into the injection lookaside buffer */
270 			union fi_bgq_mu_descriptor * rget_payload = (union fi_bgq_mu_descriptor *) payload_vaddr;
271 			qpx_memcpy64((void*)rget_payload, (const void*)&payload[i+1]);
272 
273 			const uint64_t rget_ndesc = desc->Message_Length >> BGQ_MU_DESCRIPTOR_SIZE_IN_POWER_OF_2;
274 			i += rget_ndesc;
275 
276 			unsigned j;
277 			for (j = 0; j < rget_ndesc; ++j) {
278 				set_desc_payload_paddr(&rget_payload[j], bat);
279 			}
280 
281 		} else {
282 
283 #ifdef FI_BGQ_TRACE
284 fprintf(stderr,"complete_rma_operation - processing fifo desc %d\n",i);
285 fflush(stderr);
286 #endif
287 			set_desc_payload_paddr((union fi_bgq_mu_descriptor *)desc, bat);
288 
289 		}
290 		MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
291 	}
292 #ifdef FI_BGQ_TRACE
293 fprintf(stderr,"complete_rma_operation complete\n");
294 fflush(stderr);
295 #endif
296 }
297 
298 
299 static inline
inject_eager_completion(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt)300 void inject_eager_completion (struct fi_bgq_ep * bgq_ep,
301 		struct fi_bgq_mu_packet * pkt) {
302 
303 	const uint64_t is_local = pkt->hdr.completion.is_local;
304 	const uint64_t cntr_paddr = ((uint64_t)pkt->hdr.completion.cntr_paddr_rsh3b) << 3;
305 
306 	MUHWI_Descriptor_t * desc =
307 		fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
308 
309 	qpx_memcpy64((void*)desc, (const void*)&bgq_ep->rx.poll.ack_model[is_local]);
310 
311 	MUSPI_SetRecPayloadBaseAddressInfo(desc, FI_BGQ_MU_BAT_ID_GLOBAL, cntr_paddr);
312 	desc->PacketHeader.NetworkHeader.pt2pt.Destination = pkt->hdr.completion.origin;
313 
314 	MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
315 
316 	return;
317 }
318 
319 
320 /**
321  * \brief Complete a receive operation that has matched the packet header with
322  * 		the match information
323  *
324  * \param[in]		bgq_ep	Edpoint associated with the receive
325  * \param[in]		hdr	MU packet header that matched
326  * \param[in,out]	entry	Completion entry
327  */
328 static inline
complete_receive_operation(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const uint64_t origin_tag,union fi_bgq_context * context,const unsigned is_context_ext,const unsigned is_multi_receive,const unsigned is_manual_progress)329 void complete_receive_operation (struct fi_bgq_ep * bgq_ep,
330 		struct fi_bgq_mu_packet * pkt,
331 		const uint64_t origin_tag,
332 		union fi_bgq_context * context,
333 		const unsigned is_context_ext,
334 		const unsigned is_multi_receive,
335 		const unsigned is_manual_progress) {
336 
337 #ifdef FI_BGQ_TRACE
338 	fprintf(stderr,"complete_receive_operation starting\n");
339 #endif
340 	const uint64_t recv_len = context->len;
341 	void * recv_buf = context->buf;
342 	const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
343 
344 	const uint64_t immediate_data = pkt->hdr.pt2pt.immediate_data;
345 
346 	if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
347 #ifdef FI_BGQ_TRACE
348         fprintf(stderr,"complete_receive_operation - packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER\n");
349 #endif
350 
351 		const uint64_t send_len = pkt->hdr.pt2pt.send.message_length;
352 
353 		if (is_multi_receive) {		/* branch should compile out */
354 			if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
355 
356 			union fi_bgq_context * original_multi_recv_context = context;
357 			context = (union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
358 			assert((((uintptr_t)context) & 0x07) == 0);
359 
360 			context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
361 			context->buf = recv_buf;
362 			context->len = send_len;
363 			context->data = immediate_data;
364 			context->tag = 0;	/* tag is not valid for multi-receives */
365 			context->multi_recv_context = original_multi_recv_context;
366 			context->byte_counter = 0;
367 
368 			/* the next 'fi_bgq_context' must be 8-byte aligned */
369 			uint64_t bytes_consumed = ((send_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
370 			original_multi_recv_context->len -= bytes_consumed;
371 			original_multi_recv_context->buf = (void*)((uintptr_t)(original_multi_recv_context->buf) + bytes_consumed);
372 #ifdef FI_BGQ_TRACE
373         fprintf(stderr,"complete_receive_operation - is_multi_receive - enqueue cq for child context %p of parent context %p\n",context,original_multi_recv_context);
374 #endif
375 
376 
377 			/* post a completion event for the individual receive */
378 			fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
379 
380 		} else if (send_len <= recv_len) {
381 			if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
382 #ifdef FI_BGQ_TRACE
383         fprintf(stderr,"EAGER complete_receive_operation send_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_completed\n",send_len,recv_len);
384 #endif
385 
386 			context->buf = NULL;
387 			context->len = send_len;
388 			context->data = immediate_data;
389 			context->tag = origin_tag;
390 			context->byte_counter = 0;
391 
392 			/* post a completion event for the individual receive */
393 			fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
394 
395 		} else {	/* truncation - unlikely */
396 #ifdef FI_BGQ_TRACE
397         fprintf(stderr,"EAGER complete_receive_operation truncation - send_len %lu > recv_len %lu posting error\n",send_len,recv_len);
398 
399 #endif
400 
401 			struct fi_bgq_context_ext * ext;
402 			if (is_context_ext) {
403 				ext = (struct fi_bgq_context_ext *)context;
404 				ext->err_entry.op_context = ext->msg.op_context;
405 			} else {
406 				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
407 				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
408 				ext->err_entry.op_context = context;
409 			}
410 
411 			ext->err_entry.flags = context->flags;
412 			ext->err_entry.len = recv_len;
413 			ext->err_entry.buf = recv_buf;
414 			ext->err_entry.data = immediate_data;
415 			ext->err_entry.tag = origin_tag;
416 			ext->err_entry.olen = send_len - recv_len;
417 			ext->err_entry.err = FI_ETRUNC;
418 			ext->err_entry.prov_errno = 0;
419 			ext->err_entry.err_data = NULL;
420 
421 			ext->bgq_context.byte_counter = 0;
422 
423 			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
424 		}
425 
426 		return;
427 
428 	} else {			/* rendezvous packet */
429 
430 		uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
431 		assert(niov <= (7-is_multi_receive));
432 		uint64_t xfer_len = pkt->payload.rendezvous.mu_iov[0].message_length;
433 		{
434 			uint64_t i;
435 			for (i=1; i<niov; ++i) xfer_len += pkt->payload.rendezvous.mu_iov[i].message_length;
436 		}
437 
438 		uint64_t byte_counter_vaddr = 0;
439 
440 		if (is_multi_receive) {		/* branch should compile out */
441 
442 			/* This code functionaliy is unverified - exit with an error mesg for now
443  			 * when we have an mpich case for this we will then verify.
444  			 */
445 
446 			fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
447 			fflush(stderr);
448 			exit(1);
449 
450 
451 #ifdef FI_BGQ_TRACE
452         fprintf(stderr,"rendezvous multirecv\n");
453 #endif
454 
455 			union fi_bgq_context * multi_recv_context =
456 				(union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
457 			assert((((uintptr_t)multi_recv_context) & 0x07) == 0);
458 
459 			multi_recv_context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
460 			multi_recv_context->buf = recv_buf;
461 			multi_recv_context->len = xfer_len;
462 			multi_recv_context->data = immediate_data;
463 			multi_recv_context->tag = 0;	/* tag is not valid for multi-receives */
464 			multi_recv_context->multi_recv_context = context;
465 			multi_recv_context->byte_counter = xfer_len;
466 
467 			/* the next 'fi_bgq_context' must be 8-byte aligned */
468 			uint64_t bytes_consumed = ((xfer_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
469 			context->len -= bytes_consumed;
470 			context->buf = (void*)((uintptr_t)(context->buf) + bytes_consumed);
471 
472 			byte_counter_vaddr = (uint64_t)&multi_recv_context->byte_counter;
473 
474 			/* the original multi-receive context actually uses an
475 			 * operation counter - not a byte counter - but nevertheless
476 			 * the same field in the context structure is used */
477 			context->byte_counter += 1;
478 
479 			/* post a completion event for the individual receive */
480 			fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
481 
482 		} else if (xfer_len <= recv_len) {
483 
484 #ifdef FI_BGQ_TRACE
485         fprintf(stderr,"rendezvous complete_receive_operation xfer_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_pending\n",xfer_len,recv_len);
486 #endif
487 			context->len = xfer_len;
488 			context->data = immediate_data;
489 			context->tag = origin_tag;
490 			context->byte_counter = xfer_len;
491 
492 			byte_counter_vaddr = (uint64_t)&context->byte_counter;
493 
494 			/* post a completion event for the individual receive */
495 			fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
496 
497 		} else {
498 #ifdef FI_BGQ_TRACE
499         fprintf(stderr,"rendezvous truncation xfer_len %lu > recv_len %lu posting error\n",xfer_len,recv_len);
500 #endif
501 
502 			/* truncation */
503 			struct fi_bgq_context_ext * ext;
504 			if (is_context_ext) {
505 				ext = (struct fi_bgq_context_ext *)context;
506 				ext->err_entry.op_context = ext->msg.op_context;
507 			} else {
508 				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
509 				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
510 				ext->err_entry.op_context = context;
511 			}
512 
513 			ext->err_entry.flags = context->flags;
514 			ext->err_entry.len = recv_len;
515 			ext->err_entry.buf = recv_buf;
516 			ext->err_entry.data = immediate_data;
517 			ext->err_entry.tag = origin_tag;
518 			ext->err_entry.olen = xfer_len - recv_len;
519 			ext->err_entry.err = FI_ETRUNC;
520 			ext->err_entry.prov_errno = 0;
521 			ext->err_entry.err_data = NULL;
522 
523 			ext->bgq_context.byte_counter = 0;
524 
525 			byte_counter_vaddr = (uint64_t)&ext->bgq_context.byte_counter;
526 
527 			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
528 
529 			xfer_len = 0;
530 			niov = 0;
531 		}
532 
533 		/* determine the physical address of the byte counter memory */
534 		uint64_t byte_counter_paddr = 0;
535 		{
536 			Kernel_MemoryRegion_t mr;
537 			Kernel_CreateMemoryRegion(&mr, (void*)byte_counter_vaddr, sizeof(uint64_t));
538 			byte_counter_paddr = (uint64_t)mr.BasePa + (byte_counter_vaddr - (uint64_t)mr.BaseVa);
539 		}
540 
541 		/* determine the physical address of the destination buffer */
542 		uint64_t dst_paddr = 0;
543 		{
544 			Kernel_MemoryRegion_t mr;
545 			Kernel_CreateMemoryRegion(&mr, (void*)recv_buf, recv_len);
546 			dst_paddr = (uint64_t)mr.BasePa + ((uint64_t)recv_buf - (uint64_t)mr.BaseVa);
547 		}
548 
549 		const uint64_t fifo_map = fi_bgq_mu_packet_get_fifo_map(pkt);
550 		const uint64_t is_local = (fifo_map & (MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL0 | MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL1)) != 0;
551 
552 		/*
553 		 * inject a "remote get" descriptor - the payload is composed
554 		 * of two descriptors:
555 		 *
556 		 *   the first is a "direct put" descriptor that will rdma
557 		 *   transfer the source data from the origin and will
558 		 *   decrement a reception counter on the target as it
559 		 *   completes
560 		 *
561 		 *   the second is a "direct put" descriptor that will clear
562 		 *   the byte counter for the send completion entry on the
563 		 *   origin
564 		 */
565 
566 		/* busy-wait until a fifo slot is available .. */
567 		MUHWI_Descriptor_t * rget_desc =
568 			fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
569 
570 		assert(rget_desc);
571 		assert((((uintptr_t)rget_desc)&0x1F) == 0);
572 
573 		/* locate the payload lookaside slot */
574 		uint64_t payload_paddr = 0;
575 		MUHWI_Descriptor_t * payload =
576 			(MUHWI_Descriptor_t *)fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
577 				rget_desc, &payload_paddr);
578 
579 		/* initialize the remote-get descriptor in the injection fifo */
580 		qpx_memcpy64((void*)rget_desc, (const void*)&bgq_ep->rx.poll.rzv.rget_model[is_local]);
581 
582 		rget_desc->Pa_Payload = payload_paddr;
583 		rget_desc->PacketHeader.messageUnitHeader.Packet_Types.Remote_Get.Rget_Inj_FIFO_Id =
584 			pkt->hdr.pt2pt.rendezvous.rget_inj_fifo_id;	/* TODO - different rget inj fifos for tag vs msg operations? */
585 
586 		rget_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(pkt->hdr.pt2pt.uid.fi);
587 
588 		/* initialize the direct-put ("data transfer") descriptor(s) in the rget payload */
589 		unsigned i;
590 		for (i=0; i<niov; ++i) {
591 			MUHWI_Descriptor_t * xfer_desc = payload++;
592 
593 			qpx_memcpy64((void*)xfer_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_model[is_local]);
594 
595 			xfer_desc->Pa_Payload = pkt->payload.rendezvous.mu_iov[i].src_paddr;
596 			const uint64_t message_length = pkt->payload.rendezvous.mu_iov[i].message_length;
597 			xfer_desc->Message_Length = message_length;
598 			MUSPI_SetRecPayloadBaseAddressInfo(xfer_desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
599 			dst_paddr += message_length;
600 			xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
601 				MUSPI_GetAtomicAddress(byte_counter_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
602 			xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Counter_Base_Address_Id =
603 				FI_BGQ_MU_BAT_ID_GLOBAL;
604 
605 			rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
606 
607 			if (is_multi_receive) {		/* branch should compile out */
608 				xfer_desc->Torus_FIFO_Map = fifo_map;
609 			}
610 		}
611 
612 		/* initialize the direct-put ("origin completion") descriptor in the rget payload */
613 		{
614 			MUHWI_Descriptor_t * dput_desc = payload;
615 			qpx_memcpy64((void*)dput_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_completion_model);
616 
617 			const uint64_t counter_paddr = ((uint64_t) pkt->payload.rendezvous.cntr_paddr_rsh3b) << 3;
618 			dput_desc->Pa_Payload =
619 				MUSPI_GetAtomicAddress(counter_paddr,
620 					MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);
621 		}
622 
623 		/* initialize the memory-fifo ("rendezvous ack") descriptor in the rget payload for multi-receives */
624 		if (is_multi_receive) {			/* branch should compile out */
625 			MUHWI_Descriptor_t * ack_desc = ++payload;
626 			qpx_memcpy64((void*)ack_desc, (const void*)&bgq_ep->rx.poll.rzv.multi_recv_ack_model);
627 
628 			ack_desc->Torus_FIFO_Map = fifo_map;
629 			rget_desc->Torus_FIFO_Map = fifo_map;
630 			rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
631 
632 			union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &ack_desc->PacketHeader;
633 			hdr->ack.context = (uintptr_t) context;
634 		}
635 
636 		/*
637 		 * inject the descriptor
638 		 */
639 		MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
640 	}
641 	return;
642 }
643 
644 static inline
is_match(struct fi_bgq_mu_packet * pkt,union fi_bgq_context * context,const unsigned poll_msg)645 unsigned is_match(struct fi_bgq_mu_packet *pkt, union fi_bgq_context * context, const unsigned poll_msg)
646 {
647 	const uint64_t origin_tag = pkt->hdr.pt2pt.ofi_tag;
648 	const fi_bgq_uid_t origin_uid = pkt->hdr.pt2pt.uid.fi;
649 	const fi_bgq_uid_t target_uid = fi_bgq_addr_uid(context->src_addr);
650 	const uint64_t ignore = context->ignore;
651 	const uint64_t target_tag = context->tag;
652 	const uint64_t target_tag_and_not_ignore = target_tag & ~ignore;
653 	const uint64_t origin_tag_and_not_ignore = origin_tag & ~ignore;
654 
655 #ifdef FI_BGQ_TRACE
656 	fprintf(stderr, "%s:%s():%d context %p origin_uid=0x%08x target_uid=0x%08x origin_tag=0x%016lx target_tag=0x%016lx ignore=0x%016lx any_source is %u returning %u\n", __FILE__, __func__, __LINE__, context,origin_uid, target_uid, origin_tag, target_tag, ignore, (context->src_addr == FI_ADDR_UNSPEC),((origin_tag_and_not_ignore == target_tag_and_not_ignore) && ((context->src_addr == FI_ADDR_UNSPEC) || (origin_uid == target_uid))));
657 	fflush(stderr);
658 #endif
659 
660 	return ((origin_tag_and_not_ignore == target_tag_and_not_ignore) && ((context->src_addr == FI_ADDR_UNSPEC) || (origin_uid == target_uid)));
661 }
662 
663 static inline
process_rfifo_packet_optimized(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const unsigned poll_msg,const unsigned is_manual_progress)664 void process_rfifo_packet_optimized (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt, const unsigned poll_msg, const unsigned is_manual_progress)
665 {
666 	const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
667 
668 #ifdef FI_BGQ_TRACE
669 	fprintf(stderr,"process_rfifo_packet_optimized - poll_msg is %u mq addr is %p\n",poll_msg,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
670 	fflush(stderr);
671 #endif
672 	if (poll_msg) {
673 		if (packet_type == FI_BGQ_MU_PACKET_TYPE_ACK) {	/* branch should compile out */
674 
675 			union fi_bgq_context * context = (union fi_bgq_context *) pkt->hdr.ack.context;
676 			context->byte_counter -= 1;
677 			/* TODO - msync? */
678 			return;
679 		}
680 
681 		if (packet_type == FI_BGQ_MU_PACKET_TYPE_RMA) {
682 			complete_rma_operation(bgq_ep, pkt);
683 			return;
684 		}
685 
686 		if (packet_type == FI_BGQ_MU_PACKET_TYPE_ATOMIC) {
687 			complete_atomic_operation(bgq_ep, pkt);
688 			return;
689 		}
690 	}
691 
692 	if ((packet_type & (FI_BGQ_MU_PACKET_TYPE_ACK|FI_BGQ_MU_PACKET_TYPE_EAGER)) ==
693 			(FI_BGQ_MU_PACKET_TYPE_ACK|FI_BGQ_MU_PACKET_TYPE_EAGER)) {	/* unlikely? */
694 		inject_eager_completion(bgq_ep, pkt);
695 		return;
696 	}
697 
698 	assert(packet_type & (FI_BGQ_MU_PACKET_TYPE_EAGER | FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS));
699 
700 	/* search the match queue */
701 	union fi_bgq_context * head = bgq_ep->rx.poll.rfifo[poll_msg].mq.head;
702 	union fi_bgq_context * context = head;
703 	union fi_bgq_context * prev = NULL;
704 #ifdef FI_BGQ_TRACE
705 	fprintf(stderr,"searching mq - head is %p\n",bgq_ep->rx.poll.rfifo[poll_msg].mq.head);
706 #endif
707 	while (context) {
708 
709 		const uint64_t rx_op_flags = context->flags;
710 #ifdef FI_BGQ_TRACE
711 		fprintf(stderr,"is_match calling with context %p prev is %p next is %p\n",context,p,context->next);
712 		fflush(stderr);
713 #endif
714 		if (is_match(pkt, context, poll_msg)) {
715 
716 			if (!poll_msg || ((rx_op_flags | FI_MULTI_RECV) == 0)) {	/* branch should compile out for tagged receives */
717 
718 				union fi_bgq_context * next = context->next;
719 
720 				/* remove the context from the match queue */
721 				if (prev) prev->next = next;
722 				else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
723 
724 				if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
725 
726 				const uint64_t is_context_ext = rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT;
727 
728 				/* branch will compile out */
729 				if (poll_msg)
730 					complete_receive_operation(bgq_ep, pkt,
731 						0, context, is_context_ext, 0, is_manual_progress);
732 				else
733 					complete_receive_operation(bgq_ep, pkt,
734 						pkt->hdr.pt2pt.ofi_tag, context, is_context_ext, 0, is_manual_progress);
735 
736 				return;
737 
738 			} else {	/* FI_MULTI_RECV - unlikely */
739 
740 				/* verify that there is enough space available in
741 				 * the multi-receive buffer for the incoming data */
742 				const uint64_t recv_len = context->len;
743 				uint64_t send_len = 0;
744 
745 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
746 					send_len = pkt->hdr.pt2pt.send.message_length;
747 				} else /* FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS */ {
748 
749 					/* This code functionaliy is unverified - exit with an error mesg for now
750 				 	* when we have an mpich case for this we will then verify.
751 				 	*/
752 
753 					fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
754 					fflush(stderr);
755 					exit(1);
756 
757 					const uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
758 					send_len = pkt->payload.rendezvous.mu_iov[0].message_length;
759 					uint64_t i;
760 					for (i=1; i<niov; ++i) send_len += pkt->payload.rendezvous.mu_iov[i].message_length;
761 				}
762 
763 				if (send_len > recv_len) {
764 
765 					/* To keep ordering need to complete this multirecv context now and remove
766 					 * from match queue and the next multirecv context should have enough room.
767 					 */
768 
769 					union fi_bgq_context * next = context->next;
770 
771                                         /* remove the context from the match queue */
772                                         if (prev) prev->next = next;
773                                         else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
774 
775                                         if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
776 
777 					context->byte_counter = 0;
778 					fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);
779 
780 				} else {
781 
782 					complete_receive_operation(bgq_ep, pkt,
783 						0, context, 0, 1, is_manual_progress);
784 
785 					if (context->len < bgq_ep->rx.poll.min_multi_recv) {
786 						/* after processing this message there is not
787 						 * enough space available in the multi-receive
788 						 * buffer to receive the next message; post a
789 						 * 'FI_MULTI_RECV' event to the completion
790 						 * queue and return. */
791 
792 						union fi_bgq_context * next = context->next;
793 
794 						/* remove the context from the match queue */
795 						if (prev) prev->next = next;
796 						else bgq_ep->rx.poll.rfifo[poll_msg].mq.head = next;
797 
798 						if (!next) bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
799 
800 						/* post a completion event for the multi-receive */
801 						context->byte_counter = 0;
802 						fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
803 
804 					}
805 				}
806 				return;
807 			}
808 
809 		} else {
810 			prev = context;
811 			context = context->next;
812 		}
813 	}
814 
815 	/* did not find a match .. add this packet to the unexpected queue */
816 
817 #ifdef FI_BGQ_TRACE
818 	fprintf(stderr, "process_rfifo_packet_optimized - did not find a match .. add this packet to the unexpected queue \n");
819 	fflush(stderr);
820 #endif
821 	if (bgq_ep->rx.poll.rfifo[poll_msg].ue.free == NULL) { /* unlikely */
822 		struct fi_bgq_mu_packet * block = NULL;
823 		int rc __attribute__ ((unused));
824 		rc = posix_memalign((void **)&block,
825 			32, sizeof(struct fi_bgq_mu_packet)*FI_BGQ_UEPKT_BLOCKSIZE);
826 		assert(rc==0);
827 		unsigned i;
828 		for (i=0; i<(FI_BGQ_UEPKT_BLOCKSIZE-1); ++i) block[i].next = &block[i+1];
829 		block[FI_BGQ_UEPKT_BLOCKSIZE-1].next = NULL;
830 		bgq_ep->rx.poll.rfifo[poll_msg].ue.free = block;
831 	}
832 
833 	/* pop the free list */
834 	struct fi_bgq_mu_packet * uepkt = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
835 	bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt->next;
836 
837 	/* copy the packet and append to the ue queue */
838 	size_t bytes_to_copy = (pkt->hdr.muhwi.NetworkHeader.pt2pt.Byte8.Size + 1) * 32;
839 	memcpy((void*)uepkt, (const void *)pkt, bytes_to_copy);
840 	uepkt->next = NULL;
841 	if (bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL) {
842 		bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt;
843 	} else {
844 		bgq_ep->rx.poll.rfifo[poll_msg].ue.tail->next = uepkt;
845 	}
846 	bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = uepkt;
847 
848 	return;
849 }
850 
851 static inline
process_rfifo_packet(struct fi_bgq_ep * bgq_ep,struct fi_bgq_mu_packet * pkt,const unsigned poll_msg,const unsigned is_manual_progress)852 void process_rfifo_packet (struct fi_bgq_ep * bgq_ep, struct fi_bgq_mu_packet * pkt, const unsigned poll_msg, const unsigned is_manual_progress)
853 {
854 	process_rfifo_packet_optimized(bgq_ep, pkt, poll_msg, is_manual_progress);
855 }
856 
857 static inline
poll_rfifo(struct fi_bgq_ep * bgq_ep,const unsigned is_manual_progress)858 int poll_rfifo (struct fi_bgq_ep * bgq_ep, const unsigned is_manual_progress) {
859 
860 	/*
861 	 * The mu reception fifo is consumed by software at the 'head' and
862 	 * produced by hardware at the 'tail'.
863 	 */
864 	MUSPI_Fifo_t * fifo_ptr = &bgq_ep->rx.poll.muspi_recfifo->_fifo;
865 	assert(fifo_ptr);
866 	volatile uint64_t pa_tail = MUSPI_getHwTail(fifo_ptr);
867 	const uintptr_t pa_start = MUSPI_getStartPa(fifo_ptr);
868 	const uintptr_t offset_tail = pa_tail - pa_start;
869 
870 	const uintptr_t va_head = (uintptr_t) MUSPI_getHeadVa(fifo_ptr);
871 	const uintptr_t va_start = (uintptr_t) MUSPI_getStartVa(fifo_ptr);
872 	const uintptr_t offset_head = va_head - va_start;
873 
874 	MUHWI_PacketHeader_t * hdr = (MUHWI_PacketHeader_t *) va_head;
875 
876 	if (offset_head < offset_tail) {			/* likely */
877 
878 		muspi_dcbt(va_head, 0);
879 		_bgq_msync();
880 
881 		const uintptr_t stop = va_head + offset_tail - offset_head;
882 		int process_rfifo_iter = 0;
883 		while (((uintptr_t)hdr < stop) && (process_rfifo_iter < PROCESS_RFIFO_MAX)) {
884 
885 			process_rfifo_iter++;
886 			struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
887 			const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
888 
889 			if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) {	/* likely */
890 				process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
891 			} else {
892 				process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
893 			}
894 
895 			hdr += hdr->NetworkHeader.pt2pt.Byte8.Size + 1;
896 			muspi_dcbt(hdr, 0);
897 		}
898 
899 		MUSPI_setHeadVa(fifo_ptr, (void*)hdr);
900 		MUSPI_setHwHead(fifo_ptr, (uintptr_t)hdr-va_start);
901 
902 
903 	} else if (offset_head > offset_tail) {			/* unlikely ? */
904 
905 		/* check if the head packet wraps */
906 		const uintptr_t va_end = (uintptr_t) fifo_ptr->va_end;
907 		if ((va_head + 544) < va_end) {			/* likely */
908 
909 			/* head packet does not wrap */
910 			muspi_dcbt(va_head, 0);
911 			_bgq_msync();
912 
913 			const uintptr_t stop = va_end - 544;
914 			int process_rfifo_iter = 0;
915 			while  (((uintptr_t)hdr < stop) && (process_rfifo_iter < PROCESS_RFIFO_MAX)) {
916 
917 				process_rfifo_iter++;
918 				struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
919 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
920 
921 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) {	/* likely */
922 					process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
923 				} else {
924 					process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
925 				}
926 
927 				hdr += hdr->NetworkHeader.pt2pt.Byte8.Size + 1;
928 				muspi_dcbt(hdr, 0);
929 			}
930 
931 			MUSPI_setHeadVa(fifo_ptr, (void*)hdr);
932 			MUSPI_setHwHead(fifo_ptr, (uintptr_t)hdr-va_start);
933 
934 		} else {					/* unlikely */
935 
936 			/* head packet may wrap */
937 			muspi_dcbt(va_head, 0);
938 			_bgq_msync();
939 
940 			uint32_t packet_bytes = ((uint32_t)hdr->NetworkHeader.pt2pt.Byte8.Size + 1) << 5;
941 			const uintptr_t bytes_before_wrap = va_end - va_head;
942 			if (packet_bytes < bytes_before_wrap) {
943 				struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
944 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
945 
946 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) {	/* likely */
947 					process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
948 				} else {
949 					process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
950 				}
951 
952 				const uintptr_t new_offset_head = offset_head + packet_bytes;
953 				MUSPI_setHeadVa(fifo_ptr, (void*)(va_start + new_offset_head));
954 				MUSPI_setHwHead(fifo_ptr, new_offset_head);
955 
956 			} else if (packet_bytes == bytes_before_wrap) {
957 				struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
958 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
959 
960 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) {	/* likely */
961 					process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
962 				} else {
963 					process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
964 				}
965 
966 				MUSPI_setHeadVa(fifo_ptr, (void*)(va_start));
967 				MUSPI_setHwHead(fifo_ptr, 0);
968 
969 			} else {
970 				uint8_t tmp_pkt[544] __attribute__((__aligned__(32)));
971 
972 				memcpy((void*)&tmp_pkt[0], (void*)va_head, bytes_before_wrap);
973 				const uintptr_t bytes_after_wrap = packet_bytes - bytes_before_wrap;
974 				memcpy((void*)&tmp_pkt[bytes_before_wrap], (void*)va_start, bytes_after_wrap);
975 
976 				hdr = (MUHWI_PacketHeader_t *)&tmp_pkt[0];
977 				struct fi_bgq_mu_packet *pkt = (struct fi_bgq_mu_packet *) hdr;
978 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
979 
980 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_TAG) {	/* likely */
981 					process_rfifo_packet(bgq_ep, pkt, 0, is_manual_progress);
982 				} else {
983 					process_rfifo_packet(bgq_ep, pkt, 1, is_manual_progress);
984 				}
985 
986 				MUSPI_setHeadVa(fifo_ptr, (void*)(va_start + bytes_after_wrap));
987 				MUSPI_setHwHead(fifo_ptr, bytes_after_wrap);
988 			}
989 		}
990 	}
991 
992 
993 	return 0;
994 }
995 
996 
997 /* rx_op_flags is only checked for FI_PEEK | FI_CLAIM | FI_MULTI_RECV
998  * rx_op_flags is only used if FI_PEEK | FI_CLAIM | cancel_context
999  * is_context_ext is only used if FI_PEEK | cancel_context | iovec
1000  *
1001  * The "normal" data movement functions, such as fi_[t]recv(), can safely
1002  * specify '0' for cancel_context, rx_op_flags, and is_context_ext, in
1003  * order to reduce code path.
1004  */
1005 static inline
process_mfifo_context(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context,union fi_bgq_context * context,const uint64_t rx_op_flags,const uint64_t is_context_ext,const unsigned is_manual_progress)1006 int process_mfifo_context (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg,
1007 		const uint64_t cancel_context, union fi_bgq_context * context,
1008 		const uint64_t rx_op_flags, const uint64_t is_context_ext,
1009 		const unsigned is_manual_progress) {
1010 #ifdef FI_BGQ_TRACE
1011 	fprintf(stderr,"process_mfifo_context starting - context->tag is %d\n",context->tag);
1012 	if (rx_op_flags & FI_PEEK)
1013 		fprintf(stderr,"just peeking\n");
1014 	fflush(stderr);
1015 #endif
1016 	if (cancel_context) {	/* branch should compile out */
1017 		const uint64_t compare_context = is_context_ext ?
1018 			(uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
1019 			(uint64_t)context;
1020 
1021 		if (compare_context == cancel_context) {
1022 
1023 			struct fi_bgq_context_ext * ext;
1024 			if (is_context_ext) {
1025 				ext = (struct fi_bgq_context_ext *)context;
1026 			} else {
1027 				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1028 				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
1029 			}
1030 
1031 			ext->bgq_context.byte_counter = 0;
1032 			ext->err_entry.op_context = (void *)cancel_context;
1033 			ext->err_entry.flags = rx_op_flags;
1034 			ext->err_entry.len = 0;
1035 			ext->err_entry.buf = 0;
1036 			ext->err_entry.data = 0;
1037 			ext->err_entry.tag = context->tag;
1038 			ext->err_entry.olen = 0;
1039 			ext->err_entry.err = FI_ECANCELED;
1040 			ext->err_entry.prov_errno = 0;
1041 			ext->err_entry.err_data = NULL;
1042 
1043 			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1044 
1045 			return FI_ECANCELED;
1046 		}
1047 	}
1048 
1049 	if ((rx_op_flags & (FI_PEEK | FI_CLAIM | FI_MULTI_RECV)) == 0) {	/* likely */
1050 
1051 		/* search the unexpected packet queue */
1052 		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1053 		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1054 		struct fi_bgq_mu_packet * prev = NULL;
1055 		struct fi_bgq_mu_packet * uepkt = head;
1056 
1057 		unsigned found_match = 0;
1058 		while (uepkt != NULL) {
1059 
1060 #ifdef FI_BGQ_TRACE
1061 	fprintf(stderr,"process_mfifo_context - searching unexpected queue\n");
1062 	fflush(stderr);
1063 #endif
1064 			if (is_match(uepkt, context, poll_msg)) {
1065 #ifdef FI_BGQ_TRACE
1066 	fprintf(stderr,"process_mfifo_context - found match on unexpected queue\n");
1067 	fflush(stderr);
1068 #endif
1069 
1070 				/* branch will compile out */
1071 				if (poll_msg)
1072 					complete_receive_operation(bgq_ep, uepkt,
1073 						0, context, 0, 0, is_manual_progress);
1074 				else
1075 					complete_receive_operation(bgq_ep, uepkt,
1076 						uepkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
1077 
1078 				/* remove the uepkt from the ue queue */
1079 				if (head == tail) {
1080 					bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1081 					bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1082 				} else if (prev == NULL) {
1083 					bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1084 				} else if (tail == uepkt) {
1085 					bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1086 					prev->next = NULL;
1087 				} else {
1088 					prev->next = uepkt->next;
1089 				}
1090 
1091 				/* ... and prepend the uehdr to the ue free list. */
1092 				uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1093 				bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
1094 
1095 				/* found a match; break from the loop */
1096 				uepkt = NULL;
1097 				found_match = 1;
1098 
1099 			} else {
1100 
1101 				/* a match was not found; advance to the next ue header */
1102 				prev = uepkt;
1103 				uepkt = uepkt->next;
1104 			}
1105 		}
1106 
1107 		if (!found_match) {
1108 
1109 #ifdef FI_BGQ_TRACE
1110 	fprintf(stderr,"process_mfifo_context - nothing found on unexpected queue adding to match queue for poll_msg %u context->tag is %d context is %p mq addr is %p\n",poll_msg,context->tag,context,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
1111 	fflush(stderr);
1112 #endif
1113 			/*
1114 			 * no unexpected headers were matched; add this match
1115 			 * information to the appropriate match queue
1116 			 */
1117 
1118 			union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1119 
1120 			context->next = NULL;
1121 			if (tail == NULL) {
1122 				bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
1123 			} else {
1124 				tail->next = context;
1125 			}
1126 			bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
1127 		}
1128 
1129 	} else if (rx_op_flags & FI_PEEK) {	/* unlikely */
1130 
1131 		/* search the unexpected packet queue */
1132 		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1133 		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1134 		struct fi_bgq_mu_packet * prev = NULL;
1135 		struct fi_bgq_mu_packet * uepkt = head;
1136 
1137 #ifdef FI_BGQ_TRACE
1138 	fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_PEEK searching unexpected queue\n");
1139 	if (uepkt == NULL)
1140 		fprintf(stderr,"uepkt == NULL\n");
1141 	else
1142 		fprintf(stderr,"uepkt != NULL\n");
1143 
1144 	fflush(stderr);
1145 #endif
1146 		unsigned found_match = 0;
1147 		while (uepkt != NULL) {
1148 
1149 #ifdef FI_BGQ_TRACE
1150 	fprintf(stderr,"process_mfifo_context uepkt != NULL - rx_op_flags & FI_PEEK searching unexpected queue\n");
1151 	fflush(stderr);
1152 #endif
1153 			if (is_match(uepkt, context, poll_msg)) {
1154 
1155 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
1156 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
1157 					const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
1158 					uint64_t len = 0;
1159 					unsigned i;
1160 					for (i=0; i<niov; ++i) len += uepkt->payload.rendezvous.mu_iov[i].message_length;
1161 					context->len = len;
1162 				} else {	/* "eager" or "eager with completion" packet type */
1163 					context->len = uepkt->hdr.pt2pt.send.message_length;
1164 				}
1165 				context->tag = poll_msg ? 0 : uepkt->hdr.pt2pt.ofi_tag;
1166 				context->byte_counter = 0;
1167 
1168 				if (rx_op_flags & FI_CLAIM) { /* both FI_PEEK and FI_CLAIM were specified */
1169 					assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
1170 
1171 					context->claim = uepkt;
1172 
1173 					/* remove the uepkt from the ue queue */
1174 					if (head == tail) {
1175 						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1176 						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1177 					} else if (prev == NULL) {
1178 						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1179 					} else if (tail == uepkt) {
1180 						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1181 						prev->next = NULL;
1182 					} else {
1183 						prev->next = uepkt->next;
1184 					}
1185 				}
1186 				/* tranfer immediate data from pkt to context for matching FI_PEEK */
1187 				context->data = uepkt->hdr.pt2pt.immediate_data;
1188 
1189 				/* post a completion event for the receive */
1190 				fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
1191 
1192 				found_match = 1;
1193 				uepkt = NULL;
1194 
1195 			} else {
1196 
1197 				/* a match was not found; advance to the next ue header */
1198 				prev = uepkt;
1199 				uepkt = uepkt->next;
1200 			}
1201 		}
1202 
1203 		if (!found_match) {
1204 
1205 #ifdef FI_BGQ_TRACE
1206 	fprintf(stderr,"didn't find a match for this FI_PEEK\n");
1207 	fflush(stderr);
1208 #endif
1209 			/* did not find a match for this "peek" */
1210 
1211 
1212 			struct fi_bgq_context_ext * ext;
1213 			uint64_t mfifo_value;
1214 			if (is_context_ext) {
1215 				ext = (struct fi_bgq_context_ext *)context;
1216 				mfifo_value = (uint64_t)context >> 3;
1217 			} else {
1218 				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1219 				ext->bgq_context.flags = rx_op_flags | FI_BGQ_CQ_CONTEXT_EXT;
1220 
1221 				mfifo_value = (uint64_t)ext >> 3;
1222 			}
1223 
1224 			ext->err_entry.op_context = context;
1225 			ext->err_entry.flags = rx_op_flags;
1226 			ext->err_entry.len = 0;
1227 			ext->err_entry.buf = 0;
1228 			ext->err_entry.data = 0;
1229 			ext->err_entry.tag = 0;
1230 			ext->err_entry.olen = 0;
1231 			ext->err_entry.err = FI_ENOMSG;
1232 			ext->err_entry.prov_errno = 0;
1233 			ext->err_entry.err_data = NULL;
1234 			ext->bgq_context.byte_counter = 0;
1235 
1236 #ifdef FI_BGQ_TRACE
1237 	fprintf(stderr,"process_mfifo_context -  no match found on unexpected queue posting error\n");
1238 	fflush(stderr);
1239 #endif
1240 			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1241 
1242 		}
1243 
1244 	} else if (rx_op_flags & FI_CLAIM) {	/* unlikely */
1245 		assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
1246 #ifdef FI_BGQ_TRACE
1247 	fprintf(stderr,"process_mfifo_context -  rx_op_flags & FI_CLAIM complete receive operation\n");
1248 #endif
1249 
1250 		/* only FI_CLAIM was specified
1251 		 *
1252 		 * this occurs after a previous FI_PEEK + FI_CLAIM
1253 		 * operation has removed an unexpected packet from
1254 		 * the queue and saved a pointer to it in the context
1255 		 *
1256 		 * complete the receive for this "claimed" message ... */
1257 		struct fi_bgq_mu_packet * claimed_pkt = context->claim;
1258 		if (poll_msg)
1259 			complete_receive_operation(bgq_ep, claimed_pkt,
1260 				0, context, 0, 0, is_manual_progress);
1261 		else
1262 			complete_receive_operation(bgq_ep, claimed_pkt,
1263 				claimed_pkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
1264 
1265 		/* ... and prepend the uehdr to the ue free list. */
1266 		claimed_pkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1267 		bgq_ep->rx.poll.rfifo[poll_msg].ue.free = claimed_pkt;
1268 
1269 	} else if (poll_msg && (rx_op_flags & FI_MULTI_RECV)) {		/* unlikely - branch should compile out for tagged receives */
1270 		/* search the unexpected packet queue */
1271 		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
1272 		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
1273 		struct fi_bgq_mu_packet * prev = NULL;
1274 		struct fi_bgq_mu_packet * uepkt = head;
1275 
1276 		unsigned full_multirecv_buffer = 0;
1277 		while (uepkt != NULL) {
1278 
1279 			if (is_match(uepkt, context, poll_msg)) {
1280 
1281 				/* verify that there is enough space available in
1282 				 * the multi-receive buffer for the incoming data */
1283 				const uint64_t recv_len = context->len;
1284 				const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
1285 				uint64_t send_len = 0;
1286 
1287 				if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
1288 					send_len = uepkt->hdr.pt2pt.send.message_length;
1289 				} else if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
1290 
1291 					/* This code functionaliy is unverified - exit with an error mesg for now
1292 					 * when we have an mpich case for this we will then verify.
1293 				 	*/
1294 
1295 					fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
1296 					fflush(stderr);
1297 					exit(1);
1298 
1299 					const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
1300 					send_len = uepkt->payload.rendezvous.mu_iov[0].message_length;
1301 					uint64_t i;
1302 					for (i=1; i<niov; ++i) send_len += uepkt->payload.rendezvous.mu_iov[i].message_length;
1303 				}
1304 
1305 				if (send_len > recv_len) {
1306 					/* There is not enough room for the next subcontext multirec.
1307  					 * to preserver the ordering just break off here with whatever
1308  					 * matches are in the buffer and hopefully the next multirecv
1309  					 * has space.
1310  					 */
1311 
1312 					uepkt = NULL;
1313 					full_multirecv_buffer = 1;
1314 					context->byte_counter = 0;
1315 					fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);
1316 
1317 				} else {
1318 					complete_receive_operation(bgq_ep, uepkt,
1319 						0, context, 0, 1, is_manual_progress);
1320 
1321 					/* remove the uepkt from the ue queue */
1322 					if (head == tail) {
1323 						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
1324 						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
1325 					} else if (prev == NULL) {
1326 						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
1327 					} else if (tail == uepkt) {
1328 						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
1329 						prev->next = NULL;
1330 					} else {
1331 						prev->next = uepkt->next;
1332 					}
1333 
1334 					struct fi_bgq_mu_packet *matched_uepkt_next = uepkt->next;
1335 
1336 					/* ... and prepend the uehdr to the ue free list. */
1337 					uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
1338 					bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
1339 
1340 					if (context->len < bgq_ep->rx.poll.min_multi_recv) {
1341 						/* after processing this message there is not
1342 						 * enough space available in the multi-receive
1343 						 * buffer to receive the next message; break
1344 						 * from the loop and post a 'FI_MULTI_RECV'
1345 						 * event to the completion queue. */
1346 						uepkt = NULL;
1347 						full_multirecv_buffer = 1;
1348 
1349 						/* post a completion event for the multi-receive */
1350 						context->byte_counter = 0;
1351 						fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
1352 					}
1353 					else {
1354 						uepkt = matched_uepkt_next;
1355 					}
1356 
1357 				}
1358 
1359 			} else {
1360 
1361 				/* a match was not found; advance to the next ue header */
1362 				prev = uepkt;
1363 				uepkt = uepkt->next;
1364 			}
1365 		}
1366 
1367 		if (!full_multirecv_buffer) {
1368 
1369 			/* The multirecv context has room in its buffer.
1370 			 * Post to match queue for further filling.
1371 			 */
1372 
1373 			union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1374 
1375 			context->next = NULL;
1376 			if (tail == NULL) {
1377 				bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
1378 			} else {
1379 				tail->next = context;
1380 			}
1381 			bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
1382 		}
1383 	}
1384 
1385 	return 0;
1386 }
1387 
1388 
1389 static inline
poll_mfifo(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context,const unsigned is_manual_progress)1390 int poll_mfifo (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg, const uint64_t cancel_context, const unsigned is_manual_progress) {
1391 
1392 #ifdef DEBUG
1393 	if (bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].ue.tail == NULL);
1394 	if (bgq_ep->rx.poll.rfifo[poll_msg].ue.tail == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].ue.head == NULL);
1395 	if (bgq_ep->rx.poll.rfifo[poll_msg].mq.head == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].mq.tail == NULL);
1396 	if (bgq_ep->rx.poll.rfifo[poll_msg].mq.tail == NULL) assert(bgq_ep->rx.poll.rfifo[poll_msg].mq.head == NULL);
1397 #endif
1398 
1399 	/*
1400 	 * attempt to match each new match element from the match fifo with any
1401 	 * unexpected headers and compete the receives; if no match is found,
1402 	 * append the match element to the match queue which will be searched
1403 	 * for a match as each rfifo packet is processed
1404 	 */
1405 	uint64_t mfifo_value;
1406 	struct l2atomic_fifo_consumer * consumer = &bgq_ep->rx.poll.rfifo[poll_msg].match;
1407 	unsigned loop_count = 0;
1408 	while (++loop_count < 16 && l2atomic_fifo_consume(consumer, &mfifo_value) == 0) {
1409 
1410 		union fi_bgq_context * context = (union fi_bgq_context *)(mfifo_value << 3);
1411 		const uint64_t rx_op_flags = context->flags;
1412 		const uint64_t is_context_ext = rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT;
1413 
1414 #ifdef FI_BGQ_TRACE
1415 	fprintf(stderr,"poll_mfifo calling process_mfifo_context\n");
1416 #endif
1417 		process_mfifo_context(bgq_ep, poll_msg, cancel_context,
1418 			context, rx_op_flags, is_context_ext, is_manual_progress);
1419 
1420 	}
1421 
1422 	return 0;
1423 }
1424 
1425 
1426 static inline
cancel_match_queue(struct fi_bgq_ep * bgq_ep,const unsigned poll_msg,const uint64_t cancel_context)1427 int cancel_match_queue (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg, const uint64_t cancel_context) {
1428 
1429 	/* search the match queue */
1430 	union fi_bgq_context * head = bgq_ep->rx.poll.rfifo[poll_msg].mq.head;
1431 	union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
1432 	union fi_bgq_context * context = head;
1433 	union fi_bgq_context * prev = NULL;
1434 	while (context) {
1435 
1436 		const uint64_t is_context_ext = context->flags & FI_BGQ_CQ_CONTEXT_EXT;
1437 		const uint64_t compare_context = is_context_ext ?
1438 			(uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
1439 			(uint64_t)context;
1440 
1441 		if (compare_context == cancel_context) {
1442 
1443 			/* remove the context from the match queue */
1444 			if (context == head)
1445 				bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context->next;
1446 			else
1447 				prev->next = context->next;
1448 
1449 			if (context == tail)
1450 				bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = prev;
1451 
1452 			struct fi_bgq_context_ext * ext;
1453 			if (is_context_ext) {
1454 				ext = (struct fi_bgq_context_ext *)context;
1455 			} else {
1456 				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
1457 				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
1458 			}
1459 
1460 			ext->bgq_context.byte_counter = 0;
1461 			ext->err_entry.op_context = (void *)cancel_context;
1462 			ext->err_entry.flags = context->flags;
1463 			ext->err_entry.len = 0;
1464 			ext->err_entry.buf = 0;
1465 			ext->err_entry.data = 0;
1466 			ext->err_entry.tag = context->tag;
1467 			ext->err_entry.olen = 0;
1468 			ext->err_entry.err = FI_ECANCELED;
1469 			ext->err_entry.prov_errno = 0;
1470 			ext->err_entry.err_data = NULL;
1471 
1472 			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
1473 
1474 			return FI_ECANCELED;
1475 		}
1476 		else
1477 			prev = context;
1478 		context = context->next;
1479 	}
1480 
1481 	return 0;
1482 }
1483 
1484 static inline
poll_cfifo(struct fi_bgq_ep * bgq_ep,const unsigned is_manual_progress)1485 void poll_cfifo (struct fi_bgq_ep * bgq_ep, const unsigned is_manual_progress) {	/* TODO - make no inline */
1486 
1487 	struct l2atomic_fifo_consumer * consumer = &bgq_ep->rx.poll.control;
1488 	uint64_t value = 0;
1489 	if (l2atomic_fifo_consume(consumer, &value) == 0) {
1490 
1491 		const unsigned poll_fi_msg = bgq_ep->rx.caps & FI_MSG;
1492 		const unsigned poll_fi_tag = bgq_ep->rx.caps & FI_TAGGED;
1493 
1494 		/* const uint64_t flags = value & 0xE000000000000000ull; -- currently not used */
1495 		const uint64_t cancel_context = value << 3;
1496 
1497 		if (poll_fi_msg && poll_fi_tag) {
1498 			if (FI_ECANCELED != cancel_match_queue(bgq_ep, 0, cancel_context)) {
1499 				if (FI_ECANCELED != poll_mfifo(bgq_ep, 0, cancel_context, is_manual_progress)) {
1500 
1501 					if (FI_ECANCELED != cancel_match_queue(bgq_ep, 1, cancel_context)) {
1502 						if (FI_ECANCELED != poll_mfifo(bgq_ep, 1, cancel_context, is_manual_progress)) {
1503 							/* did not find a match */
1504 						}
1505 					}
1506 				}
1507 			}
1508 		} else if (poll_fi_msg) {
1509 			if (FI_ECANCELED != cancel_match_queue(bgq_ep, 1, cancel_context)) {
1510 				if (FI_ECANCELED != poll_mfifo(bgq_ep, 1, cancel_context, is_manual_progress)) {
1511 					/* did not find a match */
1512 				}
1513 			}
1514 		} else if (poll_fi_tag) {
1515 			if (FI_ECANCELED != cancel_match_queue(bgq_ep, 0, cancel_context)) {
1516 				if (FI_ECANCELED != poll_mfifo(bgq_ep, 0, cancel_context, is_manual_progress)) {
1517 					/* did not find a match */
1518 				}
1519 			}
1520 		}
1521 	}
1522 }
1523 
1524 static inline
poll_rx(struct fi_bgq_ep * bgq_ep,const unsigned poll_fi_msg,const unsigned poll_fi_tag)1525 void poll_rx (struct fi_bgq_ep * bgq_ep,
1526 		const unsigned poll_fi_msg,
1527 		const unsigned poll_fi_tag) {
1528 
1529 	volatile uint64_t * async_is_enabled = &bgq_ep->async.enabled;
1530 	while (L2_AtomicLoad(async_is_enabled)) {
1531 		unsigned loop_count = 64;
1532 		do {
1533 			if (poll_fi_msg) {
1534 				poll_mfifo(bgq_ep, 1, 0, 0);
1535 				poll_rfifo(bgq_ep, 0);
1536 			}
1537 			if (poll_fi_tag) {
1538 				poll_mfifo(bgq_ep, 0, 0, 0);
1539 				poll_rfifo(bgq_ep, 0);
1540 			}
1541 		} while (--loop_count);
1542 
1543 		poll_cfifo(bgq_ep, 0);
1544 	}
1545 }
1546 
1547 static inline
poll_fn(void * arg)1548 void * poll_fn (void *arg) {
1549 //fprintf(stderr, "%s:%s():%d .... arg = %p\n", __FILE__, __func__, __LINE__, arg);
1550 	struct fi_bgq_ep * bgq_ep = (struct fi_bgq_ep *) arg;
1551 
1552 	volatile uint64_t * async_is_active = &bgq_ep->async.active;
1553 	L2_AtomicStore(async_is_active, 1);
1554 
1555 	uint64_t rx_caps = bgq_ep->rx.caps & (FI_MSG | FI_TAGGED);
1556 
1557 	if (rx_caps == (FI_MSG | FI_TAGGED)) {
1558 		poll_rx(bgq_ep, 1, 1);
1559 	} else if (rx_caps == FI_MSG) {
1560 		poll_rx(bgq_ep, 1, 0);
1561 	} else if (rx_caps == FI_TAGGED) {
1562 		poll_rx(bgq_ep, 0, 1);
1563 	}
1564 
1565 	L2_AtomicStore(async_is_active, 0);
1566 
1567 	return NULL;
1568 }
1569 
1570 
1571 
1572 #endif /* _FI_PROV_BGQ_RX_H_ */
1573