1 /*
2  * Copyright (c) 2013-2015 Intel Corporation, Inc.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include "config.h"
34 
35 #include <stdint.h>
36 #include <ofi_mem.h>
37 
38 #include "fi_verbs.h"
39 
vrb_cq_read_context_entry(struct ibv_wc * wc,void * buf)40 static void vrb_cq_read_context_entry(struct ibv_wc *wc, void *buf)
41 {
42 	struct fi_cq_entry *entry = buf;
43 
44 	entry->op_context = (void *) (uintptr_t) wc->wr_id;
45 }
46 
vrb_cq_read_msg_entry(struct ibv_wc * wc,void * buf)47 static void vrb_cq_read_msg_entry(struct ibv_wc *wc, void *buf)
48 {
49 	struct fi_cq_msg_entry *entry = buf;
50 
51 	entry->op_context = (void *) (uintptr_t) wc->wr_id;
52 
53 	switch (wc->opcode) {
54 	case IBV_WC_SEND:
55 		entry->flags = (FI_SEND | FI_MSG);
56 		break;
57 	case IBV_WC_RDMA_WRITE:
58 		entry->flags = (FI_RMA | FI_WRITE);
59 		break;
60 	case IBV_WC_RDMA_READ:
61 		entry->flags = (FI_RMA | FI_READ);
62 		break;
63 	case IBV_WC_COMP_SWAP:
64 		entry->flags = FI_ATOMIC;
65 		break;
66 	case IBV_WC_FETCH_ADD:
67 		entry->flags = FI_ATOMIC;
68 		break;
69 	case IBV_WC_RECV:
70 		entry->len = wc->byte_len;
71 		entry->flags = (FI_RECV | FI_MSG);
72 		break;
73 	case IBV_WC_RECV_RDMA_WITH_IMM:
74 		entry->len = wc->byte_len;
75 		entry->flags = (FI_RMA | FI_REMOTE_WRITE);
76 		break;
77 	default:
78 		break;
79 	}
80 }
81 
vrb_cq_read_data_entry(struct ibv_wc * wc,void * buf)82 static void vrb_cq_read_data_entry(struct ibv_wc *wc, void *buf)
83 {
84 	struct fi_cq_data_entry *entry = buf;
85 
86 	/* fi_cq_data_entry can cast to fi_cq_msg_entry */
87 	vrb_cq_read_msg_entry(wc, buf);
88 	if ((wc->wc_flags & IBV_WC_WITH_IMM) &&
89 	    (wc->opcode & IBV_WC_RECV)) {
90 		entry->data = ntohl(wc->imm_data);
91 		entry->flags |= FI_REMOTE_CQ_DATA;
92 	}
93 }
94 
95 static ssize_t
vrb_cq_readerr(struct fid_cq * cq_fid,struct fi_cq_err_entry * entry,uint64_t flags)96 vrb_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry,
97 		  uint64_t flags)
98 {
99 	struct vrb_cq *cq;
100 	struct vrb_wc_entry *wce;
101 	struct slist_entry *slist_entry;
102 	uint32_t api_version;
103 
104 	cq = container_of(cq_fid, struct vrb_cq, util_cq.cq_fid);
105 
106 	cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
107 	if (slist_empty(&cq->saved_wc_list))
108 		goto err;
109 
110 	wce = container_of(cq->saved_wc_list.head, struct vrb_wc_entry, entry);
111 	if (!wce->wc.status)
112 		goto err;
113 
114 	api_version = cq->util_cq.domain->fabric->fabric_fid.api_version;
115 
116 	slist_entry = slist_remove_head(&cq->saved_wc_list);
117 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
118 
119 	wce = container_of(slist_entry, struct vrb_wc_entry, entry);
120 
121 	entry->op_context = (void *)(uintptr_t)wce->wc.wr_id;
122 	entry->prov_errno = wce->wc.status;
123 	if (wce->wc.status == IBV_WC_WR_FLUSH_ERR)
124 		entry->err = FI_ECANCELED;
125 	else
126 		entry->err = EIO;
127 
128 	/* fi_cq_err_entry can cast to fi_cq_data_entry */
129 	vrb_cq_read_data_entry(&wce->wc, (void *) entry);
130 
131 	if ((FI_VERSION_GE(api_version, FI_VERSION(1, 5))) &&
132 		entry->err_data && entry->err_data_size) {
133 		entry->err_data_size = MIN(entry->err_data_size,
134 			sizeof(wce->wc.vendor_err));
135 		memcpy(entry->err_data, &wce->wc.vendor_err, entry->err_data_size);
136 	} else {
137 		memcpy(&entry->err_data, &wce->wc.vendor_err,
138 			sizeof(wce->wc.vendor_err));
139 	}
140 
141 	ofi_buf_free(wce);
142 	return 1;
143 err:
144 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
145 	return -FI_EAGAIN;
146 }
147 
148 static inline int
vrb_poll_events(struct vrb_cq * _cq,int timeout)149 vrb_poll_events(struct vrb_cq *_cq, int timeout)
150 {
151 	int ret, rc;
152 	void *context;
153 	struct pollfd fds[2];
154 	char data;
155 
156 	fds[0].fd = _cq->channel->fd;
157 	fds[1].fd = _cq->signal_fd[0];
158 
159 	fds[0].events = fds[1].events = POLLIN;
160 
161 	rc = poll(fds, 2, timeout);
162 	if (rc == 0)
163 		return -FI_EAGAIN;
164 	else if (rc < 0)
165 		return -errno;
166 
167 	if (fds[0].revents & POLLIN) {
168 		ret = ibv_get_cq_event(_cq->channel, &_cq->cq, &context);
169 		if (ret)
170 			return ret;
171 
172 		ofi_atomic_inc32(&_cq->nevents);
173 		rc--;
174 	}
175 	if (fds[1].revents & POLLIN) {
176 		do {
177 			ret = read(fds[1].fd, &data, 1);
178 		} while (ret > 0);
179 		ret = -FI_EAGAIN;
180 		rc--;
181 	}
182 	if (rc) {
183 		VERBS_WARN(FI_LOG_CQ, "Unknown poll error: check revents\n");
184 		return -FI_EOTHER;
185 	}
186 
187 	return ret;
188 }
189 
190 static ssize_t
vrb_cq_sread(struct fid_cq * cq,void * buf,size_t count,const void * cond,int timeout)191 vrb_cq_sread(struct fid_cq *cq, void *buf, size_t count, const void *cond,
192 		int timeout)
193 {
194 	ssize_t ret = 0, cur;
195 	ssize_t  threshold;
196 	struct vrb_cq *_cq;
197 	uint8_t *p;
198 
199 	p = buf;
200 	_cq = container_of(cq, struct vrb_cq, util_cq.cq_fid);
201 
202 	if (!_cq->channel)
203 		return -FI_ENOSYS;
204 
205 	threshold = (_cq->wait_cond == FI_CQ_COND_THRESHOLD) ?
206 		MIN((ssize_t) cond, count) : 1;
207 
208 	for (cur = 0; cur < threshold; ) {
209 		if (vrb_cq_trywait(_cq) == FI_SUCCESS) {
210 			ret = vrb_poll_events(_cq, timeout);
211 			if (ret)
212 				break;
213 		}
214 
215 		ret = _cq->util_cq.cq_fid.ops->read(&_cq->util_cq.cq_fid, p, count - cur);
216 		if (ret > 0) {
217 			p += ret * _cq->entry_size;
218 			cur += ret;
219 			if (cur >= threshold)
220 				break;
221 		} else if (ret != -FI_EAGAIN) {
222 			break;
223 		}
224 	}
225 
226 	return cur ? cur : ret;
227 }
228 
229 /* Must be called with CQ lock held. */
vrb_poll_cq(struct vrb_cq * cq,struct ibv_wc * wc)230 int vrb_poll_cq(struct vrb_cq *cq, struct ibv_wc *wc)
231 {
232 	struct vrb_context *ctx;
233 	int ret;
234 
235 	do {
236 		ret = ibv_poll_cq(cq->cq, 1, wc);
237 		if (ret <= 0)
238 			break;
239 
240 		ctx = (struct vrb_context *) (uintptr_t) wc->wr_id;
241 		wc->wr_id = (uintptr_t) ctx->user_ctx;
242 		if (ctx->flags & FI_TRANSMIT) {
243 			cq->credits++;
244 			ctx->ep->tx_credits++;
245 		}
246 
247 		if (wc->status) {
248 			if (ctx->flags & FI_RECV)
249 				wc->opcode |= IBV_WC_RECV;
250 			else
251 				wc->opcode &= ~IBV_WC_RECV;
252 		}
253 		if (ctx->srx) {
254 			fastlock_acquire(&ctx->srx->ctx_lock);
255 			ofi_buf_free(ctx);
256 			fastlock_release(&ctx->srx->ctx_lock);
257 		} else {
258 			ofi_buf_free(ctx);
259 		}
260 
261 	} while (wc->wr_id == VERBS_NO_COMP_FLAG);
262 
263 	return ret;
264 }
265 
266 /* Must be called with CQ lock held. */
vrb_save_wc(struct vrb_cq * cq,struct ibv_wc * wc)267 int vrb_save_wc(struct vrb_cq *cq, struct ibv_wc *wc)
268 {
269 	struct vrb_wc_entry *wce;
270 
271 	wce = ofi_buf_alloc(cq->wce_pool);
272 	if (!wce) {
273 		FI_WARN(&vrb_prov, FI_LOG_CQ,
274 			"Unable to save completion, completion lost!\n");
275 		return -FI_ENOMEM;
276 	}
277 
278 	wce->wc = *wc;
279 	slist_insert_tail(&wce->entry, &cq->saved_wc_list);
280 	return FI_SUCCESS;
281 }
282 
vrb_flush_cq(struct vrb_cq * cq)283 static void vrb_flush_cq(struct vrb_cq *cq)
284 {
285 	struct ibv_wc wc;
286 	ssize_t ret;
287 
288 	cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
289 	while (1) {
290 		ret = vrb_poll_cq(cq, &wc);
291 		if (ret <= 0)
292 			break;
293 
294 		vrb_save_wc(cq, &wc);
295 	};
296 
297 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
298 }
299 
vrb_cleanup_cq(struct vrb_ep * ep)300 void vrb_cleanup_cq(struct vrb_ep *ep)
301 {
302 	if (ep->util_ep.rx_cq) {
303 		vrb_flush_cq(container_of(ep->util_ep.rx_cq,
304 					  struct vrb_cq, util_cq));
305 	}
306 	if (ep->util_ep.tx_cq) {
307 		vrb_flush_cq(container_of(ep->util_ep.tx_cq,
308 					  struct vrb_cq, util_cq));
309 	}
310 }
311 
vrb_cq_read(struct fid_cq * cq_fid,void * buf,size_t count)312 static ssize_t vrb_cq_read(struct fid_cq *cq_fid, void *buf, size_t count)
313 {
314 	struct vrb_cq *cq;
315 	struct vrb_wc_entry *wce;
316 	struct slist_entry *entry;
317 	struct ibv_wc wc;
318 	ssize_t ret = 0, i;
319 
320 	cq = container_of(cq_fid, struct vrb_cq, util_cq.cq_fid);
321 
322 	cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
323 
324 	for (i = 0; i < count; i++) {
325 		if (!slist_empty(&cq->saved_wc_list)) {
326 			wce = container_of(cq->saved_wc_list.head,
327 					   struct vrb_wc_entry, entry);
328 			if (wce->wc.status) {
329 				ret = -FI_EAVAIL;
330 				break;
331 			}
332 			entry = slist_remove_head(&cq->saved_wc_list);
333 			wce = container_of(entry, struct vrb_wc_entry, entry);
334 			cq->read_entry(&wce->wc, (char *) buf + i * cq->entry_size);
335 			ofi_buf_free(wce);
336 			continue;
337 		}
338 
339 		ret = vrb_poll_cq(cq, &wc);
340 		if (ret <= 0)
341 			break;
342 
343 		if (wc.status) {
344 			wce = ofi_buf_alloc(cq->wce_pool);
345 			if (!wce) {
346 				cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
347 				return -FI_ENOMEM;
348 			}
349 			memset(wce, 0, sizeof(*wce));
350 			memcpy(&wce->wc, &wc, sizeof wc);
351 			slist_insert_tail(&wce->entry, &cq->saved_wc_list);
352 			ret = -FI_EAVAIL;
353 			break;
354 		}
355 
356 		cq->read_entry(&wc, (char *)buf + i * cq->entry_size);
357 	}
358 
359 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
360 	return i ? i : (ret < 0 ? ret : -FI_EAGAIN);
361 }
362 
363 static const char *
vrb_cq_strerror(struct fid_cq * eq,int prov_errno,const void * err_data,char * buf,size_t len)364 vrb_cq_strerror(struct fid_cq *eq, int prov_errno, const void *err_data,
365 		   char *buf, size_t len)
366 {
367 	if (buf && len)
368 		strncpy(buf, ibv_wc_status_str(prov_errno), len);
369 	return ibv_wc_status_str(prov_errno);
370 }
371 
vrb_cq_signal(struct fid_cq * cq)372 int vrb_cq_signal(struct fid_cq *cq)
373 {
374 	struct vrb_cq *_cq;
375 	char data = '0';
376 
377 	_cq = container_of(cq, struct vrb_cq, util_cq.cq_fid);
378 
379 	if (write(_cq->signal_fd[1], &data, 1) != 1) {
380 		VERBS_WARN(FI_LOG_CQ, "Error signalling CQ\n");
381 		return -errno;
382 	}
383 
384 	return 0;
385 }
386 
vrb_cq_trywait(struct vrb_cq * cq)387 int vrb_cq_trywait(struct vrb_cq *cq)
388 {
389 	struct ibv_wc wc;
390 	void *context;
391 	int ret = -FI_EAGAIN, rc;
392 
393 	if (!cq->channel) {
394 		VERBS_WARN(FI_LOG_CQ, "No wait object object associated with CQ\n");
395 		return -FI_EINVAL;
396 	}
397 
398 	cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
399 	if (!slist_empty(&cq->saved_wc_list))
400 		goto out;
401 
402 	rc = vrb_poll_cq(cq, &wc);
403 	if (rc) {
404 		if (rc > 0)
405 			vrb_save_wc(cq, &wc);
406 		goto out;
407 	}
408 
409 	while (!ibv_get_cq_event(cq->channel, &cq->cq, &context))
410 		ofi_atomic_inc32(&cq->nevents);
411 
412 	rc = ibv_req_notify_cq(cq->cq, 0);
413 	if (rc) {
414 		VERBS_WARN(FI_LOG_CQ, "ibv_req_notify_cq error: %d\n", ret);
415 		ret = -errno;
416 		goto out;
417 	}
418 
419 	/* Read again to fetch any completions that we might have missed
420 	 * while rearming */
421 	rc = vrb_poll_cq(cq, &wc);
422 	if (rc) {
423 		if (rc > 0)
424 			vrb_save_wc(cq, &wc);
425 		goto out;
426 	}
427 
428 	ret = FI_SUCCESS;
429 out:
430 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
431 	return ret;
432 }
433 
434 static struct fi_ops_cq vrb_cq_ops = {
435 	.size = sizeof(struct fi_ops_cq),
436 	.read = vrb_cq_read,
437 	.readfrom = fi_no_cq_readfrom,
438 	.readerr = vrb_cq_readerr,
439 	.sread = vrb_cq_sread,
440 	.sreadfrom = fi_no_cq_sreadfrom,
441 	.signal = vrb_cq_signal,
442 	.strerror = vrb_cq_strerror
443 };
444 
vrb_cq_control(fid_t fid,int command,void * arg)445 static int vrb_cq_control(fid_t fid, int command, void *arg)
446 {
447 	struct fi_wait_pollfd *pollfd;
448 	struct vrb_cq *cq;
449 	int ret;
450 
451 	cq = container_of(fid, struct vrb_cq, util_cq.cq_fid);
452 	switch(command) {
453 	case FI_GETWAIT:
454 		if (!cq->channel) {
455 			ret = -FI_ENODATA;
456 			break;
457 		}
458 
459 		if (cq->wait_obj == FI_WAIT_FD) {
460 			*(int *) arg = cq->channel->fd;
461 			return 0;
462 		}
463 
464 		pollfd = arg;
465 		if (pollfd->nfds >= 1) {
466 			pollfd->fd[0].fd = cq->channel->fd;
467 			pollfd->fd[0].events = POLLIN;
468 			ret = 0;
469 		} else {
470 			ret = -FI_ETOOSMALL;
471 		}
472 		pollfd->nfds = 1;
473 		break;
474 	case FI_GETWAITOBJ:
475 		*(enum fi_wait_obj *) arg = cq->wait_obj;
476 		ret = 0;
477 		break;
478 	default:
479 		ret = -FI_ENOSYS;
480 		break;
481 	}
482 
483 	return ret;
484 }
485 
vrb_cq_close(fid_t fid)486 static int vrb_cq_close(fid_t fid)
487 {
488 	struct vrb_wc_entry *wce;
489 	struct slist_entry *entry;
490 	int ret;
491 	struct vrb_cq *cq =
492 		container_of(fid, struct vrb_cq, util_cq.cq_fid);
493 	struct vrb_srq_ep *srq_ep;
494 	struct dlist_entry *srq_ep_temp;
495 
496 	if (ofi_atomic_get32(&cq->nevents))
497 		ibv_ack_cq_events(cq->cq, ofi_atomic_get32(&cq->nevents));
498 
499 	/* Since an RX CQ and SRX context can be destroyed in any order,
500 	 * and the XRC SRQ references the RX CQ, we must destroy any
501 	 * XRC SRQ using this CQ before destroying the CQ. */
502 	fastlock_acquire(&cq->xrc.srq_list_lock);
503 	dlist_foreach_container_safe(&cq->xrc.srq_list, struct vrb_srq_ep,
504 				     srq_ep, xrc.srq_entry, srq_ep_temp) {
505 		ret = vrb_xrc_close_srq(srq_ep);
506 		if (ret) {
507 			fastlock_release(&cq->xrc.srq_list_lock);
508 			return -ret;
509 		}
510 	}
511 	fastlock_release(&cq->xrc.srq_list_lock);
512 
513 	cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
514 	while (!slist_empty(&cq->saved_wc_list)) {
515 		entry = slist_remove_head(&cq->saved_wc_list);
516 		wce = container_of(entry, struct vrb_wc_entry, entry);
517 		ofi_buf_free(wce);
518 	}
519 	cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
520 
521 	ofi_bufpool_destroy(cq->wce_pool);
522 	ofi_bufpool_destroy(cq->ctx_pool);
523 
524 	if (cq->cq) {
525 		ret = ibv_destroy_cq(cq->cq);
526 		if (ret)
527 			return -ret;
528 	}
529 
530 	if (cq->signal_fd[0]) {
531 		ofi_close_socket(cq->signal_fd[0]);
532 	}
533 	if (cq->signal_fd[1]) {
534 		ofi_close_socket(cq->signal_fd[1]);
535 	}
536 
537 	ofi_cq_cleanup(&cq->util_cq);
538 
539 	if (cq->channel)
540 		ibv_destroy_comp_channel(cq->channel);
541 
542 	fastlock_destroy(&cq->xrc.srq_list_lock);
543 	free(cq);
544 	return 0;
545 }
546 
547 static struct fi_ops vrb_cq_fi_ops = {
548 	.size = sizeof(struct fi_ops),
549 	.close = vrb_cq_close,
550 	.bind = fi_no_bind,
551 	.control = vrb_cq_control,
552 	.ops_open = fi_no_ops_open,
553 };
554 
vrb_util_cq_progress_noop(struct util_cq * cq)555 static void vrb_util_cq_progress_noop(struct util_cq *cq)
556 {
557 	/* This routine shouldn't be called */
558 	assert(0);
559 }
560 
vrb_cq_open(struct fid_domain * domain_fid,struct fi_cq_attr * attr,struct fid_cq ** cq_fid,void * context)561 int vrb_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
562 		   struct fid_cq **cq_fid, void *context)
563 {
564 	struct vrb_cq *cq;
565 	struct vrb_domain *domain =
566 		container_of(domain_fid, struct vrb_domain,
567 			     util_domain.domain_fid);
568 	size_t size;
569 	int ret;
570 	struct fi_cq_attr tmp_attr = *attr;
571 
572 	cq = calloc(1, sizeof(*cq));
573 	if (!cq)
574 		return -FI_ENOMEM;
575 
576 	/* verbs uses its own implementation of wait objects for CQ */
577 	tmp_attr.wait_obj = FI_WAIT_NONE;
578 	ret = ofi_cq_init(&vrb_prov, domain_fid, &tmp_attr, &cq->util_cq,
579 			  vrb_util_cq_progress_noop, context);
580 	if (ret)
581 		goto err1;
582 
583 	switch (attr->wait_obj) {
584 	case FI_WAIT_UNSPEC:
585 		cq->wait_obj = FI_WAIT_FD;
586 		break;
587 	case FI_WAIT_FD:
588 	case FI_WAIT_POLLFD:
589 	case FI_WAIT_NONE:
590 		cq->wait_obj = attr->wait_obj;
591 		break;
592 	default:
593 		ret = -FI_ENOSYS;
594 		goto err4;
595 	}
596 
597 	if (cq->wait_obj != FI_WAIT_NONE) {
598 		cq->channel = ibv_create_comp_channel(domain->verbs);
599 		if (!cq->channel) {
600 			ret = -errno;
601 			VERBS_WARN(FI_LOG_CQ,
602 				   "Unable to create completion channel\n");
603 			goto err2;
604 		}
605 
606 		ret = fi_fd_nonblock(cq->channel->fd);
607 		if (ret)
608 			goto err3;
609 
610 		if (socketpair(AF_UNIX, SOCK_STREAM, 0, cq->signal_fd)) {
611 			ret = -errno;
612 			goto err3;
613 		}
614 
615 		ret = fi_fd_nonblock(cq->signal_fd[0]);
616 		if (ret)
617 			goto err4;
618 	}
619 
620 	size = attr->size ? attr->size : VERBS_DEF_CQ_SIZE;
621 
622 	/*
623 	 * Verbs may throw an error if CQ size exceeds ibv_device_attr->max_cqe.
624 	 * OFI doesn't expose CQ size to the apps because it's better to fix the
625 	 * issue in the provider than the app dealing with it. The fix is to
626 	 * open multiple verbs CQs and load balance "MSG EP to CQ binding"* among
627 	 * them to avoid any CQ overflow.
628 	 * Something like:
629 	 * num_qp_per_cq = ibv_device_attr->max_cqe / (qp_send_wr + qp_recv_wr)
630 	 */
631 	cq->cq = ibv_create_cq(domain->verbs, size, cq, cq->channel,
632 			       attr->signaling_vector);
633 	if (!cq->cq) {
634 		ret = -errno;
635 		VERBS_WARN(FI_LOG_CQ, "Unable to create verbs CQ\n");
636 		goto err4;
637 	}
638 
639 	if (cq->channel) {
640 		ret = ibv_req_notify_cq(cq->cq, 0);
641 		if (ret) {
642 			VERBS_WARN(FI_LOG_CQ,
643 				   "ibv_req_notify_cq failed\n");
644 			goto err5;
645 		}
646 	}
647 
648 	ret = ofi_bufpool_create(&cq->wce_pool, sizeof(struct vrb_wc_entry),
649 				16, 0, VERBS_WCE_CNT, 0);
650 	if (ret) {
651 		VERBS_WARN(FI_LOG_CQ, "Failed to create wce_pool\n");
652 		goto err5;
653 	}
654 
655 	cq->flags |= attr->flags;
656 	cq->wait_cond = attr->wait_cond;
657 	/* verbs uses its own ops for CQ */
658 	cq->util_cq.cq_fid.fid.ops = &vrb_cq_fi_ops;
659 	cq->util_cq.cq_fid.ops = &vrb_cq_ops;
660 
661 	switch (attr->format) {
662 	case FI_CQ_FORMAT_UNSPEC:
663 	case FI_CQ_FORMAT_CONTEXT:
664 		cq->read_entry = vrb_cq_read_context_entry;
665 		cq->entry_size = sizeof(struct fi_cq_entry);
666 		break;
667 	case FI_CQ_FORMAT_MSG:
668 		cq->read_entry = vrb_cq_read_msg_entry;
669 		cq->entry_size = sizeof(struct fi_cq_msg_entry);
670 		break;
671 	case FI_CQ_FORMAT_DATA:
672 		cq->read_entry = vrb_cq_read_data_entry;
673 		cq->entry_size = sizeof(struct fi_cq_data_entry);
674 		break;
675 	case FI_CQ_FORMAT_TAGGED:
676 	default:
677 		ret = -FI_ENOSYS;
678 		goto err6;
679 	}
680 
681 	ret = ofi_bufpool_create(&cq->ctx_pool, sizeof(struct fi_context),
682 				 16, 0, size, OFI_BUFPOOL_NO_TRACK);
683 	if (ret)
684 		goto err6;
685 
686 	slist_init(&cq->saved_wc_list);
687 	dlist_init(&cq->xrc.srq_list);
688 	fastlock_init(&cq->xrc.srq_list_lock);
689 
690 	ofi_atomic_initialize32(&cq->nevents, 0);
691 
692 	cq->credits = size;
693 
694 	*cq_fid = &cq->util_cq.cq_fid;
695 	return 0;
696 err6:
697 	ofi_bufpool_destroy(cq->wce_pool);
698 err5:
699 	ibv_destroy_cq(cq->cq);
700 err4:
701 	ofi_close_socket(cq->signal_fd[0]);
702 	ofi_close_socket(cq->signal_fd[1]);
703 err3:
704 	if (cq->channel)
705 		ibv_destroy_comp_channel(cq->channel);
706 err2:
707 	ofi_cq_cleanup(&cq->util_cq);
708 err1:
709 	free(cq);
710 	return ret;
711 }
712