1 /*
2  * Copyright (c) 2016 Intel Corporation, Inc.  All rights reserved.
3  * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <stdlib.h>
35 #include <string.h>
36 #include <poll.h>
37 
38 #include <ofi.h>
39 #include <ofi_util.h>
40 #include "rxm.h"
41 
42 static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap);
43 static int rxm_conn_connect(struct rxm_ep *ep,
44 			    struct rxm_cmap_handle *handle, const void *addr);
45 static int rxm_conn_signal(struct rxm_ep *ep, void *context,
46 			   enum rxm_cmap_signal signal);
47 static void rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle);
48 static void *rxm_conn_progress(void *arg);
49 static void *rxm_conn_atomic_progress(void *arg);
50 static int rxm_conn_handle_event(struct rxm_ep *rxm_ep,
51 				 struct rxm_msg_eq_entry *entry);
52 
53 
54 /*
55  * Connection map
56  */
57 
58 char *rxm_cm_state_str[] = {
59 	RXM_CM_STATES(OFI_STR)
60 };
61 
rxm_eq_readerr(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)62 static inline ssize_t rxm_eq_readerr(struct rxm_ep *rxm_ep,
63 				     struct rxm_msg_eq_entry *entry)
64 {
65 	ssize_t ret;
66 
67 	/* reset previous err data info */
68 	entry->err_entry.err_data_size = 0;
69 
70 	ret = fi_eq_readerr(rxm_ep->msg_eq, &entry->err_entry, 0);
71 	if (ret != sizeof(entry->err_entry)) {
72 		if (ret != -FI_EAGAIN)
73 			FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
74 				"unable to fi_eq_readerr: %zd\n", ret);
75 		return ret < 0 ? ret : -FI_EINVAL;
76 	}
77 
78 	if (entry->err_entry.err == ECONNREFUSED) {
79 		entry->context = entry->err_entry.fid->context;
80 		return -FI_ECONNREFUSED;
81 	}
82 
83 	OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
84 			rxm_ep->msg_eq, &entry->err_entry);
85 	return -entry->err_entry.err;
86 }
87 
rxm_eq_read(struct rxm_ep * ep,size_t len,struct rxm_msg_eq_entry * entry)88 static ssize_t rxm_eq_read(struct rxm_ep *ep, size_t len,
89 			   struct rxm_msg_eq_entry *entry)
90 {
91 	ssize_t ret;
92 
93 	ret = fi_eq_read(ep->msg_eq, &entry->event, &entry->cm_entry, len, 0);
94 	if (ret == -FI_EAVAIL)
95 		ret = rxm_eq_readerr(ep, entry);
96 
97 	return ret;
98 }
99 
rxm_cmap_set_key(struct rxm_cmap_handle * handle)100 static void rxm_cmap_set_key(struct rxm_cmap_handle *handle)
101 {
102 	handle->key = ofi_idx2key(&handle->cmap->key_idx,
103 		ofi_idx_insert(&handle->cmap->handles_idx, handle));
104 }
105 
rxm_cmap_clear_key(struct rxm_cmap_handle * handle)106 static void rxm_cmap_clear_key(struct rxm_cmap_handle *handle)
107 {
108 	int index = ofi_key2idx(&handle->cmap->key_idx, handle->key);
109 
110 	if (!ofi_idx_is_valid(&handle->cmap->handles_idx, index))
111 		FI_WARN(handle->cmap->av->prov, FI_LOG_AV, "Invalid key!\n");
112 	else
113 		ofi_idx_remove(&handle->cmap->handles_idx, index);
114 }
115 
rxm_cmap_key2handle(struct rxm_cmap * cmap,uint64_t key)116 struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key)
117 {
118 	struct rxm_cmap_handle *handle;
119 
120 	if (!(handle = ofi_idx_lookup(&cmap->handles_idx,
121 				      ofi_key2idx(&cmap->key_idx, key)))) {
122 		FI_WARN(cmap->av->prov, FI_LOG_AV, "Invalid key!\n");
123 	} else {
124 		if (handle->key != key) {
125 			FI_WARN(cmap->av->prov, FI_LOG_AV,
126 				"handle->key not matching given key\n");
127 			handle = NULL;
128 		}
129 	}
130 	return handle;
131 }
132 
rxm_cmap_init_handle(struct rxm_cmap_handle * handle,struct rxm_cmap * cmap,enum rxm_cmap_state state,fi_addr_t fi_addr,struct rxm_cmap_peer * peer)133 static void rxm_cmap_init_handle(struct rxm_cmap_handle *handle,
134 				  struct rxm_cmap *cmap,
135 				  enum rxm_cmap_state state,
136 				  fi_addr_t fi_addr,
137 				  struct rxm_cmap_peer *peer)
138 {
139 	handle->cmap = cmap;
140 	RXM_CM_UPDATE_STATE(handle, state);
141 	rxm_cmap_set_key(handle);
142 	handle->fi_addr = fi_addr;
143 	handle->peer = peer;
144 }
145 
rxm_cmap_match_peer(struct dlist_entry * entry,const void * addr)146 static int rxm_cmap_match_peer(struct dlist_entry *entry, const void *addr)
147 {
148 	struct rxm_cmap_peer *peer;
149 
150 	peer = container_of(entry, struct rxm_cmap_peer, entry);
151 	return !memcmp(peer->addr, addr, peer->handle->cmap->av->addrlen);
152 }
153 
rxm_cmap_del_handle(struct rxm_cmap_handle * handle)154 static int rxm_cmap_del_handle(struct rxm_cmap_handle *handle)
155 {
156 	struct rxm_cmap *cmap = handle->cmap;
157 	int ret;
158 
159 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
160 	       "marking connection handle: %p for deletion\n", handle);
161 	rxm_cmap_clear_key(handle);
162 
163 	RXM_CM_UPDATE_STATE(handle, RXM_CMAP_SHUTDOWN);
164 
165 	/* Signal CM thread to delete the handle. This is required
166 	 * so that the CM thread handles any pending events for this
167 	 * ep correctly. Handle would be freed finally after processing the
168 	 * events */
169 	ret = rxm_conn_signal(cmap->ep, handle, RXM_CMAP_FREE);
170 	if (ret) {
171 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
172 			"Unable to signal CM thread\n");
173 		return ret;
174 	}
175 	return 0;
176 }
177 
178 static inline int
rxm_cmap_check_and_realloc_handles_table(struct rxm_cmap * cmap,fi_addr_t fi_addr)179 rxm_cmap_check_and_realloc_handles_table(struct rxm_cmap *cmap,
180 					 fi_addr_t fi_addr)
181 {
182 	void *new_handles;
183 	size_t grow_size;
184 
185 	if (OFI_LIKELY(fi_addr < cmap->num_allocated))
186 		return 0;
187 
188 	grow_size = MAX(cmap->av->count, fi_addr - cmap->num_allocated + 1);
189 
190 	new_handles = realloc(cmap->handles_av,
191 			      (grow_size + cmap->num_allocated) *
192 			      sizeof(*cmap->handles_av));
193 	if (OFI_LIKELY(!new_handles))
194 		return -FI_ENOMEM;
195 
196 	cmap->handles_av = new_handles;
197 	memset(&cmap->handles_av[cmap->num_allocated], 0,
198 	       sizeof(*cmap->handles_av) * grow_size);
199 	cmap->num_allocated += grow_size;
200 	return 0;
201 }
202 
203 static struct rxm_pkt *
rxm_conn_inject_pkt_alloc(struct rxm_ep * rxm_ep,struct rxm_conn * rxm_conn,uint8_t op,uint64_t flags)204 rxm_conn_inject_pkt_alloc(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn,
205 			  uint8_t op, uint64_t flags)
206 {
207 	struct rxm_pkt *inject_pkt;
208 	int ret = ofi_memalign((void **) &inject_pkt, 16,
209 			       rxm_ep->inject_limit + sizeof(*inject_pkt));
210 
211 	if (ret)
212 		return NULL;
213 
214 	memset(inject_pkt, 0, rxm_ep->inject_limit + sizeof(*inject_pkt));
215 	inject_pkt->ctrl_hdr.version = RXM_CTRL_VERSION;
216 	inject_pkt->ctrl_hdr.type = rxm_ctrl_eager;
217 	inject_pkt->hdr.version = OFI_OP_VERSION;
218 	inject_pkt->hdr.op = op;
219 	inject_pkt->hdr.flags = flags;
220 
221 	return inject_pkt;
222 }
223 
rxm_conn_res_free(struct rxm_conn * rxm_conn)224 static void rxm_conn_res_free(struct rxm_conn *rxm_conn)
225 {
226 	ofi_freealign(rxm_conn->inject_pkt);
227 	rxm_conn->inject_pkt = NULL;
228 	ofi_freealign(rxm_conn->inject_data_pkt);
229 	rxm_conn->inject_data_pkt = NULL;
230 	ofi_freealign(rxm_conn->tinject_pkt);
231 	rxm_conn->tinject_pkt = NULL;
232 	ofi_freealign(rxm_conn->tinject_data_pkt);
233 	rxm_conn->tinject_data_pkt = NULL;
234 }
235 
rxm_conn_res_alloc(struct rxm_ep * rxm_ep,struct rxm_conn * rxm_conn)236 static int rxm_conn_res_alloc(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn)
237 {
238 	dlist_init(&rxm_conn->deferred_conn_entry);
239 	dlist_init(&rxm_conn->deferred_tx_queue);
240 	dlist_init(&rxm_conn->sar_rx_msg_list);
241 	dlist_init(&rxm_conn->sar_deferred_rx_msg_list);
242 
243 	if (rxm_ep->util_ep.domain->threading != FI_THREAD_SAFE) {
244 		rxm_conn->inject_pkt =
245 			rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
246 						  ofi_op_msg, 0);
247 		rxm_conn->inject_data_pkt =
248 			rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
249 						  ofi_op_msg, FI_REMOTE_CQ_DATA);
250 		rxm_conn->tinject_pkt =
251 			rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
252 						  ofi_op_tagged, 0);
253 		rxm_conn->tinject_data_pkt =
254 			rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
255 						  ofi_op_tagged, FI_REMOTE_CQ_DATA);
256 
257 		if (!rxm_conn->inject_pkt || !rxm_conn->inject_data_pkt ||
258 		    !rxm_conn->tinject_pkt || !rxm_conn->tinject_data_pkt) {
259 			rxm_conn_res_free(rxm_conn);
260 			FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to allocate "
261 				"inject pkt for connection\n");
262 			return -FI_ENOMEM;
263 		}
264 	}
265 	return 0;
266 }
267 
rxm_conn_close(struct rxm_cmap_handle * handle)268 static void rxm_conn_close(struct rxm_cmap_handle *handle)
269 {
270 	struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
271 
272 	FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing msg ep\n");
273 	if (!rxm_conn->msg_ep)
274 		return;
275 
276 	if (fi_close(&rxm_conn->msg_ep->fid))
277 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to close msg_ep\n");
278 
279 	rxm_conn->msg_ep = NULL;
280 }
281 
rxm_conn_free(struct rxm_cmap_handle * handle)282 static void rxm_conn_free(struct rxm_cmap_handle *handle)
283 {
284 	struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
285 
286 	rxm_conn_close(handle);
287 	rxm_conn_res_free(rxm_conn);
288 	free(rxm_conn);
289 }
290 
rxm_cmap_alloc_handle(struct rxm_cmap * cmap,fi_addr_t fi_addr,enum rxm_cmap_state state,struct rxm_cmap_handle ** handle)291 static int rxm_cmap_alloc_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr,
292 				 enum rxm_cmap_state state,
293 				 struct rxm_cmap_handle **handle)
294 {
295 	int ret;
296 
297 	*handle = rxm_conn_alloc(cmap);
298 	if (OFI_UNLIKELY(!*handle))
299 		return -FI_ENOMEM;
300 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
301 	       "Allocated handle: %p for fi_addr: %" PRIu64 "\n",
302 	       *handle, fi_addr);
303 	ret = rxm_cmap_check_and_realloc_handles_table(cmap, fi_addr);
304 	if (OFI_UNLIKELY(ret)) {
305 		rxm_conn_free(*handle);
306 		return ret;
307 	}
308 	rxm_cmap_init_handle(*handle, cmap, state, fi_addr, NULL);
309 	cmap->handles_av[fi_addr] = *handle;
310 	return 0;
311 }
312 
rxm_cmap_alloc_handle_peer(struct rxm_cmap * cmap,void * addr,enum rxm_cmap_state state,struct rxm_cmap_handle ** handle)313 static int rxm_cmap_alloc_handle_peer(struct rxm_cmap *cmap, void *addr,
314 				       enum rxm_cmap_state state,
315 				       struct rxm_cmap_handle **handle)
316 {
317 	struct rxm_cmap_peer *peer;
318 
319 	peer = calloc(1, sizeof(*peer) + cmap->av->addrlen);
320 	if (!peer)
321 		return -FI_ENOMEM;
322 	*handle = rxm_conn_alloc(cmap);
323 	if (!*handle) {
324 		free(peer);
325 		return -FI_ENOMEM;
326 	}
327 	ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV, "Allocated handle for addr",
328 			addr);
329 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "handle: %p\n", *handle);
330 	rxm_cmap_init_handle(*handle, cmap, state, FI_ADDR_NOTAVAIL, peer);
331 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Adding handle to peer list\n");
332 	peer->handle = *handle;
333 	memcpy(peer->addr, addr, cmap->av->addrlen);
334 	dlist_insert_tail(&peer->entry, &cmap->peer_list);
335 	return 0;
336 }
337 
338 static struct rxm_cmap_handle *
rxm_cmap_get_handle_peer(struct rxm_cmap * cmap,const void * addr)339 rxm_cmap_get_handle_peer(struct rxm_cmap *cmap, const void *addr)
340 {
341 	struct rxm_cmap_peer *peer;
342 	struct dlist_entry *entry;
343 
344 	entry = dlist_find_first_match(&cmap->peer_list, rxm_cmap_match_peer,
345 				       addr);
346 	if (!entry)
347 		return NULL;
348 	ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV,
349 			"handle found in peer list for addr", addr);
350 	peer = container_of(entry, struct rxm_cmap_peer, entry);
351 	return peer->handle;
352 }
353 
rxm_cmap_remove(struct rxm_cmap * cmap,int index)354 int rxm_cmap_remove(struct rxm_cmap *cmap, int index)
355 {
356 	struct rxm_cmap_handle *handle;
357 	int ret = -FI_ENOENT;
358 
359 	handle = cmap->handles_av[index];
360 	if (!handle) {
361 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cmap entry not found\n");
362 		return ret;
363 	}
364 
365 	handle->peer = calloc(1, sizeof(*handle->peer) + cmap->av->addrlen);
366 	if (!handle->peer) {
367 		ret = -FI_ENOMEM;
368 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to allocate memory "
369 			"for moving handle to peer list, deleting it instead\n");
370 		rxm_cmap_del_handle(handle);
371 		return ret;
372 	}
373 	handle->fi_addr = FI_ADDR_NOTAVAIL;
374 	cmap->handles_av[index] = NULL;
375 	handle->peer->handle = handle;
376 	memcpy(handle->peer->addr, ofi_av_get_addr(cmap->av, index),
377 	       cmap->av->addrlen);
378 	dlist_insert_tail(&handle->peer->entry, &cmap->peer_list);
379 	return 0;
380 }
381 
rxm_cmap_move_handle(struct rxm_cmap_handle * handle,fi_addr_t fi_addr)382 static int rxm_cmap_move_handle(struct rxm_cmap_handle *handle,
383 				fi_addr_t fi_addr)
384 {
385 	int ret;
386 
387 	dlist_remove(&handle->peer->entry);
388 	free(handle->peer);
389 	handle->peer = NULL;
390 	handle->fi_addr = fi_addr;
391 	ret = rxm_cmap_check_and_realloc_handles_table(handle->cmap, fi_addr);
392 	if (OFI_UNLIKELY(ret))
393 		return ret;
394 	handle->cmap->handles_av[fi_addr] = handle;
395 	return 0;
396 }
397 
rxm_cmap_update(struct rxm_cmap * cmap,const void * addr,fi_addr_t fi_addr)398 int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr)
399 {
400 	struct rxm_cmap_handle *handle;
401 	int ret;
402 
403 	/* Check whether we have already allocated a handle for this `fi_addr`. */
404 	/* We rely on the fact that `ofi_ip_av_insert`/`ofi_av_insert_addr` returns
405 	 * the same `fi_addr` for the equal addresses */
406 	if (fi_addr < cmap->num_allocated) {
407 		handle = rxm_cmap_acquire_handle(cmap, fi_addr);
408 		if (handle)
409 			return 0;
410 	}
411 
412 	handle = rxm_cmap_get_handle_peer(cmap, addr);
413 	if (!handle) {
414 		ret = rxm_cmap_alloc_handle(cmap, fi_addr,
415 					    RXM_CMAP_IDLE, &handle);
416 		return ret;
417 	}
418 	ret = rxm_cmap_move_handle(handle, fi_addr);
419 	if (ret)
420 		return ret;
421 
422 	rxm_conn_av_updated_handler(handle);
423 	return 0;
424 }
425 
rxm_cmap_process_shutdown(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle)426 void rxm_cmap_process_shutdown(struct rxm_cmap *cmap,
427 			       struct rxm_cmap_handle *handle)
428 {
429 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
430 		"Processing shutdown for handle: %p\n", handle);
431 	if (handle->state > RXM_CMAP_SHUTDOWN) {
432 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
433 			"Invalid handle on shutdown event\n");
434 	} else if (handle->state != RXM_CMAP_SHUTDOWN) {
435 		FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got remote shutdown\n");
436 		rxm_cmap_del_handle(handle);
437 	} else {
438 		FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got local shutdown\n");
439 	}
440 }
441 
rxm_cmap_process_connect(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle,union rxm_cm_data * cm_data)442 void rxm_cmap_process_connect(struct rxm_cmap *cmap,
443 			      struct rxm_cmap_handle *handle,
444 			      union rxm_cm_data *cm_data)
445 {
446 	struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
447 
448 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
449 	       "processing FI_CONNECTED event for handle: %p\n", handle);
450 	if (cm_data) {
451 		assert(handle->state == RXM_CMAP_CONNREQ_SENT);
452 		handle->remote_key = cm_data->accept.server_conn_id;
453 		rxm_conn->rndv_tx_credits = cm_data->accept.rx_size;
454 	} else {
455 		assert(handle->state == RXM_CMAP_CONNREQ_RECV);
456 	}
457 	RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNECTED);
458 
459 	/* Set the remote key to the inject packets */
460 	if (cmap->ep->util_ep.domain->threading != FI_THREAD_SAFE) {
461 		rxm_conn->inject_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
462 		rxm_conn->inject_data_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
463 		rxm_conn->tinject_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
464 		rxm_conn->tinject_data_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
465 	}
466 }
467 
rxm_cmap_process_reject(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle,enum rxm_cmap_reject_reason reject_reason)468 void rxm_cmap_process_reject(struct rxm_cmap *cmap,
469 			     struct rxm_cmap_handle *handle,
470 			     enum rxm_cmap_reject_reason reject_reason)
471 {
472 	FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
473 		"Processing reject for handle: %p\n", handle);
474 	switch (handle->state) {
475 	case RXM_CMAP_CONNREQ_RECV:
476 	case RXM_CMAP_CONNECTED:
477 		/* Handle is being re-used for incoming connection request */
478 		break;
479 	case RXM_CMAP_CONNREQ_SENT:
480 		if (reject_reason == RXM_CMAP_REJECT_GENUINE) {
481 			FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
482 			       "Deleting connection handle\n");
483 			rxm_cmap_del_handle(handle);
484 		} else {
485 			FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
486 			       "Connection handle is being re-used. Close the connection\n");
487 			rxm_conn_close(handle);
488 		}
489 		break;
490 	case RXM_CMAP_SHUTDOWN:
491 		FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
492 			"Connection handle already being deleted\n");
493 		break;
494 	default:
495 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "Invalid cmap state: "
496 			"%d when receiving connection reject\n", handle->state);
497 		assert(0);
498 	}
499 }
500 
rxm_cmap_process_connreq(struct rxm_cmap * cmap,void * addr,struct rxm_cmap_handle ** handle_ret,uint8_t * reject_reason)501 int rxm_cmap_process_connreq(struct rxm_cmap *cmap, void *addr,
502 			     struct rxm_cmap_handle **handle_ret,
503 			     uint8_t *reject_reason)
504 {
505 	struct rxm_cmap_handle *handle;
506 	int ret = 0, cmp;
507 	fi_addr_t fi_addr = ofi_ip_av_get_fi_addr(cmap->av, addr);
508 
509 	ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL,
510 			"Processing connreq from remote pep", addr);
511 
512 	if (fi_addr == FI_ADDR_NOTAVAIL)
513 		handle = rxm_cmap_get_handle_peer(cmap, addr);
514 	else
515 		handle = rxm_cmap_acquire_handle(cmap, fi_addr);
516 
517 	if (!handle) {
518 		if (fi_addr == FI_ADDR_NOTAVAIL)
519 			ret = rxm_cmap_alloc_handle_peer(cmap, addr,
520 							 RXM_CMAP_CONNREQ_RECV,
521 							 &handle);
522 		else
523 			ret = rxm_cmap_alloc_handle(cmap, fi_addr,
524 						    RXM_CMAP_CONNREQ_RECV,
525 						    &handle);
526 		if (ret)
527 			goto unlock;
528 	}
529 
530 	switch (handle->state) {
531 	case RXM_CMAP_CONNECTED:
532 		FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
533 			"Connection already present.\n");
534 		ret = -FI_EALREADY;
535 		break;
536 	case RXM_CMAP_CONNREQ_SENT:
537 		ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "local_name",
538 				cmap->attr.name);
539 		ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "remote_name",
540 				addr);
541 
542 		cmp = ofi_addr_cmp(cmap->av->prov, addr, cmap->attr.name);
543 
544 		if (cmp < 0) {
545 			FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
546 				"Remote name lower than local name.\n");
547 			*reject_reason = RXM_CMAP_REJECT_SIMULT_CONN;
548 			ret = -FI_EALREADY;
549 			break;
550 		} else if (cmp > 0) {
551 			FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
552 				"Re-using handle: %p to accept remote "
553 				"connection\n", handle);
554 			*reject_reason = RXM_CMAP_REJECT_GENUINE;
555 			rxm_conn_close(handle);
556 		} else {
557 			FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
558 				"Endpoint connects to itself\n");
559 			ret = rxm_cmap_alloc_handle_peer(cmap, addr,
560 							  RXM_CMAP_CONNREQ_RECV,
561 							  &handle);
562 			if (ret)
563 				goto unlock;
564 
565 			assert(fi_addr != FI_ADDR_NOTAVAIL);
566 			handle->fi_addr = fi_addr;
567 		}
568 		/* Fall through */
569 	case RXM_CMAP_IDLE:
570 		RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_RECV);
571 		/* Fall through */
572 	case RXM_CMAP_CONNREQ_RECV:
573 		*handle_ret = handle;
574 		break;
575 	case RXM_CMAP_SHUTDOWN:
576 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "handle :%p marked for "
577 			"deletion / shutdown, reject connection\n", handle);
578 		*reject_reason = RXM_CMAP_REJECT_GENUINE;
579 		ret = -FI_EOPBADSTATE;
580 		break;
581 	default:
582 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
583 		       "invalid handle state: %d\n", handle->state);
584 		assert(0);
585 		ret = -FI_EOPBADSTATE;
586 	}
587 unlock:
588 	return ret;
589 }
590 
rxm_msg_eq_progress(struct rxm_ep * rxm_ep)591 int rxm_msg_eq_progress(struct rxm_ep *rxm_ep)
592 {
593 	struct rxm_msg_eq_entry *entry;
594 	int ret;
595 
596 	entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
597 	if (!entry) {
598 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
599 			"unable to allocate memory!\n");
600 		return -FI_ENOMEM;
601 	}
602 
603 	while (1) {
604 		entry->rd = rxm_eq_read(rxm_ep, RXM_MSG_EQ_ENTRY_SZ, entry);
605 		if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED) {
606 			ret = (int) entry->rd;
607 			break;
608 		}
609 		ret = rxm_conn_handle_event(rxm_ep, entry);
610 		if (ret) {
611 			FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
612 			       "invalid connection handle event: %d\n", ret);
613 			break;
614 		}
615 	}
616 	return ret;
617 }
618 
rxm_cmap_connect(struct rxm_ep * rxm_ep,fi_addr_t fi_addr,struct rxm_cmap_handle * handle)619 int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr,
620 		     struct rxm_cmap_handle *handle)
621 {
622 	int ret = FI_SUCCESS;
623 
624 	switch (handle->state) {
625 	case RXM_CMAP_IDLE:
626 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "initiating MSG_EP connect "
627 		       "for fi_addr: %" PRIu64 "\n", fi_addr);
628 		ret = rxm_conn_connect(rxm_ep, handle,
629 				       ofi_av_get_addr(rxm_ep->cmap->av, fi_addr));
630 		if (ret) {
631 			rxm_cmap_del_handle(handle);
632 		} else {
633 			RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_SENT);
634 			ret = -FI_EAGAIN;
635 		}
636 		break;
637 	case RXM_CMAP_CONNREQ_SENT:
638 	case RXM_CMAP_CONNREQ_RECV:
639 	case RXM_CMAP_SHUTDOWN:
640 		ret = -FI_EAGAIN;
641 		break;
642 	default:
643 		FI_WARN(rxm_ep->cmap->av->prov, FI_LOG_EP_CTRL,
644 			"Invalid cmap handle state\n");
645 		assert(0);
646 		ret = -FI_EOPBADSTATE;
647 	}
648 	if (ret == -FI_EAGAIN)
649 		rxm_msg_eq_progress(rxm_ep);
650 
651 	return ret;
652 }
653 
rxm_cmap_cm_thread_close(struct rxm_cmap * cmap)654 static int rxm_cmap_cm_thread_close(struct rxm_cmap *cmap)
655 {
656 	int ret;
657 
658 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "stopping CM thread\n");
659 	if (!cmap->cm_thread)
660 		return 0;
661 
662 	cmap->ep->do_progress = false;
663 	ret = rxm_conn_signal(cmap->ep, NULL, RXM_CMAP_EXIT);
664 	if (ret) {
665 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
666 			"Unable to signal CM thread\n");
667 		return ret;
668 	}
669 	ret = pthread_join(cmap->cm_thread, NULL);
670 	if (ret) {
671 		FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
672 			"Unable to join CM thread\n");
673 		return ret;
674 	}
675 	return 0;
676 }
677 
rxm_cmap_free(struct rxm_cmap * cmap)678 void rxm_cmap_free(struct rxm_cmap *cmap)
679 {
680 	struct rxm_cmap_peer *peer;
681 	struct dlist_entry *entry;
682 	size_t i;
683 
684 	FI_INFO(cmap->av->prov, FI_LOG_EP_CTRL, "Closing cmap\n");
685 	rxm_cmap_cm_thread_close(cmap);
686 
687 	for (i = 0; i < cmap->num_allocated; i++) {
688 		if (cmap->handles_av[i]) {
689 			rxm_cmap_clear_key(cmap->handles_av[i]);
690 			rxm_conn_free(cmap->handles_av[i]);
691 			cmap->handles_av[i] = 0;
692 		}
693 	}
694 
695 	while(!dlist_empty(&cmap->peer_list)) {
696 		entry = cmap->peer_list.next;
697 		peer = container_of(entry, struct rxm_cmap_peer, entry);
698 		dlist_remove(&peer->entry);
699 		rxm_cmap_clear_key(peer->handle);
700 		rxm_conn_free(peer->handle);
701 		free(peer);
702 	}
703 
704 	free(cmap->handles_av);
705 	free(cmap->attr.name);
706 	ofi_idx_reset(&cmap->handles_idx);
707 	free(cmap);
708 }
709 
710 static int
rxm_cmap_update_addr(struct util_av * av,void * addr,fi_addr_t fi_addr,void * arg)711 rxm_cmap_update_addr(struct util_av *av, void *addr,
712 		     fi_addr_t fi_addr, void *arg)
713 {
714 	return rxm_cmap_update((struct rxm_cmap *)arg, addr, fi_addr);
715 }
716 
rxm_cmap_bind_to_av(struct rxm_cmap * cmap,struct util_av * av)717 int rxm_cmap_bind_to_av(struct rxm_cmap *cmap, struct util_av *av)
718 {
719 	cmap->av = av;
720 	return ofi_av_elements_iter(av, rxm_cmap_update_addr, (void *)cmap);
721 }
722 
rxm_cmap_alloc(struct rxm_ep * rxm_ep,struct rxm_cmap_attr * attr)723 int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr)
724 {
725 	struct rxm_cmap *cmap;
726 	struct util_ep *ep = &rxm_ep->util_ep;
727 	int ret;
728 
729 	cmap = calloc(1, sizeof *cmap);
730 	if (!cmap)
731 		return -FI_ENOMEM;
732 
733 	cmap->ep = rxm_ep;
734 	cmap->av = ep->av;
735 
736 	cmap->handles_av = calloc(cmap->av->count, sizeof(*cmap->handles_av));
737 	if (!cmap->handles_av) {
738 		ret = -FI_ENOMEM;
739 		goto err1;
740 	}
741 	cmap->num_allocated = ep->av->count;
742 
743 	cmap->attr = *attr;
744 	cmap->attr.name = mem_dup(attr->name, ep->av->addrlen);
745 	if (!cmap->attr.name) {
746 		ret = -FI_ENOMEM;
747 		goto err2;
748 	}
749 
750 	memset(&cmap->handles_idx, 0, sizeof(cmap->handles_idx));
751 	ofi_key_idx_init(&cmap->key_idx, RXM_CMAP_IDX_BITS);
752 
753 	dlist_init(&cmap->peer_list);
754 
755 	rxm_ep->cmap = cmap;
756 
757 	if (ep->domain->data_progress == FI_PROGRESS_AUTO || force_auto_progress) {
758 
759 		assert(ep->domain->threading == FI_THREAD_SAFE);
760 		rxm_ep->do_progress = true;
761 		if (pthread_create(&cmap->cm_thread, 0,
762 				   rxm_ep->rxm_info->caps & FI_ATOMIC ?
763 				   rxm_conn_atomic_progress :
764 				   rxm_conn_progress, ep)) {
765 			FI_WARN(ep->av->prov, FI_LOG_EP_CTRL,
766 				"unable to create cmap thread\n");
767 			ret = -ofi_syserr();
768 			goto err3;
769 		}
770 	}
771 
772 	assert(ep->av);
773 	ret = rxm_cmap_bind_to_av(cmap, ep->av);
774 	if (ret)
775 		goto err4;
776 
777 	return FI_SUCCESS;
778 err4:
779 	rxm_cmap_cm_thread_close(cmap);
780 err3:
781 	rxm_ep->cmap = NULL;
782 	free(cmap->attr.name);
783 err2:
784 	free(cmap->handles_av);
785 err1:
786 	free(cmap);
787 	return ret;
788 }
789 
rxm_msg_ep_open(struct rxm_ep * rxm_ep,struct fi_info * msg_info,struct rxm_conn * rxm_conn,void * context)790 static int rxm_msg_ep_open(struct rxm_ep *rxm_ep, struct fi_info *msg_info,
791 			   struct rxm_conn *rxm_conn, void *context)
792 {
793 	struct rxm_domain *rxm_domain;
794 	struct fid_ep *msg_ep;
795 	int ret;
796 
797 	rxm_domain = container_of(rxm_ep->util_ep.domain, struct rxm_domain,
798 			util_domain);
799 	ret = fi_endpoint(rxm_domain->msg_domain, msg_info, &msg_ep, context);
800 	if (ret) {
801 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
802 			"unable to create msg_ep: %d\n", ret);
803 		return ret;
804 	}
805 
806 	ret = fi_ep_bind(msg_ep, &rxm_ep->msg_eq->fid, 0);
807 	if (ret) {
808 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
809 			"unable to bind msg EP to EQ: %d\n", ret);
810 		goto err;
811 	}
812 
813 	if (rxm_ep->srx_ctx) {
814 		ret = fi_ep_bind(msg_ep, &rxm_ep->srx_ctx->fid, 0);
815 		if (ret) {
816 			FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to bind msg "
817 				"EP to shared RX ctx: %d\n", ret);
818 			goto err;
819 		}
820 	}
821 
822 	// TODO add other completion flags
823 	ret = fi_ep_bind(msg_ep, &rxm_ep->msg_cq->fid, FI_TRANSMIT | FI_RECV);
824 	if (ret) {
825 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
826 				"unable to bind msg_ep to msg_cq: %d\n", ret);
827 		goto err;
828 	}
829 
830 	ret = fi_enable(msg_ep);
831 	if (ret) {
832 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
833 			"unable to enable msg_ep: %d\n", ret);
834 		goto err;
835 	}
836 
837 	if (!rxm_ep->srx_ctx) {
838 		ret = rxm_msg_ep_prepost_recv(rxm_ep, msg_ep);
839 		if (ret)
840 			goto err;
841 	}
842 
843 	rxm_conn->msg_ep = msg_ep;
844 	return 0;
845 err:
846 	fi_close(&msg_ep->fid);
847 	return ret;
848 }
849 
rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue * recv_queue)850 static int rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue *recv_queue)
851 {
852 	struct rxm_rx_buf *rx_buf;
853 	struct dlist_entry *entry, *tmp_entry;
854 	struct rxm_recv_match_attr match_attr;
855 	struct fi_cq_err_entry err_entry = {0};
856 	int ret, count = 0;
857 
858 	dlist_foreach_container_safe(&recv_queue->unexp_msg_list,
859 				     struct rxm_rx_buf, rx_buf,
860 				     unexp_msg.entry, tmp_entry) {
861 		if (rx_buf->unexp_msg.addr == rx_buf->conn->handle.fi_addr)
862 			continue;
863 
864 		assert(rx_buf->unexp_msg.addr == FI_ADDR_NOTAVAIL);
865 
866 		rx_buf->unexp_msg.addr = rx_buf->conn->handle.fi_addr;
867 		match_attr.addr = rx_buf->unexp_msg.addr;
868 		match_attr.tag = rx_buf->unexp_msg.tag;
869 
870 		entry = dlist_remove_first_match(&recv_queue->recv_list,
871 						 recv_queue->match_recv,
872 						 &match_attr);
873 		if (!entry)
874 			continue;
875 
876 		dlist_remove(&rx_buf->unexp_msg.entry);
877 		rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry,
878 						  entry);
879 
880 		ret = rxm_cq_handle_rx_buf(rx_buf);
881 		if (ret) {
882 			err_entry.op_context = rx_buf;
883 			err_entry.flags = rx_buf->recv_entry->comp_flags;
884 			err_entry.len = rx_buf->pkt.hdr.size;
885 			err_entry.data = rx_buf->pkt.hdr.data;
886 			err_entry.tag = rx_buf->pkt.hdr.tag;
887 			err_entry.err = ret;
888 			err_entry.prov_errno = ret;
889 			ofi_cq_write_error(recv_queue->rxm_ep->util_ep.rx_cq,
890 					   &err_entry);
891 			if (rx_buf->ep->util_ep.flags & OFI_CNTR_ENABLED)
892 				rxm_cntr_incerr(rx_buf->ep->util_ep.rx_cntr);
893 
894 			rxm_rx_buf_free(rx_buf);
895 
896 			if (!(rx_buf->recv_entry->flags & FI_MULTI_RECV))
897 				rxm_recv_entry_release(recv_queue,
898 						       rx_buf->recv_entry);
899 		}
900 		count++;
901 	}
902 	return count;
903 }
904 
905 static void
rxm_conn_av_updated_handler(struct rxm_cmap_handle * handle)906 rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle)
907 {
908 	struct rxm_ep *ep = handle->cmap->ep;
909 	int count = 0;
910 
911 	if (ep->rxm_info->caps & FI_DIRECTED_RECV) {
912 		count += rxm_conn_reprocess_directed_recvs(&ep->recv_queue);
913 		count += rxm_conn_reprocess_directed_recvs(&ep->trecv_queue);
914 
915 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
916 		       "Reprocessed directed recvs - %d\n", count);
917 	}
918 }
919 
rxm_conn_alloc(struct rxm_cmap * cmap)920 static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap)
921 {
922 	struct rxm_conn *rxm_conn = calloc(1, sizeof(*rxm_conn));
923 
924 	if (OFI_UNLIKELY(!rxm_conn))
925 		return NULL;
926 
927 	if (rxm_conn_res_alloc(cmap->ep, rxm_conn)) {
928 		free(rxm_conn);
929 		return NULL;
930 	}
931 
932 	return &rxm_conn->handle;
933 }
934 
935 static inline int
rxm_conn_verify_cm_data(union rxm_cm_data * remote_cm_data,union rxm_cm_data * local_cm_data)936 rxm_conn_verify_cm_data(union rxm_cm_data *remote_cm_data,
937 			union rxm_cm_data *local_cm_data)
938 {
939 	/* This should stay at top as it helps to avoid endian conversion
940 	 * for other fields in rxm_cm_data */
941 	if (remote_cm_data->connect.version != local_cm_data->connect.version) {
942 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data version mismatch "
943 			"(local: %" PRIu8 ", remote:  %" PRIu8 ")\n",
944 			local_cm_data->connect.version,
945 			remote_cm_data->connect.version);
946 		goto err;
947 	}
948 	if (remote_cm_data->connect.endianness != local_cm_data->connect.endianness) {
949 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data endianness mismatch "
950 			"(local: %" PRIu8 ", remote:  %" PRIu8 ")\n",
951 			local_cm_data->connect.endianness,
952 			remote_cm_data->connect.endianness);
953 		goto err;
954 	}
955 	if (remote_cm_data->connect.ctrl_version != local_cm_data->connect.ctrl_version) {
956 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data ctrl_version mismatch "
957 			"(local: %" PRIu8 ", remote:  %" PRIu8 ")\n",
958 			local_cm_data->connect.ctrl_version,
959 			remote_cm_data->connect.ctrl_version);
960 		goto err;
961 	}
962 	if (remote_cm_data->connect.op_version != local_cm_data->connect.op_version) {
963 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data op_version mismatch "
964 			"(local: %" PRIu8 ", remote:  %" PRIu8 ")\n",
965 			local_cm_data->connect.op_version,
966 			remote_cm_data->connect.op_version);
967 		goto err;
968 	}
969 	if (remote_cm_data->connect.eager_size != local_cm_data->connect.eager_size) {
970 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data eager_size mismatch "
971 			"(local: %" PRIu32 ", remote:  %" PRIu32 ")\n",
972 			local_cm_data->connect.eager_size,
973 			remote_cm_data->connect.eager_size);
974 		goto err;
975 	}
976 	return FI_SUCCESS;
977 err:
978 	return -FI_EINVAL;
979 }
980 
rxm_conn_get_rx_size(struct rxm_ep * rxm_ep,struct fi_info * msg_info)981 static size_t rxm_conn_get_rx_size(struct rxm_ep *rxm_ep,
982 				   struct fi_info *msg_info)
983 {
984 	if (msg_info->ep_attr->rx_ctx_cnt == FI_SHARED_CONTEXT)
985 		return MAX(MIN(16, msg_info->rx_attr->size),
986 			   (msg_info->rx_attr->size /
987 			    rxm_ep->util_ep.av->count));
988 	else
989 		return msg_info->rx_attr->size;
990 }
991 
992 static int
rxm_msg_process_connreq(struct rxm_ep * rxm_ep,struct fi_info * msg_info,union rxm_cm_data * remote_cm_data)993 rxm_msg_process_connreq(struct rxm_ep *rxm_ep, struct fi_info *msg_info,
994 			union rxm_cm_data *remote_cm_data)
995 {
996 	struct rxm_conn *rxm_conn;
997 	union rxm_cm_data cm_data = {
998 		.connect = {
999 			.version = RXM_CM_DATA_VERSION,
1000 			.endianness = ofi_detect_endianness(),
1001 			.ctrl_version = RXM_CTRL_VERSION,
1002 			.op_version = RXM_OP_VERSION,
1003 			.eager_size = rxm_ep->rxm_info->tx_attr->inject_size,
1004 		},
1005 	};
1006 	union rxm_cm_data reject_cm_data = {
1007 		.reject = {
1008 			.version = RXM_CM_DATA_VERSION,
1009 			.reason = RXM_CMAP_REJECT_GENUINE,
1010 		}
1011 	};
1012 	struct rxm_cmap_handle *handle;
1013 	struct sockaddr_storage remote_pep_addr;
1014 	int ret;
1015 
1016 	assert(sizeof(uint32_t) == sizeof(cm_data.accept.rx_size));
1017 	assert(msg_info->rx_attr->size <= (uint32_t)-1);
1018 
1019 	if (rxm_conn_verify_cm_data(remote_cm_data, &cm_data)) {
1020 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1021 			"CM data mismatch was detected\n");
1022 		ret = -FI_EINVAL;
1023 		goto err1;
1024 	}
1025 
1026 	memcpy(&remote_pep_addr, msg_info->dest_addr, msg_info->dest_addrlen);
1027 	ofi_addr_set_port((struct sockaddr *)&remote_pep_addr,
1028 			  remote_cm_data->connect.port);
1029 
1030 	ret = rxm_cmap_process_connreq(rxm_ep->cmap, &remote_pep_addr,
1031 				       &handle, &reject_cm_data.reject.reason);
1032 	if (ret)
1033 		goto err1;
1034 
1035 	rxm_conn = container_of(handle, struct rxm_conn, handle);
1036 
1037 	rxm_conn->handle.remote_key = remote_cm_data->connect.client_conn_id;
1038 	rxm_conn->rndv_tx_credits = remote_cm_data->connect.rx_size;
1039 	assert(rxm_conn->rndv_tx_credits);
1040 
1041 	ret = rxm_msg_ep_open(rxm_ep, msg_info, rxm_conn, handle);
1042 	if (ret)
1043 		goto err2;
1044 
1045 	cm_data.accept.server_conn_id = rxm_conn->handle.key;
1046 	cm_data.accept.rx_size = rxm_conn_get_rx_size(rxm_ep, msg_info);
1047 
1048 	ret = fi_accept(rxm_conn->msg_ep, &cm_data.accept.server_conn_id,
1049 			sizeof(cm_data.accept));
1050 	if (ret) {
1051 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1052 			"Unable to accept incoming connection\n");
1053 		goto err2;
1054 	}
1055 
1056 	return ret;
1057 err2:
1058 	rxm_cmap_del_handle(&rxm_conn->handle);
1059 err1:
1060 	FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1061 	       "rejecting incoming connection request (reject reason: %d)\n",
1062 	       (enum rxm_cmap_reject_reason)reject_cm_data.reject.reason);
1063 	fi_reject(rxm_ep->msg_pep, msg_info->handle,
1064 		  &reject_cm_data.reject, sizeof(reject_cm_data.reject));
1065 	return ret;
1066 }
1067 
rxm_flush_msg_cq(struct rxm_ep * rxm_ep)1068 static void rxm_flush_msg_cq(struct rxm_ep *rxm_ep)
1069 {
1070 	struct fi_cq_data_entry comp;
1071 	int ret;
1072 	do {
1073 		ret = fi_cq_read(rxm_ep->msg_cq, &comp, 1);
1074 		if (ret > 0) {
1075 			ret = rxm_cq_handle_comp(rxm_ep, &comp);
1076 			if (OFI_UNLIKELY(ret)) {
1077 				rxm_cq_write_error_all(rxm_ep, ret);
1078 			} else {
1079 				ret = 1;
1080 			}
1081 		} else if (ret == -FI_EAVAIL) {
1082 			rxm_cq_read_write_error(rxm_ep);
1083 			ret = 1;
1084 		} else if (ret < 0 && ret != -FI_EAGAIN) {
1085 			rxm_cq_write_error_all(rxm_ep, ret);
1086 		}
1087 	} while (ret > 0);
1088 }
1089 
rxm_conn_handle_notify(struct fi_eq_entry * eq_entry)1090 static int rxm_conn_handle_notify(struct fi_eq_entry *eq_entry)
1091 {
1092 	struct rxm_cmap *cmap;
1093 	struct rxm_cmap_handle *handle;
1094 
1095 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "notify event %" PRIu64 "\n",
1096 		eq_entry->data);
1097 
1098 	if ((enum rxm_cmap_signal) eq_entry->data != RXM_CMAP_FREE)
1099 		return -FI_EOTHER;
1100 
1101 	handle = eq_entry->context;
1102 	assert(handle->state == RXM_CMAP_SHUTDOWN);
1103 	FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "freeing handle: %p\n", handle);
1104 	cmap = handle->cmap;
1105 
1106 	rxm_conn_close(handle);
1107 
1108 	// after closing the connection, we need to flush any dangling references to the
1109 	// handle from msg_cq entries that have not been cleaned up yet, otherwise we
1110 	// could run into problems during CQ cleanup.  these entries will be errored so
1111 	// keep reading through EAVAIL.
1112 	rxm_flush_msg_cq(cmap->ep);
1113 
1114 	if (handle->peer) {
1115 		dlist_remove(&handle->peer->entry);
1116 		free(handle->peer);
1117 		handle->peer = NULL;
1118 	} else {
1119 		cmap->handles_av[handle->fi_addr] = 0;
1120 	}
1121 	rxm_conn_free(handle);
1122 	return 0;
1123 }
1124 
rxm_conn_wake_up_wait_obj(struct rxm_ep * rxm_ep)1125 static void rxm_conn_wake_up_wait_obj(struct rxm_ep *rxm_ep)
1126 {
1127 	if (rxm_ep->util_ep.tx_cq && rxm_ep->util_ep.tx_cq->wait)
1128 		util_cq_signal(rxm_ep->util_ep.tx_cq);
1129 	if (rxm_ep->util_ep.tx_cntr && rxm_ep->util_ep.tx_cntr->wait)
1130 		util_cntr_signal(rxm_ep->util_ep.tx_cntr);
1131 }
1132 
1133 static int
rxm_conn_handle_reject(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1134 rxm_conn_handle_reject(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1135 {
1136 	union rxm_cm_data *cm_data = entry->err_entry.err_data;
1137 
1138 	if (!cm_data || entry->err_entry.err_data_size != sizeof(cm_data->reject)) {
1139 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1140 			"no reject error data (cm_data) was found "
1141 			"(data length expected: %zu found: %zu)\n",
1142 			sizeof(cm_data->reject),
1143 			entry->err_entry.err_data_size);
1144 		return -FI_EOTHER;
1145 	}
1146 
1147 	if (cm_data->reject.version != RXM_CM_DATA_VERSION) {
1148 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1149 			"cm data version mismatch (local: %" PRIu8
1150 			", remote:  %" PRIu8 ")\n",
1151 			(uint8_t) RXM_CM_DATA_VERSION,
1152 			cm_data->reject.version);
1153 		return -FI_EOTHER;
1154 	}
1155 
1156 	if (cm_data->reject.reason == RXM_CMAP_REJECT_GENUINE) {
1157 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1158 		       "remote peer didn't accept the connection\n");
1159 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1160 		       "(reason: RXM_CMAP_REJECT_GENUINE)\n");
1161 		OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
1162 				rxm_ep->msg_eq, &entry->err_entry);
1163 	} else if (cm_data->reject.reason == RXM_CMAP_REJECT_SIMULT_CONN) {
1164 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1165 		       "(reason: RXM_CMAP_REJECT_SIMULT_CONN)\n");
1166 	} else {
1167 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1168 		        "received unknown reject reason: %d\n",
1169 			cm_data->reject.reason);
1170 	}
1171 	rxm_cmap_process_reject(rxm_ep->cmap, entry->context,
1172 				cm_data->reject.reason);
1173 	return 0;
1174 }
1175 
1176 static int
rxm_conn_handle_event(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1177 rxm_conn_handle_event(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1178 {
1179 	if (entry->rd == -FI_ECONNREFUSED)
1180 		return rxm_conn_handle_reject(rxm_ep, entry);
1181 
1182 	switch (entry->event) {
1183 	case FI_NOTIFY:
1184 		return rxm_conn_handle_notify((struct fi_eq_entry *)
1185 					      &entry->cm_entry);
1186 	case FI_CONNREQ:
1187 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "Got new connection\n");
1188 		if ((size_t)entry->rd != RXM_CM_ENTRY_SZ) {
1189 			FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1190 				"Received a connection request with no CM data. "
1191 				"Is sender running FI_PROTO_RXM?\n");
1192 			FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Received CM entry "
1193 				"size (%zd) not matching expected (%zu)\n",
1194 				entry->rd, RXM_CM_ENTRY_SZ);
1195 			return -FI_EOTHER;
1196 		}
1197 		rxm_msg_process_connreq(rxm_ep, entry->cm_entry.info,
1198 					(union rxm_cm_data *) entry->cm_entry.data);
1199 		fi_freeinfo(entry->cm_entry.info);
1200 		break;
1201 	case FI_CONNECTED:
1202 		assert(entry->cm_entry.fid->context);
1203 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1204 		       "connection successful\n");
1205 		rxm_cmap_process_connect(rxm_ep->cmap,
1206 			entry->cm_entry.fid->context,
1207 			entry->rd - sizeof(entry->cm_entry) > 0 ?
1208 			(union rxm_cm_data *) entry->cm_entry.data : NULL);
1209 		rxm_conn_wake_up_wait_obj(rxm_ep);
1210 		break;
1211 	case FI_SHUTDOWN:
1212 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1213 		       "Received connection shutdown\n");
1214 		rxm_cmap_process_shutdown(rxm_ep->cmap,
1215 					  entry->cm_entry.fid->context);
1216 		break;
1217 	default:
1218 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1219 			"Unknown event: %u\n", entry->event);
1220 		return -FI_EOTHER;
1221 	}
1222 	return 0;
1223 }
1224 
rxm_eq_sread(struct rxm_ep * rxm_ep,size_t len,struct rxm_msg_eq_entry * entry)1225 static ssize_t rxm_eq_sread(struct rxm_ep *rxm_ep, size_t len,
1226 			    struct rxm_msg_eq_entry *entry)
1227 {
1228 	ssize_t rd;
1229 	int once = 1;
1230 
1231 	do {
1232 		/* TODO convert this to poll + fi_eq_read so that we can grab
1233 		 * rxm_ep lock before reading the EQ. This is needed to avoid
1234 		 * processing events / error entries from closed MSG EPs. This
1235 		 * can be done only for non-Windows OSes as Windows doesn't
1236 		 * have poll for a generic file descriptor. */
1237 		rd = fi_eq_sread(rxm_ep->msg_eq, &entry->event, &entry->cm_entry,
1238 				 len, -1, 0);
1239 		if (rd >= 0)
1240 			return rd;
1241 		if (rd == -FI_EINTR && once) {
1242 			FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "Ignoring EINTR\n");
1243 			once = 0;
1244 		}
1245 	} while (rd == -FI_EINTR);
1246 
1247 	if (rd != -FI_EAVAIL) {
1248 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1249 			"unable to fi_eq_sread: %s (%zd)\n",
1250 			fi_strerror(-rd), -rd);
1251 		return rd;
1252 	}
1253 
1254 	ofi_ep_lock_acquire(&rxm_ep->util_ep);
1255 	rd = rxm_eq_readerr(rxm_ep, entry);
1256 	ofi_ep_lock_release(&rxm_ep->util_ep);
1257 	return rd;
1258 }
1259 
rxm_conn_eq_event(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1260 static inline int rxm_conn_eq_event(struct rxm_ep *rxm_ep,
1261 				    struct rxm_msg_eq_entry *entry)
1262 {
1263 	int ret;
1264 
1265 	ofi_ep_lock_acquire(&rxm_ep->util_ep);
1266 	ret = rxm_conn_handle_event(rxm_ep, entry) ? -1 : 0;
1267 	ofi_ep_lock_release(&rxm_ep->util_ep);
1268 
1269 	return ret;
1270 }
1271 
rxm_conn_progress(void * arg)1272 static void *rxm_conn_progress(void *arg)
1273 {
1274 	struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep);
1275 	struct rxm_msg_eq_entry *entry;
1276 
1277 	entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
1278 	if (!entry)
1279 		return NULL;
1280 
1281 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Starting auto-progress thread\n");
1282 
1283 	while (ep->do_progress) {
1284 		memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ);
1285 		entry->rd = rxm_eq_sread(ep, RXM_CM_ENTRY_SZ, entry);
1286 		if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED)
1287 			continue;
1288 
1289 		rxm_conn_eq_event(ep, entry);
1290 	}
1291 
1292 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Stopping auto-progress thread\n");
1293 	return NULL;
1294 }
1295 
1296 static inline int
rxm_conn_auto_progress_eq(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1297 rxm_conn_auto_progress_eq(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1298 {
1299 	memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ);
1300 
1301 	ofi_ep_lock_acquire(&rxm_ep->util_ep);
1302 	entry->rd = rxm_eq_read(rxm_ep, RXM_CM_ENTRY_SZ, entry);
1303 	ofi_ep_lock_release(&rxm_ep->util_ep);
1304 
1305 	if (!entry->rd || entry->rd == -FI_EAGAIN)
1306 		return FI_SUCCESS;
1307 	if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED)
1308 		return entry->rd;
1309 
1310 	return rxm_conn_eq_event(rxm_ep, entry);
1311 }
1312 
rxm_conn_atomic_progress(void * arg)1313 static void *rxm_conn_atomic_progress(void *arg)
1314 {
1315 	struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep);
1316 	struct rxm_msg_eq_entry *entry;
1317 	struct rxm_fabric *fabric;
1318 	struct fid *fids[2] = {
1319 		&ep->msg_eq->fid,
1320 		&ep->msg_cq->fid,
1321 	};
1322 	struct pollfd fds[2] = {
1323 		{.events = POLLIN},
1324 		{.events = POLLIN},
1325 	};
1326 	int ret;
1327 
1328 	entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
1329 	if (!entry)
1330 		return NULL;
1331 
1332 	fabric = container_of(ep->util_ep.domain->fabric,
1333 			      struct rxm_fabric, util_fabric);
1334 
1335 	ret = fi_control(&ep->msg_eq->fid, FI_GETWAIT, &fds[0].fd);
1336 	if (ret) {
1337 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1338 			"unable to get msg EQ fd: %s\n", fi_strerror(ret));
1339 		return NULL;
1340 	}
1341 
1342 	ret = fi_control(&ep->msg_cq->fid, FI_GETWAIT, &fds[1].fd);
1343 	if (ret) {
1344 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1345 			"unable to get msg CQ fd: %s\n", fi_strerror(ret));
1346 		return NULL;
1347 	}
1348 
1349 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Starting auto-progress thread\n");
1350 	while (ep->do_progress) {
1351 		ret = fi_trywait(fabric->msg_fabric, fids, 2);
1352 
1353 		if (!ret) {
1354 			fds[0].revents = 0;
1355 			fds[1].revents = 0;
1356 
1357 			ret = poll(fds, 2, -1);
1358 			if (ret == -1 && errno != EINTR) {
1359 				FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1360 					"Select error %s, closing CM thread\n",
1361 					strerror(errno));
1362 				break;
1363 			}
1364 		}
1365 		rxm_conn_auto_progress_eq(ep, entry);
1366 		ep->util_ep.progress(&ep->util_ep);
1367 	}
1368 
1369 	FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Stopping auto progress thread\n");
1370 	return NULL;
1371 }
1372 
rxm_prepare_cm_data(struct fid_pep * pep,struct rxm_cmap_handle * handle,union rxm_cm_data * cm_data)1373 static int rxm_prepare_cm_data(struct fid_pep *pep, struct rxm_cmap_handle *handle,
1374 		union rxm_cm_data *cm_data)
1375 {
1376 	struct sockaddr_storage name;
1377 	size_t cm_data_size = 0;
1378 	size_t name_size = sizeof(name);
1379 	size_t opt_size = sizeof(cm_data_size);
1380 	int ret;
1381 
1382 	ret = fi_getopt(&pep->fid, FI_OPT_ENDPOINT, FI_OPT_CM_DATA_SIZE,
1383 			&cm_data_size, &opt_size);
1384 	if (ret) {
1385 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "fi_getopt failed\n");
1386 		return ret;
1387 	}
1388 
1389 	if (cm_data_size < sizeof(*cm_data)) {
1390 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "MSG EP CM data size too small\n");
1391 		return -FI_EOTHER;
1392 	}
1393 
1394 	ret = fi_getname(&pep->fid, &name, &name_size);
1395 	if (ret) {
1396 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to get msg pep name\n");
1397 		return ret;
1398 	}
1399 
1400 	cm_data->connect.port = ofi_addr_get_port((struct sockaddr *)&name);
1401 	cm_data->connect.client_conn_id = handle->key;
1402 	return 0;
1403 }
1404 
1405 static int
rxm_conn_connect(struct rxm_ep * ep,struct rxm_cmap_handle * handle,const void * addr)1406 rxm_conn_connect(struct rxm_ep *ep, struct rxm_cmap_handle *handle,
1407 		 const void *addr)
1408 {
1409 	int ret;
1410 	struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
1411 	union rxm_cm_data cm_data = {
1412 		.connect = {
1413 			.version = RXM_CM_DATA_VERSION,
1414 			.ctrl_version = RXM_CTRL_VERSION,
1415 			.op_version = RXM_OP_VERSION,
1416 			.endianness = ofi_detect_endianness(),
1417 			.eager_size = ep->rxm_info->tx_attr->inject_size,
1418 		},
1419 	};
1420 
1421 	assert(sizeof(uint32_t) == sizeof(cm_data.connect.eager_size));
1422 	assert(sizeof(uint32_t) == sizeof(cm_data.connect.rx_size));
1423 	assert(ep->rxm_info->tx_attr->inject_size <= (uint32_t) -1);
1424 	assert(ep->msg_info->rx_attr->size <= (uint32_t) -1);
1425 
1426 	free(ep->msg_info->dest_addr);
1427 	ep->msg_info->dest_addrlen = ep->msg_info->src_addrlen;
1428 
1429 	ep->msg_info->dest_addr = mem_dup(addr, ep->msg_info->dest_addrlen);
1430 	if (!ep->msg_info->dest_addr) {
1431 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "mem_dup failed, len %zu\n",
1432 			ep->msg_info->dest_addrlen);
1433 		return -FI_ENOMEM;
1434 	}
1435 
1436 	ret = rxm_msg_ep_open(ep, ep->msg_info, rxm_conn, &rxm_conn->handle);
1437 	if (ret)
1438 		return ret;
1439 
1440 	/* We have to send passive endpoint's address to the server since the
1441 	 * address from which connection request would be sent would have a
1442 	 * different port. */
1443 	ret = rxm_prepare_cm_data(ep->msg_pep, &rxm_conn->handle, &cm_data);
1444 	if (ret)
1445 		goto err;
1446 
1447 	cm_data.connect.rx_size = rxm_conn_get_rx_size(ep, ep->msg_info);
1448 
1449 	ret = fi_connect(rxm_conn->msg_ep, ep->msg_info->dest_addr,
1450 			 &cm_data, sizeof(cm_data));
1451 	if (ret) {
1452 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to connect msg_ep\n");
1453 		goto err;
1454 	}
1455 	return 0;
1456 
1457 err:
1458 	fi_close(&rxm_conn->msg_ep->fid);
1459 	rxm_conn->msg_ep = NULL;
1460 	return ret;
1461 }
1462 
rxm_conn_signal(struct rxm_ep * ep,void * context,enum rxm_cmap_signal signal)1463 static int rxm_conn_signal(struct rxm_ep *ep, void *context,
1464 			   enum rxm_cmap_signal signal)
1465 {
1466 	struct fi_eq_entry entry = {0};
1467 	ssize_t rd;
1468 
1469 	entry.context = context;
1470 	entry.data = (uint64_t) signal;
1471 
1472 	rd = fi_eq_write(ep->msg_eq, FI_NOTIFY, &entry, sizeof(entry), 0);
1473 	if (rd != sizeof(entry)) {
1474 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to signal\n");
1475 		return (int)rd;
1476 	}
1477 	return 0;
1478 }
1479 
rxm_conn_cmap_alloc(struct rxm_ep * rxm_ep)1480 int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep)
1481 {
1482 	struct rxm_cmap_attr attr;
1483 	int ret;
1484 	size_t len = rxm_ep->util_ep.av->addrlen;
1485 	void *name = calloc(1, len);
1486 	if (!name) {
1487 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1488 			"Unable to allocate memory for EP name\n");
1489 		return -FI_ENOMEM;
1490 	}
1491 
1492 	/* Passive endpoint should already have fi_setname or fi_listen
1493 	 * called on it for this to work */
1494 	ret = fi_getname(&rxm_ep->msg_pep->fid, name, &len);
1495 	if (ret) {
1496 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1497 			"Unable to fi_getname on msg_ep\n");
1498 		goto fn;
1499 	}
1500 	ofi_straddr_dbg(&rxm_prov, FI_LOG_EP_CTRL, "local_name", name);
1501 
1502 	attr.name		= name;
1503 
1504 	ret = rxm_cmap_alloc(rxm_ep, &attr);
1505 	if (ret)
1506 		FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1507 			"Unable to allocate CMAP\n");
1508 fn:
1509 	free(name);
1510 	return ret;
1511 }
1512