1 /*
2  * Copyright (c) 2018 Cray Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include "config.h"
34 #include "fi_verbs.h"
35 #include <sys/stat.h>
36 
37 
38 /* Domain XRC INI QP RBTree key */
39 struct vrb_ini_conn_key {
40 	struct sockaddr		*addr;
41 	struct vrb_cq	*tx_cq;
42 };
43 
44 static int vrb_process_ini_conn(struct vrb_xrc_ep *ep,int reciprocal,
45 				   void *param, size_t paramlen);
46 
47 /*
48  * This routine is a work around that creates a QP for the only purpose of
49  * reserving the QP number. The QP is not transitioned out of the RESET state.
50  */
vrb_reserve_qpn(struct vrb_xrc_ep * ep,struct ibv_qp ** qp)51 int vrb_reserve_qpn(struct vrb_xrc_ep *ep, struct ibv_qp **qp)
52 {
53 	struct vrb_domain *domain = vrb_ep_to_domain(&ep->base_ep);
54 	struct vrb_cq *cq = container_of(ep->base_ep.util_ep.tx_cq,
55 					    struct vrb_cq, util_cq);
56 	struct ibv_qp_init_attr attr = { 0 };
57 	int ret;
58 
59 	/* Limit library allocated resources and do not INIT QP */
60 	attr.cap.max_send_wr = 1;
61 	attr.cap.max_send_sge = 1;
62 	attr.cap.max_recv_wr = 0;
63 	attr.cap.max_recv_sge = 0;
64 	attr.cap.max_inline_data = 0;
65 	attr.send_cq = cq->cq;
66 	attr.recv_cq = cq->cq;
67 	attr.qp_type = IBV_QPT_RC;
68 
69 	*qp = ibv_create_qp(domain->pd, &attr);
70 	if (OFI_UNLIKELY(!*qp)) {
71 		ret = -errno;
72 		VERBS_WARN(FI_LOG_EP_CTRL,
73 			   "Reservation QP create failed %d\n", -ret);
74 		return ret;
75 	}
76 	return FI_SUCCESS;
77 }
78 
vrb_create_ini_qp(struct vrb_xrc_ep * ep)79 static int vrb_create_ini_qp(struct vrb_xrc_ep *ep)
80 {
81 #if VERBS_HAVE_XRC
82 	struct ibv_qp_init_attr_ex attr_ex;
83 	struct vrb_domain *domain = vrb_ep_to_domain(&ep->base_ep);
84 	int ret;
85 
86 	vrb_msg_ep_get_qp_attr(&ep->base_ep,
87 			(struct ibv_qp_init_attr *)&attr_ex);
88 	attr_ex.qp_type = IBV_QPT_XRC_SEND;
89 	attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
90 	attr_ex.pd = domain->pd;
91 	attr_ex.qp_context = domain;
92 
93 	ret = rdma_create_qp_ex(ep->base_ep.id, &attr_ex);
94 	if (ret) {
95 		ret = -errno;
96 		VERBS_WARN(FI_LOG_EP_CTRL,
97 			   "XRC INI QP rdma_create_qp_ex failed %d\n", -ret);
98 		return ret;
99 	}
100 	return FI_SUCCESS;
101 #else /* VERBS_HAVE_XRC */
102 	return -FI_ENOSYS;
103 #endif /* !VERBS_HAVE_XRC */
104 }
105 
vrb_set_ini_conn_key(struct vrb_xrc_ep * ep,struct vrb_ini_conn_key * key)106 static inline void vrb_set_ini_conn_key(struct vrb_xrc_ep *ep,
107 					   struct vrb_ini_conn_key *key)
108 {
109 	key->addr = ep->base_ep.info->dest_addr;
110 	key->tx_cq = container_of(ep->base_ep.util_ep.tx_cq,
111 				  struct vrb_cq, util_cq);
112 }
113 
114 /* Caller must hold domain:eq:lock */
vrb_get_shared_ini_conn(struct vrb_xrc_ep * ep,struct vrb_ini_shared_conn ** ini_conn)115 int vrb_get_shared_ini_conn(struct vrb_xrc_ep *ep,
116 			       struct vrb_ini_shared_conn **ini_conn) {
117 	struct vrb_domain *domain = vrb_ep_to_domain(&ep->base_ep);
118 	struct vrb_ini_conn_key key;
119 	struct vrb_ini_shared_conn *conn;
120 	struct ofi_rbnode *node;
121 	int ret;
122 
123 	vrb_set_ini_conn_key(ep, &key);
124 	node = ofi_rbmap_find(domain->xrc.ini_conn_rbmap, &key);
125 	if (node) {
126 		*ini_conn = node->data;
127 		ofi_atomic_inc32(&(*ini_conn)->ref_cnt);
128 		return FI_SUCCESS;
129 	}
130 
131 	*ini_conn = NULL;
132 	conn = calloc(1, sizeof(*conn));
133 	if (!conn) {
134 		VERBS_WARN(FI_LOG_EP_CTRL,
135 			   "Unable to allocate INI connection memory\n");
136 		return -FI_ENOMEM;
137 	}
138 
139 	conn->tgt_qpn = VRB_NO_INI_TGT_QPNUM;
140 	conn->peer_addr = mem_dup(key.addr, ofi_sizeofaddr(key.addr));
141 	if (!conn->peer_addr) {
142 		VERBS_WARN(FI_LOG_EP_CTRL,
143 			   "mem_dup of peer address failed\n");
144 		free(conn);
145 		return -FI_ENOMEM;
146 	}
147 	conn->tx_cq = container_of(ep->base_ep.util_ep.tx_cq,
148 				   struct vrb_cq, util_cq);
149 	dlist_init(&conn->pending_list);
150 	dlist_init(&conn->active_list);
151 	ofi_atomic_initialize32(&conn->ref_cnt, 1);
152 
153 	ret = ofi_rbmap_insert(domain->xrc.ini_conn_rbmap,
154 			       (void *) &key, (void *) conn, NULL);
155 	assert(ret != -FI_EALREADY);
156 	if (ret) {
157 		VERBS_WARN(FI_LOG_EP_CTRL, "INI QP RBTree insert failed %d\n",
158 			   ret);
159 		goto insert_err;
160 	}
161 	*ini_conn = conn;
162 	return FI_SUCCESS;
163 
164 insert_err:
165 	free(conn->peer_addr);
166 	free(conn);
167 	return ret;
168 }
169 
170 /* Caller must hold domain:eq:lock */
vrb_put_shared_ini_conn(struct vrb_xrc_ep * ep)171 void vrb_put_shared_ini_conn(struct vrb_xrc_ep *ep)
172 {
173 	struct vrb_domain *domain = vrb_ep_to_domain(&ep->base_ep);
174 	struct vrb_ini_shared_conn *ini_conn;
175 	struct vrb_ini_conn_key key;
176 
177 	if (!ep->ini_conn)
178 		return;
179 
180 	/* remove from pending or active connection list */
181 	dlist_remove(&ep->ini_conn_entry);
182 	ep->conn_state = VRB_XRC_UNCONNECTED;
183 	ini_conn = ep->ini_conn;
184 	ep->ini_conn = NULL;
185 	ep->base_ep.ibv_qp = NULL;
186 	if (ep->base_ep.id)
187 		ep->base_ep.id->qp = NULL;
188 
189 	/* If XRC physical QP connection was not completed, make sure
190 	 * any pending connection to that destination will get scheduled. */
191 	if (ep->base_ep.id && ep->base_ep.id == ini_conn->phys_conn_id) {
192 		if (ini_conn->state == VRB_INI_QP_CONNECTING)
193 			ini_conn->state = VRB_INI_QP_UNCONNECTED;
194 
195 		ini_conn->phys_conn_id = NULL;
196 	}
197 
198 	/* Tear down physical INI/TGT when no longer being used */
199 	if (!ofi_atomic_dec32(&ini_conn->ref_cnt)) {
200 		if (ini_conn->ini_qp && ibv_destroy_qp(ini_conn->ini_qp))
201 			VERBS_WARN(FI_LOG_EP_CTRL,
202 				   "Destroy of XRC physical INI QP failed %d\n",
203 				   errno);
204 
205 		assert(dlist_empty(&ini_conn->pending_list));
206 		vrb_set_ini_conn_key(ep, &key);
207 		ofi_rbmap_find_delete(domain->xrc.ini_conn_rbmap, &key);
208 		free(ini_conn->peer_addr);
209 		free(ini_conn);
210 	} else {
211 		vrb_sched_ini_conn(ini_conn);
212 	}
213 }
214 
215 /* Caller must hold domain:eq:lock */
vrb_add_pending_ini_conn(struct vrb_xrc_ep * ep,int reciprocal,void * conn_param,size_t conn_paramlen)216 void vrb_add_pending_ini_conn(struct vrb_xrc_ep *ep, int reciprocal,
217 				 void *conn_param, size_t conn_paramlen)
218 {
219 	ep->conn_setup->pending_recip = reciprocal;
220 	ep->conn_setup->pending_paramlen = MIN(conn_paramlen,
221 				sizeof(ep->conn_setup->pending_param));
222 	memcpy(ep->conn_setup->pending_param, conn_param,
223 	       ep->conn_setup->pending_paramlen);
224 	dlist_insert_tail(&ep->ini_conn_entry, &ep->ini_conn->pending_list);
225 }
226 
227 /* Caller must hold domain:eq:lock */
vrb_create_shutdown_event(struct vrb_xrc_ep * ep)228 static void vrb_create_shutdown_event(struct vrb_xrc_ep *ep)
229 {
230 	struct fi_eq_cm_entry entry = {
231 		.fid = &ep->base_ep.util_ep.ep_fid.fid,
232 	};
233 	struct vrb_eq_entry *eq_entry;
234 
235 	eq_entry = vrb_eq_alloc_entry(FI_SHUTDOWN, &entry, sizeof(entry));
236 	if (eq_entry)
237 		dlistfd_insert_tail(&eq_entry->item, &ep->base_ep.eq->list_head);
238 }
239 
240 /* Caller must hold domain:eq:lock */
vrb_sched_ini_conn(struct vrb_ini_shared_conn * ini_conn)241 void vrb_sched_ini_conn(struct vrb_ini_shared_conn *ini_conn)
242 {
243 	struct vrb_xrc_ep *ep;
244 	enum vrb_ini_qp_state last_state;
245 	struct sockaddr *addr;
246 	int ret;
247 
248 	/* Continue to schedule shared connections if the physical connection
249 	 * has completed and there are connection requests pending. We could
250 	 * implement a throttle here if it is determined that it is better to
251 	 * limit the number of outstanding connections. */
252 	while (1) {
253 		if (dlist_empty(&ini_conn->pending_list) ||
254 				ini_conn->state == VRB_INI_QP_CONNECTING)
255 			return;
256 
257 		dlist_pop_front(&ini_conn->pending_list,
258 				struct vrb_xrc_ep, ep, ini_conn_entry);
259 
260 		dlist_insert_tail(&ep->ini_conn_entry,
261 				  &ep->ini_conn->active_list);
262 		last_state = ep->ini_conn->state;
263 
264 		ret = vrb_create_ep(ep->base_ep.info,
265 				       last_state == VRB_INI_QP_UNCONNECTED ?
266 				       RDMA_PS_TCP : RDMA_PS_UDP,
267 				       &ep->base_ep.id);
268 		if (ret) {
269 			VERBS_WARN(FI_LOG_EP_CTRL,
270 				   "Failed to create active CM ID %d\n",
271 				   ret);
272 			goto err;
273 		}
274 
275 		if (last_state == VRB_INI_QP_UNCONNECTED) {
276 			assert(!ep->ini_conn->phys_conn_id && ep->base_ep.id);
277 
278 			if (ep->ini_conn->ini_qp &&
279 			    ibv_destroy_qp(ep->ini_conn->ini_qp)) {
280 				VERBS_WARN(FI_LOG_EP_CTRL, "Failed to destroy "
281 					   "physical INI QP %d\n", errno);
282 			}
283 			ret = vrb_create_ini_qp(ep);
284 			if (ret) {
285 				VERBS_WARN(FI_LOG_EP_CTRL, "Failed to create "
286 					   "physical INI QP %d\n", ret);
287 				goto err;
288 			}
289 			ep->ini_conn->ini_qp = ep->base_ep.id->qp;
290 			ep->ini_conn->state = VRB_INI_QP_CONNECTING;
291 			ep->ini_conn->phys_conn_id = ep->base_ep.id;
292 		} else {
293 			assert(!ep->base_ep.id->qp);
294 			VERBS_DBG(FI_LOG_EP_CTRL, "Sharing XRC INI QPN %d\n",
295 				  ep->ini_conn->ini_qp->qp_num);
296 		}
297 
298 		assert(ep->ini_conn->ini_qp);
299 		ep->base_ep.id->context = &ep->base_ep.util_ep.ep_fid.fid;
300 		ret = rdma_migrate_id(ep->base_ep.id,
301 				      ep->base_ep.eq->channel);
302 		if (ret) {
303 			VERBS_WARN(FI_LOG_EP_CTRL,
304 				   "Failed to migrate active CM ID %d\n", ret);
305 			goto err;
306 		}
307 
308 		addr = rdma_get_local_addr(ep->base_ep.id);
309 		if (addr)
310 			ofi_straddr_dbg(&vrb_prov, FI_LOG_EP_CTRL,
311 					"XRC connect src_addr", addr);
312 		addr = rdma_get_peer_addr(ep->base_ep.id);
313 		if (addr)
314 			ofi_straddr_dbg(&vrb_prov, FI_LOG_EP_CTRL,
315 					"XRC connect dest_addr", addr);
316 
317 		ep->base_ep.ibv_qp = ep->ini_conn->ini_qp;
318 		ret = vrb_process_ini_conn(ep, ep->conn_setup->pending_recip,
319 					      ep->conn_setup->pending_param,
320 					      ep->conn_setup->pending_paramlen);
321 err:
322 		if (ret) {
323 			ep->ini_conn->state = last_state;
324 			vrb_put_shared_ini_conn(ep);
325 
326 			/* We need to let the application know that the
327 			 * connect request has failed. */
328 			vrb_create_shutdown_event(ep);
329 			break;
330 		}
331 	}
332 }
333 
334 /* Caller must hold domain:xrc:eq:lock */
vrb_process_ini_conn(struct vrb_xrc_ep * ep,int reciprocal,void * param,size_t paramlen)335 int vrb_process_ini_conn(struct vrb_xrc_ep *ep,int reciprocal,
336 			    void *param, size_t paramlen)
337 {
338 	struct vrb_xrc_cm_data *cm_data = param;
339 	int ret;
340 
341 	assert(ep->base_ep.ibv_qp);
342 
343 	vrb_set_xrc_cm_data(cm_data, reciprocal, reciprocal ?
344 			       ep->conn_setup->remote_conn_tag :
345 			       ep->conn_setup->conn_tag,
346 			       ep->base_ep.eq->xrc.pep_port,
347 			       ep->ini_conn->tgt_qpn, ep->srqn);
348 
349 	ep->base_ep.conn_param.private_data = cm_data;
350 	ep->base_ep.conn_param.private_data_len = paramlen;
351 	ep->base_ep.conn_param.responder_resources = RDMA_MAX_RESP_RES;
352 	ep->base_ep.conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
353 	ep->base_ep.conn_param.flow_control = 1;
354 	ep->base_ep.conn_param.retry_count = 15;
355 	ep->base_ep.conn_param.rnr_retry_count = 7;
356 	ep->base_ep.conn_param.srq = 1;
357 
358 	if (!ep->base_ep.id->qp)
359 		ep->base_ep.conn_param.qp_num =
360 				ep->ini_conn->ini_qp->qp_num;
361 
362 	assert(ep->conn_state == VRB_XRC_UNCONNECTED ||
363 	       ep->conn_state == VRB_XRC_ORIG_CONNECTED);
364 	vrb_next_xrc_conn_state(ep);
365 
366 	ret = rdma_resolve_route(ep->base_ep.id, VERBS_RESOLVE_TIMEOUT);
367 	if (ret) {
368 		ret = -errno;
369 		VERBS_WARN(FI_LOG_EP_CTRL,
370 			   "rdma_resolve_route failed %s (%d)\n",
371 			   strerror(-ret), -ret);
372 		vrb_prev_xrc_conn_state(ep);
373 	}
374 
375 	return ret;
376 }
377 
vrb_ep_create_tgt_qp(struct vrb_xrc_ep * ep,uint32_t tgt_qpn)378 int vrb_ep_create_tgt_qp(struct vrb_xrc_ep *ep, uint32_t tgt_qpn)
379 {
380 #if VERBS_HAVE_XRC
381 	struct ibv_qp_open_attr open_attr;
382 	struct ibv_qp_init_attr_ex attr_ex;
383 	struct vrb_domain *domain = vrb_ep_to_domain(&ep->base_ep);
384 	int ret;
385 
386 	assert(ep->tgt_id && !ep->tgt_id->qp);
387 
388 	/* If a target QP number was specified then open that existing
389 	 * QP for sharing. */
390 	if (tgt_qpn) {
391 		memset(&open_attr, 0, sizeof(open_attr));
392 		open_attr.qp_num = tgt_qpn;
393 		open_attr.comp_mask = IBV_QP_OPEN_ATTR_NUM |
394 			IBV_QP_OPEN_ATTR_XRCD | IBV_QP_OPEN_ATTR_TYPE |
395 			IBV_QP_OPEN_ATTR_CONTEXT;
396 		open_attr.xrcd = domain->xrc.xrcd;
397 		open_attr.qp_type = IBV_QPT_XRC_RECV;
398 		open_attr.qp_context = ep;
399 
400 		ep->tgt_ibv_qp = ibv_open_qp(domain->verbs, &open_attr);
401 		if (!ep->tgt_ibv_qp) {
402 			ret = -errno;
403 			VERBS_WARN(FI_LOG_EP_CTRL,
404 				   "XRC TGT QP ibv_open_qp failed %d\n", -ret);
405 			return ret;
406 		}
407 		return FI_SUCCESS;
408 	}
409 
410 	/* An existing XRC target was not specified, create XRC TGT
411 	 * side of new physical connection. */
412 	vrb_msg_ep_get_qp_attr(&ep->base_ep,
413 			(struct ibv_qp_init_attr *)&attr_ex);
414 	attr_ex.qp_type = IBV_QPT_XRC_RECV;
415 	attr_ex.qp_context = ep;
416 	attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD | IBV_QP_INIT_ATTR_XRCD;
417 	attr_ex.pd = domain->pd;
418 	attr_ex.xrcd = domain->xrc.xrcd;
419 	if (rdma_create_qp_ex(ep->tgt_id, &attr_ex)) {
420 		ret = -errno;
421 		VERBS_WARN(FI_LOG_EP_CTRL,
422 			   "Physical XRC TGT QP rdma_create_qp_ex failed %d\n",
423 			   -ret);
424 		return ret;
425 	}
426 	ep->tgt_ibv_qp = ep->tgt_id->qp;
427 
428 	return FI_SUCCESS;
429 #else /* VERBS_HAVE_XRC */
430 	return -FI_ENOSYS;
431 #endif /* !VERBS_HAVE_XRC */
432 }
433 
vrb_put_tgt_qp(struct vrb_xrc_ep * ep)434 static int vrb_put_tgt_qp(struct vrb_xrc_ep *ep)
435 {
436 	int ret;
437 
438 	if (!ep->tgt_ibv_qp)
439 		return FI_SUCCESS;
440 
441 	/* The kernel will not destroy the detached TGT QP until all
442 	 * shared opens have called ibv_destroy_qp. */
443 	ret = ibv_destroy_qp(ep->tgt_ibv_qp);
444 	if (ret) {
445 		ret = -errno;
446 		VERBS_WARN(FI_LOG_EP_CTRL,
447 			   "Close XRC TGT QP ibv_destroy_qp failed %d\n",
448 			   -ret);
449 		return ret;
450 	}
451 	ep->tgt_ibv_qp = NULL;
452 	if (ep->tgt_id)
453 		ep->tgt_id->qp = NULL;
454 
455 	return FI_SUCCESS;
456 }
457 
458 /* Caller must hold eq:lock */
vrb_ep_destroy_xrc_qp(struct vrb_xrc_ep * ep)459 int vrb_ep_destroy_xrc_qp(struct vrb_xrc_ep *ep)
460 {
461 	vrb_put_shared_ini_conn(ep);
462 
463 	if (ep->base_ep.id) {
464 		rdma_destroy_id(ep->base_ep.id);
465 		ep->base_ep.id = NULL;
466 	}
467 	if (ep->tgt_ibv_qp)
468 		vrb_put_tgt_qp(ep);
469 
470 	if (ep->tgt_id) {
471 		rdma_destroy_id(ep->tgt_id);
472 		ep->tgt_id = NULL;
473 	}
474 	return 0;
475 }
476 
477 FI_VERBS_XRC_ONLY
vrb_ini_conn_compare(struct ofi_rbmap * map,void * key,void * data)478 static int vrb_ini_conn_compare(struct ofi_rbmap *map, void *key, void *data)
479 {
480 	struct vrb_ini_shared_conn *ini_conn = data;
481 	struct vrb_ini_conn_key *_key = key;
482 	int ret;
483 
484 	assert(_key->addr->sa_family == ini_conn->peer_addr->sa_family);
485 
486 	/* Only interested in the interface address and TX CQ */
487 	switch (_key->addr->sa_family) {
488 	case AF_INET:
489 		ret = memcmp(&ofi_sin_addr(_key->addr),
490 			     &ofi_sin_addr(ini_conn->peer_addr),
491 			     sizeof(ofi_sin_addr(_key->addr)));
492 		break;
493 	case AF_INET6:
494 		ret = memcmp(&ofi_sin6_addr(_key->addr),
495 			     &ofi_sin6_addr(ini_conn->peer_addr),
496 			     sizeof(ofi_sin6_addr(_key->addr)));
497 		break;
498 	default:
499 		VERBS_WARN(FI_LOG_FABRIC, "Unsupported address format\n");
500 		assert(0);
501 		return -FI_EINVAL;
502 	}
503 	if (ret)
504 		return ret;
505 
506 	return _key->tx_cq < ini_conn->tx_cq ?
507 			-1 : _key->tx_cq > ini_conn->tx_cq;
508 }
509 
510 FI_VERBS_XRC_ONLY
vrb_domain_xrc_validate_hw(struct vrb_domain * domain)511 static int vrb_domain_xrc_validate_hw(struct vrb_domain *domain)
512 {
513 	struct ibv_device_attr attr;
514 	int ret;
515 
516 	ret = ibv_query_device(domain->verbs, &attr);
517 	if (ret || !(attr.device_cap_flags & IBV_DEVICE_XRC)) {
518 		VERBS_INFO(FI_LOG_DOMAIN, "XRC is not supported");
519 		return -FI_EINVAL;
520 	}
521 	return FI_SUCCESS;
522 }
523 
vrb_domain_xrc_init(struct vrb_domain * domain)524 int vrb_domain_xrc_init(struct vrb_domain *domain)
525 {
526 #if VERBS_HAVE_XRC
527 	struct ibv_xrcd_init_attr attr;
528 	int ret;
529 
530 	ret = vrb_domain_xrc_validate_hw(domain);
531 	if (ret)
532 		return ret;
533 
534 	domain->xrc.xrcd_fd = -1;
535 	if (vrb_gl_data.msg.xrcd_filename) {
536 		domain->xrc.xrcd_fd = open(vrb_gl_data.msg.xrcd_filename,
537 				       O_CREAT, S_IWUSR | S_IRUSR);
538 		if (domain->xrc.xrcd_fd < 0) {
539 			VERBS_WARN(FI_LOG_DOMAIN,
540 				   "XRCD file open failed %d\n", errno);
541 			return -errno;
542 		}
543 	}
544 
545 	attr.comp_mask = IBV_XRCD_INIT_ATTR_FD | IBV_XRCD_INIT_ATTR_OFLAGS;
546 	attr.fd = domain->xrc.xrcd_fd;
547 	attr.oflags = O_CREAT;
548 	domain->xrc.xrcd = ibv_open_xrcd(domain->verbs, &attr);
549 	if (!domain->xrc.xrcd) {
550 		ret = -errno;
551 		VERBS_INFO_ERRNO(FI_LOG_DOMAIN, "ibv_open_xrcd", errno);
552 		goto xrcd_err;
553 	}
554 
555 	domain->xrc.ini_conn_rbmap = ofi_rbmap_create(vrb_ini_conn_compare);
556 	if (!domain->xrc.ini_conn_rbmap) {
557 		ret = -ENOMEM;
558 		VERBS_INFO_ERRNO(FI_LOG_DOMAIN, "XRC INI QP RB Tree", -ret);
559 		goto rbmap_err;
560 	}
561 
562 	domain->flags |= VRB_USE_XRC;
563 	return FI_SUCCESS;
564 
565 rbmap_err:
566 	(void)ibv_close_xrcd(domain->xrc.xrcd);
567 xrcd_err:
568 	if (domain->xrc.xrcd_fd >= 0) {
569 		close(domain->xrc.xrcd_fd);
570 		domain->xrc.xrcd_fd = -1;
571 	}
572 	return ret;
573 #else /* VERBS_HAVE_XRC */
574 	return -FI_ENOSYS;
575 #endif /* !VERBS_HAVE_XRC */
576 }
577 
vrb_domain_xrc_cleanup(struct vrb_domain * domain)578 int vrb_domain_xrc_cleanup(struct vrb_domain *domain)
579 {
580 #if VERBS_HAVE_XRC
581 	int ret;
582 
583 	assert(domain->xrc.xrcd);
584 
585 	/* All endpoint and hence XRC INI QP should be closed */
586 	if (!ofi_rbmap_empty(domain->xrc.ini_conn_rbmap)) {
587 		VERBS_WARN(FI_LOG_DOMAIN, "XRC domain busy\n");
588 		return -FI_EBUSY;
589 	}
590 
591 	ret = ibv_close_xrcd(domain->xrc.xrcd);
592 	if (ret) {
593 		VERBS_WARN(FI_LOG_DOMAIN, "ibv_close_xrcd failed %d\n", ret);
594 		return -ret;
595 	}
596 	if (domain->xrc.xrcd_fd >= 0) {
597 		close(domain->xrc.xrcd_fd);
598 		domain->xrc.xrcd_fd = -1;
599 	}
600 
601 	ofi_rbmap_destroy(domain->xrc.ini_conn_rbmap);
602 #endif /* VERBS_HAVE_XRC */
603 	return 0;
604 }
605