1 /*
2 * Copyright (c) 2014-2016 Intel Corporation, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/time.h>
36
37 #include <ofi_enosys.h>
38 #include <ofi_util.h>
39 #include <ofi_epoll.h>
40
41
ofi_poll_to_epoll(uint32_t events)42 static uint32_t ofi_poll_to_epoll(uint32_t events)
43 {
44 uint32_t epoll_events = 0;
45
46 if (events & POLLIN)
47 epoll_events |= OFI_EPOLL_IN;
48 if (events & POLLOUT)
49 epoll_events |= OFI_EPOLL_OUT;
50 return epoll_events;
51 }
52
ofi_trywait(struct fid_fabric * fabric,struct fid ** fids,int count)53 int ofi_trywait(struct fid_fabric *fabric, struct fid **fids, int count)
54 {
55 struct util_cq *cq;
56 struct util_eq *eq;
57 struct util_cntr *cntr;
58 struct util_wait *wait;
59 int i, ret;
60
61 for (i = 0; i < count; i++) {
62 switch (fids[i]->fclass) {
63 case FI_CLASS_CQ:
64 cq = container_of(fids[i], struct util_cq, cq_fid.fid);
65 wait = cq->wait;
66 break;
67 case FI_CLASS_EQ:
68 eq = container_of(fids[i], struct util_eq, eq_fid.fid);
69 wait = eq->wait;
70 break;
71 case FI_CLASS_CNTR:
72 cntr = container_of(fids[i], struct util_cntr, cntr_fid.fid);
73 wait = cntr->wait;
74 break;
75 case FI_CLASS_WAIT:
76 wait = container_of(fids[i], struct util_wait, wait_fid.fid);
77 break;
78 default:
79 return -FI_EINVAL;
80 }
81
82 ret = wait->wait_try(wait);
83 if (ret)
84 return ret;
85 }
86 return 0;
87 }
88
ofi_check_wait_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)89 int ofi_check_wait_attr(const struct fi_provider *prov,
90 const struct fi_wait_attr *attr)
91 {
92 switch (attr->wait_obj) {
93 case FI_WAIT_UNSPEC:
94 case FI_WAIT_FD:
95 case FI_WAIT_POLLFD:
96 case FI_WAIT_MUTEX_COND:
97 case FI_WAIT_YIELD:
98 break;
99 default:
100 FI_WARN(prov, FI_LOG_FABRIC, "invalid wait object type\n");
101 return -FI_EINVAL;
102 }
103
104 if (attr->flags) {
105 FI_WARN(prov, FI_LOG_FABRIC, "invalid flags\n");
106 return -FI_EINVAL;
107 }
108
109 return 0;
110 }
111
fi_wait_cleanup(struct util_wait * wait)112 int fi_wait_cleanup(struct util_wait *wait)
113 {
114 struct ofi_wait_fid_entry *fid_entry;
115 int ret;
116
117 if (ofi_atomic_get32(&wait->ref))
118 return -FI_EBUSY;
119
120 ret = fi_close(&wait->pollset->poll_fid.fid);
121 if (ret)
122 return ret;
123
124 while (!dlist_empty(&wait->fid_list)) {
125 dlist_pop_front(&wait->fid_list, struct ofi_wait_fid_entry,
126 fid_entry, entry);
127 free(fid_entry->pollfds.fd);
128 free(fid_entry);
129 }
130
131 fastlock_destroy(&wait->lock);
132 ofi_atomic_dec32(&wait->fabric->ref);
133 return 0;
134 }
135
ofi_wait_init(struct util_fabric * fabric,struct fi_wait_attr * attr,struct util_wait * wait)136 int ofi_wait_init(struct util_fabric *fabric, struct fi_wait_attr *attr,
137 struct util_wait *wait)
138 {
139 struct fid_poll *poll_fid;
140 struct fi_poll_attr poll_attr;
141 int ret;
142
143 wait->prov = fabric->prov;
144 ofi_atomic_initialize32(&wait->ref, 0);
145 wait->wait_fid.fid.fclass = FI_CLASS_WAIT;
146
147 switch (attr->wait_obj) {
148 case FI_WAIT_UNSPEC:
149 wait->wait_obj = FI_WAIT_FD;
150 break;
151 case FI_WAIT_FD:
152 case FI_WAIT_POLLFD:
153 case FI_WAIT_MUTEX_COND:
154 case FI_WAIT_YIELD:
155 wait->wait_obj = attr->wait_obj;
156 break;
157 default:
158 assert(0);
159 return -FI_EINVAL;
160 }
161
162 memset(&poll_attr, 0, sizeof poll_attr);
163 ret = fi_poll_create_(fabric->prov, NULL, &poll_attr, &poll_fid);
164 if (ret)
165 return ret;
166
167 wait->pollset = container_of(poll_fid, struct util_poll, poll_fid);
168 fastlock_init(&wait->lock);
169 dlist_init(&wait->fid_list);
170 wait->fabric = fabric;
171 ofi_atomic_inc32(&fabric->ref);
172 return 0;
173 }
174
ofi_wait_match_fd(struct dlist_entry * item,const void * arg)175 static int ofi_wait_match_fd(struct dlist_entry *item, const void *arg)
176 {
177 struct ofi_wait_fd_entry *fd_entry;
178
179 fd_entry = container_of(item, struct ofi_wait_fd_entry, entry);
180 return fd_entry->fd == *(int *) arg;
181 }
182
ofi_wait_fdset_del(struct util_wait_fd * wait_fd,int fd)183 static int ofi_wait_fdset_del(struct util_wait_fd *wait_fd, int fd)
184 {
185 wait_fd->change_index++;
186
187 return (wait_fd->util_wait.wait_obj == FI_WAIT_FD) ?
188 ofi_epoll_del(wait_fd->epoll_fd, fd) :
189 ofi_pollfds_del(wait_fd->pollfds, fd);
190 }
191
ofi_wait_fdset_add(struct util_wait_fd * wait_fd,int fd,uint32_t events,void * context)192 static int ofi_wait_fdset_add(struct util_wait_fd *wait_fd, int fd,
193 uint32_t events, void *context)
194 {
195 int ret;
196
197 wait_fd->change_index++;
198 if (wait_fd->util_wait.wait_obj == FI_WAIT_FD) {
199 ret = ofi_epoll_add(wait_fd->epoll_fd, fd,
200 ofi_poll_to_epoll(events), context);
201 } else {
202 ret = ofi_pollfds_add(wait_fd->pollfds, fd, events, context);
203 }
204 return ret;
205 }
206
ofi_wait_del_fd(struct util_wait * wait,int fd)207 int ofi_wait_del_fd(struct util_wait *wait, int fd)
208 {
209 struct ofi_wait_fd_entry *fd_entry;
210 struct dlist_entry *entry;
211 struct util_wait_fd *wait_fd;
212 int ret = 0;
213
214 wait_fd = container_of(wait, struct util_wait_fd, util_wait);
215 fastlock_acquire(&wait->lock);
216 entry = dlist_find_first_match(&wait_fd->fd_list, ofi_wait_match_fd, &fd);
217 if (!entry) {
218 FI_INFO(wait->prov, FI_LOG_FABRIC,
219 "Given fd (%d) not found in wait list - %p\n",
220 fd, wait_fd);
221 ret = -FI_EINVAL;
222 goto out;
223 }
224
225 fd_entry = container_of(entry, struct ofi_wait_fd_entry, entry);
226 if (ofi_atomic_dec32(&fd_entry->ref))
227 goto out;
228
229 dlist_remove(&fd_entry->entry);
230 ofi_wait_fdset_del(wait_fd, fd_entry->fd);
231 free(fd_entry);
232 out:
233 fastlock_release(&wait->lock);
234 return ret;
235 }
236
ofi_wait_add_fd(struct util_wait * wait,int fd,uint32_t events,ofi_wait_try_func wait_try,void * arg,void * context)237 int ofi_wait_add_fd(struct util_wait *wait, int fd, uint32_t events,
238 ofi_wait_try_func wait_try, void *arg, void *context)
239 {
240 struct ofi_wait_fd_entry *fd_entry;
241 struct dlist_entry *entry;
242 struct util_wait_fd *wait_fd;
243 int ret = 0;
244
245 wait_fd = container_of(wait, struct util_wait_fd, util_wait);
246 fastlock_acquire(&wait->lock);
247 entry = dlist_find_first_match(&wait_fd->fd_list, ofi_wait_match_fd, &fd);
248 if (entry) {
249 FI_DBG(wait->prov, FI_LOG_EP_CTRL,
250 "Given fd (%d) already added to wait list - %p \n",
251 fd, wait_fd);
252 fd_entry = container_of(entry, struct ofi_wait_fd_entry, entry);
253 ofi_atomic_inc32(&fd_entry->ref);
254 goto out;
255 }
256
257 ret = ofi_wait_fdset_add(wait_fd, fd, events, context);
258 if (ret) {
259 FI_WARN(wait->prov, FI_LOG_FABRIC,
260 "Unable to add fd to epoll\n");
261 goto out;
262 }
263
264 fd_entry = calloc(1, sizeof *fd_entry);
265 if (!fd_entry) {
266 ret = -FI_ENOMEM;
267 ofi_wait_fdset_del(wait_fd, fd);
268 goto out;
269 }
270
271 fd_entry->fd = fd;
272 fd_entry->wait_try = wait_try;
273 fd_entry->arg = arg;
274 ofi_atomic_initialize32(&fd_entry->ref, 1);
275
276 dlist_insert_tail(&fd_entry->entry, &wait_fd->fd_list);
277 out:
278 fastlock_release(&wait->lock);
279 return ret;
280 }
281
util_wait_fd_signal(struct util_wait * util_wait)282 static void util_wait_fd_signal(struct util_wait *util_wait)
283 {
284 struct util_wait_fd *wait;
285 wait = container_of(util_wait, struct util_wait_fd, util_wait);
286 fd_signal_set(&wait->signal);
287 }
288
util_wait_update_pollfd(struct util_wait_fd * wait_fd,struct ofi_wait_fid_entry * fid_entry)289 static int util_wait_update_pollfd(struct util_wait_fd *wait_fd,
290 struct ofi_wait_fid_entry *fid_entry)
291 {
292 struct fi_wait_pollfd pollfds = { 0 };
293 struct pollfd *fds;
294 size_t i;
295 int ret;
296
297 ret = fi_control(fid_entry->fid, FI_GETWAIT, &pollfds);
298 if (ret != FI_ETOOSMALL)
299 return ret;
300
301 if (pollfds.change_index == fid_entry->pollfds.change_index)
302 return 0;
303
304 fds = fid_entry->pollfds.fd;
305 for (i = 0; i < fid_entry->pollfds.nfds; i++) {
306 ret = ofi_wait_fdset_del(wait_fd, fds->fd);
307 if (ret) {
308 FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
309 "epoll_del failed %s\n", fi_strerror(ret));
310 }
311 }
312
313 if (fid_entry->pollfds.nfds < pollfds.nfds) {
314 fds = calloc(pollfds.nfds, sizeof(*fds));
315 if (!fds)
316 return -FI_ENOMEM;
317
318 free(fid_entry->pollfds.fd);
319 fid_entry->pollfds.fd = fds;
320 fid_entry->pollfds.nfds = pollfds.nfds;
321 }
322
323 ret = fi_control(fid_entry->fid, FI_GETWAIT, &fid_entry->pollfds);
324 if (ret) {
325 FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
326 "unable to get wait pollfd %s\n", fi_strerror(ret));
327 return ret;
328 }
329
330 fds = fid_entry->pollfds.fd;
331 for (i = 0; i < fid_entry->pollfds.nfds; i++) {
332 ret = ofi_wait_fdset_add(wait_fd, fds[i].fd, fds[i].events,
333 fid_entry->fid->context);
334 if (ret) {
335 FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
336 "unable to add fd %s\n", fi_strerror(ret));
337 return ret;
338 }
339 }
340
341 return -FI_EAGAIN;
342 }
343
util_wait_fd_try(struct util_wait * wait)344 static int util_wait_fd_try(struct util_wait *wait)
345 {
346 struct ofi_wait_fid_entry *fid_entry;
347 struct ofi_wait_fd_entry *fd_entry;
348 struct util_wait_fd *wait_fd;
349 void *context;
350 int ret;
351
352 wait_fd = container_of(wait, struct util_wait_fd, util_wait);
353 fd_signal_reset(&wait_fd->signal);
354 fastlock_acquire(&wait->lock);
355 dlist_foreach_container(&wait_fd->fd_list, struct ofi_wait_fd_entry,
356 fd_entry, entry) {
357 ret = fd_entry->wait_try(fd_entry->arg);
358 if (ret != FI_SUCCESS)
359 goto release;
360 }
361
362 dlist_foreach_container(&wait->fid_list,
363 struct ofi_wait_fid_entry, fid_entry, entry) {
364 if (fid_entry->wait_obj == FI_WAIT_POLLFD) {
365 ret = util_wait_update_pollfd(wait_fd, fid_entry);
366 if (ret)
367 goto release;
368 }
369
370 ret = fid_entry->wait_try(fid_entry->fid);
371 if (ret != FI_SUCCESS)
372 goto release;
373 }
374
375 fastlock_release(&wait->lock);
376 ret = fi_poll(&wait->pollset->poll_fid, &context, 1);
377 return (ret > 0) ? -FI_EAGAIN : (ret == -FI_EAGAIN) ? FI_SUCCESS : ret;
378
379 release:
380 fastlock_release(&wait->lock);
381 return ret;
382 }
383
util_wait_fd_run(struct fid_wait * wait_fid,int timeout)384 static int util_wait_fd_run(struct fid_wait *wait_fid, int timeout)
385 {
386 struct util_wait_fd *wait;
387 uint64_t endtime;
388 void *ep_context[1];
389 int ret;
390
391 wait = container_of(wait_fid, struct util_wait_fd, util_wait.wait_fid);
392 endtime = ofi_timeout_time(timeout);
393
394 while (1) {
395 ret = wait->util_wait.wait_try(&wait->util_wait);
396 if (ret)
397 return ret == -FI_EAGAIN ? 0 : ret;
398
399 if (ofi_adjust_timeout(endtime, &timeout))
400 return -FI_ETIMEDOUT;
401
402 ret = (wait->util_wait.wait_obj == FI_WAIT_FD) ?
403 ofi_epoll_wait(wait->epoll_fd, ep_context, 1, timeout) :
404 ofi_pollfds_wait(wait->pollfds, ep_context, 1, timeout);
405 if (ret > 0)
406 return FI_SUCCESS;
407
408 if (ret < 0) {
409 FI_WARN(wait->util_wait.prov, FI_LOG_FABRIC,
410 "poll failed\n");
411 return ret;
412 }
413 }
414 }
415
util_wait_fd_control(struct fid * fid,int command,void * arg)416 static int util_wait_fd_control(struct fid *fid, int command, void *arg)
417 {
418 struct util_wait_fd *wait;
419 struct fi_wait_pollfd *pollfd;
420 int ret;
421
422 wait = container_of(fid, struct util_wait_fd, util_wait.wait_fid.fid);
423 switch (command) {
424 case FI_GETWAIT:
425 if (wait->util_wait.wait_obj == FI_WAIT_FD) {
426 #ifdef HAVE_EPOLL
427 *(int *) arg = wait->epoll_fd;
428 return 0;
429 #else
430 return -FI_ENODATA;
431 #endif
432 }
433
434 pollfd = arg;
435 fastlock_acquire(&wait->util_wait.lock);
436 if (pollfd->nfds >= wait->pollfds->nfds) {
437 memcpy(pollfd->fd, &wait->pollfds->fds[0],
438 wait->pollfds->nfds * sizeof(*wait->pollfds->fds));
439 ret = 0;
440 } else {
441 ret = -FI_ETOOSMALL;
442 }
443 pollfd->change_index = wait->change_index;
444 pollfd->nfds = wait->pollfds->nfds;
445 fastlock_release(&wait->util_wait.lock);
446 break;
447 case FI_GETWAITOBJ:
448 *(enum fi_wait_obj *) arg = wait->util_wait.wait_obj;
449 ret = 0;
450 break;
451 default:
452 FI_INFO(wait->util_wait.prov, FI_LOG_FABRIC,
453 "unsupported command\n");
454 ret = -FI_ENOSYS;
455 break;
456 }
457 return ret;
458 }
459
util_wait_fd_close(struct fid * fid)460 static int util_wait_fd_close(struct fid *fid)
461 {
462 struct util_wait_fd *wait;
463 struct ofi_wait_fd_entry *fd_entry;
464 int ret;
465
466 wait = container_of(fid, struct util_wait_fd, util_wait.wait_fid.fid);
467
468 fastlock_acquire(&wait->util_wait.lock);
469 while (!dlist_empty(&wait->fd_list)) {
470 dlist_pop_front(&wait->fd_list, struct ofi_wait_fd_entry,
471 fd_entry, entry);
472 ofi_wait_fdset_del(wait, fd_entry->fd);
473 free(fd_entry);
474 }
475 fastlock_release(&wait->util_wait.lock);
476
477 ret = fi_wait_cleanup(&wait->util_wait);
478 if (ret)
479 return ret;
480
481 ofi_wait_fdset_del(wait, wait->signal.fd[FI_READ_FD]);
482 fd_signal_free(&wait->signal);
483
484 if (wait->util_wait.wait_obj == FI_WAIT_FD)
485 ofi_epoll_close(wait->epoll_fd);
486 else
487 ofi_epoll_close(wait->epoll_fd);
488 free(wait);
489 return 0;
490 }
491
492 static struct fi_ops_wait util_wait_fd_ops = {
493 .size = sizeof(struct fi_ops_wait),
494 .wait = util_wait_fd_run,
495 };
496
497 static struct fi_ops util_wait_fd_fi_ops = {
498 .size = sizeof(struct fi_ops),
499 .close = util_wait_fd_close,
500 .bind = fi_no_bind,
501 .control = util_wait_fd_control,
502 .ops_open = fi_no_ops_open,
503 };
504
util_verify_wait_fd_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)505 static int util_verify_wait_fd_attr(const struct fi_provider *prov,
506 const struct fi_wait_attr *attr)
507 {
508 int ret;
509
510 ret = ofi_check_wait_attr(prov, attr);
511 if (ret)
512 return ret;
513
514 switch (attr->wait_obj) {
515 case FI_WAIT_UNSPEC:
516 case FI_WAIT_FD:
517 case FI_WAIT_POLLFD:
518 break;
519 default:
520 FI_WARN(prov, FI_LOG_FABRIC, "unsupported wait object\n");
521 return -FI_EINVAL;
522 }
523
524 return 0;
525 }
526
ofi_wait_fd_open(struct fid_fabric * fabric_fid,struct fi_wait_attr * attr,struct fid_wait ** waitset)527 int ofi_wait_fd_open(struct fid_fabric *fabric_fid, struct fi_wait_attr *attr,
528 struct fid_wait **waitset)
529 {
530 struct util_fabric *fabric;
531 struct util_wait_fd *wait;
532 int ret;
533
534 fabric = container_of(fabric_fid, struct util_fabric, fabric_fid);
535 ret = util_verify_wait_fd_attr(fabric->prov, attr);
536 if (ret)
537 return ret;
538
539 wait = calloc(1, sizeof(*wait));
540 if (!wait)
541 return -FI_ENOMEM;
542
543 ret = ofi_wait_init(fabric, attr, &wait->util_wait);
544 if (ret)
545 goto err1;
546
547 wait->util_wait.signal = util_wait_fd_signal;
548 wait->util_wait.wait_try = util_wait_fd_try;
549 ret = fd_signal_init(&wait->signal);
550 if (ret)
551 goto err2;
552
553 ret = (wait->util_wait.wait_obj == FI_WAIT_FD) ?
554 ofi_epoll_create(&wait->epoll_fd) :
555 ofi_pollfds_create(&wait->pollfds);
556 if (ret)
557 goto err3;
558
559 ret = ofi_wait_fdset_add(wait, wait->signal.fd[FI_READ_FD],
560 POLLIN, &wait->util_wait.wait_fid.fid);
561 if (ret)
562 goto err4;
563
564 wait->util_wait.wait_fid.fid.ops = &util_wait_fd_fi_ops;
565 wait->util_wait.wait_fid.ops = &util_wait_fd_ops;
566
567 dlist_init(&wait->fd_list);
568
569 *waitset = &wait->util_wait.wait_fid;
570 return 0;
571
572 err4:
573 if (wait->util_wait.wait_obj == FI_WAIT_FD)
574 ofi_epoll_close(wait->epoll_fd);
575 else
576 ofi_pollfds_close(wait->pollfds);
577 err3:
578 fd_signal_free(&wait->signal);
579 err2:
580 fi_wait_cleanup(&wait->util_wait);
581 err1:
582 free(wait);
583 return ret;
584 }
585
util_wait_yield_signal(struct util_wait * util_wait)586 static void util_wait_yield_signal(struct util_wait *util_wait)
587 {
588 struct util_wait_yield *wait_yield;
589
590 wait_yield = container_of(util_wait, struct util_wait_yield, util_wait);
591
592 fastlock_acquire(&wait_yield->signal_lock);
593 wait_yield->signal = 1;
594 fastlock_release(&wait_yield->signal_lock);
595 }
596
util_wait_yield_run(struct fid_wait * wait_fid,int timeout)597 static int util_wait_yield_run(struct fid_wait *wait_fid, int timeout)
598 {
599 struct util_wait_yield *wait;
600 struct ofi_wait_fid_entry *fid_entry;
601 int ret = 0;
602
603 wait = container_of(wait_fid, struct util_wait_yield, util_wait.wait_fid);
604 while (!wait->signal) {
605 fastlock_acquire(&wait->util_wait.lock);
606 dlist_foreach_container(&wait->util_wait.fid_list,
607 struct ofi_wait_fid_entry,
608 fid_entry, entry) {
609 ret = fid_entry->wait_try(fid_entry->fid);
610 if (ret) {
611 fastlock_release(&wait->util_wait.lock);
612 return ret;
613 }
614 }
615 fastlock_release(&wait->util_wait.lock);
616 pthread_yield();
617 }
618
619 fastlock_acquire(&wait->signal_lock);
620 wait->signal = 0;
621 fastlock_release(&wait->signal_lock);
622
623 return FI_SUCCESS;
624 }
625
util_wait_yield_close(struct fid * fid)626 static int util_wait_yield_close(struct fid *fid)
627 {
628 struct util_wait_yield *wait;
629 int ret;
630
631 wait = container_of(fid, struct util_wait_yield, util_wait.wait_fid.fid);
632 ret = fi_wait_cleanup(&wait->util_wait);
633 if (ret)
634 return ret;
635
636 fastlock_destroy(&wait->signal_lock);
637 free(wait);
638 return 0;
639 }
640
641 static struct fi_ops_wait util_wait_yield_ops = {
642 .size = sizeof(struct fi_ops_wait),
643 .wait = util_wait_yield_run,
644 };
645
646 static struct fi_ops util_wait_yield_fi_ops = {
647 .size = sizeof(struct fi_ops),
648 .close = util_wait_yield_close,
649 .bind = fi_no_bind,
650 .control = fi_no_control,
651 .ops_open = fi_no_ops_open,
652 };
653
util_verify_wait_yield_attr(const struct fi_provider * prov,const struct fi_wait_attr * attr)654 static int util_verify_wait_yield_attr(const struct fi_provider *prov,
655 const struct fi_wait_attr *attr)
656 {
657 int ret;
658
659 ret = ofi_check_wait_attr(prov, attr);
660 if (ret)
661 return ret;
662
663 switch (attr->wait_obj) {
664 case FI_WAIT_UNSPEC:
665 case FI_WAIT_YIELD:
666 break;
667 default:
668 FI_WARN(prov, FI_LOG_FABRIC, "unsupported wait object\n");
669 return -FI_EINVAL;
670 }
671
672 return 0;
673 }
674
ofi_wait_yield_open(struct fid_fabric * fabric_fid,struct fi_wait_attr * attr,struct fid_wait ** waitset)675 int ofi_wait_yield_open(struct fid_fabric *fabric_fid, struct fi_wait_attr *attr,
676 struct fid_wait **waitset)
677 {
678 struct util_fabric *fabric;
679 struct util_wait_yield *wait;
680 int ret;
681
682 fabric = container_of(fabric_fid, struct util_fabric, fabric_fid);
683 ret = util_verify_wait_yield_attr(fabric->prov, attr);
684 if (ret)
685 return ret;
686
687 attr->wait_obj = FI_WAIT_YIELD;
688 wait = calloc(1, sizeof(*wait));
689 if (!wait)
690 return -FI_ENOMEM;
691
692 ret = ofi_wait_init(fabric, attr, &wait->util_wait);
693 if (ret) {
694 free(wait);
695 return ret;
696 }
697
698 wait->util_wait.signal = util_wait_yield_signal;
699 wait->signal = 0;
700
701 wait->util_wait.wait_fid.fid.ops = &util_wait_yield_fi_ops;
702 wait->util_wait.wait_fid.ops = &util_wait_yield_ops;
703
704 fastlock_init(&wait->signal_lock);
705
706 *waitset = &wait->util_wait.wait_fid;
707
708 return 0;
709 }
710
ofi_wait_match_fid(struct dlist_entry * item,const void * arg)711 static int ofi_wait_match_fid(struct dlist_entry *item, const void *arg)
712 {
713 struct ofi_wait_fid_entry *fid_entry;
714
715 fid_entry = container_of(item, struct ofi_wait_fid_entry, entry);
716 return fid_entry->fid == arg;
717 }
718
ofi_wait_del_fid(struct util_wait * wait,fid_t fid)719 int ofi_wait_del_fid(struct util_wait *wait, fid_t fid)
720 {
721 struct ofi_wait_fid_entry *fid_entry;
722 struct util_wait_fd *wait_fd;
723 struct dlist_entry *entry;
724 struct pollfd *fds;
725 size_t i;
726 int ret = 0;
727
728 fastlock_acquire(&wait->lock);
729 entry = dlist_find_first_match(&wait->fid_list,
730 ofi_wait_match_fid, fid);
731 if (!entry) {
732 FI_INFO(wait->prov, FI_LOG_EP_CTRL,
733 "Given fid (%p) not found in wait list - %p\n",
734 fid, wait);
735 ret = -FI_EINVAL;
736 goto out;
737 }
738
739 fid_entry = container_of(entry, struct ofi_wait_fid_entry, entry);
740 if (ofi_atomic_dec32(&fid_entry->ref))
741 goto out;
742
743 wait_fd = container_of(wait, struct util_wait_fd, util_wait);
744 fds = fid_entry->pollfds.fd;
745 for (i = 0; i < fid_entry->pollfds.nfds; i++) {
746 assert(fds);
747 ret = ofi_wait_fdset_del(wait_fd, fds->fd);
748 if (ret) {
749 FI_WARN(wait->prov, FI_LOG_EP_CTRL,
750 "epoll_del failed %s\n", fi_strerror(ret));
751 }
752 }
753
754 dlist_remove(&fid_entry->entry);
755 free(fid_entry->pollfds.fd);
756 free(fid_entry);
757 out:
758 fastlock_release(&wait->lock);
759 return ret;
760 }
761
ofi_wait_get_fd(struct util_wait_fd * wait_fd,struct ofi_wait_fid_entry * fid_entry)762 static int ofi_wait_get_fd(struct util_wait_fd *wait_fd,
763 struct ofi_wait_fid_entry *fid_entry)
764 {
765 struct pollfd *fds;
766 int ret;
767
768 fds = calloc(1, sizeof(*fds));
769 if (!fds)
770 return -FI_ENOMEM;
771
772 ret = fi_control(fid_entry->fid, FI_GETWAIT, &fds->fd);
773 if (ret) {
774 FI_WARN(wait_fd->util_wait.prov, FI_LOG_EP_CTRL,
775 "unable to get wait fd %s\n", fi_strerror(ret));
776 goto free;
777 }
778
779 fds->events = fid_entry->events;
780 fid_entry->pollfds.fd = fds;
781 fid_entry->pollfds.nfds = 1;
782 return 0;
783
784 free:
785 free(fds);
786 return ret;
787 }
788
ofi_wait_get_fid_fds(struct util_wait * wait,struct ofi_wait_fid_entry * fid_entry)789 static int ofi_wait_get_fid_fds(struct util_wait *wait,
790 struct ofi_wait_fid_entry *fid_entry)
791 {
792 struct util_wait_fd *wait_fd;
793 struct pollfd *fds;
794 size_t i;
795 int ret;
796
797 ret = fi_control(fid_entry->fid, FI_GETWAITOBJ,
798 &fid_entry->wait_obj);
799 if ((fid_entry->wait_obj != FI_WAIT_FD) &&
800 (fid_entry->wait_obj != FI_WAIT_POLLFD)) {
801 FI_WARN(wait->prov, FI_LOG_EP_CTRL,
802 "unsupported wait object %d (ret: %s)\n",
803 fid_entry->wait_obj, fi_strerror(ret));
804 return ret;
805 }
806
807 /* pollfd is updated during trywait */
808 if (fid_entry->wait_obj == FI_WAIT_POLLFD)
809 return 0;
810
811 wait_fd = container_of(wait, struct util_wait_fd, util_wait);
812 ret = ofi_wait_get_fd(wait_fd, fid_entry);
813 if (ret)
814 return ret;
815
816 fds = fid_entry->pollfds.fd;
817 for (i = 0; i < fid_entry->pollfds.nfds; i++) {
818 ret = ofi_wait_fdset_add(wait_fd, fds[i].fd, fds[i].events,
819 fid_entry->fid->context);
820 if (ret) {
821 FI_WARN(wait->prov, FI_LOG_EP_CTRL,
822 "unable to add fd %s\n", fi_strerror(ret));
823 return ret;
824 }
825 }
826
827 return 0;
828 }
829
ofi_wait_add_fid(struct util_wait * wait,fid_t fid,uint32_t events,ofi_wait_try_func wait_try)830 int ofi_wait_add_fid(struct util_wait *wait, fid_t fid, uint32_t events,
831 ofi_wait_try_func wait_try)
832 {
833 struct ofi_wait_fid_entry *fid_entry;
834 struct dlist_entry *entry;
835 int ret = 0;
836
837 fastlock_acquire(&wait->lock);
838 entry = dlist_find_first_match(&wait->fid_list,
839 ofi_wait_match_fid, fid);
840 if (entry) {
841 FI_DBG(wait->prov, FI_LOG_EP_CTRL,
842 "Given fid (%p) already added to wait list - %p \n",
843 fid, wait);
844 fid_entry = container_of(entry, struct ofi_wait_fid_entry, entry);
845 ofi_atomic_inc32(&fid_entry->ref);
846 goto out;
847 }
848
849 fid_entry = calloc(1, sizeof *fid_entry);
850 if (!fid_entry) {
851 ret = -FI_ENOMEM;
852 goto out;
853 }
854
855 fid_entry->fid = fid;
856 fid_entry->wait_try = wait_try;
857 fid_entry->events = events;
858 ofi_atomic_initialize32(&fid_entry->ref, 1);
859
860 if (wait->wait_obj == FI_WAIT_FD || wait->wait_obj == FI_WAIT_POLLFD) {
861 ret = ofi_wait_get_fid_fds(wait, fid_entry);
862 if (ret) {
863 free(fid_entry);
864 goto out;
865 }
866 }
867 dlist_insert_tail(&fid_entry->entry, &wait->fid_list);
868 out:
869 fastlock_release(&wait->lock);
870 return ret;
871 }
872