1 /*
2  * Copyright (c) 2014-2016 Intel Corporation, Inc.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/time.h>
36 
37 #include <ofi_enosys.h>
38 #include <ofi_util.h>
39 #include <ofi_epoll.h>
40 
41 
ofi_poll_to_epoll(uint32_t events)42 static uint32_t ofi_poll_to_epoll(uint32_t events)
43 {
44 	uint32_t epoll_events = 0;
45 
46 	if (events & POLLIN)
47 		epoll_events |= OFI_EPOLL_IN;
48 	if (events & POLLOUT)
49 		epoll_events |= OFI_EPOLL_OUT;
50 	return epoll_events;
51 }
52 
ofi_trywait(struct fid_fabric * fabric,struct fid ** fids,int count)53 int ofi_trywait(struct fid_fabric *fabric, struct fid **fids, int count)
54 {
55 	struct util_cq *cq;
56 	struct util_eq *eq;
57 	struct util_cntr *cntr;
58 	struct util_wait *wait;
59 	int i, ret;
60 
61 	for (i = 0; i < count; i++) {
62 		switch (fids[i]->fclass) {
63 		case FI_CLASS_CQ:
64 			cq = container_of(fids[i], struct util_cq, cq_fid.fid);
65 			wait = cq->wait;
66 			break;
67 		case FI_CLASS_EQ:
68 			eq = container_of(fids[i], struct util_eq, eq_fid.fid);
69 			wait = eq->wait;
70 			break;
71 		case FI_CLASS_CNTR:
72 			cntr = container_of(fids[i], struct util_cntr, cntr_fid.fid);
73 			wait = cntr->wait;
74 			break;
75 		case FI_CLASS_WAIT:
76 			wait = container_of(fids[i], struct util_wait, wait_fid.fid);
77 			break;
78 		default:
79 			return -FI_EINVAL;
80 		}
81 
82 		ret = wait->wait_try(wait);
83 		if (ret)
84 			return ret;
85 	}
86 	return 0;
87 }
88 
ofi_check_wait_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)89 int ofi_check_wait_attr(const struct fi_provider *prov,
90 		        const struct fi_wait_attr *attr)
91 {
92 	switch (attr->wait_obj) {
93 	case FI_WAIT_UNSPEC:
94 	case FI_WAIT_FD:
95 	case FI_WAIT_POLLFD:
96 	case FI_WAIT_MUTEX_COND:
97 	case FI_WAIT_YIELD:
98 		break;
99 	default:
100 		FI_WARN(prov, FI_LOG_FABRIC, "invalid wait object type\n");
101 		return -FI_EINVAL;
102 	}
103 
104 	if (attr->flags) {
105 		FI_WARN(prov, FI_LOG_FABRIC, "invalid flags\n");
106 		return -FI_EINVAL;
107 	}
108 
109 	return 0;
110 }
111 
fi_wait_cleanup(struct util_wait * wait)112 int fi_wait_cleanup(struct util_wait *wait)
113 {
114 	struct ofi_wait_fid_entry *fid_entry;
115 	int ret;
116 
117 	if (ofi_atomic_get32(&wait->ref))
118 		return -FI_EBUSY;
119 
120 	ret = fi_close(&wait->pollset->poll_fid.fid);
121 	if (ret)
122 		return ret;
123 
124 	while (!dlist_empty(&wait->fid_list)) {
125 		dlist_pop_front(&wait->fid_list, struct ofi_wait_fid_entry,
126 				fid_entry, entry);
127 		free(fid_entry->pollfds.fd);
128 		free(fid_entry);
129 	}
130 
131 	fastlock_destroy(&wait->lock);
132 	ofi_atomic_dec32(&wait->fabric->ref);
133 	return 0;
134 }
135 
ofi_wait_init(struct util_fabric * fabric,struct fi_wait_attr * attr,struct util_wait * wait)136 int ofi_wait_init(struct util_fabric *fabric, struct fi_wait_attr *attr,
137 		  struct util_wait *wait)
138 {
139 	struct fid_poll *poll_fid;
140 	struct fi_poll_attr poll_attr;
141 	int ret;
142 
143 	wait->prov = fabric->prov;
144 	ofi_atomic_initialize32(&wait->ref, 0);
145 	wait->wait_fid.fid.fclass = FI_CLASS_WAIT;
146 
147 	switch (attr->wait_obj) {
148 	case FI_WAIT_UNSPEC:
149 		wait->wait_obj = FI_WAIT_FD;
150 		break;
151 	case FI_WAIT_FD:
152 	case FI_WAIT_POLLFD:
153 	case FI_WAIT_MUTEX_COND:
154 	case FI_WAIT_YIELD:
155 		wait->wait_obj = attr->wait_obj;
156 		break;
157 	default:
158 		assert(0);
159 		return -FI_EINVAL;
160 	}
161 
162 	memset(&poll_attr, 0, sizeof poll_attr);
163 	ret = fi_poll_create_(fabric->prov, NULL, &poll_attr, &poll_fid);
164 	if (ret)
165 		return ret;
166 
167 	wait->pollset = container_of(poll_fid, struct util_poll, poll_fid);
168 	fastlock_init(&wait->lock);
169 	dlist_init(&wait->fid_list);
170 	wait->fabric = fabric;
171 	ofi_atomic_inc32(&fabric->ref);
172 	return 0;
173 }
174 
ofi_wait_match_fd(struct dlist_entry * item,const void * arg)175 static int ofi_wait_match_fd(struct dlist_entry *item, const void *arg)
176 {
177 	struct ofi_wait_fd_entry *fd_entry;
178 
179 	fd_entry = container_of(item, struct ofi_wait_fd_entry, entry);
180 	return fd_entry->fd == *(int *) arg;
181 }
182 
ofi_wait_fdset_del(struct util_wait_fd * wait_fd,int fd)183 static int ofi_wait_fdset_del(struct util_wait_fd *wait_fd, int fd)
184 {
185 	wait_fd->change_index++;
186 
187 	return (wait_fd->util_wait.wait_obj == FI_WAIT_FD) ?
188 		ofi_epoll_del(wait_fd->epoll_fd, fd) :
189 		ofi_pollfds_del(wait_fd->pollfds, fd);
190 }
191 
ofi_wait_fdset_add(struct util_wait_fd * wait_fd,int fd,uint32_t events,void * context)192 static int ofi_wait_fdset_add(struct util_wait_fd *wait_fd, int fd,
193 			       uint32_t events, void *context)
194 {
195 	int ret;
196 
197 	wait_fd->change_index++;
198 	if (wait_fd->util_wait.wait_obj == FI_WAIT_FD) {
199 		ret = ofi_epoll_add(wait_fd->epoll_fd, fd,
200 				    ofi_poll_to_epoll(events), context);
201 	} else {
202 		ret = ofi_pollfds_add(wait_fd->pollfds, fd, events, context);
203 	}
204 	return ret;
205 }
206 
ofi_wait_del_fd(struct util_wait * wait,int fd)207 int ofi_wait_del_fd(struct util_wait *wait, int fd)
208 {
209 	struct ofi_wait_fd_entry *fd_entry;
210 	struct dlist_entry *entry;
211 	struct util_wait_fd *wait_fd;
212 	int ret = 0;
213 
214 	wait_fd = container_of(wait, struct util_wait_fd, util_wait);
215 	fastlock_acquire(&wait->lock);
216 	entry = dlist_find_first_match(&wait_fd->fd_list, ofi_wait_match_fd, &fd);
217 	if (!entry) {
218 		FI_INFO(wait->prov, FI_LOG_FABRIC,
219 			"Given fd (%d) not found in wait list - %p\n",
220 			fd, wait_fd);
221 		ret = -FI_EINVAL;
222 		goto out;
223 	}
224 
225 	fd_entry = container_of(entry, struct ofi_wait_fd_entry, entry);
226 	if (ofi_atomic_dec32(&fd_entry->ref))
227 		goto out;
228 
229 	dlist_remove(&fd_entry->entry);
230 	ofi_wait_fdset_del(wait_fd, fd_entry->fd);
231 	free(fd_entry);
232 out:
233 	fastlock_release(&wait->lock);
234 	return ret;
235 }
236 
ofi_wait_add_fd(struct util_wait * wait,int fd,uint32_t events,ofi_wait_try_func wait_try,void * arg,void * context)237 int ofi_wait_add_fd(struct util_wait *wait, int fd, uint32_t events,
238 		    ofi_wait_try_func wait_try, void *arg, void *context)
239 {
240 	struct ofi_wait_fd_entry *fd_entry;
241 	struct dlist_entry *entry;
242 	struct util_wait_fd *wait_fd;
243 	int ret = 0;
244 
245 	wait_fd = container_of(wait, struct util_wait_fd, util_wait);
246 	fastlock_acquire(&wait->lock);
247 	entry = dlist_find_first_match(&wait_fd->fd_list, ofi_wait_match_fd, &fd);
248 	if (entry) {
249 		FI_DBG(wait->prov, FI_LOG_EP_CTRL,
250 		       "Given fd (%d) already added to wait list - %p \n",
251 		       fd, wait_fd);
252 		fd_entry = container_of(entry, struct ofi_wait_fd_entry, entry);
253 		ofi_atomic_inc32(&fd_entry->ref);
254 		goto out;
255 	}
256 
257 	ret = ofi_wait_fdset_add(wait_fd, fd, events, context);
258 	if (ret) {
259 		FI_WARN(wait->prov, FI_LOG_FABRIC,
260 			"Unable to add fd to epoll\n");
261 		goto out;
262 	}
263 
264 	fd_entry = calloc(1, sizeof *fd_entry);
265 	if (!fd_entry) {
266 		ret = -FI_ENOMEM;
267 		ofi_wait_fdset_del(wait_fd, fd);
268 		goto out;
269 	}
270 
271 	fd_entry->fd = fd;
272 	fd_entry->wait_try = wait_try;
273 	fd_entry->arg = arg;
274 	ofi_atomic_initialize32(&fd_entry->ref, 1);
275 
276 	dlist_insert_tail(&fd_entry->entry, &wait_fd->fd_list);
277 out:
278 	fastlock_release(&wait->lock);
279 	return ret;
280 }
281 
util_wait_fd_signal(struct util_wait * util_wait)282 static void util_wait_fd_signal(struct util_wait *util_wait)
283 {
284 	struct util_wait_fd *wait;
285 	wait = container_of(util_wait, struct util_wait_fd, util_wait);
286 	fd_signal_set(&wait->signal);
287 }
288 
util_wait_update_pollfd(struct util_wait_fd * wait_fd,struct ofi_wait_fid_entry * fid_entry)289 static int util_wait_update_pollfd(struct util_wait_fd *wait_fd,
290 				   struct ofi_wait_fid_entry *fid_entry)
291 {
292 	struct fi_wait_pollfd pollfds = { 0 };
293 	struct pollfd *fds;
294 	size_t i;
295 	int ret;
296 
297 	ret = fi_control(fid_entry->fid, FI_GETWAIT, &pollfds);
298 	if (ret != FI_ETOOSMALL)
299 		return ret;
300 
301 	if (pollfds.change_index == fid_entry->pollfds.change_index)
302 		return 0;
303 
304 	fds = fid_entry->pollfds.fd;
305 	for (i = 0; i < fid_entry->pollfds.nfds; i++) {
306 		ret = ofi_wait_fdset_del(wait_fd, fds->fd);
307 		if (ret) {
308 			FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
309 				"epoll_del failed %s\n", fi_strerror(ret));
310 		}
311 	}
312 
313 	if (fid_entry->pollfds.nfds < pollfds.nfds) {
314 		fds = calloc(pollfds.nfds, sizeof(*fds));
315 		if (!fds)
316 			return -FI_ENOMEM;
317 
318 		free(fid_entry->pollfds.fd);
319 		fid_entry->pollfds.fd = fds;
320 		fid_entry->pollfds.nfds = pollfds.nfds;
321 	}
322 
323 	ret = fi_control(fid_entry->fid, FI_GETWAIT, &fid_entry->pollfds);
324 	if (ret) {
325 		FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
326 			"unable to get wait pollfd %s\n", fi_strerror(ret));
327 		return ret;
328 	}
329 
330 	fds = fid_entry->pollfds.fd;
331 	for (i = 0; i < fid_entry->pollfds.nfds; i++) {
332 		ret = ofi_wait_fdset_add(wait_fd, fds[i].fd, fds[i].events,
333 					 fid_entry->fid->context);
334 		if (ret) {
335 			FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
336 				"unable to add fd %s\n", fi_strerror(ret));
337 			return ret;
338 		}
339 	}
340 
341 	return -FI_EAGAIN;
342 }
343 
util_wait_fd_try(struct util_wait * wait)344 static int util_wait_fd_try(struct util_wait *wait)
345 {
346 	struct ofi_wait_fid_entry *fid_entry;
347 	struct ofi_wait_fd_entry *fd_entry;
348 	struct util_wait_fd *wait_fd;
349 	void *context;
350 	int ret;
351 
352 	wait_fd = container_of(wait, struct util_wait_fd, util_wait);
353 	fd_signal_reset(&wait_fd->signal);
354 	fastlock_acquire(&wait->lock);
355 	dlist_foreach_container(&wait_fd->fd_list, struct ofi_wait_fd_entry,
356 				fd_entry, entry) {
357 		ret = fd_entry->wait_try(fd_entry->arg);
358 		if (ret != FI_SUCCESS)
359 			goto release;
360 	}
361 
362 	dlist_foreach_container(&wait->fid_list,
363 				struct ofi_wait_fid_entry, fid_entry, entry) {
364 		if (fid_entry->wait_obj == FI_WAIT_POLLFD) {
365 			ret = util_wait_update_pollfd(wait_fd, fid_entry);
366 			if (ret)
367 				goto release;
368 		}
369 
370 		ret = fid_entry->wait_try(fid_entry->fid);
371 		if (ret != FI_SUCCESS)
372 			goto release;
373 	}
374 
375 	fastlock_release(&wait->lock);
376 	ret = fi_poll(&wait->pollset->poll_fid, &context, 1);
377 	return (ret > 0) ? -FI_EAGAIN : (ret == -FI_EAGAIN) ? FI_SUCCESS : ret;
378 
379 release:
380 	fastlock_release(&wait->lock);
381 	return ret;
382 }
383 
util_wait_fd_run(struct fid_wait * wait_fid,int timeout)384 static int util_wait_fd_run(struct fid_wait *wait_fid, int timeout)
385 {
386 	struct util_wait_fd *wait;
387 	uint64_t endtime;
388 	void *ep_context[1];
389 	int ret;
390 
391 	wait = container_of(wait_fid, struct util_wait_fd, util_wait.wait_fid);
392 	endtime = ofi_timeout_time(timeout);
393 
394 	while (1) {
395 		ret = wait->util_wait.wait_try(&wait->util_wait);
396 		if (ret)
397 			return ret == -FI_EAGAIN ? 0 : ret;
398 
399 		if (ofi_adjust_timeout(endtime, &timeout))
400 			return -FI_ETIMEDOUT;
401 
402 		ret = (wait->util_wait.wait_obj == FI_WAIT_FD) ?
403 		      ofi_epoll_wait(wait->epoll_fd, ep_context, 1, timeout) :
404 		      ofi_pollfds_wait(wait->pollfds, ep_context, 1, timeout);
405 		if (ret > 0)
406 			return FI_SUCCESS;
407 
408 		if (ret < 0) {
409 			FI_WARN(wait->util_wait.prov, FI_LOG_FABRIC,
410 				"poll failed\n");
411 			return ret;
412 		}
413 	}
414 }
415 
util_wait_fd_control(struct fid * fid,int command,void * arg)416 static int util_wait_fd_control(struct fid *fid, int command, void *arg)
417 {
418 	struct util_wait_fd *wait;
419 	struct fi_wait_pollfd *pollfd;
420 	int ret;
421 
422 	wait = container_of(fid, struct util_wait_fd, util_wait.wait_fid.fid);
423 	switch (command) {
424 	case FI_GETWAIT:
425 		if (wait->util_wait.wait_obj == FI_WAIT_FD) {
426 #ifdef HAVE_EPOLL
427 			*(int *) arg = wait->epoll_fd;
428 			return 0;
429 #else
430 			return -FI_ENODATA;
431 #endif
432 		}
433 
434 		pollfd = arg;
435 		fastlock_acquire(&wait->util_wait.lock);
436 		if (pollfd->nfds >= wait->pollfds->nfds) {
437 			memcpy(pollfd->fd, &wait->pollfds->fds[0],
438 			       wait->pollfds->nfds * sizeof(*wait->pollfds->fds));
439 			ret = 0;
440 		} else {
441 			ret = -FI_ETOOSMALL;
442 		}
443 		pollfd->change_index = wait->change_index;
444 		pollfd->nfds = wait->pollfds->nfds;
445 		fastlock_release(&wait->util_wait.lock);
446 		break;
447 	case FI_GETWAITOBJ:
448 		*(enum fi_wait_obj *) arg = wait->util_wait.wait_obj;
449 		ret = 0;
450 		break;
451 	default:
452 		FI_INFO(wait->util_wait.prov, FI_LOG_FABRIC,
453 			"unsupported command\n");
454 		ret = -FI_ENOSYS;
455 		break;
456 	}
457 	return ret;
458 }
459 
util_wait_fd_close(struct fid * fid)460 static int util_wait_fd_close(struct fid *fid)
461 {
462 	struct util_wait_fd *wait;
463 	struct ofi_wait_fd_entry *fd_entry;
464 	int ret;
465 
466 	wait = container_of(fid, struct util_wait_fd, util_wait.wait_fid.fid);
467 
468 	fastlock_acquire(&wait->util_wait.lock);
469 	while (!dlist_empty(&wait->fd_list)) {
470 		dlist_pop_front(&wait->fd_list, struct ofi_wait_fd_entry,
471 				fd_entry, entry);
472 		ofi_wait_fdset_del(wait, fd_entry->fd);
473 		free(fd_entry);
474 	}
475 	fastlock_release(&wait->util_wait.lock);
476 
477 	ret = fi_wait_cleanup(&wait->util_wait);
478 	if (ret)
479 		return ret;
480 
481 	ofi_wait_fdset_del(wait, wait->signal.fd[FI_READ_FD]);
482 	fd_signal_free(&wait->signal);
483 
484 	if (wait->util_wait.wait_obj == FI_WAIT_FD)
485 		ofi_epoll_close(wait->epoll_fd);
486 	else
487 		ofi_epoll_close(wait->epoll_fd);
488 	free(wait);
489 	return 0;
490 }
491 
492 static struct fi_ops_wait util_wait_fd_ops = {
493 	.size = sizeof(struct fi_ops_wait),
494 	.wait = util_wait_fd_run,
495 };
496 
497 static struct fi_ops util_wait_fd_fi_ops = {
498 	.size = sizeof(struct fi_ops),
499 	.close = util_wait_fd_close,
500 	.bind = fi_no_bind,
501 	.control = util_wait_fd_control,
502 	.ops_open = fi_no_ops_open,
503 };
504 
util_verify_wait_fd_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)505 static int util_verify_wait_fd_attr(const struct fi_provider *prov,
506 				    const struct fi_wait_attr *attr)
507 {
508 	int ret;
509 
510 	ret = ofi_check_wait_attr(prov, attr);
511 	if (ret)
512 		return ret;
513 
514 	switch (attr->wait_obj) {
515 	case FI_WAIT_UNSPEC:
516 	case FI_WAIT_FD:
517 	case FI_WAIT_POLLFD:
518 		break;
519 	default:
520 		FI_WARN(prov, FI_LOG_FABRIC, "unsupported wait object\n");
521 		return -FI_EINVAL;
522 	}
523 
524 	return 0;
525 }
526 
ofi_wait_fd_open(struct fid_fabric * fabric_fid,struct fi_wait_attr * attr,struct fid_wait ** waitset)527 int ofi_wait_fd_open(struct fid_fabric *fabric_fid, struct fi_wait_attr *attr,
528 		    struct fid_wait **waitset)
529 {
530 	struct util_fabric *fabric;
531 	struct util_wait_fd *wait;
532 	int ret;
533 
534 	fabric = container_of(fabric_fid, struct util_fabric, fabric_fid);
535 	ret = util_verify_wait_fd_attr(fabric->prov, attr);
536 	if (ret)
537 		return ret;
538 
539 	wait = calloc(1, sizeof(*wait));
540 	if (!wait)
541 		return -FI_ENOMEM;
542 
543 	ret = ofi_wait_init(fabric, attr, &wait->util_wait);
544 	if (ret)
545 		goto err1;
546 
547 	wait->util_wait.signal = util_wait_fd_signal;
548 	wait->util_wait.wait_try = util_wait_fd_try;
549 	ret = fd_signal_init(&wait->signal);
550 	if (ret)
551 		goto err2;
552 
553 	ret = (wait->util_wait.wait_obj == FI_WAIT_FD) ?
554 	      ofi_epoll_create(&wait->epoll_fd) :
555 	      ofi_pollfds_create(&wait->pollfds);
556 	if (ret)
557 		goto err3;
558 
559 	ret = ofi_wait_fdset_add(wait, wait->signal.fd[FI_READ_FD],
560 				 POLLIN, &wait->util_wait.wait_fid.fid);
561 	if (ret)
562 		goto err4;
563 
564 	wait->util_wait.wait_fid.fid.ops = &util_wait_fd_fi_ops;
565 	wait->util_wait.wait_fid.ops = &util_wait_fd_ops;
566 
567 	dlist_init(&wait->fd_list);
568 
569 	*waitset = &wait->util_wait.wait_fid;
570 	return 0;
571 
572 err4:
573 	if (wait->util_wait.wait_obj == FI_WAIT_FD)
574 		ofi_epoll_close(wait->epoll_fd);
575 	else
576 		ofi_pollfds_close(wait->pollfds);
577 err3:
578 	fd_signal_free(&wait->signal);
579 err2:
580 	fi_wait_cleanup(&wait->util_wait);
581 err1:
582 	free(wait);
583 	return ret;
584 }
585 
util_wait_yield_signal(struct util_wait * util_wait)586 static void util_wait_yield_signal(struct util_wait *util_wait)
587 {
588 	struct util_wait_yield *wait_yield;
589 
590 	wait_yield = container_of(util_wait, struct util_wait_yield, util_wait);
591 
592 	fastlock_acquire(&wait_yield->signal_lock);
593 	wait_yield->signal = 1;
594 	fastlock_release(&wait_yield->signal_lock);
595 }
596 
util_wait_yield_run(struct fid_wait * wait_fid,int timeout)597 static int util_wait_yield_run(struct fid_wait *wait_fid, int timeout)
598 {
599 	struct util_wait_yield *wait;
600 	struct ofi_wait_fid_entry *fid_entry;
601 	int ret = 0;
602 
603 	wait = container_of(wait_fid, struct util_wait_yield, util_wait.wait_fid);
604 	while (!wait->signal) {
605 		fastlock_acquire(&wait->util_wait.lock);
606 		dlist_foreach_container(&wait->util_wait.fid_list,
607 					struct ofi_wait_fid_entry,
608 					fid_entry, entry) {
609 			ret = fid_entry->wait_try(fid_entry->fid);
610 			if (ret) {
611 				fastlock_release(&wait->util_wait.lock);
612 				return ret;
613 			}
614 		}
615 		fastlock_release(&wait->util_wait.lock);
616 		pthread_yield();
617 	}
618 
619 	fastlock_acquire(&wait->signal_lock);
620 	wait->signal = 0;
621 	fastlock_release(&wait->signal_lock);
622 
623 	return FI_SUCCESS;
624 }
625 
util_wait_yield_close(struct fid * fid)626 static int util_wait_yield_close(struct fid *fid)
627 {
628 	struct util_wait_yield *wait;
629 	int ret;
630 
631 	wait = container_of(fid, struct util_wait_yield, util_wait.wait_fid.fid);
632 	ret = fi_wait_cleanup(&wait->util_wait);
633 	if (ret)
634 		return ret;
635 
636 	fastlock_destroy(&wait->signal_lock);
637 	free(wait);
638 	return 0;
639 }
640 
641 static struct fi_ops_wait util_wait_yield_ops = {
642 	.size = sizeof(struct fi_ops_wait),
643 	.wait = util_wait_yield_run,
644 };
645 
646 static struct fi_ops util_wait_yield_fi_ops = {
647 	.size = sizeof(struct fi_ops),
648 	.close = util_wait_yield_close,
649 	.bind = fi_no_bind,
650 	.control = fi_no_control,
651 	.ops_open = fi_no_ops_open,
652 };
653 
util_verify_wait_yield_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)654 static int util_verify_wait_yield_attr(const struct fi_provider *prov,
655 				       const struct fi_wait_attr *attr)
656 {
657 	int ret;
658 
659 	ret = ofi_check_wait_attr(prov, attr);
660 	if (ret)
661 		return ret;
662 
663 	switch (attr->wait_obj) {
664 	case FI_WAIT_UNSPEC:
665 	case FI_WAIT_YIELD:
666 		break;
667 	default:
668 		FI_WARN(prov, FI_LOG_FABRIC, "unsupported wait object\n");
669 		return -FI_EINVAL;
670 	}
671 
672 	return 0;
673 }
674 
ofi_wait_yield_open(struct fid_fabric * fabric_fid,struct fi_wait_attr * attr,struct fid_wait ** waitset)675 int ofi_wait_yield_open(struct fid_fabric *fabric_fid, struct fi_wait_attr *attr,
676 			struct fid_wait **waitset)
677 {
678 	struct util_fabric *fabric;
679 	struct util_wait_yield *wait;
680 	int ret;
681 
682 	fabric = container_of(fabric_fid, struct util_fabric, fabric_fid);
683 	ret = util_verify_wait_yield_attr(fabric->prov, attr);
684 	if (ret)
685 		return ret;
686 
687 	attr->wait_obj = FI_WAIT_YIELD;
688 	wait = calloc(1, sizeof(*wait));
689 	if (!wait)
690 		return -FI_ENOMEM;
691 
692 	ret = ofi_wait_init(fabric, attr, &wait->util_wait);
693 	if (ret) {
694 		free(wait);
695 		return ret;
696 	}
697 
698 	wait->util_wait.signal = util_wait_yield_signal;
699 	wait->signal = 0;
700 
701 	wait->util_wait.wait_fid.fid.ops = &util_wait_yield_fi_ops;
702 	wait->util_wait.wait_fid.ops = &util_wait_yield_ops;
703 
704 	fastlock_init(&wait->signal_lock);
705 
706 	*waitset = &wait->util_wait.wait_fid;
707 
708 	return 0;
709 }
710 
ofi_wait_match_fid(struct dlist_entry * item,const void * arg)711 static int ofi_wait_match_fid(struct dlist_entry *item, const void *arg)
712 {
713 	struct ofi_wait_fid_entry *fid_entry;
714 
715 	fid_entry = container_of(item, struct ofi_wait_fid_entry, entry);
716 	return fid_entry->fid == arg;
717 }
718 
ofi_wait_del_fid(struct util_wait * wait,fid_t fid)719 int ofi_wait_del_fid(struct util_wait *wait, fid_t fid)
720 {
721 	struct ofi_wait_fid_entry *fid_entry;
722 	struct util_wait_fd *wait_fd;
723 	struct dlist_entry *entry;
724 	struct pollfd *fds;
725 	size_t i;
726 	int ret = 0;
727 
728 	fastlock_acquire(&wait->lock);
729 	entry = dlist_find_first_match(&wait->fid_list,
730 				       ofi_wait_match_fid, fid);
731 	if (!entry) {
732 		FI_INFO(wait->prov, FI_LOG_EP_CTRL,
733 			"Given fid (%p) not found in wait list - %p\n",
734 			fid, wait);
735 		ret = -FI_EINVAL;
736 		goto out;
737 	}
738 
739 	fid_entry = container_of(entry, struct ofi_wait_fid_entry, entry);
740 	if (ofi_atomic_dec32(&fid_entry->ref))
741 		goto out;
742 
743 	wait_fd = container_of(wait, struct util_wait_fd, util_wait);
744 	fds = fid_entry->pollfds.fd;
745 	for (i = 0; i < fid_entry->pollfds.nfds; i++) {
746 		assert(fds);
747 		ret = ofi_wait_fdset_del(wait_fd, fds->fd);
748 		if (ret) {
749 			FI_WARN(wait->prov, FI_LOG_EP_CTRL,
750 				"epoll_del failed %s\n", fi_strerror(ret));
751 		}
752 	}
753 
754 	dlist_remove(&fid_entry->entry);
755 	free(fid_entry->pollfds.fd);
756 	free(fid_entry);
757 out:
758 	fastlock_release(&wait->lock);
759 	return ret;
760 }
761 
ofi_wait_get_fd(struct util_wait_fd * wait_fd,struct ofi_wait_fid_entry * fid_entry)762 static int ofi_wait_get_fd(struct util_wait_fd *wait_fd,
763 			   struct ofi_wait_fid_entry *fid_entry)
764 {
765 	struct pollfd *fds;
766 	int ret;
767 
768 	fds = calloc(1, sizeof(*fds));
769 	if (!fds)
770 		return -FI_ENOMEM;
771 
772 	ret = fi_control(fid_entry->fid, FI_GETWAIT, &fds->fd);
773 	if (ret) {
774 		FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
775 			"unable to get wait fd %s\n", fi_strerror(ret));
776 		goto free;
777 	}
778 
779 	fds->events = fid_entry->events;
780 	fid_entry->pollfds.fd = fds;
781 	fid_entry->pollfds.nfds = 1;
782 	return 0;
783 
784 free:
785 	free(fds);
786 	return ret;
787 }
788 
ofi_wait_get_fid_fds(struct util_wait * wait,struct ofi_wait_fid_entry * fid_entry)789 static int ofi_wait_get_fid_fds(struct util_wait *wait,
790 				struct ofi_wait_fid_entry *fid_entry)
791 {
792 	struct util_wait_fd *wait_fd;
793 	struct pollfd *fds;
794 	size_t i;
795 	int ret;
796 
797 	ret = fi_control(fid_entry->fid, FI_GETWAITOBJ,
798 			 &fid_entry->wait_obj);
799 	if ((fid_entry->wait_obj != FI_WAIT_FD) &&
800 	    (fid_entry->wait_obj != FI_WAIT_POLLFD)) {
801 		FI_WARN(wait->prov, FI_LOG_EP_CTRL,
802 			"unsupported wait object %d (ret: %s)\n",
803 			fid_entry->wait_obj, fi_strerror(ret));
804 		return ret;
805 	}
806 
807 	/* pollfd is updated during trywait */
808 	if (fid_entry->wait_obj == FI_WAIT_POLLFD)
809 		return 0;
810 
811 	wait_fd = container_of(wait, struct util_wait_fd, util_wait);
812 	ret = ofi_wait_get_fd(wait_fd, fid_entry);
813 	if (ret)
814 		return ret;
815 
816 	fds = fid_entry->pollfds.fd;
817 	for (i = 0; i < fid_entry->pollfds.nfds; i++) {
818 		ret = ofi_wait_fdset_add(wait_fd, fds[i].fd, fds[i].events,
819 					 fid_entry->fid->context);
820 		if (ret) {
821 			FI_WARN(wait->prov, FI_LOG_EP_CTRL,
822 				"unable to add fd %s\n", fi_strerror(ret));
823 			return ret;
824 		}
825 	}
826 
827 	return 0;
828 }
829 
ofi_wait_add_fid(struct util_wait * wait,fid_t fid,uint32_t events,ofi_wait_try_func wait_try)830 int ofi_wait_add_fid(struct util_wait *wait, fid_t fid, uint32_t events,
831 		     ofi_wait_try_func wait_try)
832 {
833 	struct ofi_wait_fid_entry *fid_entry;
834 	struct dlist_entry *entry;
835 	int ret = 0;
836 
837 	fastlock_acquire(&wait->lock);
838 	entry = dlist_find_first_match(&wait->fid_list,
839 				       ofi_wait_match_fid, fid);
840 	if (entry) {
841 		FI_DBG(wait->prov, FI_LOG_EP_CTRL,
842 		       "Given fid (%p) already added to wait list - %p \n",
843 		       fid, wait);
844 		fid_entry = container_of(entry, struct ofi_wait_fid_entry, entry);
845 		ofi_atomic_inc32(&fid_entry->ref);
846 		goto out;
847 	}
848 
849 	fid_entry = calloc(1, sizeof *fid_entry);
850 	if (!fid_entry) {
851 		ret = -FI_ENOMEM;
852 		goto out;
853 	}
854 
855 	fid_entry->fid = fid;
856 	fid_entry->wait_try = wait_try;
857 	fid_entry->events = events;
858 	ofi_atomic_initialize32(&fid_entry->ref, 1);
859 
860 	if (wait->wait_obj == FI_WAIT_FD || wait->wait_obj == FI_WAIT_POLLFD) {
861 		ret = ofi_wait_get_fid_fds(wait, fid_entry);
862 		if (ret) {
863 			free(fid_entry);
864 			goto out;
865 		}
866 	}
867 	dlist_insert_tail(&fid_entry->entry, &wait->fid_list);
868 out:
869 	fastlock_release(&wait->lock);
870 	return ret;
871 }
872