1 /*
2  * Copyright (c) 2013-2018 Intel Corporation. All rights reserved
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/uio.h>
36 
37 #include "ofi_iov.h"
38 #include "smr.h"
39 
40 extern struct fi_ops_msg smr_msg_ops;
41 extern struct fi_ops_tagged smr_tagged_ops;
42 extern struct fi_ops_rma smr_rma_ops;
43 extern struct fi_ops_atomic smr_atomic_ops;
44 
smr_setname(fid_t fid,void * addr,size_t addrlen)45 int smr_setname(fid_t fid, void *addr, size_t addrlen)
46 {
47 	struct smr_ep *ep;
48 	char *name;
49 
50 	ep = container_of(fid, struct smr_ep, util_ep.ep_fid.fid);
51 	name = strdup(addr);
52 	if (!name)
53 		return -FI_ENOMEM;
54 
55 	if (ep->name)
56 		free((void *) ep->name);
57 	ep->name = name;
58 	return 0;
59 }
60 
smr_getname(fid_t fid,void * addr,size_t * addrlen)61 int smr_getname(fid_t fid, void *addr, size_t *addrlen)
62 {
63 	struct smr_ep *ep;
64 	int ret = 0;
65 
66 	ep = container_of(fid, struct smr_ep, util_ep.ep_fid.fid);
67 	if (!ep->name)
68 		return -FI_EADDRNOTAVAIL;
69 
70 	if (!addr || *addrlen == 0 ||
71 	    snprintf(addr, *addrlen, "%s", ep->name) >= *addrlen)
72 		ret = -FI_ETOOSMALL;
73 
74 	*addrlen = strlen(ep->name) + 1;
75 
76 	if (!ret)
77 		((char *) addr)[*addrlen - 1] = '\0';
78 
79 	return ret;
80 }
81 
82 static struct fi_ops_cm smr_cm_ops = {
83 	.size = sizeof(struct fi_ops_cm),
84 	.setname = smr_setname,
85 	.getname = smr_getname,
86 	.getpeer = fi_no_getpeer,
87 	.connect = fi_no_connect,
88 	.listen = fi_no_listen,
89 	.accept = fi_no_accept,
90 	.reject = fi_no_reject,
91 	.shutdown = fi_no_shutdown,
92 };
93 
smr_getopt(fid_t fid,int level,int optname,void * optval,size_t * optlen)94 int smr_getopt(fid_t fid, int level, int optname,
95 	       void *optval, size_t *optlen)
96 {
97 	struct smr_ep *smr_ep =
98 		container_of(fid, struct smr_ep, util_ep.ep_fid);
99 
100 	if ((level != FI_OPT_ENDPOINT) || (optname != FI_OPT_MIN_MULTI_RECV))
101 		return -FI_ENOPROTOOPT;
102 
103 	*(size_t *)optval = smr_ep->min_multi_recv_size;
104 	*optlen = sizeof(size_t);
105 
106 	return FI_SUCCESS;
107 }
108 
smr_setopt(fid_t fid,int level,int optname,const void * optval,size_t optlen)109 int smr_setopt(fid_t fid, int level, int optname,
110 	       const void *optval, size_t optlen)
111 {
112 	struct smr_ep *smr_ep =
113 		container_of(fid, struct smr_ep, util_ep.ep_fid);
114 
115 	if ((level != FI_OPT_ENDPOINT) || (optname != FI_OPT_MIN_MULTI_RECV))
116 		return -FI_ENOPROTOOPT;
117 
118 	smr_ep->min_multi_recv_size = *(size_t *)optval;
119 
120 	return FI_SUCCESS;
121 }
122 
smr_match_recv_ctx(struct dlist_entry * item,const void * args)123 static int smr_match_recv_ctx(struct dlist_entry *item, const void *args)
124 {
125 	struct smr_rx_entry *pending_recv;
126 
127 	pending_recv = container_of(item, struct smr_rx_entry, entry);
128 	return pending_recv->context == args;
129 }
130 
smr_ep_cancel_recv(struct smr_ep * ep,struct smr_queue * queue,void * context)131 static int smr_ep_cancel_recv(struct smr_ep *ep, struct smr_queue *queue,
132 			      void *context)
133 {
134 	struct smr_rx_entry *recv_entry;
135 	struct dlist_entry *entry;
136 	int ret = 0;
137 
138 	fastlock_acquire(&ep->util_ep.rx_cq->cq_lock);
139 	entry = dlist_remove_first_match(&queue->list, smr_match_recv_ctx,
140 					 context);
141 	if (entry) {
142 		recv_entry = container_of(entry, struct smr_rx_entry, entry);
143 		ret = smr_complete_rx(ep, (void *) recv_entry->context, ofi_op_msg,
144 				  recv_entry->flags, 0,
145 				  NULL, recv_entry->addr,
146 				  recv_entry->tag, 0, FI_ECANCELED);
147 		freestack_push(ep->recv_fs, recv_entry);
148 		ret = ret ? ret : 1;
149 	}
150 
151 	fastlock_release(&ep->util_ep.rx_cq->cq_lock);
152 	return ret;
153 }
154 
smr_ep_cancel(fid_t ep_fid,void * context)155 static ssize_t smr_ep_cancel(fid_t ep_fid, void *context)
156 {
157 	struct smr_ep *ep;
158 	int ret;
159 
160 	ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid);
161 
162 	ret = smr_ep_cancel_recv(ep, &ep->trecv_queue, context);
163 	if (ret)
164 		return (ret < 0) ? ret : 0;
165 
166 	ret = smr_ep_cancel_recv(ep, &ep->recv_queue, context);
167 	return (ret < 0) ? ret : 0;
168 }
169 
170 static struct fi_ops_ep smr_ep_ops = {
171 	.size = sizeof(struct fi_ops_ep),
172 	.cancel = smr_ep_cancel,
173 	.getopt = smr_getopt,
174 	.setopt = smr_setopt,
175 	.tx_ctx = fi_no_tx_ctx,
176 	.rx_ctx = fi_no_rx_ctx,
177 	.rx_size_left = fi_no_rx_size_left,
178 	.tx_size_left = fi_no_tx_size_left,
179 };
180 
smr_verify_peer(struct smr_ep * ep,int peer_id)181 int smr_verify_peer(struct smr_ep *ep, int peer_id)
182 {
183 	int ret;
184 
185 	if (ep->region->map->peers[peer_id].peer.addr != FI_ADDR_UNSPEC)
186 		return 0;
187 
188 	ret = smr_map_to_region(&smr_prov, &ep->region->map->peers[peer_id]);
189 
190 	return (ret == -ENOENT) ? -FI_EAGAIN : ret;
191 }
192 
smr_match_msg(struct dlist_entry * item,const void * args)193 static int smr_match_msg(struct dlist_entry *item, const void *args)
194 {
195 	struct smr_match_attr *attr = (struct smr_match_attr *)args;
196 	struct smr_rx_entry *recv_entry;
197 
198 	recv_entry = container_of(item, struct smr_rx_entry, entry);
199 	return smr_match_addr(recv_entry->addr, attr->addr);
200 }
201 
smr_match_tagged(struct dlist_entry * item,const void * args)202 static int smr_match_tagged(struct dlist_entry *item, const void *args)
203 {
204 	struct smr_match_attr *attr = (struct smr_match_attr *)args;
205 	struct smr_rx_entry *recv_entry;
206 
207 	recv_entry = container_of(item, struct smr_rx_entry, entry);
208 	return smr_match_addr(recv_entry->addr, attr->addr) &&
209 	       smr_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag);
210 }
211 
smr_match_unexp_msg(struct dlist_entry * item,const void * args)212 static int smr_match_unexp_msg(struct dlist_entry *item, const void *args)
213 {
214 	struct smr_match_attr *attr = (struct smr_match_attr *)args;
215 	struct smr_unexp_msg *unexp_msg;
216 
217 	unexp_msg = container_of(item, struct smr_unexp_msg, entry);
218 	assert(unexp_msg->cmd.msg.hdr.op == ofi_op_msg);
219 	return smr_match_addr(unexp_msg->cmd.msg.hdr.addr, attr->addr);
220 }
221 
smr_match_unexp_tagged(struct dlist_entry * item,const void * args)222 static int smr_match_unexp_tagged(struct dlist_entry *item, const void *args)
223 {
224 	struct smr_match_attr *attr = (struct smr_match_attr *)args;
225 	struct smr_unexp_msg *unexp_msg;
226 
227 	unexp_msg = container_of(item, struct smr_unexp_msg, entry);
228 	if (unexp_msg->cmd.msg.hdr.op == ofi_op_msg)
229 		return smr_match_addr(unexp_msg->cmd.msg.hdr.addr, attr->addr);
230 
231 	assert(unexp_msg->cmd.msg.hdr.op == ofi_op_tagged);
232 	return smr_match_addr(unexp_msg->cmd.msg.hdr.addr, attr->addr) &&
233 	       smr_match_tag(unexp_msg->cmd.msg.hdr.tag, attr->ignore,
234 			     attr->tag);
235 }
236 
smr_init_queue(struct smr_queue * queue,dlist_func_t * match_func)237 static void smr_init_queue(struct smr_queue *queue,
238 			   dlist_func_t *match_func)
239 {
240 	dlist_init(&queue->list);
241 	queue->match_func = match_func;
242 }
243 
smr_format_pend_resp(struct smr_tx_entry * pend,struct smr_cmd * cmd,void * context,const struct iovec * iov,uint32_t iov_count,fi_addr_t id,struct smr_resp * resp)244 void smr_format_pend_resp(struct smr_tx_entry *pend, struct smr_cmd *cmd,
245 			  void *context, const struct iovec *iov,
246 			  uint32_t iov_count, fi_addr_t id, struct smr_resp *resp)
247 {
248 	pend->cmd = *cmd;
249 	pend->context = context;
250 	memcpy(pend->iov, iov, sizeof(*iov) * iov_count);
251 	pend->iov_count = iov_count;
252 	pend->addr = id;
253 	if (cmd->msg.hdr.op_src != smr_src_sar)
254 		pend->bytes_done = 0;
255 
256 	resp->msg_id = (uint64_t) (uintptr_t) pend;
257 	resp->status = FI_EBUSY;
258 }
259 
smr_generic_format(struct smr_cmd * cmd,fi_addr_t peer_id,uint32_t op,uint64_t tag,uint64_t data,uint64_t op_flags)260 void smr_generic_format(struct smr_cmd *cmd, fi_addr_t peer_id, uint32_t op,
261 			uint64_t tag, uint64_t data, uint64_t op_flags)
262 {
263 	cmd->msg.hdr.op = op;
264 	cmd->msg.hdr.op_flags = 0;
265 	cmd->msg.hdr.tag = tag;
266 	cmd->msg.hdr.addr = peer_id;
267 	cmd->msg.hdr.data = data;
268 
269 	if (op_flags & FI_REMOTE_CQ_DATA)
270 		cmd->msg.hdr.op_flags |= SMR_REMOTE_CQ_DATA;
271 	if (op_flags & FI_COMPLETION)
272 		cmd->msg.hdr.op_flags |= SMR_TX_COMPLETION;
273 }
274 
smr_format_inline(struct smr_cmd * cmd,const struct iovec * iov,size_t count)275 void smr_format_inline(struct smr_cmd *cmd, const struct iovec *iov,
276 		       size_t count)
277 {
278 	cmd->msg.hdr.op_src = smr_src_inline;
279 	cmd->msg.hdr.size = ofi_copy_from_iov(cmd->msg.data.msg,
280 					      SMR_MSG_DATA_LEN, iov, count, 0);
281 }
282 
smr_format_inject(struct smr_cmd * cmd,const struct iovec * iov,size_t count,struct smr_region * smr,struct smr_inject_buf * tx_buf)283 void smr_format_inject(struct smr_cmd *cmd, const struct iovec *iov,
284 		       size_t count, struct smr_region *smr,
285 		       struct smr_inject_buf *tx_buf)
286 {
287 	cmd->msg.hdr.op_src = smr_src_inject;
288 	cmd->msg.hdr.src_data = smr_get_offset(smr, tx_buf);
289 	cmd->msg.hdr.size = ofi_copy_from_iov(tx_buf->data, SMR_INJECT_SIZE,
290 					      iov, count, 0);
291 }
292 
smr_format_iov(struct smr_cmd * cmd,const struct iovec * iov,size_t count,size_t total_len,struct smr_region * smr,struct smr_resp * resp)293 void smr_format_iov(struct smr_cmd *cmd, const struct iovec *iov, size_t count,
294 		    size_t total_len, struct smr_region *smr,
295 		    struct smr_resp *resp)
296 {
297 	cmd->msg.hdr.op_src = smr_src_iov;
298 	cmd->msg.hdr.src_data = smr_get_offset(smr, resp);
299 	cmd->msg.data.iov_count = count;
300 	cmd->msg.hdr.size = total_len;
301 	memcpy(cmd->msg.data.iov, iov, sizeof(*iov) * count);
302 }
303 
smr_format_mmap(struct smr_ep * ep,struct smr_cmd * cmd,const struct iovec * iov,size_t count,size_t total_len,struct smr_tx_entry * pend,struct smr_resp * resp)304 int smr_format_mmap(struct smr_ep *ep, struct smr_cmd *cmd,
305 		    const struct iovec *iov, size_t count, size_t total_len,
306 		    struct smr_tx_entry *pend, struct smr_resp *resp)
307 {
308 	void *mapped_ptr;
309 	int fd, ret, num;
310 	uint64_t msg_id;
311 	struct smr_ep_name *map_name;
312 
313 	msg_id = ep->msg_id++;
314 	map_name = calloc(1, sizeof(*map_name));
315 	if (!map_name) {
316 		FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "calloc error\n");
317 		return -FI_ENOMEM;
318 	}
319 
320 	pthread_mutex_lock(&ep_list_lock);
321 	dlist_insert_tail(&map_name->entry, &ep_name_list);
322 	pthread_mutex_unlock(&ep_list_lock);
323 	num = smr_mmap_name(map_name->name, ep->name, msg_id);
324 	if (num < 0) {
325 		FI_WARN(&smr_prov, FI_LOG_AV, "generating shm file name failed\n");
326 		ret = -errno;
327 		goto remove_entry;
328 	}
329 
330 	fd = shm_open(map_name->name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
331 	if (fd < 0) {
332 		FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "shm_open error\n");
333 		ret = -errno;
334 		goto remove_entry;
335 	}
336 
337 	ret = ftruncate(fd, total_len);
338 	if (ret < 0) {
339 		FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "ftruncate error\n");
340 		goto unlink_close;
341 	}
342 
343 	mapped_ptr = mmap(NULL, total_len, PROT_READ | PROT_WRITE,
344 			  MAP_SHARED, fd, 0);
345 	if (mapped_ptr == MAP_FAILED) {
346 		FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "mmap error\n");
347 		ret = -errno;
348 		goto unlink_close;
349 	}
350 
351 	if (cmd->msg.hdr.op != ofi_op_read_req) {
352 		if (ofi_copy_from_iov(mapped_ptr, total_len, iov, count, 0)
353 		    != total_len) {
354 			FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "copy from iov error\n");
355 			ret = -FI_EIO;
356 			goto munmap;
357 		}
358 		munmap(mapped_ptr, total_len);
359 	} else {
360 		pend->map_ptr = mapped_ptr;
361 	}
362 
363 	cmd->msg.hdr.op_src = smr_src_mmap;
364 	cmd->msg.hdr.msg_id = msg_id;
365 	cmd->msg.hdr.src_data = smr_get_offset(ep->region, resp);
366 	cmd->msg.hdr.size = total_len;
367 	pend->map_name = map_name;
368 
369 	close(fd);
370 	return 0;
371 
372 munmap:
373 	munmap(mapped_ptr, total_len);
374 unlink_close:
375 	shm_unlink(map_name->name);
376 	close(fd);
377 remove_entry:
378 	dlist_remove(&map_name->entry);
379 	free(map_name);
380 	return ret;
381 }
382 
smr_copy_to_sar(struct smr_sar_msg * sar_msg,struct smr_resp * resp,struct smr_cmd * cmd,const struct iovec * iov,size_t count,size_t * bytes_done,int * next)383 size_t smr_copy_to_sar(struct smr_sar_msg *sar_msg, struct smr_resp *resp,
384 		       struct smr_cmd *cmd, const struct iovec *iov, size_t count,
385 		       size_t *bytes_done, int *next)
386 {
387 	size_t start = *bytes_done;
388 
389 	if (sar_msg->sar[0].status == SMR_SAR_FREE && !*next) {
390 		*bytes_done += ofi_copy_from_iov(sar_msg->sar[0].buf, SMR_SAR_SIZE,
391 						 iov, count, *bytes_done);
392 		sar_msg->sar[0].status = SMR_SAR_READY;
393 		if (cmd->msg.hdr.op == ofi_op_read_req)
394 			resp->status = FI_SUCCESS;
395 		*next = 1;
396 	}
397 
398 	if (*bytes_done < cmd->msg.hdr.size &&
399 	    sar_msg->sar[1].status == SMR_SAR_FREE && *next) {
400 		*bytes_done += ofi_copy_from_iov(sar_msg->sar[1].buf, SMR_SAR_SIZE,
401 						 iov, count, *bytes_done);
402 		sar_msg->sar[1].status = SMR_SAR_READY;
403 		if (cmd->msg.hdr.op == ofi_op_read_req)
404 			resp->status = FI_SUCCESS;
405 		*next = 0;
406 	}
407 	return *bytes_done - start;
408 }
409 
smr_copy_from_sar(struct smr_sar_msg * sar_msg,struct smr_resp * resp,struct smr_cmd * cmd,const struct iovec * iov,size_t count,size_t * bytes_done,int * next)410 size_t smr_copy_from_sar(struct smr_sar_msg *sar_msg, struct smr_resp *resp,
411 			 struct smr_cmd *cmd, const struct iovec *iov, size_t count,
412 			 size_t *bytes_done, int *next)
413 {
414 	size_t start = *bytes_done;
415 
416 	if (sar_msg->sar[0].status == SMR_SAR_READY && !*next) {
417 		*bytes_done += ofi_copy_to_iov(iov, count, *bytes_done,
418 					       sar_msg->sar[0].buf, SMR_SAR_SIZE);
419 		sar_msg->sar[0].status = SMR_SAR_FREE;
420 		if (cmd->msg.hdr.op != ofi_op_read_req)
421 			resp->status = FI_SUCCESS;
422 		*next = 1;
423 	}
424 
425 	if (*bytes_done < cmd->msg.hdr.size &&
426 	    sar_msg->sar[1].status == SMR_SAR_READY && *next) {
427 		*bytes_done += ofi_copy_to_iov(iov, count, *bytes_done,
428 					       sar_msg->sar[1].buf, SMR_SAR_SIZE);
429 		sar_msg->sar[1].status = SMR_SAR_FREE;
430 		if (cmd->msg.hdr.op != ofi_op_read_req)
431 			resp->status = FI_SUCCESS;
432 		*next = 0;
433 	}
434 	return *bytes_done - start;
435 }
436 
smr_format_sar(struct smr_cmd * cmd,const struct iovec * iov,size_t count,size_t total_len,struct smr_region * smr,struct smr_region * peer_smr,struct smr_sar_msg * sar_msg,struct smr_tx_entry * pending,struct smr_resp * resp)437 void smr_format_sar(struct smr_cmd *cmd, const struct iovec *iov, size_t count,
438 		    size_t total_len, struct smr_region *smr,
439 		    struct smr_region *peer_smr, struct smr_sar_msg *sar_msg,
440 		    struct smr_tx_entry *pending, struct smr_resp *resp)
441 {
442 	cmd->msg.hdr.op_src = smr_src_sar;
443 	cmd->msg.hdr.src_data = smr_get_offset(smr, resp);
444 	cmd->msg.data.sar = smr_get_offset(peer_smr, sar_msg);
445 	cmd->msg.hdr.size = total_len;
446 
447 	pending->bytes_done = 0;
448 	pending->next = 0;
449 	sar_msg->sar[0].status = SMR_SAR_FREE;
450 	sar_msg->sar[1].status = SMR_SAR_FREE;
451 	if (cmd->msg.hdr.op != ofi_op_read_req)
452 		smr_copy_to_sar(sar_msg, NULL, cmd, iov, count,
453 				&pending->bytes_done, &pending->next);
454 }
455 
smr_ep_close(struct fid * fid)456 static int smr_ep_close(struct fid *fid)
457 {
458 	struct smr_ep *ep;
459 
460 	ep = container_of(fid, struct smr_ep, util_ep.ep_fid.fid);
461 
462 	ofi_endpoint_close(&ep->util_ep);
463 
464 	if (ep->region)
465 		smr_free(ep->region);
466 
467 	smr_recv_fs_free(ep->recv_fs);
468 	smr_unexp_fs_free(ep->unexp_fs);
469 	smr_pend_fs_free(ep->pend_fs);
470 	free(ep);
471 	return 0;
472 }
473 
smr_ep_trywait(void * arg)474 static int smr_ep_trywait(void *arg)
475 {
476 	struct smr_ep *ep;
477 
478 	ep = container_of(arg, struct smr_ep, util_ep.ep_fid.fid);
479 
480 	smr_ep_progress(&ep->util_ep);
481 
482 	return FI_SUCCESS;
483 }
484 
smr_ep_bind_cq(struct smr_ep * ep,struct util_cq * cq,uint64_t flags)485 static int smr_ep_bind_cq(struct smr_ep *ep, struct util_cq *cq, uint64_t flags)
486 {
487 	int ret;
488 
489 	ret = ofi_ep_bind_cq(&ep->util_ep, cq, flags);
490 	if (ret)
491 		return ret;
492 
493 	if (flags & FI_TRANSMIT) {
494 		ep->tx_comp = cq->wait ? smr_tx_comp_signal : smr_tx_comp;
495 	}
496 
497 	if (flags & FI_RECV) {
498 		if (cq->wait) {
499 			ep->rx_comp = (cq->domain->info_domain_caps & FI_SOURCE) ?
500 				      smr_rx_src_comp_signal :
501 				      smr_rx_comp_signal;
502 		} else {
503 			ep->rx_comp = (cq->domain->info_domain_caps & FI_SOURCE) ?
504 				      smr_rx_src_comp : smr_rx_comp;
505 		}
506 	}
507 
508 	if (cq->wait) {
509 		ret = ofi_wait_add_fid(cq->wait, &ep->util_ep.ep_fid.fid, 0,
510 				       smr_ep_trywait);
511 		if (ret)
512 			return ret;
513 	}
514 
515 	ret = fid_list_insert(&cq->ep_list,
516 			      &cq->ep_list_lock,
517 			      &ep->util_ep.ep_fid.fid);
518 
519 	return ret;
520 }
521 
smr_ep_bind_cntr(struct smr_ep * ep,struct util_cntr * cntr,uint64_t flags)522 static int smr_ep_bind_cntr(struct smr_ep *ep, struct util_cntr *cntr, uint64_t flags)
523 {
524 	int ret;
525 
526 	ret = ofi_ep_bind_cntr(&ep->util_ep, cntr, flags);
527 	if (ret)
528 		return ret;
529 
530 	if (cntr->wait) {
531 		ret = ofi_wait_add_fid(cntr->wait, &ep->util_ep.ep_fid.fid, 0,
532 				       smr_ep_trywait);
533 		if (ret)
534 			return ret;
535 	}
536 
537 	return FI_SUCCESS;
538 }
539 
smr_ep_bind(struct fid * ep_fid,struct fid * bfid,uint64_t flags)540 static int smr_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
541 {
542 	struct smr_ep *ep;
543 	struct util_av *av;
544 	int ret = 0;
545 
546 	ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid);
547 	switch (bfid->fclass) {
548 	case FI_CLASS_AV:
549 		av = container_of(bfid, struct util_av, av_fid.fid);
550 		ret = ofi_ep_bind_av(&ep->util_ep, av);
551 		if (ret) {
552 			FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
553 				"duplicate AV binding\n");
554 			return -FI_EINVAL;
555 		}
556 		break;
557 	case FI_CLASS_CQ:
558 		ret = smr_ep_bind_cq(ep, container_of(bfid, struct util_cq,
559 						      cq_fid.fid), flags);
560 		break;
561 	case FI_CLASS_EQ:
562 		break;
563 	case FI_CLASS_CNTR:
564 		ret = smr_ep_bind_cntr(ep, container_of(bfid,
565 				struct util_cntr, cntr_fid.fid), flags);
566 		break;
567 	default:
568 		FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
569 			"invalid fid class\n");
570 		ret = -FI_EINVAL;
571 		break;
572 	}
573 	return ret;
574 }
575 
smr_ep_ctrl(struct fid * fid,int command,void * arg)576 static int smr_ep_ctrl(struct fid *fid, int command, void *arg)
577 {
578 	struct smr_attr attr;
579 	struct smr_ep *ep;
580 	struct smr_av *av;
581 	int ret;
582 
583 	ep = container_of(fid, struct smr_ep, util_ep.ep_fid.fid);
584 	av = container_of(ep->util_ep.av, struct smr_av, util_av);
585 
586 	switch (command) {
587 	case FI_ENABLE:
588 		if (!ep->util_ep.rx_cq || !ep->util_ep.tx_cq)
589 			return -FI_ENOCQ;
590 		if (!ep->util_ep.av)
591 			return -FI_ENOAV;
592 
593 		attr.name = ep->name;
594 		attr.rx_count = ep->rx_size;
595 		attr.tx_count = ep->tx_size;
596 		ret = smr_create(&smr_prov, av->smr_map, &attr, &ep->region);
597 		if (ret)
598 			return ret;
599 		smr_exchange_all_peers(ep->region);
600 		break;
601 	default:
602 		return -FI_ENOSYS;
603 	}
604 	return ret;
605 }
606 
607 static struct fi_ops smr_ep_fi_ops = {
608 	.size = sizeof(struct fi_ops),
609 	.close = smr_ep_close,
610 	.bind = smr_ep_bind,
611 	.control = smr_ep_ctrl,
612 	.ops_open = fi_no_ops_open,
613 };
614 
smr_endpoint_name(char * name,char * addr,size_t addrlen,int dom_idx,int ep_idx)615 static int smr_endpoint_name(char *name, char *addr, size_t addrlen,
616 			     int dom_idx, int ep_idx)
617 {
618 	const char *start;
619 	memset(name, 0, NAME_MAX);
620 	if (!addr || addrlen > NAME_MAX)
621 		return -FI_EINVAL;
622 
623 	start = smr_no_prefix((const char *) addr);
624 	if (strstr(addr, SMR_PREFIX) || dom_idx || ep_idx)
625 		snprintf(name, NAME_MAX, "%s:%d:%d:%d", start, getuid(), dom_idx,
626 			 ep_idx);
627 	else
628 		snprintf(name, NAME_MAX, "%s", start);
629 
630 	return 0;
631 }
632 
smr_endpoint(struct fid_domain * domain,struct fi_info * info,struct fid_ep ** ep_fid,void * context)633 int smr_endpoint(struct fid_domain *domain, struct fi_info *info,
634 		  struct fid_ep **ep_fid, void *context)
635 {
636 	struct smr_ep *ep;
637 	struct smr_domain *smr_domain;
638 	int ret, ep_idx;
639 	char name[NAME_MAX];
640 
641 	ep = calloc(1, sizeof(*ep));
642 	if (!ep)
643 		return -FI_ENOMEM;
644 
645 	smr_domain = container_of(domain, struct smr_domain, util_domain.domain_fid);
646 
647 	fastlock_acquire(&smr_domain->util_domain.lock);
648 	ep_idx = smr_domain->ep_idx++;
649 	fastlock_release(&smr_domain->util_domain.lock);
650 	ret = smr_endpoint_name(name, info->src_addr, info->src_addrlen,
651 			        smr_domain->dom_idx, ep_idx);
652 	if (ret)
653 		goto err2;
654 
655 	ret = smr_setname(&ep->util_ep.ep_fid.fid, name, NAME_MAX);
656 	if (ret)
657 		goto err2;
658 
659 	ep->rx_size = info->rx_attr->size;
660 	ep->tx_size = info->tx_attr->size;
661 	ret = ofi_endpoint_init(domain, &smr_util_prov, info, &ep->util_ep, context,
662 				smr_ep_progress);
663 	if (ret)
664 		goto err1;
665 
666 	ep->recv_fs = smr_recv_fs_create(info->rx_attr->size, NULL, NULL);
667 	ep->unexp_fs = smr_unexp_fs_create(info->rx_attr->size, NULL, NULL);
668 	ep->pend_fs = smr_pend_fs_create(info->tx_attr->size, NULL, NULL);
669 	ep->sar_fs = smr_sar_fs_create(info->rx_attr->size, NULL, NULL);
670 	smr_init_queue(&ep->recv_queue, smr_match_msg);
671 	smr_init_queue(&ep->trecv_queue, smr_match_tagged);
672 	smr_init_queue(&ep->unexp_msg_queue, smr_match_unexp_msg);
673 	smr_init_queue(&ep->unexp_tagged_queue, smr_match_unexp_tagged);
674 	dlist_init(&ep->sar_list);
675 
676 	ep->min_multi_recv_size = SMR_INJECT_SIZE;
677 
678 	ep->util_ep.ep_fid.fid.ops = &smr_ep_fi_ops;
679 	ep->util_ep.ep_fid.ops = &smr_ep_ops;
680 	ep->util_ep.ep_fid.cm = &smr_cm_ops;
681 	ep->util_ep.ep_fid.msg = &smr_msg_ops;
682 	ep->util_ep.ep_fid.tagged = &smr_tagged_ops;
683 	ep->util_ep.ep_fid.rma = &smr_rma_ops;
684 	ep->util_ep.ep_fid.atomic = &smr_atomic_ops;
685 
686 	*ep_fid = &ep->util_ep.ep_fid;
687 	return 0;
688 
689 err1:
690 	free((void *)ep->name);
691 err2:
692 	free(ep);
693 	return ret;
694 }
695