1 /*
2 * Copyright (c) 2016 Intel Corporation, Inc. All rights reserved.
3 * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #include <stdlib.h>
35 #include <string.h>
36 #include <poll.h>
37
38 #include <ofi.h>
39 #include <ofi_util.h>
40 #include "rxm.h"
41
42 static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap);
43 static int rxm_conn_connect(struct rxm_ep *ep,
44 struct rxm_cmap_handle *handle, const void *addr);
45 static int rxm_conn_signal(struct rxm_ep *ep, void *context,
46 enum rxm_cmap_signal signal);
47 static void rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle);
48 static void *rxm_conn_progress(void *arg);
49 static void *rxm_conn_atomic_progress(void *arg);
50 static int rxm_conn_handle_event(struct rxm_ep *rxm_ep,
51 struct rxm_msg_eq_entry *entry);
52
53
54 /*
55 * Connection map
56 */
57
58 char *rxm_cm_state_str[] = {
59 RXM_CM_STATES(OFI_STR)
60 };
61
rxm_eq_readerr(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)62 static inline ssize_t rxm_eq_readerr(struct rxm_ep *rxm_ep,
63 struct rxm_msg_eq_entry *entry)
64 {
65 ssize_t ret;
66
67 /* reset previous err data info */
68 entry->err_entry.err_data_size = 0;
69
70 ret = fi_eq_readerr(rxm_ep->msg_eq, &entry->err_entry, 0);
71 if (ret != sizeof(entry->err_entry)) {
72 if (ret != -FI_EAGAIN)
73 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
74 "unable to fi_eq_readerr: %zd\n", ret);
75 return ret < 0 ? ret : -FI_EINVAL;
76 }
77
78 if (entry->err_entry.err == ECONNREFUSED) {
79 entry->context = entry->err_entry.fid->context;
80 return -FI_ECONNREFUSED;
81 }
82
83 OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
84 rxm_ep->msg_eq, &entry->err_entry);
85 return -entry->err_entry.err;
86 }
87
rxm_eq_read(struct rxm_ep * ep,size_t len,struct rxm_msg_eq_entry * entry)88 static ssize_t rxm_eq_read(struct rxm_ep *ep, size_t len,
89 struct rxm_msg_eq_entry *entry)
90 {
91 ssize_t ret;
92
93 ret = fi_eq_read(ep->msg_eq, &entry->event, &entry->cm_entry, len, 0);
94 if (ret == -FI_EAVAIL)
95 ret = rxm_eq_readerr(ep, entry);
96
97 return ret;
98 }
99
rxm_cmap_set_key(struct rxm_cmap_handle * handle)100 static void rxm_cmap_set_key(struct rxm_cmap_handle *handle)
101 {
102 handle->key = ofi_idx2key(&handle->cmap->key_idx,
103 ofi_idx_insert(&handle->cmap->handles_idx, handle));
104 }
105
rxm_cmap_clear_key(struct rxm_cmap_handle * handle)106 static void rxm_cmap_clear_key(struct rxm_cmap_handle *handle)
107 {
108 int index = ofi_key2idx(&handle->cmap->key_idx, handle->key);
109
110 if (!ofi_idx_is_valid(&handle->cmap->handles_idx, index))
111 FI_WARN(handle->cmap->av->prov, FI_LOG_AV, "Invalid key!\n");
112 else
113 ofi_idx_remove(&handle->cmap->handles_idx, index);
114 }
115
rxm_cmap_key2handle(struct rxm_cmap * cmap,uint64_t key)116 struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key)
117 {
118 struct rxm_cmap_handle *handle;
119
120 if (!(handle = ofi_idx_lookup(&cmap->handles_idx,
121 ofi_key2idx(&cmap->key_idx, key)))) {
122 FI_WARN(cmap->av->prov, FI_LOG_AV, "Invalid key!\n");
123 } else {
124 if (handle->key != key) {
125 FI_WARN(cmap->av->prov, FI_LOG_AV,
126 "handle->key not matching given key\n");
127 handle = NULL;
128 }
129 }
130 return handle;
131 }
132
rxm_cmap_init_handle(struct rxm_cmap_handle * handle,struct rxm_cmap * cmap,enum rxm_cmap_state state,fi_addr_t fi_addr,struct rxm_cmap_peer * peer)133 static void rxm_cmap_init_handle(struct rxm_cmap_handle *handle,
134 struct rxm_cmap *cmap,
135 enum rxm_cmap_state state,
136 fi_addr_t fi_addr,
137 struct rxm_cmap_peer *peer)
138 {
139 handle->cmap = cmap;
140 RXM_CM_UPDATE_STATE(handle, state);
141 rxm_cmap_set_key(handle);
142 handle->fi_addr = fi_addr;
143 handle->peer = peer;
144 }
145
rxm_cmap_match_peer(struct dlist_entry * entry,const void * addr)146 static int rxm_cmap_match_peer(struct dlist_entry *entry, const void *addr)
147 {
148 struct rxm_cmap_peer *peer;
149
150 peer = container_of(entry, struct rxm_cmap_peer, entry);
151 return !memcmp(peer->addr, addr, peer->handle->cmap->av->addrlen);
152 }
153
rxm_cmap_del_handle(struct rxm_cmap_handle * handle)154 static int rxm_cmap_del_handle(struct rxm_cmap_handle *handle)
155 {
156 struct rxm_cmap *cmap = handle->cmap;
157 int ret;
158
159 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
160 "marking connection handle: %p for deletion\n", handle);
161 rxm_cmap_clear_key(handle);
162
163 RXM_CM_UPDATE_STATE(handle, RXM_CMAP_SHUTDOWN);
164
165 /* Signal CM thread to delete the handle. This is required
166 * so that the CM thread handles any pending events for this
167 * ep correctly. Handle would be freed finally after processing the
168 * events */
169 ret = rxm_conn_signal(cmap->ep, handle, RXM_CMAP_FREE);
170 if (ret) {
171 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
172 "Unable to signal CM thread\n");
173 return ret;
174 }
175 return 0;
176 }
177
178 static inline int
rxm_cmap_check_and_realloc_handles_table(struct rxm_cmap * cmap,fi_addr_t fi_addr)179 rxm_cmap_check_and_realloc_handles_table(struct rxm_cmap *cmap,
180 fi_addr_t fi_addr)
181 {
182 void *new_handles;
183 size_t grow_size;
184
185 if (OFI_LIKELY(fi_addr < cmap->num_allocated))
186 return 0;
187
188 grow_size = MAX(cmap->av->count, fi_addr - cmap->num_allocated + 1);
189
190 new_handles = realloc(cmap->handles_av,
191 (grow_size + cmap->num_allocated) *
192 sizeof(*cmap->handles_av));
193 if (OFI_LIKELY(!new_handles))
194 return -FI_ENOMEM;
195
196 cmap->handles_av = new_handles;
197 memset(&cmap->handles_av[cmap->num_allocated], 0,
198 sizeof(*cmap->handles_av) * grow_size);
199 cmap->num_allocated += grow_size;
200 return 0;
201 }
202
203 static struct rxm_pkt *
rxm_conn_inject_pkt_alloc(struct rxm_ep * rxm_ep,struct rxm_conn * rxm_conn,uint8_t op,uint64_t flags)204 rxm_conn_inject_pkt_alloc(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn,
205 uint8_t op, uint64_t flags)
206 {
207 struct rxm_pkt *inject_pkt;
208 int ret = ofi_memalign((void **) &inject_pkt, 16,
209 rxm_ep->inject_limit + sizeof(*inject_pkt));
210
211 if (ret)
212 return NULL;
213
214 memset(inject_pkt, 0, rxm_ep->inject_limit + sizeof(*inject_pkt));
215 inject_pkt->ctrl_hdr.version = RXM_CTRL_VERSION;
216 inject_pkt->ctrl_hdr.type = rxm_ctrl_eager;
217 inject_pkt->hdr.version = OFI_OP_VERSION;
218 inject_pkt->hdr.op = op;
219 inject_pkt->hdr.flags = flags;
220
221 return inject_pkt;
222 }
223
rxm_conn_res_free(struct rxm_conn * rxm_conn)224 static void rxm_conn_res_free(struct rxm_conn *rxm_conn)
225 {
226 ofi_freealign(rxm_conn->inject_pkt);
227 rxm_conn->inject_pkt = NULL;
228 ofi_freealign(rxm_conn->inject_data_pkt);
229 rxm_conn->inject_data_pkt = NULL;
230 ofi_freealign(rxm_conn->tinject_pkt);
231 rxm_conn->tinject_pkt = NULL;
232 ofi_freealign(rxm_conn->tinject_data_pkt);
233 rxm_conn->tinject_data_pkt = NULL;
234 }
235
rxm_conn_res_alloc(struct rxm_ep * rxm_ep,struct rxm_conn * rxm_conn)236 static int rxm_conn_res_alloc(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn)
237 {
238 dlist_init(&rxm_conn->deferred_conn_entry);
239 dlist_init(&rxm_conn->deferred_tx_queue);
240 dlist_init(&rxm_conn->sar_rx_msg_list);
241 dlist_init(&rxm_conn->sar_deferred_rx_msg_list);
242
243 if (rxm_ep->util_ep.domain->threading != FI_THREAD_SAFE) {
244 rxm_conn->inject_pkt =
245 rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
246 ofi_op_msg, 0);
247 rxm_conn->inject_data_pkt =
248 rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
249 ofi_op_msg, FI_REMOTE_CQ_DATA);
250 rxm_conn->tinject_pkt =
251 rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
252 ofi_op_tagged, 0);
253 rxm_conn->tinject_data_pkt =
254 rxm_conn_inject_pkt_alloc(rxm_ep, rxm_conn,
255 ofi_op_tagged, FI_REMOTE_CQ_DATA);
256
257 if (!rxm_conn->inject_pkt || !rxm_conn->inject_data_pkt ||
258 !rxm_conn->tinject_pkt || !rxm_conn->tinject_data_pkt) {
259 rxm_conn_res_free(rxm_conn);
260 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to allocate "
261 "inject pkt for connection\n");
262 return -FI_ENOMEM;
263 }
264 }
265 return 0;
266 }
267
rxm_conn_close(struct rxm_cmap_handle * handle)268 static void rxm_conn_close(struct rxm_cmap_handle *handle)
269 {
270 struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
271
272 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing msg ep\n");
273 if (!rxm_conn->msg_ep)
274 return;
275
276 if (fi_close(&rxm_conn->msg_ep->fid))
277 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to close msg_ep\n");
278
279 rxm_conn->msg_ep = NULL;
280 }
281
rxm_conn_free(struct rxm_cmap_handle * handle)282 static void rxm_conn_free(struct rxm_cmap_handle *handle)
283 {
284 struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
285
286 rxm_conn_close(handle);
287 rxm_conn_res_free(rxm_conn);
288 free(rxm_conn);
289 }
290
rxm_cmap_alloc_handle(struct rxm_cmap * cmap,fi_addr_t fi_addr,enum rxm_cmap_state state,struct rxm_cmap_handle ** handle)291 static int rxm_cmap_alloc_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr,
292 enum rxm_cmap_state state,
293 struct rxm_cmap_handle **handle)
294 {
295 int ret;
296
297 *handle = rxm_conn_alloc(cmap);
298 if (OFI_UNLIKELY(!*handle))
299 return -FI_ENOMEM;
300 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
301 "Allocated handle: %p for fi_addr: %" PRIu64 "\n",
302 *handle, fi_addr);
303 ret = rxm_cmap_check_and_realloc_handles_table(cmap, fi_addr);
304 if (OFI_UNLIKELY(ret)) {
305 rxm_conn_free(*handle);
306 return ret;
307 }
308 rxm_cmap_init_handle(*handle, cmap, state, fi_addr, NULL);
309 cmap->handles_av[fi_addr] = *handle;
310 return 0;
311 }
312
rxm_cmap_alloc_handle_peer(struct rxm_cmap * cmap,void * addr,enum rxm_cmap_state state,struct rxm_cmap_handle ** handle)313 static int rxm_cmap_alloc_handle_peer(struct rxm_cmap *cmap, void *addr,
314 enum rxm_cmap_state state,
315 struct rxm_cmap_handle **handle)
316 {
317 struct rxm_cmap_peer *peer;
318
319 peer = calloc(1, sizeof(*peer) + cmap->av->addrlen);
320 if (!peer)
321 return -FI_ENOMEM;
322 *handle = rxm_conn_alloc(cmap);
323 if (!*handle) {
324 free(peer);
325 return -FI_ENOMEM;
326 }
327 ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV, "Allocated handle for addr",
328 addr);
329 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "handle: %p\n", *handle);
330 rxm_cmap_init_handle(*handle, cmap, state, FI_ADDR_NOTAVAIL, peer);
331 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Adding handle to peer list\n");
332 peer->handle = *handle;
333 memcpy(peer->addr, addr, cmap->av->addrlen);
334 dlist_insert_tail(&peer->entry, &cmap->peer_list);
335 return 0;
336 }
337
338 static struct rxm_cmap_handle *
rxm_cmap_get_handle_peer(struct rxm_cmap * cmap,const void * addr)339 rxm_cmap_get_handle_peer(struct rxm_cmap *cmap, const void *addr)
340 {
341 struct rxm_cmap_peer *peer;
342 struct dlist_entry *entry;
343
344 entry = dlist_find_first_match(&cmap->peer_list, rxm_cmap_match_peer,
345 addr);
346 if (!entry)
347 return NULL;
348 ofi_straddr_dbg(cmap->av->prov, FI_LOG_AV,
349 "handle found in peer list for addr", addr);
350 peer = container_of(entry, struct rxm_cmap_peer, entry);
351 return peer->handle;
352 }
353
rxm_cmap_remove(struct rxm_cmap * cmap,int index)354 int rxm_cmap_remove(struct rxm_cmap *cmap, int index)
355 {
356 struct rxm_cmap_handle *handle;
357 int ret = -FI_ENOENT;
358
359 handle = cmap->handles_av[index];
360 if (!handle) {
361 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cmap entry not found\n");
362 return ret;
363 }
364
365 handle->peer = calloc(1, sizeof(*handle->peer) + cmap->av->addrlen);
366 if (!handle->peer) {
367 ret = -FI_ENOMEM;
368 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to allocate memory "
369 "for moving handle to peer list, deleting it instead\n");
370 rxm_cmap_del_handle(handle);
371 return ret;
372 }
373 handle->fi_addr = FI_ADDR_NOTAVAIL;
374 cmap->handles_av[index] = NULL;
375 handle->peer->handle = handle;
376 memcpy(handle->peer->addr, ofi_av_get_addr(cmap->av, index),
377 cmap->av->addrlen);
378 dlist_insert_tail(&handle->peer->entry, &cmap->peer_list);
379 return 0;
380 }
381
rxm_cmap_move_handle(struct rxm_cmap_handle * handle,fi_addr_t fi_addr)382 static int rxm_cmap_move_handle(struct rxm_cmap_handle *handle,
383 fi_addr_t fi_addr)
384 {
385 int ret;
386
387 dlist_remove(&handle->peer->entry);
388 free(handle->peer);
389 handle->peer = NULL;
390 handle->fi_addr = fi_addr;
391 ret = rxm_cmap_check_and_realloc_handles_table(handle->cmap, fi_addr);
392 if (OFI_UNLIKELY(ret))
393 return ret;
394 handle->cmap->handles_av[fi_addr] = handle;
395 return 0;
396 }
397
rxm_cmap_update(struct rxm_cmap * cmap,const void * addr,fi_addr_t fi_addr)398 int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr)
399 {
400 struct rxm_cmap_handle *handle;
401 int ret;
402
403 /* Check whether we have already allocated a handle for this `fi_addr`. */
404 /* We rely on the fact that `ofi_ip_av_insert`/`ofi_av_insert_addr` returns
405 * the same `fi_addr` for the equal addresses */
406 if (fi_addr < cmap->num_allocated) {
407 handle = rxm_cmap_acquire_handle(cmap, fi_addr);
408 if (handle)
409 return 0;
410 }
411
412 handle = rxm_cmap_get_handle_peer(cmap, addr);
413 if (!handle) {
414 ret = rxm_cmap_alloc_handle(cmap, fi_addr,
415 RXM_CMAP_IDLE, &handle);
416 return ret;
417 }
418 ret = rxm_cmap_move_handle(handle, fi_addr);
419 if (ret)
420 return ret;
421
422 rxm_conn_av_updated_handler(handle);
423 return 0;
424 }
425
rxm_cmap_process_shutdown(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle)426 void rxm_cmap_process_shutdown(struct rxm_cmap *cmap,
427 struct rxm_cmap_handle *handle)
428 {
429 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
430 "Processing shutdown for handle: %p\n", handle);
431 if (handle->state > RXM_CMAP_SHUTDOWN) {
432 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
433 "Invalid handle on shutdown event\n");
434 } else if (handle->state != RXM_CMAP_SHUTDOWN) {
435 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got remote shutdown\n");
436 rxm_cmap_del_handle(handle);
437 } else {
438 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL, "Got local shutdown\n");
439 }
440 }
441
rxm_cmap_process_connect(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle,union rxm_cm_data * cm_data)442 void rxm_cmap_process_connect(struct rxm_cmap *cmap,
443 struct rxm_cmap_handle *handle,
444 union rxm_cm_data *cm_data)
445 {
446 struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
447
448 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
449 "processing FI_CONNECTED event for handle: %p\n", handle);
450 if (cm_data) {
451 assert(handle->state == RXM_CMAP_CONNREQ_SENT);
452 handle->remote_key = cm_data->accept.server_conn_id;
453 rxm_conn->rndv_tx_credits = cm_data->accept.rx_size;
454 } else {
455 assert(handle->state == RXM_CMAP_CONNREQ_RECV);
456 }
457 RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNECTED);
458
459 /* Set the remote key to the inject packets */
460 if (cmap->ep->util_ep.domain->threading != FI_THREAD_SAFE) {
461 rxm_conn->inject_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
462 rxm_conn->inject_data_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
463 rxm_conn->tinject_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
464 rxm_conn->tinject_data_pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
465 }
466 }
467
rxm_cmap_process_reject(struct rxm_cmap * cmap,struct rxm_cmap_handle * handle,enum rxm_cmap_reject_reason reject_reason)468 void rxm_cmap_process_reject(struct rxm_cmap *cmap,
469 struct rxm_cmap_handle *handle,
470 enum rxm_cmap_reject_reason reject_reason)
471 {
472 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
473 "Processing reject for handle: %p\n", handle);
474 switch (handle->state) {
475 case RXM_CMAP_CONNREQ_RECV:
476 case RXM_CMAP_CONNECTED:
477 /* Handle is being re-used for incoming connection request */
478 break;
479 case RXM_CMAP_CONNREQ_SENT:
480 if (reject_reason == RXM_CMAP_REJECT_GENUINE) {
481 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
482 "Deleting connection handle\n");
483 rxm_cmap_del_handle(handle);
484 } else {
485 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
486 "Connection handle is being re-used. Close the connection\n");
487 rxm_conn_close(handle);
488 }
489 break;
490 case RXM_CMAP_SHUTDOWN:
491 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
492 "Connection handle already being deleted\n");
493 break;
494 default:
495 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "Invalid cmap state: "
496 "%d when receiving connection reject\n", handle->state);
497 assert(0);
498 }
499 }
500
rxm_cmap_process_connreq(struct rxm_cmap * cmap,void * addr,struct rxm_cmap_handle ** handle_ret,uint8_t * reject_reason)501 int rxm_cmap_process_connreq(struct rxm_cmap *cmap, void *addr,
502 struct rxm_cmap_handle **handle_ret,
503 uint8_t *reject_reason)
504 {
505 struct rxm_cmap_handle *handle;
506 int ret = 0, cmp;
507 fi_addr_t fi_addr = ofi_ip_av_get_fi_addr(cmap->av, addr);
508
509 ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL,
510 "Processing connreq from remote pep", addr);
511
512 if (fi_addr == FI_ADDR_NOTAVAIL)
513 handle = rxm_cmap_get_handle_peer(cmap, addr);
514 else
515 handle = rxm_cmap_acquire_handle(cmap, fi_addr);
516
517 if (!handle) {
518 if (fi_addr == FI_ADDR_NOTAVAIL)
519 ret = rxm_cmap_alloc_handle_peer(cmap, addr,
520 RXM_CMAP_CONNREQ_RECV,
521 &handle);
522 else
523 ret = rxm_cmap_alloc_handle(cmap, fi_addr,
524 RXM_CMAP_CONNREQ_RECV,
525 &handle);
526 if (ret)
527 goto unlock;
528 }
529
530 switch (handle->state) {
531 case RXM_CMAP_CONNECTED:
532 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
533 "Connection already present.\n");
534 ret = -FI_EALREADY;
535 break;
536 case RXM_CMAP_CONNREQ_SENT:
537 ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "local_name",
538 cmap->attr.name);
539 ofi_straddr_dbg(cmap->av->prov, FI_LOG_EP_CTRL, "remote_name",
540 addr);
541
542 cmp = ofi_addr_cmp(cmap->av->prov, addr, cmap->attr.name);
543
544 if (cmp < 0) {
545 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
546 "Remote name lower than local name.\n");
547 *reject_reason = RXM_CMAP_REJECT_SIMULT_CONN;
548 ret = -FI_EALREADY;
549 break;
550 } else if (cmp > 0) {
551 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
552 "Re-using handle: %p to accept remote "
553 "connection\n", handle);
554 *reject_reason = RXM_CMAP_REJECT_GENUINE;
555 rxm_conn_close(handle);
556 } else {
557 FI_DBG(cmap->av->prov, FI_LOG_EP_CTRL,
558 "Endpoint connects to itself\n");
559 ret = rxm_cmap_alloc_handle_peer(cmap, addr,
560 RXM_CMAP_CONNREQ_RECV,
561 &handle);
562 if (ret)
563 goto unlock;
564
565 assert(fi_addr != FI_ADDR_NOTAVAIL);
566 handle->fi_addr = fi_addr;
567 }
568 /* Fall through */
569 case RXM_CMAP_IDLE:
570 RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_RECV);
571 /* Fall through */
572 case RXM_CMAP_CONNREQ_RECV:
573 *handle_ret = handle;
574 break;
575 case RXM_CMAP_SHUTDOWN:
576 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL, "handle :%p marked for "
577 "deletion / shutdown, reject connection\n", handle);
578 *reject_reason = RXM_CMAP_REJECT_GENUINE;
579 ret = -FI_EOPBADSTATE;
580 break;
581 default:
582 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
583 "invalid handle state: %d\n", handle->state);
584 assert(0);
585 ret = -FI_EOPBADSTATE;
586 }
587 unlock:
588 return ret;
589 }
590
rxm_msg_eq_progress(struct rxm_ep * rxm_ep)591 int rxm_msg_eq_progress(struct rxm_ep *rxm_ep)
592 {
593 struct rxm_msg_eq_entry *entry;
594 int ret;
595
596 entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
597 if (!entry) {
598 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
599 "unable to allocate memory!\n");
600 return -FI_ENOMEM;
601 }
602
603 while (1) {
604 entry->rd = rxm_eq_read(rxm_ep, RXM_MSG_EQ_ENTRY_SZ, entry);
605 if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED) {
606 ret = (int) entry->rd;
607 break;
608 }
609 ret = rxm_conn_handle_event(rxm_ep, entry);
610 if (ret) {
611 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
612 "invalid connection handle event: %d\n", ret);
613 break;
614 }
615 }
616 return ret;
617 }
618
rxm_cmap_connect(struct rxm_ep * rxm_ep,fi_addr_t fi_addr,struct rxm_cmap_handle * handle)619 int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr,
620 struct rxm_cmap_handle *handle)
621 {
622 int ret = FI_SUCCESS;
623
624 switch (handle->state) {
625 case RXM_CMAP_IDLE:
626 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "initiating MSG_EP connect "
627 "for fi_addr: %" PRIu64 "\n", fi_addr);
628 ret = rxm_conn_connect(rxm_ep, handle,
629 ofi_av_get_addr(rxm_ep->cmap->av, fi_addr));
630 if (ret) {
631 rxm_cmap_del_handle(handle);
632 } else {
633 RXM_CM_UPDATE_STATE(handle, RXM_CMAP_CONNREQ_SENT);
634 ret = -FI_EAGAIN;
635 }
636 break;
637 case RXM_CMAP_CONNREQ_SENT:
638 case RXM_CMAP_CONNREQ_RECV:
639 case RXM_CMAP_SHUTDOWN:
640 ret = -FI_EAGAIN;
641 break;
642 default:
643 FI_WARN(rxm_ep->cmap->av->prov, FI_LOG_EP_CTRL,
644 "Invalid cmap handle state\n");
645 assert(0);
646 ret = -FI_EOPBADSTATE;
647 }
648 if (ret == -FI_EAGAIN)
649 rxm_msg_eq_progress(rxm_ep);
650
651 return ret;
652 }
653
rxm_cmap_cm_thread_close(struct rxm_cmap * cmap)654 static int rxm_cmap_cm_thread_close(struct rxm_cmap *cmap)
655 {
656 int ret;
657
658 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "stopping CM thread\n");
659 if (!cmap->cm_thread)
660 return 0;
661
662 cmap->ep->do_progress = false;
663 ret = rxm_conn_signal(cmap->ep, NULL, RXM_CMAP_EXIT);
664 if (ret) {
665 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
666 "Unable to signal CM thread\n");
667 return ret;
668 }
669 ret = pthread_join(cmap->cm_thread, NULL);
670 if (ret) {
671 FI_WARN(cmap->av->prov, FI_LOG_EP_CTRL,
672 "Unable to join CM thread\n");
673 return ret;
674 }
675 return 0;
676 }
677
rxm_cmap_free(struct rxm_cmap * cmap)678 void rxm_cmap_free(struct rxm_cmap *cmap)
679 {
680 struct rxm_cmap_peer *peer;
681 struct dlist_entry *entry;
682 size_t i;
683
684 FI_INFO(cmap->av->prov, FI_LOG_EP_CTRL, "Closing cmap\n");
685 rxm_cmap_cm_thread_close(cmap);
686
687 for (i = 0; i < cmap->num_allocated; i++) {
688 if (cmap->handles_av[i]) {
689 rxm_cmap_clear_key(cmap->handles_av[i]);
690 rxm_conn_free(cmap->handles_av[i]);
691 cmap->handles_av[i] = 0;
692 }
693 }
694
695 while(!dlist_empty(&cmap->peer_list)) {
696 entry = cmap->peer_list.next;
697 peer = container_of(entry, struct rxm_cmap_peer, entry);
698 dlist_remove(&peer->entry);
699 rxm_cmap_clear_key(peer->handle);
700 rxm_conn_free(peer->handle);
701 free(peer);
702 }
703
704 free(cmap->handles_av);
705 free(cmap->attr.name);
706 ofi_idx_reset(&cmap->handles_idx);
707 free(cmap);
708 }
709
710 static int
rxm_cmap_update_addr(struct util_av * av,void * addr,fi_addr_t fi_addr,void * arg)711 rxm_cmap_update_addr(struct util_av *av, void *addr,
712 fi_addr_t fi_addr, void *arg)
713 {
714 return rxm_cmap_update((struct rxm_cmap *)arg, addr, fi_addr);
715 }
716
rxm_cmap_bind_to_av(struct rxm_cmap * cmap,struct util_av * av)717 int rxm_cmap_bind_to_av(struct rxm_cmap *cmap, struct util_av *av)
718 {
719 cmap->av = av;
720 return ofi_av_elements_iter(av, rxm_cmap_update_addr, (void *)cmap);
721 }
722
rxm_cmap_alloc(struct rxm_ep * rxm_ep,struct rxm_cmap_attr * attr)723 int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr)
724 {
725 struct rxm_cmap *cmap;
726 struct util_ep *ep = &rxm_ep->util_ep;
727 int ret;
728
729 cmap = calloc(1, sizeof *cmap);
730 if (!cmap)
731 return -FI_ENOMEM;
732
733 cmap->ep = rxm_ep;
734 cmap->av = ep->av;
735
736 cmap->handles_av = calloc(cmap->av->count, sizeof(*cmap->handles_av));
737 if (!cmap->handles_av) {
738 ret = -FI_ENOMEM;
739 goto err1;
740 }
741 cmap->num_allocated = ep->av->count;
742
743 cmap->attr = *attr;
744 cmap->attr.name = mem_dup(attr->name, ep->av->addrlen);
745 if (!cmap->attr.name) {
746 ret = -FI_ENOMEM;
747 goto err2;
748 }
749
750 memset(&cmap->handles_idx, 0, sizeof(cmap->handles_idx));
751 ofi_key_idx_init(&cmap->key_idx, RXM_CMAP_IDX_BITS);
752
753 dlist_init(&cmap->peer_list);
754
755 rxm_ep->cmap = cmap;
756
757 if (ep->domain->data_progress == FI_PROGRESS_AUTO || force_auto_progress) {
758
759 assert(ep->domain->threading == FI_THREAD_SAFE);
760 rxm_ep->do_progress = true;
761 if (pthread_create(&cmap->cm_thread, 0,
762 rxm_ep->rxm_info->caps & FI_ATOMIC ?
763 rxm_conn_atomic_progress :
764 rxm_conn_progress, ep)) {
765 FI_WARN(ep->av->prov, FI_LOG_EP_CTRL,
766 "unable to create cmap thread\n");
767 ret = -ofi_syserr();
768 goto err3;
769 }
770 }
771
772 assert(ep->av);
773 ret = rxm_cmap_bind_to_av(cmap, ep->av);
774 if (ret)
775 goto err4;
776
777 return FI_SUCCESS;
778 err4:
779 rxm_cmap_cm_thread_close(cmap);
780 err3:
781 rxm_ep->cmap = NULL;
782 free(cmap->attr.name);
783 err2:
784 free(cmap->handles_av);
785 err1:
786 free(cmap);
787 return ret;
788 }
789
rxm_msg_ep_open(struct rxm_ep * rxm_ep,struct fi_info * msg_info,struct rxm_conn * rxm_conn,void * context)790 static int rxm_msg_ep_open(struct rxm_ep *rxm_ep, struct fi_info *msg_info,
791 struct rxm_conn *rxm_conn, void *context)
792 {
793 struct rxm_domain *rxm_domain;
794 struct fid_ep *msg_ep;
795 int ret;
796
797 rxm_domain = container_of(rxm_ep->util_ep.domain, struct rxm_domain,
798 util_domain);
799 ret = fi_endpoint(rxm_domain->msg_domain, msg_info, &msg_ep, context);
800 if (ret) {
801 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
802 "unable to create msg_ep: %d\n", ret);
803 return ret;
804 }
805
806 ret = fi_ep_bind(msg_ep, &rxm_ep->msg_eq->fid, 0);
807 if (ret) {
808 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
809 "unable to bind msg EP to EQ: %d\n", ret);
810 goto err;
811 }
812
813 if (rxm_ep->srx_ctx) {
814 ret = fi_ep_bind(msg_ep, &rxm_ep->srx_ctx->fid, 0);
815 if (ret) {
816 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to bind msg "
817 "EP to shared RX ctx: %d\n", ret);
818 goto err;
819 }
820 }
821
822 // TODO add other completion flags
823 ret = fi_ep_bind(msg_ep, &rxm_ep->msg_cq->fid, FI_TRANSMIT | FI_RECV);
824 if (ret) {
825 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
826 "unable to bind msg_ep to msg_cq: %d\n", ret);
827 goto err;
828 }
829
830 ret = fi_enable(msg_ep);
831 if (ret) {
832 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
833 "unable to enable msg_ep: %d\n", ret);
834 goto err;
835 }
836
837 if (!rxm_ep->srx_ctx) {
838 ret = rxm_msg_ep_prepost_recv(rxm_ep, msg_ep);
839 if (ret)
840 goto err;
841 }
842
843 rxm_conn->msg_ep = msg_ep;
844 return 0;
845 err:
846 fi_close(&msg_ep->fid);
847 return ret;
848 }
849
rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue * recv_queue)850 static int rxm_conn_reprocess_directed_recvs(struct rxm_recv_queue *recv_queue)
851 {
852 struct rxm_rx_buf *rx_buf;
853 struct dlist_entry *entry, *tmp_entry;
854 struct rxm_recv_match_attr match_attr;
855 struct fi_cq_err_entry err_entry = {0};
856 int ret, count = 0;
857
858 dlist_foreach_container_safe(&recv_queue->unexp_msg_list,
859 struct rxm_rx_buf, rx_buf,
860 unexp_msg.entry, tmp_entry) {
861 if (rx_buf->unexp_msg.addr == rx_buf->conn->handle.fi_addr)
862 continue;
863
864 assert(rx_buf->unexp_msg.addr == FI_ADDR_NOTAVAIL);
865
866 rx_buf->unexp_msg.addr = rx_buf->conn->handle.fi_addr;
867 match_attr.addr = rx_buf->unexp_msg.addr;
868 match_attr.tag = rx_buf->unexp_msg.tag;
869
870 entry = dlist_remove_first_match(&recv_queue->recv_list,
871 recv_queue->match_recv,
872 &match_attr);
873 if (!entry)
874 continue;
875
876 dlist_remove(&rx_buf->unexp_msg.entry);
877 rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry,
878 entry);
879
880 ret = rxm_cq_handle_rx_buf(rx_buf);
881 if (ret) {
882 err_entry.op_context = rx_buf;
883 err_entry.flags = rx_buf->recv_entry->comp_flags;
884 err_entry.len = rx_buf->pkt.hdr.size;
885 err_entry.data = rx_buf->pkt.hdr.data;
886 err_entry.tag = rx_buf->pkt.hdr.tag;
887 err_entry.err = ret;
888 err_entry.prov_errno = ret;
889 ofi_cq_write_error(recv_queue->rxm_ep->util_ep.rx_cq,
890 &err_entry);
891 if (rx_buf->ep->util_ep.flags & OFI_CNTR_ENABLED)
892 rxm_cntr_incerr(rx_buf->ep->util_ep.rx_cntr);
893
894 rxm_rx_buf_free(rx_buf);
895
896 if (!(rx_buf->recv_entry->flags & FI_MULTI_RECV))
897 rxm_recv_entry_release(recv_queue,
898 rx_buf->recv_entry);
899 }
900 count++;
901 }
902 return count;
903 }
904
905 static void
rxm_conn_av_updated_handler(struct rxm_cmap_handle * handle)906 rxm_conn_av_updated_handler(struct rxm_cmap_handle *handle)
907 {
908 struct rxm_ep *ep = handle->cmap->ep;
909 int count = 0;
910
911 if (ep->rxm_info->caps & FI_DIRECTED_RECV) {
912 count += rxm_conn_reprocess_directed_recvs(&ep->recv_queue);
913 count += rxm_conn_reprocess_directed_recvs(&ep->trecv_queue);
914
915 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
916 "Reprocessed directed recvs - %d\n", count);
917 }
918 }
919
rxm_conn_alloc(struct rxm_cmap * cmap)920 static struct rxm_cmap_handle *rxm_conn_alloc(struct rxm_cmap *cmap)
921 {
922 struct rxm_conn *rxm_conn = calloc(1, sizeof(*rxm_conn));
923
924 if (OFI_UNLIKELY(!rxm_conn))
925 return NULL;
926
927 if (rxm_conn_res_alloc(cmap->ep, rxm_conn)) {
928 free(rxm_conn);
929 return NULL;
930 }
931
932 return &rxm_conn->handle;
933 }
934
935 static inline int
rxm_conn_verify_cm_data(union rxm_cm_data * remote_cm_data,union rxm_cm_data * local_cm_data)936 rxm_conn_verify_cm_data(union rxm_cm_data *remote_cm_data,
937 union rxm_cm_data *local_cm_data)
938 {
939 /* This should stay at top as it helps to avoid endian conversion
940 * for other fields in rxm_cm_data */
941 if (remote_cm_data->connect.version != local_cm_data->connect.version) {
942 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data version mismatch "
943 "(local: %" PRIu8 ", remote: %" PRIu8 ")\n",
944 local_cm_data->connect.version,
945 remote_cm_data->connect.version);
946 goto err;
947 }
948 if (remote_cm_data->connect.endianness != local_cm_data->connect.endianness) {
949 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data endianness mismatch "
950 "(local: %" PRIu8 ", remote: %" PRIu8 ")\n",
951 local_cm_data->connect.endianness,
952 remote_cm_data->connect.endianness);
953 goto err;
954 }
955 if (remote_cm_data->connect.ctrl_version != local_cm_data->connect.ctrl_version) {
956 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data ctrl_version mismatch "
957 "(local: %" PRIu8 ", remote: %" PRIu8 ")\n",
958 local_cm_data->connect.ctrl_version,
959 remote_cm_data->connect.ctrl_version);
960 goto err;
961 }
962 if (remote_cm_data->connect.op_version != local_cm_data->connect.op_version) {
963 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data op_version mismatch "
964 "(local: %" PRIu8 ", remote: %" PRIu8 ")\n",
965 local_cm_data->connect.op_version,
966 remote_cm_data->connect.op_version);
967 goto err;
968 }
969 if (remote_cm_data->connect.eager_size != local_cm_data->connect.eager_size) {
970 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "cm data eager_size mismatch "
971 "(local: %" PRIu32 ", remote: %" PRIu32 ")\n",
972 local_cm_data->connect.eager_size,
973 remote_cm_data->connect.eager_size);
974 goto err;
975 }
976 return FI_SUCCESS;
977 err:
978 return -FI_EINVAL;
979 }
980
rxm_conn_get_rx_size(struct rxm_ep * rxm_ep,struct fi_info * msg_info)981 static size_t rxm_conn_get_rx_size(struct rxm_ep *rxm_ep,
982 struct fi_info *msg_info)
983 {
984 if (msg_info->ep_attr->rx_ctx_cnt == FI_SHARED_CONTEXT)
985 return MAX(MIN(16, msg_info->rx_attr->size),
986 (msg_info->rx_attr->size /
987 rxm_ep->util_ep.av->count));
988 else
989 return msg_info->rx_attr->size;
990 }
991
992 static int
rxm_msg_process_connreq(struct rxm_ep * rxm_ep,struct fi_info * msg_info,union rxm_cm_data * remote_cm_data)993 rxm_msg_process_connreq(struct rxm_ep *rxm_ep, struct fi_info *msg_info,
994 union rxm_cm_data *remote_cm_data)
995 {
996 struct rxm_conn *rxm_conn;
997 union rxm_cm_data cm_data = {
998 .connect = {
999 .version = RXM_CM_DATA_VERSION,
1000 .endianness = ofi_detect_endianness(),
1001 .ctrl_version = RXM_CTRL_VERSION,
1002 .op_version = RXM_OP_VERSION,
1003 .eager_size = rxm_ep->rxm_info->tx_attr->inject_size,
1004 },
1005 };
1006 union rxm_cm_data reject_cm_data = {
1007 .reject = {
1008 .version = RXM_CM_DATA_VERSION,
1009 .reason = RXM_CMAP_REJECT_GENUINE,
1010 }
1011 };
1012 struct rxm_cmap_handle *handle;
1013 struct sockaddr_storage remote_pep_addr;
1014 int ret;
1015
1016 assert(sizeof(uint32_t) == sizeof(cm_data.accept.rx_size));
1017 assert(msg_info->rx_attr->size <= (uint32_t)-1);
1018
1019 if (rxm_conn_verify_cm_data(remote_cm_data, &cm_data)) {
1020 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1021 "CM data mismatch was detected\n");
1022 ret = -FI_EINVAL;
1023 goto err1;
1024 }
1025
1026 memcpy(&remote_pep_addr, msg_info->dest_addr, msg_info->dest_addrlen);
1027 ofi_addr_set_port((struct sockaddr *)&remote_pep_addr,
1028 remote_cm_data->connect.port);
1029
1030 ret = rxm_cmap_process_connreq(rxm_ep->cmap, &remote_pep_addr,
1031 &handle, &reject_cm_data.reject.reason);
1032 if (ret)
1033 goto err1;
1034
1035 rxm_conn = container_of(handle, struct rxm_conn, handle);
1036
1037 rxm_conn->handle.remote_key = remote_cm_data->connect.client_conn_id;
1038 rxm_conn->rndv_tx_credits = remote_cm_data->connect.rx_size;
1039 assert(rxm_conn->rndv_tx_credits);
1040
1041 ret = rxm_msg_ep_open(rxm_ep, msg_info, rxm_conn, handle);
1042 if (ret)
1043 goto err2;
1044
1045 cm_data.accept.server_conn_id = rxm_conn->handle.key;
1046 cm_data.accept.rx_size = rxm_conn_get_rx_size(rxm_ep, msg_info);
1047
1048 ret = fi_accept(rxm_conn->msg_ep, &cm_data.accept.server_conn_id,
1049 sizeof(cm_data.accept));
1050 if (ret) {
1051 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1052 "Unable to accept incoming connection\n");
1053 goto err2;
1054 }
1055
1056 return ret;
1057 err2:
1058 rxm_cmap_del_handle(&rxm_conn->handle);
1059 err1:
1060 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1061 "rejecting incoming connection request (reject reason: %d)\n",
1062 (enum rxm_cmap_reject_reason)reject_cm_data.reject.reason);
1063 fi_reject(rxm_ep->msg_pep, msg_info->handle,
1064 &reject_cm_data.reject, sizeof(reject_cm_data.reject));
1065 return ret;
1066 }
1067
rxm_flush_msg_cq(struct rxm_ep * rxm_ep)1068 static void rxm_flush_msg_cq(struct rxm_ep *rxm_ep)
1069 {
1070 struct fi_cq_data_entry comp;
1071 int ret;
1072 do {
1073 ret = fi_cq_read(rxm_ep->msg_cq, &comp, 1);
1074 if (ret > 0) {
1075 ret = rxm_cq_handle_comp(rxm_ep, &comp);
1076 if (OFI_UNLIKELY(ret)) {
1077 rxm_cq_write_error_all(rxm_ep, ret);
1078 } else {
1079 ret = 1;
1080 }
1081 } else if (ret == -FI_EAVAIL) {
1082 rxm_cq_read_write_error(rxm_ep);
1083 ret = 1;
1084 } else if (ret < 0 && ret != -FI_EAGAIN) {
1085 rxm_cq_write_error_all(rxm_ep, ret);
1086 }
1087 } while (ret > 0);
1088 }
1089
rxm_conn_handle_notify(struct fi_eq_entry * eq_entry)1090 static int rxm_conn_handle_notify(struct fi_eq_entry *eq_entry)
1091 {
1092 struct rxm_cmap *cmap;
1093 struct rxm_cmap_handle *handle;
1094
1095 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "notify event %" PRIu64 "\n",
1096 eq_entry->data);
1097
1098 if ((enum rxm_cmap_signal) eq_entry->data != RXM_CMAP_FREE)
1099 return -FI_EOTHER;
1100
1101 handle = eq_entry->context;
1102 assert(handle->state == RXM_CMAP_SHUTDOWN);
1103 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "freeing handle: %p\n", handle);
1104 cmap = handle->cmap;
1105
1106 rxm_conn_close(handle);
1107
1108 // after closing the connection, we need to flush any dangling references to the
1109 // handle from msg_cq entries that have not been cleaned up yet, otherwise we
1110 // could run into problems during CQ cleanup. these entries will be errored so
1111 // keep reading through EAVAIL.
1112 rxm_flush_msg_cq(cmap->ep);
1113
1114 if (handle->peer) {
1115 dlist_remove(&handle->peer->entry);
1116 free(handle->peer);
1117 handle->peer = NULL;
1118 } else {
1119 cmap->handles_av[handle->fi_addr] = 0;
1120 }
1121 rxm_conn_free(handle);
1122 return 0;
1123 }
1124
rxm_conn_wake_up_wait_obj(struct rxm_ep * rxm_ep)1125 static void rxm_conn_wake_up_wait_obj(struct rxm_ep *rxm_ep)
1126 {
1127 if (rxm_ep->util_ep.tx_cq && rxm_ep->util_ep.tx_cq->wait)
1128 util_cq_signal(rxm_ep->util_ep.tx_cq);
1129 if (rxm_ep->util_ep.tx_cntr && rxm_ep->util_ep.tx_cntr->wait)
1130 util_cntr_signal(rxm_ep->util_ep.tx_cntr);
1131 }
1132
1133 static int
rxm_conn_handle_reject(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1134 rxm_conn_handle_reject(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1135 {
1136 union rxm_cm_data *cm_data = entry->err_entry.err_data;
1137
1138 if (!cm_data || entry->err_entry.err_data_size != sizeof(cm_data->reject)) {
1139 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1140 "no reject error data (cm_data) was found "
1141 "(data length expected: %zu found: %zu)\n",
1142 sizeof(cm_data->reject),
1143 entry->err_entry.err_data_size);
1144 return -FI_EOTHER;
1145 }
1146
1147 if (cm_data->reject.version != RXM_CM_DATA_VERSION) {
1148 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1149 "cm data version mismatch (local: %" PRIu8
1150 ", remote: %" PRIu8 ")\n",
1151 (uint8_t) RXM_CM_DATA_VERSION,
1152 cm_data->reject.version);
1153 return -FI_EOTHER;
1154 }
1155
1156 if (cm_data->reject.reason == RXM_CMAP_REJECT_GENUINE) {
1157 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1158 "remote peer didn't accept the connection\n");
1159 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1160 "(reason: RXM_CMAP_REJECT_GENUINE)\n");
1161 OFI_EQ_STRERROR(&rxm_prov, FI_LOG_WARN, FI_LOG_EP_CTRL,
1162 rxm_ep->msg_eq, &entry->err_entry);
1163 } else if (cm_data->reject.reason == RXM_CMAP_REJECT_SIMULT_CONN) {
1164 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1165 "(reason: RXM_CMAP_REJECT_SIMULT_CONN)\n");
1166 } else {
1167 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "connection reject: "
1168 "received unknown reject reason: %d\n",
1169 cm_data->reject.reason);
1170 }
1171 rxm_cmap_process_reject(rxm_ep->cmap, entry->context,
1172 cm_data->reject.reason);
1173 return 0;
1174 }
1175
1176 static int
rxm_conn_handle_event(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1177 rxm_conn_handle_event(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1178 {
1179 if (entry->rd == -FI_ECONNREFUSED)
1180 return rxm_conn_handle_reject(rxm_ep, entry);
1181
1182 switch (entry->event) {
1183 case FI_NOTIFY:
1184 return rxm_conn_handle_notify((struct fi_eq_entry *)
1185 &entry->cm_entry);
1186 case FI_CONNREQ:
1187 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "Got new connection\n");
1188 if ((size_t)entry->rd != RXM_CM_ENTRY_SZ) {
1189 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1190 "Received a connection request with no CM data. "
1191 "Is sender running FI_PROTO_RXM?\n");
1192 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Received CM entry "
1193 "size (%zd) not matching expected (%zu)\n",
1194 entry->rd, RXM_CM_ENTRY_SZ);
1195 return -FI_EOTHER;
1196 }
1197 rxm_msg_process_connreq(rxm_ep, entry->cm_entry.info,
1198 (union rxm_cm_data *) entry->cm_entry.data);
1199 fi_freeinfo(entry->cm_entry.info);
1200 break;
1201 case FI_CONNECTED:
1202 assert(entry->cm_entry.fid->context);
1203 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1204 "connection successful\n");
1205 rxm_cmap_process_connect(rxm_ep->cmap,
1206 entry->cm_entry.fid->context,
1207 entry->rd - sizeof(entry->cm_entry) > 0 ?
1208 (union rxm_cm_data *) entry->cm_entry.data : NULL);
1209 rxm_conn_wake_up_wait_obj(rxm_ep);
1210 break;
1211 case FI_SHUTDOWN:
1212 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL,
1213 "Received connection shutdown\n");
1214 rxm_cmap_process_shutdown(rxm_ep->cmap,
1215 entry->cm_entry.fid->context);
1216 break;
1217 default:
1218 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1219 "Unknown event: %u\n", entry->event);
1220 return -FI_EOTHER;
1221 }
1222 return 0;
1223 }
1224
rxm_eq_sread(struct rxm_ep * rxm_ep,size_t len,struct rxm_msg_eq_entry * entry)1225 static ssize_t rxm_eq_sread(struct rxm_ep *rxm_ep, size_t len,
1226 struct rxm_msg_eq_entry *entry)
1227 {
1228 ssize_t rd;
1229 int once = 1;
1230
1231 do {
1232 /* TODO convert this to poll + fi_eq_read so that we can grab
1233 * rxm_ep lock before reading the EQ. This is needed to avoid
1234 * processing events / error entries from closed MSG EPs. This
1235 * can be done only for non-Windows OSes as Windows doesn't
1236 * have poll for a generic file descriptor. */
1237 rd = fi_eq_sread(rxm_ep->msg_eq, &entry->event, &entry->cm_entry,
1238 len, -1, 0);
1239 if (rd >= 0)
1240 return rd;
1241 if (rd == -FI_EINTR && once) {
1242 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "Ignoring EINTR\n");
1243 once = 0;
1244 }
1245 } while (rd == -FI_EINTR);
1246
1247 if (rd != -FI_EAVAIL) {
1248 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1249 "unable to fi_eq_sread: %s (%zd)\n",
1250 fi_strerror(-rd), -rd);
1251 return rd;
1252 }
1253
1254 ofi_ep_lock_acquire(&rxm_ep->util_ep);
1255 rd = rxm_eq_readerr(rxm_ep, entry);
1256 ofi_ep_lock_release(&rxm_ep->util_ep);
1257 return rd;
1258 }
1259
rxm_conn_eq_event(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1260 static inline int rxm_conn_eq_event(struct rxm_ep *rxm_ep,
1261 struct rxm_msg_eq_entry *entry)
1262 {
1263 int ret;
1264
1265 ofi_ep_lock_acquire(&rxm_ep->util_ep);
1266 ret = rxm_conn_handle_event(rxm_ep, entry) ? -1 : 0;
1267 ofi_ep_lock_release(&rxm_ep->util_ep);
1268
1269 return ret;
1270 }
1271
rxm_conn_progress(void * arg)1272 static void *rxm_conn_progress(void *arg)
1273 {
1274 struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep);
1275 struct rxm_msg_eq_entry *entry;
1276
1277 entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
1278 if (!entry)
1279 return NULL;
1280
1281 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Starting auto-progress thread\n");
1282
1283 while (ep->do_progress) {
1284 memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ);
1285 entry->rd = rxm_eq_sread(ep, RXM_CM_ENTRY_SZ, entry);
1286 if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED)
1287 continue;
1288
1289 rxm_conn_eq_event(ep, entry);
1290 }
1291
1292 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Stopping auto-progress thread\n");
1293 return NULL;
1294 }
1295
1296 static inline int
rxm_conn_auto_progress_eq(struct rxm_ep * rxm_ep,struct rxm_msg_eq_entry * entry)1297 rxm_conn_auto_progress_eq(struct rxm_ep *rxm_ep, struct rxm_msg_eq_entry *entry)
1298 {
1299 memset(entry, 0, RXM_MSG_EQ_ENTRY_SZ);
1300
1301 ofi_ep_lock_acquire(&rxm_ep->util_ep);
1302 entry->rd = rxm_eq_read(rxm_ep, RXM_CM_ENTRY_SZ, entry);
1303 ofi_ep_lock_release(&rxm_ep->util_ep);
1304
1305 if (!entry->rd || entry->rd == -FI_EAGAIN)
1306 return FI_SUCCESS;
1307 if (entry->rd < 0 && entry->rd != -FI_ECONNREFUSED)
1308 return entry->rd;
1309
1310 return rxm_conn_eq_event(rxm_ep, entry);
1311 }
1312
rxm_conn_atomic_progress(void * arg)1313 static void *rxm_conn_atomic_progress(void *arg)
1314 {
1315 struct rxm_ep *ep = container_of(arg, struct rxm_ep, util_ep);
1316 struct rxm_msg_eq_entry *entry;
1317 struct rxm_fabric *fabric;
1318 struct fid *fids[2] = {
1319 &ep->msg_eq->fid,
1320 &ep->msg_cq->fid,
1321 };
1322 struct pollfd fds[2] = {
1323 {.events = POLLIN},
1324 {.events = POLLIN},
1325 };
1326 int ret;
1327
1328 entry = alloca(RXM_MSG_EQ_ENTRY_SZ);
1329 if (!entry)
1330 return NULL;
1331
1332 fabric = container_of(ep->util_ep.domain->fabric,
1333 struct rxm_fabric, util_fabric);
1334
1335 ret = fi_control(&ep->msg_eq->fid, FI_GETWAIT, &fds[0].fd);
1336 if (ret) {
1337 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1338 "unable to get msg EQ fd: %s\n", fi_strerror(ret));
1339 return NULL;
1340 }
1341
1342 ret = fi_control(&ep->msg_cq->fid, FI_GETWAIT, &fds[1].fd);
1343 if (ret) {
1344 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1345 "unable to get msg CQ fd: %s\n", fi_strerror(ret));
1346 return NULL;
1347 }
1348
1349 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Starting auto-progress thread\n");
1350 while (ep->do_progress) {
1351 ret = fi_trywait(fabric->msg_fabric, fids, 2);
1352
1353 if (!ret) {
1354 fds[0].revents = 0;
1355 fds[1].revents = 0;
1356
1357 ret = poll(fds, 2, -1);
1358 if (ret == -1 && errno != EINTR) {
1359 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1360 "Select error %s, closing CM thread\n",
1361 strerror(errno));
1362 break;
1363 }
1364 }
1365 rxm_conn_auto_progress_eq(ep, entry);
1366 ep->util_ep.progress(&ep->util_ep);
1367 }
1368
1369 FI_INFO(&rxm_prov, FI_LOG_EP_CTRL, "Stopping auto progress thread\n");
1370 return NULL;
1371 }
1372
rxm_prepare_cm_data(struct fid_pep * pep,struct rxm_cmap_handle * handle,union rxm_cm_data * cm_data)1373 static int rxm_prepare_cm_data(struct fid_pep *pep, struct rxm_cmap_handle *handle,
1374 union rxm_cm_data *cm_data)
1375 {
1376 struct sockaddr_storage name;
1377 size_t cm_data_size = 0;
1378 size_t name_size = sizeof(name);
1379 size_t opt_size = sizeof(cm_data_size);
1380 int ret;
1381
1382 ret = fi_getopt(&pep->fid, FI_OPT_ENDPOINT, FI_OPT_CM_DATA_SIZE,
1383 &cm_data_size, &opt_size);
1384 if (ret) {
1385 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "fi_getopt failed\n");
1386 return ret;
1387 }
1388
1389 if (cm_data_size < sizeof(*cm_data)) {
1390 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "MSG EP CM data size too small\n");
1391 return -FI_EOTHER;
1392 }
1393
1394 ret = fi_getname(&pep->fid, &name, &name_size);
1395 if (ret) {
1396 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to get msg pep name\n");
1397 return ret;
1398 }
1399
1400 cm_data->connect.port = ofi_addr_get_port((struct sockaddr *)&name);
1401 cm_data->connect.client_conn_id = handle->key;
1402 return 0;
1403 }
1404
1405 static int
rxm_conn_connect(struct rxm_ep * ep,struct rxm_cmap_handle * handle,const void * addr)1406 rxm_conn_connect(struct rxm_ep *ep, struct rxm_cmap_handle *handle,
1407 const void *addr)
1408 {
1409 int ret;
1410 struct rxm_conn *rxm_conn = container_of(handle, struct rxm_conn, handle);
1411 union rxm_cm_data cm_data = {
1412 .connect = {
1413 .version = RXM_CM_DATA_VERSION,
1414 .ctrl_version = RXM_CTRL_VERSION,
1415 .op_version = RXM_OP_VERSION,
1416 .endianness = ofi_detect_endianness(),
1417 .eager_size = ep->rxm_info->tx_attr->inject_size,
1418 },
1419 };
1420
1421 assert(sizeof(uint32_t) == sizeof(cm_data.connect.eager_size));
1422 assert(sizeof(uint32_t) == sizeof(cm_data.connect.rx_size));
1423 assert(ep->rxm_info->tx_attr->inject_size <= (uint32_t) -1);
1424 assert(ep->msg_info->rx_attr->size <= (uint32_t) -1);
1425
1426 free(ep->msg_info->dest_addr);
1427 ep->msg_info->dest_addrlen = ep->msg_info->src_addrlen;
1428
1429 ep->msg_info->dest_addr = mem_dup(addr, ep->msg_info->dest_addrlen);
1430 if (!ep->msg_info->dest_addr) {
1431 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "mem_dup failed, len %zu\n",
1432 ep->msg_info->dest_addrlen);
1433 return -FI_ENOMEM;
1434 }
1435
1436 ret = rxm_msg_ep_open(ep, ep->msg_info, rxm_conn, &rxm_conn->handle);
1437 if (ret)
1438 return ret;
1439
1440 /* We have to send passive endpoint's address to the server since the
1441 * address from which connection request would be sent would have a
1442 * different port. */
1443 ret = rxm_prepare_cm_data(ep->msg_pep, &rxm_conn->handle, &cm_data);
1444 if (ret)
1445 goto err;
1446
1447 cm_data.connect.rx_size = rxm_conn_get_rx_size(ep, ep->msg_info);
1448
1449 ret = fi_connect(rxm_conn->msg_ep, ep->msg_info->dest_addr,
1450 &cm_data, sizeof(cm_data));
1451 if (ret) {
1452 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "unable to connect msg_ep\n");
1453 goto err;
1454 }
1455 return 0;
1456
1457 err:
1458 fi_close(&rxm_conn->msg_ep->fid);
1459 rxm_conn->msg_ep = NULL;
1460 return ret;
1461 }
1462
rxm_conn_signal(struct rxm_ep * ep,void * context,enum rxm_cmap_signal signal)1463 static int rxm_conn_signal(struct rxm_ep *ep, void *context,
1464 enum rxm_cmap_signal signal)
1465 {
1466 struct fi_eq_entry entry = {0};
1467 ssize_t rd;
1468
1469 entry.context = context;
1470 entry.data = (uint64_t) signal;
1471
1472 rd = fi_eq_write(ep->msg_eq, FI_NOTIFY, &entry, sizeof(entry), 0);
1473 if (rd != sizeof(entry)) {
1474 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, "Unable to signal\n");
1475 return (int)rd;
1476 }
1477 return 0;
1478 }
1479
rxm_conn_cmap_alloc(struct rxm_ep * rxm_ep)1480 int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep)
1481 {
1482 struct rxm_cmap_attr attr;
1483 int ret;
1484 size_t len = rxm_ep->util_ep.av->addrlen;
1485 void *name = calloc(1, len);
1486 if (!name) {
1487 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1488 "Unable to allocate memory for EP name\n");
1489 return -FI_ENOMEM;
1490 }
1491
1492 /* Passive endpoint should already have fi_setname or fi_listen
1493 * called on it for this to work */
1494 ret = fi_getname(&rxm_ep->msg_pep->fid, name, &len);
1495 if (ret) {
1496 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1497 "Unable to fi_getname on msg_ep\n");
1498 goto fn;
1499 }
1500 ofi_straddr_dbg(&rxm_prov, FI_LOG_EP_CTRL, "local_name", name);
1501
1502 attr.name = name;
1503
1504 ret = rxm_cmap_alloc(rxm_ep, &attr);
1505 if (ret)
1506 FI_WARN(&rxm_prov, FI_LOG_EP_CTRL,
1507 "Unable to allocate CMAP\n");
1508 fn:
1509 free(name);
1510 return ret;
1511 }
1512