1 /*
2 * Copyright (c) 2014-2017, Cisco Systems, Inc. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
28 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
29 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
30 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
31 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
32 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
33 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37 #include "config.h"
38
39 #include <asm/types.h>
40 #include <assert.h>
41 #include <errno.h>
42 #include <fcntl.h>
43 #include <netinet/in.h>
44 #include <poll.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <sys/queue.h>
49 #include <sys/eventfd.h>
50 #include <inttypes.h>
51
52 #include <rdma/fabric.h>
53 #include <rdma/fi_cm.h>
54 #include <rdma/fi_domain.h>
55 #include <rdma/fi_endpoint.h>
56 #include <rdma/fi_rma.h>
57 #include <rdma/fi_errno.h>
58 #include "ofi.h"
59 #include "ofi_enosys.h"
60
61 #include "usnic_direct.h"
62 #include "usd.h"
63 #include "usdf.h"
64 #include "usdf_wait.h"
65 #include "ofi_util.h"
66
67 static inline int
usdf_eq_empty(struct usdf_eq * eq)68 usdf_eq_empty(struct usdf_eq *eq)
69 {
70 return (ofi_atomic_get32(&eq->eq_num_events) == 0);
71 }
72
73 static inline int
usdf_eq_error(struct usdf_eq * eq)74 usdf_eq_error(struct usdf_eq *eq)
75 {
76 return ((eq->eq_ev_tail->ue_flags & USDF_EVENT_FLAG_ERROR) != 0);
77 }
78
79 /*
80 * read an event from the ring. Caller must hold eq lock, and caller
81 * needs to have checked for empty and error
82 */
usdf_eq_read_event(struct usdf_eq * eq,uint32_t * event,void * buf,size_t len,uint64_t flags)83 static inline ssize_t usdf_eq_read_event(struct usdf_eq *eq, uint32_t *event,
84 void *buf, size_t len, uint64_t flags)
85 {
86 struct usdf_event *ev;
87 size_t copylen;
88 ssize_t nbytes;
89 uint64_t val;
90
91 ev = eq->eq_ev_tail;
92
93 copylen = MIN(ev->ue_len, len);
94
95 if (copylen < ev->ue_len) {
96 USDF_WARN_SYS(EQ,
97 "buffer too small, got: %zu needed %zu\n",
98 copylen, ev->ue_len);
99 return -FI_ETOOSMALL;
100 }
101
102 /* copy out the event */
103 if (event)
104 *event = ev->ue_event;
105
106 memcpy(buf, ev->ue_buf, copylen);
107
108 if (!(flags & FI_PEEK)) {
109 /* update count */
110 ofi_atomic_dec32(&eq->eq_num_events);
111
112 /* Free the event buf if needed */
113 if (ev->ue_flags & USDF_EVENT_FLAG_FREE_BUF)
114 free(ev->ue_buf);
115
116 /* new tail */
117 eq->eq_ev_tail++;
118 if (eq->eq_ev_tail >= eq->eq_ev_end)
119 eq->eq_ev_tail = eq->eq_ev_ring;
120
121 /* consume the event in eventfd */
122 if (eq->eq_attr.wait_obj == FI_WAIT_FD) {
123 nbytes = read(eq->eq_fd, &val, sizeof(val));
124 if (nbytes != sizeof(val))
125 return -errno;
126 }
127 }
128
129 return copylen;
130 }
131
132 /*
133 * unconditionally write an event to the EQ. Caller is responsible for
134 * ensuring there is room. EQ must be locked.
135 */
136 static inline ssize_t
usdf_eq_write_event(struct usdf_eq * eq,uint32_t event,const void * buf,size_t len,uint64_t flags)137 usdf_eq_write_event(struct usdf_eq *eq, uint32_t event,
138 const void *buf, size_t len, uint64_t flags)
139 {
140 struct usdf_event *ev;
141 void *ev_buf;
142
143 ev = eq->eq_ev_head;
144 ev->ue_event = event;
145 ev->ue_len = len;
146 ev->ue_flags = flags;
147
148 /* save the event data if we can, else malloc() */
149 if (len <= sizeof(struct fi_eq_entry)) {
150 ev_buf = eq->eq_ev_buf + (ev - eq->eq_ev_ring);
151 } else {
152 ev_buf = malloc(len);
153 if (ev_buf == NULL) {
154 return -errno;
155 }
156 ev->ue_flags |= USDF_EVENT_FLAG_FREE_BUF;
157 }
158 memcpy(ev_buf, buf, len);
159 ev->ue_buf = ev_buf;
160
161 /* new head */
162 eq->eq_ev_head++;
163 if (eq->eq_ev_head >= eq->eq_ev_end) {
164 eq->eq_ev_head = eq->eq_ev_ring;
165 }
166
167 /* increment queued event count */
168 ofi_atomic_inc32(&eq->eq_num_events);
169
170 return len;
171 }
172
usdf_eq_clean_err(struct usdf_eq * eq,uint8_t destroy)173 static void usdf_eq_clean_err(struct usdf_eq *eq, uint8_t destroy)
174 {
175 struct usdf_err_data_entry *err_data_entry;
176 struct slist_entry *entry;
177
178 while (!slist_empty(&eq->eq_err_data)) {
179 entry = slist_remove_head(&eq->eq_err_data);
180 err_data_entry = container_of(entry, struct usdf_err_data_entry,
181 entry);
182 if (err_data_entry->seen || destroy) {
183 free(err_data_entry);
184 } else {
185 /* Oops, the rest hasn't been seen yet. Put this back
186 * and exit.
187 */
188 slist_insert_head(entry, &eq->eq_err_data);
189 break;
190 }
191 }
192 }
193
usdf_eq_readerr(struct fid_eq * feq,struct fi_eq_err_entry * given_buffer,uint64_t flags)194 static ssize_t usdf_eq_readerr(struct fid_eq *feq,
195 struct fi_eq_err_entry *given_buffer, uint64_t flags)
196 {
197 struct usdf_err_data_entry *err_data_entry;
198 struct fi_eq_err_entry entry;
199 struct usdf_eq *eq;
200 ssize_t ret, err_data_size;
201 uint32_t api_version;
202 void *err_data = NULL;
203
204 USDF_TRACE_SYS(EQ, "\n");
205
206 if (!feq) {
207 USDF_DBG_SYS(EQ, "invalid input\n");
208 return -FI_EINVAL;
209 }
210
211 eq = eq_ftou(feq);
212
213 pthread_spin_lock(&eq->eq_lock);
214
215 /* make sure there is an error on top */
216 if (usdf_eq_empty(eq) || !usdf_eq_error(eq)) {
217 pthread_spin_unlock(&eq->eq_lock);
218 ret = -FI_EAGAIN;
219 goto done;
220 }
221
222 ret = usdf_eq_read_event(eq, NULL, &entry, sizeof(entry), flags);
223
224 pthread_spin_unlock(&eq->eq_lock);
225
226 /* read the user's setting for err_data. */
227 err_data = given_buffer->err_data;
228 err_data_size = given_buffer->err_data_size;
229
230 /* Copy the entry. */
231 *given_buffer = entry;
232
233 /* Mark as seen so it can be cleaned on the next iteration of read. */
234 if (entry.err_data_size) {
235 err_data_entry = container_of(entry.err_data,
236 struct usdf_err_data_entry, err_data);
237 err_data_entry->seen = 1;
238 }
239
240
241 /* For release > 1.5, we will copy the err_data directly
242 * to the user's buffer.
243 */
244 api_version = eq->eq_fabric->fab_attr.fabric->api_version;
245 if (FI_VERSION_GE(api_version, FI_VERSION(1, 5))) {
246 given_buffer->err_data = err_data;
247 given_buffer->err_data_size =
248 MIN(err_data_size, entry.err_data_size);
249 memcpy(given_buffer->err_data, entry.err_data,
250 given_buffer->err_data_size);
251
252 if (err_data_size < entry.err_data_size) {
253 USDF_DBG_SYS(EQ, "err_data truncated by %zd bytes.\n",
254 entry.err_data_size - err_data_size);
255 }
256
257 usdf_eq_clean_err(eq, 0);
258 }
259
260 done:
261 return ret;
262 }
263
_usdf_eq_read(struct usdf_eq * eq,uint32_t * event,void * buf,size_t len,uint64_t flags)264 static ssize_t _usdf_eq_read(struct usdf_eq *eq, uint32_t *event, void *buf,
265 size_t len, uint64_t flags)
266 {
267 ssize_t ret;
268
269 pthread_spin_lock(&eq->eq_lock);
270
271 if (usdf_eq_empty(eq)) {
272 ret = -FI_EAGAIN;
273 goto done;
274 }
275
276 if (usdf_eq_error(eq)) {
277 ret = -FI_EAVAIL;
278 goto done;
279 }
280
281 if (!slist_empty(&eq->eq_err_data))
282 usdf_eq_clean_err(eq, 0);
283
284 ret = usdf_eq_read_event(eq, event, buf, len, flags);
285
286 done:
287 pthread_spin_unlock(&eq->eq_lock);
288 return ret;
289 }
290
usdf_eq_read(struct fid_eq * feq,uint32_t * event,void * buf,size_t len,uint64_t flags)291 static ssize_t usdf_eq_read(struct fid_eq *feq, uint32_t *event, void *buf,
292 size_t len, uint64_t flags)
293 {
294 struct usdf_eq *eq;
295
296 USDF_DBG_SYS(EQ, "\n");
297
298 eq = eq_ftou(feq);
299
300 /* Don't bother acquiring the lock if there is nothing to read. */
301 if (usdf_eq_empty(eq))
302 return -FI_EAGAIN;
303
304 return _usdf_eq_read(eq, event, buf, len, flags);
305 }
306
307 /* TODO: The timeout handling seems off on this one. */
usdf_eq_sread_fd(struct fid_eq * feq,uint32_t * event,void * buf,size_t len,int timeout,uint64_t flags)308 static ssize_t usdf_eq_sread_fd(struct fid_eq *feq, uint32_t *event, void *buf,
309 size_t len, int timeout, uint64_t flags)
310 {
311 struct usdf_eq *eq;
312 struct pollfd pfd;
313 int ret;
314
315 USDF_DBG_SYS(EQ, "\n");
316
317 eq = eq_ftou(feq);
318
319 /* Setup poll context to block until the FD becomes readable. */
320 pfd.fd = eq->eq_fd;
321 pfd.events = POLLIN;
322
323 retry:
324 ret = poll(&pfd, 1, timeout);
325 if (ret < 0)
326 return -errno;
327 else if (ret == 0)
328 return -FI_EAGAIN;
329
330 ret = _usdf_eq_read(eq, event, buf, len, flags);
331 if (ret == -FI_EAGAIN)
332 goto retry;
333
334 return ret;
335 }
336
usdf_eq_write_internal(struct usdf_eq * eq,uint32_t event,const void * buf,size_t len,uint64_t flags)337 ssize_t usdf_eq_write_internal(struct usdf_eq *eq, uint32_t event,
338 const void *buf, size_t len, uint64_t flags)
339 {
340 uint64_t val = 1;
341 int ret;
342 int n;
343
344 USDF_DBG_SYS(EQ, "event=%#" PRIx32 " flags=%#" PRIx64 "\n", event,
345 flags);
346
347 pthread_spin_lock(&eq->eq_lock);
348
349 /* Return -FI_EAGAIN if the EQ is full.
350 * TODO: Disable the EQ.
351 */
352 if (ofi_atomic_get32(&eq->eq_num_events) == eq->eq_ev_ring_size) {
353 ret = -FI_EAGAIN;
354 goto done;
355 }
356
357 ret = usdf_eq_write_event(eq, event, buf, len, flags);
358
359 /* If successful, post to eventfd */
360 if (ret >= 0 && eq->eq_attr.wait_obj == FI_WAIT_FD) {
361 n = write(eq->eq_fd, &val, sizeof(val));
362
363 /* TODO: If the write call fails, then roll back the EQ entry.
364 */
365 if (n != sizeof(val))
366 ret = -FI_EIO;
367 }
368
369 done:
370 pthread_spin_unlock(&eq->eq_lock);
371 return ret;
372 }
373
usdf_eq_write(struct fid_eq * feq,uint32_t event,const void * buf,size_t len,uint64_t flags)374 static ssize_t usdf_eq_write(struct fid_eq *feq, uint32_t event,
375 const void *buf, size_t len, uint64_t flags)
376 {
377 struct usdf_eq *eq;
378
379 USDF_DBG_SYS(EQ, "\n");
380
381 if (!feq) {
382 USDF_DBG_SYS(EQ, "invalid input\n");
383 return -FI_EINVAL;
384 }
385
386 eq = eq_ftou(feq);
387
388 return usdf_eq_write_internal(eq, event, buf, len, flags);
389 }
390
391 static const char *
usdf_eq_strerror(struct fid_eq * feq,int prov_errno,const void * err_data,char * buf,size_t len)392 usdf_eq_strerror(struct fid_eq *feq, int prov_errno, const void *err_data,
393 char *buf, size_t len)
394 {
395 return NULL;
396 }
397
usdf_eq_get_wait(struct usdf_eq * eq,void * arg)398 static int usdf_eq_get_wait(struct usdf_eq *eq, void *arg)
399 {
400 USDF_TRACE_SYS(EQ, "\n");
401
402 switch (eq->eq_attr.wait_obj) {
403 case FI_WAIT_FD:
404 *(int *) arg = eq->eq_fd;
405 break;
406 default:
407 USDF_WARN_SYS(EQ, "unsupported wait type\n");
408 return -FI_EINVAL;
409 }
410
411 return FI_SUCCESS;
412 }
413
414 static int
usdf_eq_control(fid_t fid,int command,void * arg)415 usdf_eq_control(fid_t fid, int command, void *arg)
416 {
417 struct usdf_eq *eq;
418
419 USDF_TRACE_SYS(EQ, "\n");
420
421 eq = eq_fidtou(fid);
422
423 switch (command) {
424 case FI_GETWAIT:
425 break;
426 default:
427 return -FI_EINVAL;
428 }
429
430 return usdf_eq_get_wait(eq, arg);
431 }
432
usdf_eq_bind_wait(struct usdf_eq * eq)433 static int usdf_eq_bind_wait(struct usdf_eq *eq)
434 {
435 int ret;
436 struct usdf_wait *wait_priv;
437
438 if (!eq->eq_attr.wait_set) {
439 USDF_DBG_SYS(EQ, "can't bind to non-existent wait set\n");
440 return -FI_EINVAL;
441 }
442
443 wait_priv = wait_ftou(eq->eq_attr.wait_set);
444
445 ret = fid_list_insert(&wait_priv->list, &wait_priv->lock,
446 &eq->eq_fid.fid);
447 if (ret) {
448 USDF_WARN_SYS(EQ,
449 "failed to associate eq with wait fid list\n");
450 return ret;
451 }
452
453 ret = ofi_epoll_add(wait_priv->object.epfd, eq->eq_fd, OFI_EPOLL_IN, eq);
454 if (ret) {
455 USDF_WARN_SYS(EQ, "failed to associate FD with wait set\n");
456 goto err;
457 }
458
459 USDF_DBG_SYS(EQ, "associated EQ FD %d with epoll FD %d using fid %p\n",
460 eq->eq_fd, wait_priv->object.epfd, &eq->eq_fid.fid);
461
462 return ret;
463
464 err:
465 fid_list_remove(&wait_priv->list, &wait_priv->lock, &eq->eq_fid.fid);
466 return ret;
467 }
468
usdf_eq_unbind_wait(struct usdf_eq * eq)469 static int usdf_eq_unbind_wait(struct usdf_eq *eq)
470 {
471 int ret;
472 struct usdf_wait *wait_priv;
473
474 if (!eq->eq_attr.wait_set) {
475 USDF_DBG_SYS(EQ, "can't unbind from non-existent wait set\n");
476 return -FI_EINVAL;
477 }
478
479 wait_priv = wait_ftou(eq->eq_attr.wait_set);
480
481 ret = ofi_epoll_del(wait_priv->object.epfd, eq->eq_fd);
482 if (ret) {
483 USDF_WARN_SYS(EQ,
484 "failed to remove FD from wait set\n");
485 return ret;
486 }
487
488 fid_list_remove(&wait_priv->list, &wait_priv->lock, &eq->eq_fid.fid);
489
490 ofi_atomic_dec32(&wait_priv->wait_refcnt);
491
492 USDF_DBG_SYS(EQ,
493 "dissasociated EQ FD %d from epoll FD %d using FID: %p\n",
494 eq->eq_fd, wait_priv->object.epfd, &eq->eq_fid.fid);
495
496 return FI_SUCCESS;
497 }
498
499 static int
usdf_eq_close(fid_t fid)500 usdf_eq_close(fid_t fid)
501 {
502 struct usdf_eq *eq;
503 int ret = FI_SUCCESS;
504
505 USDF_TRACE_SYS(EQ, "\n");
506
507 eq = eq_fidtou(fid);
508
509 if (ofi_atomic_get32(&eq->eq_refcnt) > 0) {
510 return -FI_EBUSY;
511 }
512 ofi_atomic_dec32(&eq->eq_fabric->fab_refcnt);
513
514 /* release wait obj */
515 switch (eq->eq_attr.wait_obj) {
516 case FI_WAIT_SET:
517 ret = usdf_eq_unbind_wait(eq);
518 /* FALLTHROUGH */
519 /* Need to close the FD used for wait set. */
520 case FI_WAIT_FD:
521 close(eq->eq_fd);
522 break;
523 default:
524 break;
525 }
526
527 /* Set destroy flag to clear everything out */
528 usdf_eq_clean_err(eq, 1);
529
530 free(eq->eq_ev_ring);
531 free(eq->eq_ev_buf);
532 free(eq);
533
534 return ret;
535 }
536
537 static struct fi_ops_eq usdf_eq_ops = {
538 .size = sizeof(struct fi_ops_eq),
539 .read = usdf_eq_read,
540 .readerr = usdf_eq_readerr,
541 .write = usdf_eq_write,
542 .sread = fi_no_eq_sread,
543 .strerror = usdf_eq_strerror,
544 };
545
546 static struct fi_ops usdf_eq_fi_ops = {
547 .size = sizeof(struct fi_ops),
548 .close = usdf_eq_close,
549 .bind = fi_no_bind,
550 .control = usdf_eq_control,
551 .ops_open = fi_no_ops_open,
552 };
553
554 int
usdf_eq_open(struct fid_fabric * fabric,struct fi_eq_attr * attr,struct fid_eq ** feq,void * context)555 usdf_eq_open(struct fid_fabric *fabric, struct fi_eq_attr *attr,
556 struct fid_eq **feq, void *context)
557 {
558 struct usdf_eq *eq;
559 struct usdf_fabric *fab;
560 int ret;
561
562 USDF_TRACE_SYS(EQ, "\n");
563
564 fab = fab_ftou(fabric);
565
566 eq = calloc(1, sizeof(*eq));
567 if (eq == NULL) {
568 ret = -errno;
569 goto fail;
570 }
571
572 /* fill in the EQ struct */
573 eq->eq_fid.fid.fclass = FI_CLASS_EQ;
574 eq->eq_fid.fid.context = context;
575 eq->eq_fid.fid.ops = &usdf_eq_fi_ops;
576 eq->eq_fid.ops = &eq->eq_ops_data;
577
578 eq->eq_fabric = fab;
579 ofi_atomic_initialize32(&eq->eq_refcnt, 0);
580 ret = pthread_spin_init(&eq->eq_lock, PTHREAD_PROCESS_PRIVATE);
581 if (ret != 0) {
582 ret = -ret;
583 goto fail;
584 }
585
586 slist_init(&eq->eq_err_data);
587
588 /* get baseline routines */
589 eq->eq_ops_data = usdf_eq_ops;
590
591 /* fill in sread based on wait type */
592 switch (attr->wait_obj) {
593 case FI_WAIT_NONE:
594 break;
595 case FI_WAIT_UNSPEC:
596 /* default to FD */
597 attr->wait_obj = FI_WAIT_FD;
598 /* FALLTHROUGH */
599 case FI_WAIT_FD:
600 eq->eq_ops_data.sread = usdf_eq_sread_fd;
601 /* FALLTHROUGH */
602 /* Don't set sread for wait set. */
603 case FI_WAIT_SET:
604 eq->eq_fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
605 if (eq->eq_fd == -1) {
606 ret = -errno;
607 goto fail;
608 }
609
610 if (attr->wait_obj == FI_WAIT_SET) {
611 ret = usdf_eq_bind_wait(eq);
612 if (ret)
613 goto fail;
614 }
615 break;
616 default:
617 ret = -FI_ENOSYS;
618 goto fail;
619 }
620
621 /*
622 * Dis-allow write if requested
623 */
624 if ((attr->flags & FI_WRITE) == 0) {
625 eq->eq_ops_data.write = fi_no_eq_write;
626 }
627
628 /*
629 * Allocate and initialize event ring
630 */
631 if (attr->size == 0) {
632 attr->size = 1024; // XXX
633 }
634 eq->eq_ev_ring = calloc(attr->size, sizeof(*eq->eq_ev_ring));
635 eq->eq_ev_buf = calloc(attr->size, sizeof(*eq->eq_ev_buf));
636 if (eq->eq_ev_ring == NULL || eq->eq_ev_buf == NULL) {
637 ret = -errno;
638 goto fail;
639 }
640 eq->eq_ev_head = eq->eq_ev_ring;
641 eq->eq_ev_tail = eq->eq_ev_ring;
642 eq->eq_ev_ring_size = attr->size;
643 eq->eq_ev_end = eq->eq_ev_ring + eq->eq_ev_ring_size;
644 ofi_atomic_initialize32(&eq->eq_num_events, 0);
645
646 ofi_atomic_inc32(&eq->eq_fabric->fab_refcnt);
647
648 eq->eq_attr = *attr;
649 *feq = eq_utof(eq);
650
651 return 0;
652
653 fail:
654 if (eq != NULL) {
655 free(eq->eq_ev_ring);
656 free(eq->eq_ev_buf);
657 free(eq);
658 }
659 return ret;
660 }
661