1 /*
2 * Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32
33 #include "config.h"
34
35 #include <stdint.h>
36 #include <ofi_mem.h>
37
38 #include "fi_verbs.h"
39
vrb_cq_read_context_entry(struct ibv_wc * wc,void * buf)40 static void vrb_cq_read_context_entry(struct ibv_wc *wc, void *buf)
41 {
42 struct fi_cq_entry *entry = buf;
43
44 entry->op_context = (void *) (uintptr_t) wc->wr_id;
45 }
46
vrb_cq_read_msg_entry(struct ibv_wc * wc,void * buf)47 static void vrb_cq_read_msg_entry(struct ibv_wc *wc, void *buf)
48 {
49 struct fi_cq_msg_entry *entry = buf;
50
51 entry->op_context = (void *) (uintptr_t) wc->wr_id;
52
53 switch (wc->opcode) {
54 case IBV_WC_SEND:
55 entry->flags = (FI_SEND | FI_MSG);
56 break;
57 case IBV_WC_RDMA_WRITE:
58 entry->flags = (FI_RMA | FI_WRITE);
59 break;
60 case IBV_WC_RDMA_READ:
61 entry->flags = (FI_RMA | FI_READ);
62 break;
63 case IBV_WC_COMP_SWAP:
64 entry->flags = FI_ATOMIC;
65 break;
66 case IBV_WC_FETCH_ADD:
67 entry->flags = FI_ATOMIC;
68 break;
69 case IBV_WC_RECV:
70 entry->len = wc->byte_len;
71 entry->flags = (FI_RECV | FI_MSG);
72 break;
73 case IBV_WC_RECV_RDMA_WITH_IMM:
74 entry->len = wc->byte_len;
75 entry->flags = (FI_RMA | FI_REMOTE_WRITE);
76 break;
77 default:
78 break;
79 }
80 }
81
vrb_cq_read_data_entry(struct ibv_wc * wc,void * buf)82 static void vrb_cq_read_data_entry(struct ibv_wc *wc, void *buf)
83 {
84 struct fi_cq_data_entry *entry = buf;
85
86 /* fi_cq_data_entry can cast to fi_cq_msg_entry */
87 vrb_cq_read_msg_entry(wc, buf);
88 if ((wc->wc_flags & IBV_WC_WITH_IMM) &&
89 (wc->opcode & IBV_WC_RECV)) {
90 entry->data = ntohl(wc->imm_data);
91 entry->flags |= FI_REMOTE_CQ_DATA;
92 }
93 }
94
95 static ssize_t
vrb_cq_readerr(struct fid_cq * cq_fid,struct fi_cq_err_entry * entry,uint64_t flags)96 vrb_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry,
97 uint64_t flags)
98 {
99 struct vrb_cq *cq;
100 struct vrb_wc_entry *wce;
101 struct slist_entry *slist_entry;
102 uint32_t api_version;
103
104 cq = container_of(cq_fid, struct vrb_cq, util_cq.cq_fid);
105
106 cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
107 if (slist_empty(&cq->saved_wc_list))
108 goto err;
109
110 wce = container_of(cq->saved_wc_list.head, struct vrb_wc_entry, entry);
111 if (!wce->wc.status)
112 goto err;
113
114 api_version = cq->util_cq.domain->fabric->fabric_fid.api_version;
115
116 slist_entry = slist_remove_head(&cq->saved_wc_list);
117 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
118
119 wce = container_of(slist_entry, struct vrb_wc_entry, entry);
120
121 entry->op_context = (void *)(uintptr_t)wce->wc.wr_id;
122 entry->prov_errno = wce->wc.status;
123 if (wce->wc.status == IBV_WC_WR_FLUSH_ERR)
124 entry->err = FI_ECANCELED;
125 else
126 entry->err = EIO;
127
128 /* fi_cq_err_entry can cast to fi_cq_data_entry */
129 vrb_cq_read_data_entry(&wce->wc, (void *) entry);
130
131 if ((FI_VERSION_GE(api_version, FI_VERSION(1, 5))) &&
132 entry->err_data && entry->err_data_size) {
133 entry->err_data_size = MIN(entry->err_data_size,
134 sizeof(wce->wc.vendor_err));
135 memcpy(entry->err_data, &wce->wc.vendor_err, entry->err_data_size);
136 } else {
137 memcpy(&entry->err_data, &wce->wc.vendor_err,
138 sizeof(wce->wc.vendor_err));
139 }
140
141 ofi_buf_free(wce);
142 return 1;
143 err:
144 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
145 return -FI_EAGAIN;
146 }
147
148 static inline int
vrb_poll_events(struct vrb_cq * _cq,int timeout)149 vrb_poll_events(struct vrb_cq *_cq, int timeout)
150 {
151 int ret, rc;
152 void *context;
153 struct pollfd fds[2];
154 char data;
155
156 fds[0].fd = _cq->channel->fd;
157 fds[1].fd = _cq->signal_fd[0];
158
159 fds[0].events = fds[1].events = POLLIN;
160
161 rc = poll(fds, 2, timeout);
162 if (rc == 0)
163 return -FI_EAGAIN;
164 else if (rc < 0)
165 return -errno;
166
167 if (fds[0].revents & POLLIN) {
168 ret = ibv_get_cq_event(_cq->channel, &_cq->cq, &context);
169 if (ret)
170 return ret;
171
172 ofi_atomic_inc32(&_cq->nevents);
173 rc--;
174 }
175 if (fds[1].revents & POLLIN) {
176 do {
177 ret = read(fds[1].fd, &data, 1);
178 } while (ret > 0);
179 ret = -FI_EAGAIN;
180 rc--;
181 }
182 if (rc) {
183 VERBS_WARN(FI_LOG_CQ, "Unknown poll error: check revents\n");
184 return -FI_EOTHER;
185 }
186
187 return ret;
188 }
189
190 static ssize_t
vrb_cq_sread(struct fid_cq * cq,void * buf,size_t count,const void * cond,int timeout)191 vrb_cq_sread(struct fid_cq *cq, void *buf, size_t count, const void *cond,
192 int timeout)
193 {
194 ssize_t ret = 0, cur;
195 ssize_t threshold;
196 struct vrb_cq *_cq;
197 uint8_t *p;
198
199 p = buf;
200 _cq = container_of(cq, struct vrb_cq, util_cq.cq_fid);
201
202 if (!_cq->channel)
203 return -FI_ENOSYS;
204
205 threshold = (_cq->wait_cond == FI_CQ_COND_THRESHOLD) ?
206 MIN((ssize_t) cond, count) : 1;
207
208 for (cur = 0; cur < threshold; ) {
209 if (vrb_cq_trywait(_cq) == FI_SUCCESS) {
210 ret = vrb_poll_events(_cq, timeout);
211 if (ret)
212 break;
213 }
214
215 ret = _cq->util_cq.cq_fid.ops->read(&_cq->util_cq.cq_fid, p, count - cur);
216 if (ret > 0) {
217 p += ret * _cq->entry_size;
218 cur += ret;
219 if (cur >= threshold)
220 break;
221 } else if (ret != -FI_EAGAIN) {
222 break;
223 }
224 }
225
226 return cur ? cur : ret;
227 }
228
229 /* Must be called with CQ lock held. */
vrb_poll_cq(struct vrb_cq * cq,struct ibv_wc * wc)230 int vrb_poll_cq(struct vrb_cq *cq, struct ibv_wc *wc)
231 {
232 struct vrb_context *ctx;
233 int ret;
234
235 do {
236 ret = ibv_poll_cq(cq->cq, 1, wc);
237 if (ret <= 0)
238 break;
239
240 ctx = (struct vrb_context *) (uintptr_t) wc->wr_id;
241 wc->wr_id = (uintptr_t) ctx->user_ctx;
242 if (ctx->flags & FI_TRANSMIT) {
243 cq->credits++;
244 ctx->ep->tx_credits++;
245 }
246
247 if (wc->status) {
248 if (ctx->flags & FI_RECV)
249 wc->opcode |= IBV_WC_RECV;
250 else
251 wc->opcode &= ~IBV_WC_RECV;
252 }
253 if (ctx->srx) {
254 fastlock_acquire(&ctx->srx->ctx_lock);
255 ofi_buf_free(ctx);
256 fastlock_release(&ctx->srx->ctx_lock);
257 } else {
258 ofi_buf_free(ctx);
259 }
260
261 } while (wc->wr_id == VERBS_NO_COMP_FLAG);
262
263 return ret;
264 }
265
266 /* Must be called with CQ lock held. */
vrb_save_wc(struct vrb_cq * cq,struct ibv_wc * wc)267 int vrb_save_wc(struct vrb_cq *cq, struct ibv_wc *wc)
268 {
269 struct vrb_wc_entry *wce;
270
271 wce = ofi_buf_alloc(cq->wce_pool);
272 if (!wce) {
273 FI_WARN(&vrb_prov, FI_LOG_CQ,
274 "Unable to save completion, completion lost!\n");
275 return -FI_ENOMEM;
276 }
277
278 wce->wc = *wc;
279 slist_insert_tail(&wce->entry, &cq->saved_wc_list);
280 return FI_SUCCESS;
281 }
282
vrb_flush_cq(struct vrb_cq * cq)283 static void vrb_flush_cq(struct vrb_cq *cq)
284 {
285 struct ibv_wc wc;
286 ssize_t ret;
287
288 cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
289 while (1) {
290 ret = vrb_poll_cq(cq, &wc);
291 if (ret <= 0)
292 break;
293
294 vrb_save_wc(cq, &wc);
295 };
296
297 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
298 }
299
vrb_cleanup_cq(struct vrb_ep * ep)300 void vrb_cleanup_cq(struct vrb_ep *ep)
301 {
302 if (ep->util_ep.rx_cq) {
303 vrb_flush_cq(container_of(ep->util_ep.rx_cq,
304 struct vrb_cq, util_cq));
305 }
306 if (ep->util_ep.tx_cq) {
307 vrb_flush_cq(container_of(ep->util_ep.tx_cq,
308 struct vrb_cq, util_cq));
309 }
310 }
311
vrb_cq_read(struct fid_cq * cq_fid,void * buf,size_t count)312 static ssize_t vrb_cq_read(struct fid_cq *cq_fid, void *buf, size_t count)
313 {
314 struct vrb_cq *cq;
315 struct vrb_wc_entry *wce;
316 struct slist_entry *entry;
317 struct ibv_wc wc;
318 ssize_t ret = 0, i;
319
320 cq = container_of(cq_fid, struct vrb_cq, util_cq.cq_fid);
321
322 cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
323
324 for (i = 0; i < count; i++) {
325 if (!slist_empty(&cq->saved_wc_list)) {
326 wce = container_of(cq->saved_wc_list.head,
327 struct vrb_wc_entry, entry);
328 if (wce->wc.status) {
329 ret = -FI_EAVAIL;
330 break;
331 }
332 entry = slist_remove_head(&cq->saved_wc_list);
333 wce = container_of(entry, struct vrb_wc_entry, entry);
334 cq->read_entry(&wce->wc, (char *) buf + i * cq->entry_size);
335 ofi_buf_free(wce);
336 continue;
337 }
338
339 ret = vrb_poll_cq(cq, &wc);
340 if (ret <= 0)
341 break;
342
343 if (wc.status) {
344 wce = ofi_buf_alloc(cq->wce_pool);
345 if (!wce) {
346 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
347 return -FI_ENOMEM;
348 }
349 memset(wce, 0, sizeof(*wce));
350 memcpy(&wce->wc, &wc, sizeof wc);
351 slist_insert_tail(&wce->entry, &cq->saved_wc_list);
352 ret = -FI_EAVAIL;
353 break;
354 }
355
356 cq->read_entry(&wc, (char *)buf + i * cq->entry_size);
357 }
358
359 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
360 return i ? i : (ret < 0 ? ret : -FI_EAGAIN);
361 }
362
363 static const char *
vrb_cq_strerror(struct fid_cq * eq,int prov_errno,const void * err_data,char * buf,size_t len)364 vrb_cq_strerror(struct fid_cq *eq, int prov_errno, const void *err_data,
365 char *buf, size_t len)
366 {
367 if (buf && len)
368 strncpy(buf, ibv_wc_status_str(prov_errno), len);
369 return ibv_wc_status_str(prov_errno);
370 }
371
vrb_cq_signal(struct fid_cq * cq)372 int vrb_cq_signal(struct fid_cq *cq)
373 {
374 struct vrb_cq *_cq;
375 char data = '0';
376
377 _cq = container_of(cq, struct vrb_cq, util_cq.cq_fid);
378
379 if (write(_cq->signal_fd[1], &data, 1) != 1) {
380 VERBS_WARN(FI_LOG_CQ, "Error signalling CQ\n");
381 return -errno;
382 }
383
384 return 0;
385 }
386
vrb_cq_trywait(struct vrb_cq * cq)387 int vrb_cq_trywait(struct vrb_cq *cq)
388 {
389 struct ibv_wc wc;
390 void *context;
391 int ret = -FI_EAGAIN, rc;
392
393 if (!cq->channel) {
394 VERBS_WARN(FI_LOG_CQ, "No wait object object associated with CQ\n");
395 return -FI_EINVAL;
396 }
397
398 cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
399 if (!slist_empty(&cq->saved_wc_list))
400 goto out;
401
402 rc = vrb_poll_cq(cq, &wc);
403 if (rc) {
404 if (rc > 0)
405 vrb_save_wc(cq, &wc);
406 goto out;
407 }
408
409 while (!ibv_get_cq_event(cq->channel, &cq->cq, &context))
410 ofi_atomic_inc32(&cq->nevents);
411
412 rc = ibv_req_notify_cq(cq->cq, 0);
413 if (rc) {
414 VERBS_WARN(FI_LOG_CQ, "ibv_req_notify_cq error: %d\n", ret);
415 ret = -errno;
416 goto out;
417 }
418
419 /* Read again to fetch any completions that we might have missed
420 * while rearming */
421 rc = vrb_poll_cq(cq, &wc);
422 if (rc) {
423 if (rc > 0)
424 vrb_save_wc(cq, &wc);
425 goto out;
426 }
427
428 ret = FI_SUCCESS;
429 out:
430 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
431 return ret;
432 }
433
434 static struct fi_ops_cq vrb_cq_ops = {
435 .size = sizeof(struct fi_ops_cq),
436 .read = vrb_cq_read,
437 .readfrom = fi_no_cq_readfrom,
438 .readerr = vrb_cq_readerr,
439 .sread = vrb_cq_sread,
440 .sreadfrom = fi_no_cq_sreadfrom,
441 .signal = vrb_cq_signal,
442 .strerror = vrb_cq_strerror
443 };
444
vrb_cq_control(fid_t fid,int command,void * arg)445 static int vrb_cq_control(fid_t fid, int command, void *arg)
446 {
447 struct fi_wait_pollfd *pollfd;
448 struct vrb_cq *cq;
449 int ret;
450
451 cq = container_of(fid, struct vrb_cq, util_cq.cq_fid);
452 switch(command) {
453 case FI_GETWAIT:
454 if (!cq->channel) {
455 ret = -FI_ENODATA;
456 break;
457 }
458
459 if (cq->wait_obj == FI_WAIT_FD) {
460 *(int *) arg = cq->channel->fd;
461 return 0;
462 }
463
464 pollfd = arg;
465 if (pollfd->nfds >= 1) {
466 pollfd->fd[0].fd = cq->channel->fd;
467 pollfd->fd[0].events = POLLIN;
468 ret = 0;
469 } else {
470 ret = -FI_ETOOSMALL;
471 }
472 pollfd->nfds = 1;
473 break;
474 case FI_GETWAITOBJ:
475 *(enum fi_wait_obj *) arg = cq->wait_obj;
476 ret = 0;
477 break;
478 default:
479 ret = -FI_ENOSYS;
480 break;
481 }
482
483 return ret;
484 }
485
vrb_cq_close(fid_t fid)486 static int vrb_cq_close(fid_t fid)
487 {
488 struct vrb_wc_entry *wce;
489 struct slist_entry *entry;
490 int ret;
491 struct vrb_cq *cq =
492 container_of(fid, struct vrb_cq, util_cq.cq_fid);
493 struct vrb_srq_ep *srq_ep;
494 struct dlist_entry *srq_ep_temp;
495
496 if (ofi_atomic_get32(&cq->nevents))
497 ibv_ack_cq_events(cq->cq, ofi_atomic_get32(&cq->nevents));
498
499 /* Since an RX CQ and SRX context can be destroyed in any order,
500 * and the XRC SRQ references the RX CQ, we must destroy any
501 * XRC SRQ using this CQ before destroying the CQ. */
502 fastlock_acquire(&cq->xrc.srq_list_lock);
503 dlist_foreach_container_safe(&cq->xrc.srq_list, struct vrb_srq_ep,
504 srq_ep, xrc.srq_entry, srq_ep_temp) {
505 ret = vrb_xrc_close_srq(srq_ep);
506 if (ret) {
507 fastlock_release(&cq->xrc.srq_list_lock);
508 return -ret;
509 }
510 }
511 fastlock_release(&cq->xrc.srq_list_lock);
512
513 cq->util_cq.cq_fastlock_acquire(&cq->util_cq.cq_lock);
514 while (!slist_empty(&cq->saved_wc_list)) {
515 entry = slist_remove_head(&cq->saved_wc_list);
516 wce = container_of(entry, struct vrb_wc_entry, entry);
517 ofi_buf_free(wce);
518 }
519 cq->util_cq.cq_fastlock_release(&cq->util_cq.cq_lock);
520
521 ofi_bufpool_destroy(cq->wce_pool);
522 ofi_bufpool_destroy(cq->ctx_pool);
523
524 if (cq->cq) {
525 ret = ibv_destroy_cq(cq->cq);
526 if (ret)
527 return -ret;
528 }
529
530 if (cq->signal_fd[0]) {
531 ofi_close_socket(cq->signal_fd[0]);
532 }
533 if (cq->signal_fd[1]) {
534 ofi_close_socket(cq->signal_fd[1]);
535 }
536
537 ofi_cq_cleanup(&cq->util_cq);
538
539 if (cq->channel)
540 ibv_destroy_comp_channel(cq->channel);
541
542 fastlock_destroy(&cq->xrc.srq_list_lock);
543 free(cq);
544 return 0;
545 }
546
547 static struct fi_ops vrb_cq_fi_ops = {
548 .size = sizeof(struct fi_ops),
549 .close = vrb_cq_close,
550 .bind = fi_no_bind,
551 .control = vrb_cq_control,
552 .ops_open = fi_no_ops_open,
553 };
554
vrb_util_cq_progress_noop(struct util_cq * cq)555 static void vrb_util_cq_progress_noop(struct util_cq *cq)
556 {
557 /* This routine shouldn't be called */
558 assert(0);
559 }
560
vrb_cq_open(struct fid_domain * domain_fid,struct fi_cq_attr * attr,struct fid_cq ** cq_fid,void * context)561 int vrb_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
562 struct fid_cq **cq_fid, void *context)
563 {
564 struct vrb_cq *cq;
565 struct vrb_domain *domain =
566 container_of(domain_fid, struct vrb_domain,
567 util_domain.domain_fid);
568 size_t size;
569 int ret;
570 struct fi_cq_attr tmp_attr = *attr;
571
572 cq = calloc(1, sizeof(*cq));
573 if (!cq)
574 return -FI_ENOMEM;
575
576 /* verbs uses its own implementation of wait objects for CQ */
577 tmp_attr.wait_obj = FI_WAIT_NONE;
578 ret = ofi_cq_init(&vrb_prov, domain_fid, &tmp_attr, &cq->util_cq,
579 vrb_util_cq_progress_noop, context);
580 if (ret)
581 goto err1;
582
583 switch (attr->wait_obj) {
584 case FI_WAIT_UNSPEC:
585 cq->wait_obj = FI_WAIT_FD;
586 break;
587 case FI_WAIT_FD:
588 case FI_WAIT_POLLFD:
589 case FI_WAIT_NONE:
590 cq->wait_obj = attr->wait_obj;
591 break;
592 default:
593 ret = -FI_ENOSYS;
594 goto err4;
595 }
596
597 if (cq->wait_obj != FI_WAIT_NONE) {
598 cq->channel = ibv_create_comp_channel(domain->verbs);
599 if (!cq->channel) {
600 ret = -errno;
601 VERBS_WARN(FI_LOG_CQ,
602 "Unable to create completion channel\n");
603 goto err2;
604 }
605
606 ret = fi_fd_nonblock(cq->channel->fd);
607 if (ret)
608 goto err3;
609
610 if (socketpair(AF_UNIX, SOCK_STREAM, 0, cq->signal_fd)) {
611 ret = -errno;
612 goto err3;
613 }
614
615 ret = fi_fd_nonblock(cq->signal_fd[0]);
616 if (ret)
617 goto err4;
618 }
619
620 size = attr->size ? attr->size : VERBS_DEF_CQ_SIZE;
621
622 /*
623 * Verbs may throw an error if CQ size exceeds ibv_device_attr->max_cqe.
624 * OFI doesn't expose CQ size to the apps because it's better to fix the
625 * issue in the provider than the app dealing with it. The fix is to
626 * open multiple verbs CQs and load balance "MSG EP to CQ binding"* among
627 * them to avoid any CQ overflow.
628 * Something like:
629 * num_qp_per_cq = ibv_device_attr->max_cqe / (qp_send_wr + qp_recv_wr)
630 */
631 cq->cq = ibv_create_cq(domain->verbs, size, cq, cq->channel,
632 attr->signaling_vector);
633 if (!cq->cq) {
634 ret = -errno;
635 VERBS_WARN(FI_LOG_CQ, "Unable to create verbs CQ\n");
636 goto err4;
637 }
638
639 if (cq->channel) {
640 ret = ibv_req_notify_cq(cq->cq, 0);
641 if (ret) {
642 VERBS_WARN(FI_LOG_CQ,
643 "ibv_req_notify_cq failed\n");
644 goto err5;
645 }
646 }
647
648 ret = ofi_bufpool_create(&cq->wce_pool, sizeof(struct vrb_wc_entry),
649 16, 0, VERBS_WCE_CNT, 0);
650 if (ret) {
651 VERBS_WARN(FI_LOG_CQ, "Failed to create wce_pool\n");
652 goto err5;
653 }
654
655 cq->flags |= attr->flags;
656 cq->wait_cond = attr->wait_cond;
657 /* verbs uses its own ops for CQ */
658 cq->util_cq.cq_fid.fid.ops = &vrb_cq_fi_ops;
659 cq->util_cq.cq_fid.ops = &vrb_cq_ops;
660
661 switch (attr->format) {
662 case FI_CQ_FORMAT_UNSPEC:
663 case FI_CQ_FORMAT_CONTEXT:
664 cq->read_entry = vrb_cq_read_context_entry;
665 cq->entry_size = sizeof(struct fi_cq_entry);
666 break;
667 case FI_CQ_FORMAT_MSG:
668 cq->read_entry = vrb_cq_read_msg_entry;
669 cq->entry_size = sizeof(struct fi_cq_msg_entry);
670 break;
671 case FI_CQ_FORMAT_DATA:
672 cq->read_entry = vrb_cq_read_data_entry;
673 cq->entry_size = sizeof(struct fi_cq_data_entry);
674 break;
675 case FI_CQ_FORMAT_TAGGED:
676 default:
677 ret = -FI_ENOSYS;
678 goto err6;
679 }
680
681 ret = ofi_bufpool_create(&cq->ctx_pool, sizeof(struct fi_context),
682 16, 0, size, OFI_BUFPOOL_NO_TRACK);
683 if (ret)
684 goto err6;
685
686 slist_init(&cq->saved_wc_list);
687 dlist_init(&cq->xrc.srq_list);
688 fastlock_init(&cq->xrc.srq_list_lock);
689
690 ofi_atomic_initialize32(&cq->nevents, 0);
691
692 cq->credits = size;
693
694 *cq_fid = &cq->util_cq.cq_fid;
695 return 0;
696 err6:
697 ofi_bufpool_destroy(cq->wce_pool);
698 err5:
699 ibv_destroy_cq(cq->cq);
700 err4:
701 ofi_close_socket(cq->signal_fd[0]);
702 ofi_close_socket(cq->signal_fd[1]);
703 err3:
704 if (cq->channel)
705 ibv_destroy_comp_channel(cq->channel);
706 err2:
707 ofi_cq_cleanup(&cq->util_cq);
708 err1:
709 free(cq);
710 return ret;
711 }
712