1 /*
2  * Copyright (C) 2016 by Argonne National Laboratory.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "rdma/bgq/fi_bgq.h"
33 #include <ofi_enosys.h>
34 #include <stdlib.h>
35 
36 #include <ofi.h>
37 
38 #include "rdma/bgq/fi_bgq_spi.h"
39 
40 #define FI_BGQ_DEFAULT_CQ_DEPTH (8192)
41 #define FI_BGQ_MAXIMUM_CQ_DEPTH (8192)
42 
43 #define FI_BGQ_L2ATOMIC_ERR_FIFO_DATA_SIZE (512)
44 
45 struct fi_cq_bgq_l2atomic_data {
46 	struct l2atomic_boundedcounter_data	entry_counter;
47 	struct l2atomic_boundedcounter_data	bounded_counter;
48 	struct l2atomic_fifo_data		err_fifo_data;
49 	uint64_t				err_packet[FI_BGQ_L2ATOMIC_ERR_FIFO_DATA_SIZE];
50 	struct l2atomic_fifo_data		std_fifo_data;
51 	uint64_t				std_packet[0];
52 } __attribute((aligned(32)));
53 
fi_bgq_close_cq(fid_t fid)54 static int fi_bgq_close_cq(fid_t fid)
55 {
56 	int ret;
57 	struct fi_bgq_cq *bgq_cq =
58 		container_of(fid, struct fi_bgq_cq, cq_fid);
59 
60 	ret = fi_bgq_fid_check(fid, FI_CLASS_CQ, "completion queue");
61 	if (ret)
62 		return ret;
63 
64 	ret = fi_bgq_ref_dec(&bgq_cq->domain->ref_cnt, "domain");
65 	if (ret)
66 		return ret;
67 
68 	ret = fi_bgq_ref_finalize(&bgq_cq->ref_cnt, "completion queue");
69 	if (ret)
70 		return ret;
71 
72 	free(bgq_cq);
73 
74 	return 0;
75 }
76 
fi_bgq_bind_cq(struct fid * fid,struct fid * bfid,uint64_t flags)77 static int fi_bgq_bind_cq(struct fid *fid, struct fid *bfid,
78 		uint64_t flags)
79 {
80 	errno = FI_ENOSYS;
81 	return -errno;
82 }
83 
fi_bgq_control_cq(fid_t fid,int command,void * arg)84 static int fi_bgq_control_cq(fid_t fid, int command, void *arg)
85 {
86 	errno = FI_ENOSYS;
87 	return -errno;
88 }
89 
fi_bgq_ops_open_cq(struct fid * fid,const char * name,uint64_t flags,void ** ops,void * context)90 static int fi_bgq_ops_open_cq(struct fid *fid, const char *name,
91 		uint64_t flags, void **ops, void *context)
92 {
93 	errno = FI_ENOSYS;
94 	return -errno;
95 }
96 
97 static struct fi_ops fi_bgq_fi_ops = {
98 	.size		= sizeof(struct fi_ops),
99 	.close		= fi_bgq_close_cq,
100 	.bind		= fi_bgq_bind_cq,
101 	.control	= fi_bgq_control_cq,
102 	.ops_open	= fi_bgq_ops_open_cq
103 };
104 
fi_bgq_cq_read(struct fid_cq * cq,void * buf,size_t count)105 static ssize_t fi_bgq_cq_read(struct fid_cq *cq, void *buf, size_t count)
106 {
107 	int lock_required;
108 	int ret;
109 	struct fi_bgq_cq *bgq_cq = container_of(cq, struct fi_bgq_cq, cq_fid);
110 
111 	switch (bgq_cq->domain->threading) {
112 	case FI_THREAD_ENDPOINT:
113 	case FI_THREAD_DOMAIN:
114 		lock_required = 0;
115 	default:
116 		lock_required = 1;
117 	}
118 
119 	ret = fi_bgq_cq_read_generic(cq, buf, count, bgq_cq->format, lock_required);
120 	return ret;
121 }
122 
123 static ssize_t
fi_bgq_cq_readfrom(struct fid_cq * cq,void * buf,size_t count,fi_addr_t * src_addr)124 fi_bgq_cq_readfrom(struct fid_cq *cq, void *buf, size_t count, fi_addr_t *src_addr)
125 {
126 	int lock_required;
127 	int ret;
128 	struct fi_bgq_cq *bgq_cq = container_of(cq, struct fi_bgq_cq, cq_fid);
129 
130 	switch (bgq_cq->domain->threading) {
131 	case FI_THREAD_ENDPOINT:
132 	case FI_THREAD_DOMAIN:
133 		lock_required = 0;
134 		break;
135 	default:
136 		lock_required = 1;
137 		break;
138 	}
139 
140 	ret = fi_bgq_cq_readfrom_generic(cq, buf, count, src_addr, bgq_cq->format, lock_required);
141 	if (ret > 0) {
142 		unsigned n;
143 		for (n=0; n<ret; ++n) src_addr[n] = FI_ADDR_NOTAVAIL;
144 	}
145 
146 	return ret;
147 }
148 
149 static ssize_t
fi_bgq_cq_readerr(struct fid_cq * cq,struct fi_cq_err_entry * buf,uint64_t flags)150 fi_bgq_cq_readerr(struct fid_cq *cq, struct fi_cq_err_entry *buf, uint64_t flags)
151 {
152 	struct fi_bgq_cq *bgq_cq = container_of(cq, struct fi_bgq_cq, cq_fid);
153 
154 	if (FI_BGQ_FABRIC_DIRECT_PROGRESS == FI_PROGRESS_MANUAL) {
155 
156 		struct fi_bgq_context_ext * ext = bgq_cq->err_head;
157 		if (NULL == ext) {
158 			errno = FI_EAGAIN;
159 			return -errno;
160 		}
161 
162 		if (ext->bgq_context.byte_counter != 0) {
163 			/* perhaps an in-progress truncated rendezvous receive? */
164 			errno = FI_EAGAIN;
165 			return -errno;
166 		}
167 
168 		assert(ext->bgq_context.flags & FI_BGQ_CQ_CONTEXT_EXT);	/* DEBUG */
169 
170 		int lock_required = 0;
171 		switch (bgq_cq->domain->threading) {
172 		case FI_THREAD_ENDPOINT:
173 		case FI_THREAD_DOMAIN:
174 			lock_required = 0;
175 			break;
176 		default:
177 			lock_required = 1;
178 			break;
179 		}
180 
181 		int ret;
182 		ret = fi_bgq_lock_if_required(&bgq_cq->lock, lock_required);
183 		if (ret) return ret;
184 
185 		bgq_cq->err_head = (struct fi_bgq_context_ext *)ext->bgq_context.next;
186 		if (NULL == bgq_cq->err_head)
187 			bgq_cq->err_tail = NULL;
188 
189 		*buf = ext->err_entry;
190 		free(ext);
191 
192 		ret = fi_bgq_unlock_if_required(&bgq_cq->lock, lock_required);
193 		if (ret) return ret;
194 
195 	} else {
196 
197 		uint64_t value = 0;
198 		if (l2atomic_fifo_peek(&bgq_cq->err_consumer, &value) != 0) {
199 			errno = FI_EAGAIN;
200 			return -errno;
201 		}
202 
203 		/* const uint64_t flags = value & 0xE000000000000000ull; -- currently not used */
204 
205 		/* convert the fifo value into a context pointer */
206 		struct fi_bgq_context_ext * ext = (struct fi_bgq_context_ext *) (value << 3);
207 
208 		if (ext->bgq_context.byte_counter != 0) {
209 			/* perhaps an in-progress truncated rendezvous receive? */
210 			errno = FI_EAGAIN;
211 			return -errno;
212 		}
213 
214 		assert(ext->bgq_context.flags & FI_BGQ_CQ_CONTEXT_EXT);	/* DEBUG */
215 
216 		*buf = ext->err_entry;
217 		free(ext);
218 
219 		l2atomic_fifo_advance(&bgq_cq->err_consumer);
220 	}
221 
222 	return 1;
223 }
224 
225 static ssize_t
fi_bgq_cq_sread(struct fid_cq * cq,void * buf,size_t len,const void * cond,int timeout)226 fi_bgq_cq_sread(struct fid_cq *cq, void *buf, size_t len, const void *cond, int timeout)
227 {
228 	int lock_required;
229 	struct fi_bgq_cq *bgq_cq = container_of(cq, struct fi_bgq_cq, cq_fid);
230 
231 	switch (bgq_cq->domain->threading) {
232 	case FI_THREAD_ENDPOINT:
233 	case FI_THREAD_DOMAIN:
234 		lock_required = 0;
235 	default:
236 		lock_required = 1;
237 	}
238 
239 	uint64_t timeout_cycles = (timeout < 0) ?
240 		ULLONG_MAX :
241 		GetTimeBase() + (1600UL * 1000 * timeout);
242 	do {
243 		ssize_t count = fi_bgq_cq_read_generic(cq, buf, len, bgq_cq->format, lock_required);
244 		if (count) return count;
245 
246 	} while (GetTimeBase() < timeout_cycles);
247 	errno = FI_EAGAIN;
248 	return -errno;
249 }
250 
251 static ssize_t
fi_bgq_cq_sreadfrom(struct fid_cq * cq,void * buf,size_t len,fi_addr_t * src_addr,const void * cond,int timeout)252 fi_bgq_cq_sreadfrom(struct fid_cq *cq, void *buf, size_t len,
253 		   fi_addr_t *src_addr, const void *cond, int timeout)
254 {
255 	int lock_required;
256 	struct fi_bgq_cq *bgq_cq = container_of(cq, struct fi_bgq_cq, cq_fid);
257 
258 	switch (bgq_cq->domain->threading) {
259 	case FI_THREAD_ENDPOINT:
260 	case FI_THREAD_DOMAIN:
261 		lock_required = 0;
262 	default:
263 		lock_required = 1;
264 	}
265 
266 	uint64_t timeout_cycles = (timeout < 0) ?
267 		ULLONG_MAX :
268 		GetTimeBase() + (1600UL * 1000 * timeout);
269 	do {
270 		ssize_t count = fi_bgq_cq_readfrom_generic(cq, buf, len, src_addr, bgq_cq->format, lock_required);
271 		if (count) return count;
272 
273 	} while (GetTimeBase() < timeout_cycles);
274 	errno = FI_EAGAIN;
275 	return -errno;
276 }
277 
278 static const char *
fi_bgq_cq_strerror(struct fid_cq * cq,int prov_errno,const void * err_data,char * buf,size_t len)279 fi_bgq_cq_strerror(struct fid_cq *cq, int prov_errno, const void *err_data,
280 	       char *buf, size_t len)
281 {
282 	errno = FI_ENOSYS;
283 	return NULL;
284 }
285 
fi_bgq_bind_ep_cq(struct fi_bgq_ep * bgq_ep,struct fi_bgq_cq * bgq_cq,uint64_t flags)286 int fi_bgq_bind_ep_cq(struct fi_bgq_ep *bgq_ep,
287 		struct fi_bgq_cq *bgq_cq, uint64_t flags)
288 {
289 	if (!(flags & (FI_SEND | FI_RECV)))
290 		goto err;
291 
292 	if (flags & FI_SEND) {
293 		fi_bgq_ref_inc(&bgq_cq->ref_cnt, "completion queue");
294 		bgq_ep->send_cq = bgq_cq;
295 		bgq_ep->tx.send.local_completion_model = bgq_cq->local_completion_model;
296 	}
297 	if (flags & FI_RECV) {
298 		fi_bgq_ref_inc(&bgq_cq->ref_cnt, "completion queue");
299 		bgq_ep->recv_cq = bgq_cq;
300 	}
301 	bgq_cq->bflags = flags;
302 
303 	if (FI_CLASS_RX_CTX == bgq_ep->ep_fid.fid.fclass ||
304 			FI_CLASS_EP == bgq_ep->ep_fid.fid.fclass) {
305 		bgq_cq->ep[(bgq_cq->ep_bind_count)++] = bgq_ep;
306 	}
307 
308 	if (ofi_recv_allowed(bgq_ep->rx.caps) || ofi_rma_target_allowed(bgq_ep->rx.caps)) {
309 		bgq_cq->progress.ep[(bgq_cq->progress.ep_count)++] = bgq_ep;
310 	}
311 
312 	return 0;
313 err:
314 	errno = FI_EINVAL;
315 	return -errno;
316 }
317 
fi_bgq_cq_enqueue_err(struct fi_bgq_cq * bgq_cq,struct fi_bgq_context_ext * ext,const int lock_required)318 int fi_bgq_cq_enqueue_err (struct fi_bgq_cq * bgq_cq,
319 		struct fi_bgq_context_ext * ext,
320 		const int lock_required)
321 {
322 	if (FI_BGQ_FABRIC_DIRECT_PROGRESS == FI_PROGRESS_MANUAL) {
323 
324 		int lock_required = 0;
325 		switch (bgq_cq->domain->threading) {
326 		case FI_THREAD_ENDPOINT:
327 		case FI_THREAD_DOMAIN:
328 			lock_required = 0;
329 		default:
330 			lock_required = 0;
331 		}
332 
333 		int ret;
334 		ret = fi_bgq_lock_if_required(&bgq_cq->lock, lock_required);
335 		if (ret) return ret;
336 
337 		struct fi_bgq_context_ext * tail = bgq_cq->err_tail;
338 		if (tail) {
339 			assert(NULL != bgq_cq->err_head);
340 
341 			tail->bgq_context.next = (union fi_bgq_context *)ext;
342 			bgq_cq->err_tail = ext;
343 
344 		} else {
345 			assert(NULL == bgq_cq->err_head);
346 
347 			bgq_cq->err_tail = ext;
348 			bgq_cq->err_head = ext;
349 		}
350 		ext->bgq_context.next = NULL;
351 
352 		ret = fi_bgq_unlock_if_required(&bgq_cq->lock, lock_required);
353 		if (ret) return ret;
354 
355 	} else {
356 
357 		struct l2atomic_fifo_producer * err_producer = &bgq_cq->err_producer;
358 		uint64_t ext_rsh3b = (uint64_t)ext >> 3;
359 		while(0 != l2atomic_fifo_produce(err_producer, ext_rsh3b));
360 	}
361 
362 	return 0;
363 }
364 
365 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_UNSPEC, 0)
366 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_UNSPEC, 1)
367 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_CONTEXT, 0)
368 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_CONTEXT, 1)
369 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_MSG, 0)
370 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_MSG, 1)
371 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_DATA, 0)
372 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_DATA, 1)
373 /* "FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_TAGGED, 0)" is already declared via FABRIC_DIRECT */
374 FI_BGQ_CQ_SPECIALIZED_FUNC(FI_CQ_FORMAT_TAGGED, 1)
375 
376 #define FI_BGQ_CQ_OPS_STRUCT_NAME(FORMAT, LOCK)					\
377   fi_bgq_ops_cq_ ## FORMAT ## _ ## LOCK						\
378 
379 #define FI_BGQ_CQ_OPS_STRUCT(FORMAT, LOCK)					\
380 static struct fi_ops_cq								\
381 	FI_BGQ_CQ_OPS_STRUCT_NAME(FORMAT, LOCK) = {				\
382     .size    = sizeof(struct fi_ops_cq),					\
383     .read      = FI_BGQ_CQ_SPECIALIZED_FUNC_NAME(cq_read, FORMAT, LOCK),	\
384     .readfrom  = FI_BGQ_CQ_SPECIALIZED_FUNC_NAME(cq_readfrom, FORMAT, LOCK),	\
385     .readerr   = fi_bgq_cq_readerr,						\
386     .sread     = fi_bgq_cq_sread,						\
387     .sreadfrom = fi_bgq_cq_sreadfrom,						\
388     .signal    = fi_no_cq_signal,						\
389     .strerror  = fi_bgq_cq_strerror,						\
390 }
391 
392 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_UNSPEC, 0);
393 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_UNSPEC, 1);
394 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_CONTEXT, 0);
395 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_CONTEXT, 1);
396 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_MSG, 0);
397 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_MSG, 1);
398 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_DATA, 0);
399 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_DATA, 1);
400 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_TAGGED, 0);
401 FI_BGQ_CQ_OPS_STRUCT(FI_CQ_FORMAT_TAGGED, 1);
402 
403 
404 static struct fi_ops_cq fi_bgq_ops_cq_default = {
405 	.size		= sizeof(struct fi_ops_cq),
406 	.read		= fi_bgq_cq_read,
407 	.readfrom	= fi_bgq_cq_readfrom,
408 	.readerr	= fi_bgq_cq_readerr,
409 	.signal		= fi_no_cq_signal,
410 	.sread		= fi_bgq_cq_sread,
411 	.sreadfrom	= fi_bgq_cq_sreadfrom,
412 	.strerror	= fi_bgq_cq_strerror
413 };
414 
415 
fi_bgq_cq_open(struct fid_domain * dom,struct fi_cq_attr * attr,struct fid_cq ** cq,void * context)416 int fi_bgq_cq_open(struct fid_domain *dom,
417 		struct fi_cq_attr *attr,
418 		struct fid_cq **cq, void *context)
419 {
420 	int ret;
421 	struct fi_bgq_cq *bgq_cq;
422 	int lock_required;
423 
424 	if (!attr) {
425 		FI_LOG(fi_bgq_global.prov, FI_LOG_DEBUG, FI_LOG_CQ,
426 				"no attr supplied\n");
427 		errno = FI_EINVAL;
428 		return -errno;
429 	}
430 	ret = fi_bgq_fid_check(&dom->fid, FI_CLASS_DOMAIN, "domain");
431 	if (ret)
432 		return ret;
433 
434 	bgq_cq = calloc(1, sizeof(*bgq_cq));
435 	if (!bgq_cq) {
436 		errno = FI_ENOMEM;
437 		goto err;
438 	}
439 
440 	bgq_cq->cq_fid.fid.fclass = FI_CLASS_CQ;
441 	bgq_cq->cq_fid.fid.context= context;
442 	bgq_cq->cq_fid.fid.ops    = &fi_bgq_fi_ops;
443 
444 	bgq_cq->size = attr->size ? attr->size : FI_BGQ_DEFAULT_CQ_DEPTH;
445 
446 	bgq_cq->domain = (struct fi_bgq_domain *) dom;
447 
448 	bgq_cq->format = attr->format ? attr->format : FI_CQ_FORMAT_CONTEXT;
449 
450 	bgq_cq->pending_head = NULL;
451 	bgq_cq->pending_tail = NULL;
452 	bgq_cq->completed_head = NULL;
453 	bgq_cq->completed_tail = NULL;
454 	bgq_cq->err_head = NULL;
455 	bgq_cq->err_tail = NULL;
456 
457 	switch (bgq_cq->domain->threading) {
458 	case FI_THREAD_ENDPOINT:
459 	case FI_THREAD_DOMAIN:
460 	case FI_THREAD_COMPLETION:
461 		lock_required = 0;
462 		break;
463 	case FI_THREAD_FID:
464 	case FI_THREAD_UNSPEC:
465 	case FI_THREAD_SAFE:
466 		lock_required = 1;
467 		break;
468 	default:
469 		errno = FI_EINVAL;
470 	goto err;
471 	}
472 
473 	if (lock_required == 0 &&
474 			bgq_cq->format == FI_CQ_FORMAT_UNSPEC) {
475 		bgq_cq->cq_fid.ops =
476 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_UNSPEC, 0);
477 	} else if (lock_required == 0 &&
478 			bgq_cq->format == FI_CQ_FORMAT_CONTEXT) {
479 		bgq_cq->cq_fid.ops =
480 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_CONTEXT, 0);
481 	} else if (lock_required == 0 &&
482 			bgq_cq->format == FI_CQ_FORMAT_MSG) {
483 		bgq_cq->cq_fid.ops =
484 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_MSG, 0);
485 	} else if (lock_required == 0 &&
486 			bgq_cq->format == FI_CQ_FORMAT_DATA) {
487 		bgq_cq->cq_fid.ops =
488 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_DATA, 0);
489 	} else if (lock_required == 0 &&
490 			bgq_cq->format == FI_CQ_FORMAT_TAGGED) {
491 		bgq_cq->cq_fid.ops =
492 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_TAGGED, 0);
493 	} else if (lock_required == 1 &&
494 			bgq_cq->format == FI_CQ_FORMAT_UNSPEC) {
495 		bgq_cq->cq_fid.ops =
496 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_UNSPEC, 1);
497 	} else if (lock_required == 1 &&
498 			bgq_cq->format == FI_CQ_FORMAT_CONTEXT) {
499 		bgq_cq->cq_fid.ops =
500 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_CONTEXT, 1);
501 	} else if (lock_required == 1 &&
502 			bgq_cq->format == FI_CQ_FORMAT_MSG) {
503 		bgq_cq->cq_fid.ops =
504 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_MSG, 1);
505 	} else if (lock_required == 1 &&
506 			bgq_cq->format == FI_CQ_FORMAT_DATA) {
507 		bgq_cq->cq_fid.ops =
508 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_DATA, 1);
509 	} else if (lock_required == 1 &&
510 			bgq_cq->format == FI_CQ_FORMAT_TAGGED) {
511 		bgq_cq->cq_fid.ops =
512 			&FI_BGQ_CQ_OPS_STRUCT_NAME(FI_CQ_FORMAT_TAGGED, 1);
513 
514 	} else {
515 		bgq_cq->cq_fid.ops =
516 			&fi_bgq_ops_cq_default;
517 	}
518 
519 	/* initialize the 'local completion' direct-put descriptor model */
520 	{
521 		MUHWI_Descriptor_t * desc = &bgq_cq->local_completion_model;
522 		MUSPI_DescriptorZeroOut(desc);
523 
524 		desc->Half_Word0.Prefetch_Only = MUHWI_DESCRIPTOR_PRE_FETCH_ONLY_NO;
525 		desc->Half_Word1.Interrupt = MUHWI_DESCRIPTOR_DO_NOT_INTERRUPT_ON_PACKET_ARRIVAL;
526 		desc->Pa_Payload = 0;				/* specified at injection time */
527 		desc->Message_Length = sizeof(uint64_t);
528 		desc->Torus_FIFO_Map =
529 			MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL0 |
530 			MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL1;
531 		desc->PacketHeader.NetworkHeader.pt2pt.Data_Packet_Type = MUHWI_PT2PT_DATA_PACKET_TYPE;
532 		desc->PacketHeader.NetworkHeader.pt2pt.Hints =
533 			MUHWI_PACKET_HINT_A_NONE |
534 			MUHWI_PACKET_HINT_B_NONE |
535 			MUHWI_PACKET_HINT_C_NONE |
536 			MUHWI_PACKET_HINT_D_NONE;
537 		desc->PacketHeader.NetworkHeader.pt2pt.Byte2.Byte2 = 0;
538 
539 		desc->PacketHeader.NetworkHeader.pt2pt.Byte3.Byte3 =
540 			MUHWI_PACKET_VIRTUAL_CHANNEL_DETERMINISTIC;
541 
542 		desc->PacketHeader.NetworkHeader.pt2pt.Destination.Destination.Destination = 0;	/* not used for local transfers */
543 
544 		desc->PacketHeader.NetworkHeader.pt2pt.Byte8.Byte8 = MUHWI_PACKET_TYPE_PUT;
545 		desc->PacketHeader.NetworkHeader.pt2pt.Byte8.Size = 16;
546 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Payload_Base_Address_Id = FI_BGQ_MU_BAT_ID_COUNTER;
547 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Pacing = MUHWI_PACKET_DIRECT_PUT_IS_NOT_PACED;
548 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Put_Offset_MSB = 0;
549 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Put_Offset_LSB = 0;
550 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Unused1 = 0;
551 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Counter_Base_Address_Id = FI_BGQ_MU_BAT_ID_COUNTER;
552 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Valid_Bytes_In_Payload = 0;
553 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Unused2 = 0;
554 		desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset = 0;
555 	}
556 
557 	/* allocate the 'std' and 'err' l2atomic fifos */
558 	{
559 		struct fi_cq_bgq_l2atomic_data * memptr = NULL;
560 		size_t bytes = sizeof(struct fi_cq_bgq_l2atomic_data) +
561 			sizeof(uint64_t) * bgq_cq->size;
562 		if (posix_memalign((void **)&memptr, 32, bytes)) {
563 			errno = FI_ENOMEM;
564 			goto err;
565 		}
566 		memset((void*)memptr, 0, bytes);
567 		bgq_cq->fifo_memptr = (void*)memptr;
568 
569 		l2atomic_fifo_initialize(&bgq_cq->err_consumer,
570 			&bgq_cq->err_producer,
571 			&memptr->err_fifo_data, FI_BGQ_L2ATOMIC_ERR_FIFO_DATA_SIZE);
572 		l2atomic_fifo_initialize(&bgq_cq->std_consumer,
573 			&bgq_cq->std_producer,
574 			&memptr->std_fifo_data, bgq_cq->size);
575 	};
576 
577 	bgq_cq->ep_bind_count = 0;
578 	bgq_cq->progress.ep_count = 0;
579 	unsigned i;
580 	for (i=0; i<64; ++i) {		/* TODO - check this array size */
581 		bgq_cq->ep[i] = NULL;
582 		bgq_cq->progress.ep[i] = NULL;
583 	}
584 
585 
586 	fi_bgq_ref_init(&bgq_cq->domain->fabric->node, &bgq_cq->ref_cnt, "completion queue");
587 	fi_bgq_ref_inc(&bgq_cq->domain->ref_cnt, "domain");
588 
589 	*cq = &bgq_cq->cq_fid;
590 
591 	return 0;
592 err:
593 	if(bgq_cq)
594 		free(bgq_cq);
595 	return -errno;
596 }
597