1 /*
2 * Copyright (c) 2013-2016 Intel Corporation. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32
33 #include <stdlib.h>
34 #include <string.h>
35
36 #include <ofi_enosys.h>
37 #include <ofi_util.h>
38
39 #define UTIL_DEF_CQ_SIZE (1024)
40
41 /* Caller must hold `cq_lock` */
ofi_cq_write_overflow(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)42 int ofi_cq_write_overflow(struct util_cq *cq, void *context, uint64_t flags, size_t len,
43 void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
44 {
45 struct util_cq_oflow_err_entry *entry;
46
47 assert(ofi_cirque_isfull(cq->cirq));
48
49 if (!(entry = calloc(1, sizeof(*entry))))
50 return -FI_ENOMEM;
51
52 entry->parent_comp = ofi_cirque_tail(cq->cirq);
53 entry->parent_comp->flags |= UTIL_FLAG_OVERFLOW;
54
55 entry->comp.op_context = context;
56 entry->comp.flags = flags;
57 entry->comp.len = len;
58 entry->comp.buf = buf;
59 entry->comp.data = data;
60 entry->comp.tag = tag;
61
62 entry->src = src;
63 slist_insert_tail(&entry->list_entry, &cq->oflow_err_list);
64
65 return 0;
66 }
67
ofi_cq_write_error(struct util_cq * cq,const struct fi_cq_err_entry * err_entry)68 int ofi_cq_write_error(struct util_cq *cq,
69 const struct fi_cq_err_entry *err_entry)
70 {
71 struct util_cq_oflow_err_entry *entry;
72 struct fi_cq_tagged_entry *comp;
73
74 assert(err_entry->err);
75
76 if (!(entry = calloc(1, sizeof(*entry))))
77 return -FI_ENOMEM;
78
79 entry->comp = *err_entry;
80 cq->cq_fastlock_acquire(&cq->cq_lock);
81 slist_insert_tail(&entry->list_entry, &cq->oflow_err_list);
82
83 if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
84 comp = ofi_cirque_tail(cq->cirq);
85 comp->flags |= (UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
86 entry->parent_comp = ofi_cirque_tail(cq->cirq);
87 } else {
88 comp = ofi_cirque_tail(cq->cirq);
89 comp->flags = UTIL_FLAG_ERROR;
90 ofi_cirque_commit(cq->cirq);
91 }
92 cq->cq_fastlock_release(&cq->cq_lock);
93 if (cq->wait)
94 cq->wait->signal(cq->wait);
95 return 0;
96 }
97
ofi_cq_write_error_peek(struct util_cq * cq,uint64_t tag,void * context)98 int ofi_cq_write_error_peek(struct util_cq *cq, uint64_t tag, void *context)
99 {
100 struct fi_cq_err_entry err_entry = {
101 .op_context = context,
102 .flags = FI_TAGGED | FI_RECV,
103 .tag = tag,
104 .err = FI_ENOMSG,
105 .prov_errno = -FI_ENOMSG,
106 };
107 return ofi_cq_write_error(cq, &err_entry);
108 }
109
ofi_cq_write_error_trunc(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,size_t olen)110 int ofi_cq_write_error_trunc(struct util_cq *cq, void *context, uint64_t flags,
111 size_t len, void *buf, uint64_t data, uint64_t tag,
112 size_t olen)
113 {
114 struct fi_cq_err_entry err_entry = {
115 .op_context = context,
116 .flags = flags,
117 .len = len,
118 .buf = buf,
119 .data = data,
120 .tag = tag,
121 .olen = olen,
122 .err = FI_ETRUNC,
123 .prov_errno = -FI_ETRUNC,
124 };
125 return ofi_cq_write_error(cq, &err_entry);
126 }
127
ofi_check_cq_attr(const struct fi_provider * prov,const struct fi_cq_attr * attr)128 int ofi_check_cq_attr(const struct fi_provider *prov,
129 const struct fi_cq_attr *attr)
130 {
131 switch (attr->format) {
132 case FI_CQ_FORMAT_UNSPEC:
133 case FI_CQ_FORMAT_CONTEXT:
134 case FI_CQ_FORMAT_MSG:
135 case FI_CQ_FORMAT_DATA:
136 case FI_CQ_FORMAT_TAGGED:
137 break;
138 default:
139 FI_WARN(prov, FI_LOG_CQ, "unsupported format\n");
140 return -FI_EINVAL;
141 }
142
143 switch (attr->wait_obj) {
144 case FI_WAIT_NONE:
145 case FI_WAIT_YIELD:
146 break;
147 case FI_WAIT_SET:
148 if (!attr->wait_set) {
149 FI_WARN(prov, FI_LOG_CQ, "invalid wait set\n");
150 return -FI_EINVAL;
151 }
152 /* fall through */
153 case FI_WAIT_UNSPEC:
154 case FI_WAIT_FD:
155 case FI_WAIT_POLLFD:
156 switch (attr->wait_cond) {
157 case FI_CQ_COND_NONE:
158 case FI_CQ_COND_THRESHOLD:
159 break;
160 default:
161 FI_WARN(prov, FI_LOG_CQ, "unsupported wait cond\n");
162 return -FI_EINVAL;
163 }
164 break;
165 default:
166 FI_WARN(prov, FI_LOG_CQ, "unsupported wait object\n");
167 return -FI_EINVAL;
168 }
169
170 if (attr->flags & ~(FI_AFFINITY)) {
171 FI_WARN(prov, FI_LOG_CQ, "invalid flags\n");
172 return -FI_EINVAL;
173 }
174
175 if (attr->flags & FI_AFFINITY) {
176 FI_WARN(prov, FI_LOG_CQ, "signaling vector ignored\n");
177 }
178
179 return 0;
180 }
181
util_cq_read_ctx(void ** dst,void * src)182 static void util_cq_read_ctx(void **dst, void *src)
183 {
184 *(struct fi_cq_entry *) *dst = *(struct fi_cq_entry *) src;
185 *(char**)dst += sizeof(struct fi_cq_entry);
186 }
187
util_cq_read_msg(void ** dst,void * src)188 static void util_cq_read_msg(void **dst, void *src)
189 {
190 *(struct fi_cq_msg_entry *) *dst = *(struct fi_cq_msg_entry *) src;
191 *(char**)dst += sizeof(struct fi_cq_msg_entry);
192 }
193
util_cq_read_data(void ** dst,void * src)194 static void util_cq_read_data(void **dst, void *src)
195 {
196 *(struct fi_cq_data_entry *) *dst = *(struct fi_cq_data_entry *) src;
197 *(char**)dst += sizeof(struct fi_cq_data_entry);
198 }
199
util_cq_read_tagged(void ** dst,void * src)200 static void util_cq_read_tagged(void **dst, void *src)
201 {
202 *(struct fi_cq_tagged_entry *) *dst = *(struct fi_cq_tagged_entry *) src;
203 *(char **)dst += sizeof(struct fi_cq_tagged_entry);
204 }
205
206 static inline
util_cq_read_oflow_entry(struct util_cq * cq,struct util_cq_oflow_err_entry * oflow_entry,struct fi_cq_tagged_entry * cirq_entry,void ** buf,fi_addr_t * src_addr,ssize_t i)207 void util_cq_read_oflow_entry(struct util_cq *cq,
208 struct util_cq_oflow_err_entry *oflow_entry,
209 struct fi_cq_tagged_entry *cirq_entry,
210 void **buf, fi_addr_t *src_addr, ssize_t i)
211 {
212 if (src_addr && cq->src) {
213 src_addr[i] = cq->src[ofi_cirque_rindex(cq->cirq)];
214 cq->src[ofi_cirque_rindex(cq->cirq)] = oflow_entry->src;
215 }
216 cq->read_entry(buf, cirq_entry);
217 cirq_entry->op_context = oflow_entry->comp.op_context;
218 cirq_entry->flags = oflow_entry->comp.flags;
219 cirq_entry->len = oflow_entry->comp.len;
220 cirq_entry->buf = oflow_entry->comp.buf;
221 cirq_entry->data = oflow_entry->comp.data;
222 cirq_entry->tag = oflow_entry->comp.tag;
223 }
224
225 static inline
util_cq_read_entry(struct util_cq * cq,struct fi_cq_tagged_entry * entry,void ** buf,fi_addr_t * src_addr,ssize_t i)226 void util_cq_read_entry(struct util_cq *cq, struct fi_cq_tagged_entry *entry,
227 void **buf, fi_addr_t *src_addr, ssize_t i)
228 {
229 if (src_addr && cq->src)
230 src_addr[i] = cq->src[ofi_cirque_rindex(cq->cirq)];
231 cq->read_entry(buf, entry);
232 ofi_cirque_discard(cq->cirq);
233 }
234
ofi_cq_readfrom(struct fid_cq * cq_fid,void * buf,size_t count,fi_addr_t * src_addr)235 ssize_t ofi_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
236 fi_addr_t *src_addr)
237 {
238 struct util_cq *cq;
239 struct fi_cq_tagged_entry *entry;
240 ssize_t i;
241
242 cq = container_of(cq_fid, struct util_cq, cq_fid);
243
244 cq->cq_fastlock_acquire(&cq->cq_lock);
245 if (ofi_cirque_isempty(cq->cirq) || !count) {
246 cq->cq_fastlock_release(&cq->cq_lock);
247 cq->progress(cq);
248 cq->cq_fastlock_acquire(&cq->cq_lock);
249 if (ofi_cirque_isempty(cq->cirq)) {
250 i = -FI_EAGAIN;
251 goto out;
252 }
253 }
254
255 if (count > ofi_cirque_usedcnt(cq->cirq))
256 count = ofi_cirque_usedcnt(cq->cirq);
257
258 for (i = 0; i < (ssize_t)count; i++) {
259 entry = ofi_cirque_head(cq->cirq);
260 if (OFI_UNLIKELY(entry->flags & (UTIL_FLAG_ERROR |
261 UTIL_FLAG_OVERFLOW))) {
262 if (entry->flags & UTIL_FLAG_ERROR) {
263 struct util_cq_oflow_err_entry *oflow_err_entry =
264 container_of(cq->oflow_err_list.head,
265 struct util_cq_oflow_err_entry,
266 list_entry);
267 if (oflow_err_entry->comp.err) {
268 /* This handles case when the head of oflow_err_list is
269 * an error entry.
270 *
271 * NOTE: if this isn't an error entry, we have to handle
272 * overflow entries and then the error entries to ensure
273 * ordering. */
274 if (!i)
275 i = -FI_EAVAIL;
276 break;
277 }
278 }
279 if (entry->flags & UTIL_FLAG_OVERFLOW) {
280 assert(!slist_empty(&cq->oflow_err_list));
281 struct util_cq_oflow_err_entry *oflow_entry =
282 container_of(cq->oflow_err_list.head,
283 struct util_cq_oflow_err_entry,
284 list_entry);
285 if (oflow_entry->parent_comp != entry) {
286 /* Handle case when all overflow/error CQ entries were read
287 * for particular CIRQ entry */
288 entry->flags &= ~(UTIL_FLAG_OVERFLOW | UTIL_FLAG_ERROR);
289 } else {
290 uint64_t service_flags =
291 (entry->flags & (UTIL_FLAG_OVERFLOW | UTIL_FLAG_ERROR));
292 slist_remove_head(&cq->oflow_err_list);
293
294 entry->flags &= ~(service_flags);
295 util_cq_read_oflow_entry(cq, oflow_entry, entry,
296 &buf, src_addr, i);
297 /* To ensure checking of overflow CQ entries once again */
298 if (!slist_empty(&cq->oflow_err_list))
299 entry->flags |= service_flags;
300 free(oflow_entry);
301 continue;
302 }
303 }
304 }
305 util_cq_read_entry(cq, entry, &buf, src_addr, i);
306 }
307 out:
308 cq->cq_fastlock_release(&cq->cq_lock);
309 return i;
310 }
311
ofi_cq_read(struct fid_cq * cq_fid,void * buf,size_t count)312 ssize_t ofi_cq_read(struct fid_cq *cq_fid, void *buf, size_t count)
313 {
314 return ofi_cq_readfrom(cq_fid, buf, count, NULL);
315 }
316
ofi_cq_readerr(struct fid_cq * cq_fid,struct fi_cq_err_entry * buf,uint64_t flags)317 ssize_t ofi_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *buf,
318 uint64_t flags)
319 {
320 struct util_cq *cq;
321 struct util_cq_oflow_err_entry *err;
322 struct slist_entry *entry;
323 struct fi_cq_tagged_entry *cirq_entry;
324 char *err_buf_save;
325 size_t err_data_size;
326 uint32_t api_version;
327 ssize_t ret;
328
329 cq = container_of(cq_fid, struct util_cq, cq_fid);
330 api_version = cq->domain->fabric->fabric_fid.api_version;
331
332 cq->cq_fastlock_acquire(&cq->cq_lock);
333 if (ofi_cirque_isempty(cq->cirq) ||
334 !(ofi_cirque_head(cq->cirq)->flags & UTIL_FLAG_ERROR)) {
335 ret = -FI_EAGAIN;
336 goto unlock;
337 }
338
339 entry = slist_remove_head(&cq->oflow_err_list);
340 err = container_of(entry, struct util_cq_oflow_err_entry, list_entry);
341 if ((FI_VERSION_GE(api_version, FI_VERSION(1, 5))) && buf->err_data_size) {
342 err_data_size = MIN(buf->err_data_size, err->comp.err_data_size);
343 memcpy(buf->err_data, err->comp.err_data, err_data_size);
344 err_buf_save = buf->err_data;
345 *buf = err->comp;
346 buf->err_data = err_buf_save;
347 buf->err_data_size = err_data_size;
348 } else {
349 memcpy(buf, &err->comp, sizeof(struct fi_cq_err_entry_1_0));
350 }
351
352 cirq_entry = ofi_cirque_head(cq->cirq);
353 if (!(cirq_entry->flags & UTIL_FLAG_OVERFLOW)) {
354 ofi_cirque_discard(cq->cirq);
355 } else if (!slist_empty(&cq->oflow_err_list)) {
356 struct util_cq_oflow_err_entry *oflow_entry =
357 container_of(cq->oflow_err_list.head,
358 struct util_cq_oflow_err_entry,
359 list_entry);
360 if (oflow_entry->parent_comp != cirq_entry) {
361 /* The normal CQ entry were used to report error due to
362 * out of space in the circular queue. We have to unset
363 * UTIL_FLAG_ERROR and UTIL_FLAG_OVERFLOW flags */
364 cirq_entry->flags &= ~(UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
365 }
366 /* If the next entry in the oflow_err_list use the same entry from CIRQ to
367 * report error/overflow, don't unset UTIL_FLAG_ERRO and UTIL_FLAG_OVERFLOW
368 * flags to ensure the next round of handling overflow/error entries */
369 } else {
370 cirq_entry->flags &= ~(UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
371 }
372
373 ret = 1;
374 free(err);
375 unlock:
376 cq->cq_fastlock_release(&cq->cq_lock);
377 return ret;
378 }
379
ofi_cq_sreadfrom(struct fid_cq * cq_fid,void * buf,size_t count,fi_addr_t * src_addr,const void * cond,int timeout)380 ssize_t ofi_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
381 fi_addr_t *src_addr, const void *cond, int timeout)
382 {
383 struct util_cq *cq;
384 uint64_t endtime;
385 int ret;
386
387 cq = container_of(cq_fid, struct util_cq, cq_fid);
388 assert(cq->wait && cq->internal_wait);
389 endtime = ofi_timeout_time(timeout);
390
391 do {
392 ret = ofi_cq_readfrom(cq_fid, buf, count, src_addr);
393 if (ret != -FI_EAGAIN)
394 break;
395
396 if (ofi_adjust_timeout(endtime, &timeout))
397 return -FI_EAGAIN;
398
399 if (ofi_atomic_get32(&cq->signaled)) {
400 ofi_atomic_set32(&cq->signaled, 0);
401 return -FI_EAGAIN;
402 }
403
404 ret = fi_wait(&cq->wait->wait_fid, timeout);
405 } while (!ret);
406
407 return ret == -FI_ETIMEDOUT ? -FI_EAGAIN : ret;
408 }
409
ofi_cq_sread(struct fid_cq * cq_fid,void * buf,size_t count,const void * cond,int timeout)410 ssize_t ofi_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
411 const void *cond, int timeout)
412 {
413 return ofi_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout);
414 }
415
ofi_cq_signal(struct fid_cq * cq_fid)416 int ofi_cq_signal(struct fid_cq *cq_fid)
417 {
418 struct util_cq *cq = container_of(cq_fid, struct util_cq, cq_fid);
419 ofi_atomic_set32(&cq->signaled, 1);
420 util_cq_signal(cq);
421 return 0;
422 }
423
util_cq_strerror(struct fid_cq * cq,int prov_errno,const void * err_data,char * buf,size_t len)424 static const char *util_cq_strerror(struct fid_cq *cq, int prov_errno,
425 const void *err_data, char *buf, size_t len)
426 {
427 return fi_strerror(prov_errno);
428 }
429
430 static struct fi_ops_cq util_cq_ops = {
431 .size = sizeof(struct fi_ops_cq),
432 .read = ofi_cq_read,
433 .readfrom = ofi_cq_readfrom,
434 .readerr = ofi_cq_readerr,
435 .sread = ofi_cq_sread,
436 .sreadfrom = ofi_cq_sreadfrom,
437 .signal = ofi_cq_signal,
438 .strerror = util_cq_strerror,
439 };
440
ofi_cq_cleanup(struct util_cq * cq)441 int ofi_cq_cleanup(struct util_cq *cq)
442 {
443 struct util_cq_oflow_err_entry *err;
444 struct slist_entry *entry;
445
446 if (ofi_atomic_get32(&cq->ref))
447 return -FI_EBUSY;
448
449 while (!slist_empty(&cq->oflow_err_list)) {
450 entry = slist_remove_head(&cq->oflow_err_list);
451 err = container_of(entry, struct util_cq_oflow_err_entry, list_entry);
452 free(err);
453 }
454
455 if (cq->wait) {
456 fi_poll_del(&cq->wait->pollset->poll_fid,
457 &cq->cq_fid.fid, 0);
458 if (cq->internal_wait)
459 fi_close(&cq->wait->wait_fid.fid);
460 }
461
462 ofi_atomic_dec32(&cq->domain->ref);
463 util_comp_cirq_free(cq->cirq);
464 fastlock_destroy(&cq->cq_lock);
465 fastlock_destroy(&cq->ep_list_lock);
466 free(cq->src);
467 return 0;
468 }
469
ofi_cq_control(struct fid * fid,int command,void * arg)470 int ofi_cq_control(struct fid *fid, int command, void *arg)
471 {
472 struct util_cq *cq = container_of(fid, struct util_cq, cq_fid.fid);
473
474 switch (command) {
475 case FI_GETWAIT:
476 case FI_GETWAITOBJ:
477 if (!cq->wait)
478 return -FI_ENODATA;
479 return fi_control(&cq->wait->wait_fid.fid, command, arg);
480 default:
481 FI_INFO(cq->wait->prov, FI_LOG_CQ, "Unsupported command\n");
482 return -FI_ENOSYS;
483 }
484 }
485
util_cq_close(struct fid * fid)486 static int util_cq_close(struct fid *fid)
487 {
488 struct util_cq *cq;
489 int ret;
490
491 cq = container_of(fid, struct util_cq, cq_fid.fid);
492 ret = ofi_cq_cleanup(cq);
493 if (ret)
494 return ret;
495
496 free(cq);
497 return 0;
498 }
499
500 static struct fi_ops util_cq_fi_ops = {
501 .size = sizeof(struct fi_ops),
502 .close = util_cq_close,
503 .bind = fi_no_bind,
504 .control = ofi_cq_control,
505 .ops_open = fi_no_ops_open,
506 };
507
fi_cq_init(struct fid_domain * domain,struct fi_cq_attr * attr,fi_cq_read_func read_entry,struct util_cq * cq,void * context)508 static int fi_cq_init(struct fid_domain *domain, struct fi_cq_attr *attr,
509 fi_cq_read_func read_entry, struct util_cq *cq,
510 void *context)
511 {
512 struct fi_wait_attr wait_attr;
513 struct fid_wait *wait;
514 int ret;
515
516 cq->domain = container_of(domain, struct util_domain, domain_fid);
517 ofi_atomic_initialize32(&cq->ref, 0);
518 ofi_atomic_initialize32(&cq->signaled, 0);
519 dlist_init(&cq->ep_list);
520 fastlock_init(&cq->ep_list_lock);
521 fastlock_init(&cq->cq_lock);
522 if (cq->domain->threading == FI_THREAD_COMPLETION ||
523 (cq->domain->threading == FI_THREAD_DOMAIN)) {
524 cq->cq_fastlock_acquire = ofi_fastlock_acquire_noop;
525 cq->cq_fastlock_release = ofi_fastlock_release_noop;
526 } else {
527 cq->cq_fastlock_acquire = ofi_fastlock_acquire;
528 cq->cq_fastlock_release = ofi_fastlock_release;
529 }
530 slist_init(&cq->oflow_err_list);
531 cq->read_entry = read_entry;
532
533 cq->cq_fid.fid.fclass = FI_CLASS_CQ;
534 cq->cq_fid.fid.context = context;
535
536 switch (attr->wait_obj) {
537 case FI_WAIT_NONE:
538 wait = NULL;
539 break;
540 case FI_WAIT_UNSPEC:
541 case FI_WAIT_FD:
542 case FI_WAIT_POLLFD:
543 case FI_WAIT_MUTEX_COND:
544 case FI_WAIT_YIELD:
545 memset(&wait_attr, 0, sizeof wait_attr);
546 wait_attr.wait_obj = attr->wait_obj;
547 cq->internal_wait = 1;
548 ret = fi_wait_open(&cq->domain->fabric->fabric_fid,
549 &wait_attr, &wait);
550 if (ret)
551 return ret;
552 break;
553 case FI_WAIT_SET:
554 wait = attr->wait_set;
555 break;
556 default:
557 assert(0);
558 return -FI_EINVAL;
559 }
560
561 if (wait)
562 cq->wait = container_of(wait, struct util_wait, wait_fid);
563
564 ofi_atomic_inc32(&cq->domain->ref);
565 return 0;
566 }
567
ofi_check_bind_cq_flags(struct util_ep * ep,struct util_cq * cq,uint64_t flags)568 int ofi_check_bind_cq_flags(struct util_ep *ep, struct util_cq *cq,
569 uint64_t flags)
570 {
571 const struct fi_provider *prov = ep->domain->fabric->prov;
572
573 if (flags & ~(FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION)) {
574 FI_WARN(prov, FI_LOG_EP_CTRL,
575 "Unsupported flags\n");
576 return -FI_EBADFLAGS;
577 }
578
579 if (((flags & FI_TRANSMIT) && ep->tx_cq) ||
580 ((flags & FI_RECV) && ep->rx_cq)) {
581 FI_WARN(prov, FI_LOG_EP_CTRL,
582 "Duplicate CQ binding\n");
583 return -FI_EINVAL;
584 }
585
586 return FI_SUCCESS;
587 }
588
ofi_cq_progress(struct util_cq * cq)589 void ofi_cq_progress(struct util_cq *cq)
590 {
591 struct util_ep *ep;
592 struct fid_list_entry *fid_entry;
593 struct dlist_entry *item;
594
595 cq->cq_fastlock_acquire(&cq->ep_list_lock);
596 dlist_foreach(&cq->ep_list, item) {
597 fid_entry = container_of(item, struct fid_list_entry, entry);
598 ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
599 ep->progress(ep);
600
601 }
602 cq->cq_fastlock_release(&cq->ep_list_lock);
603 }
604
ofi_cq_init(const struct fi_provider * prov,struct fid_domain * domain,struct fi_cq_attr * attr,struct util_cq * cq,ofi_cq_progress_func progress,void * context)605 int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
606 struct fi_cq_attr *attr, struct util_cq *cq,
607 ofi_cq_progress_func progress, void *context)
608 {
609 fi_cq_read_func read_func;
610 int ret;
611
612 assert(progress);
613 ret = ofi_check_cq_attr(prov, attr);
614 if (ret)
615 return ret;
616
617 cq->cq_fid.fid.ops = &util_cq_fi_ops;
618 cq->cq_fid.ops = &util_cq_ops;
619 cq->progress = progress;
620
621 switch (attr->format) {
622 case FI_CQ_FORMAT_UNSPEC:
623 case FI_CQ_FORMAT_CONTEXT:
624 read_func = util_cq_read_ctx;
625 break;
626 case FI_CQ_FORMAT_MSG:
627 read_func = util_cq_read_msg;
628 break;
629 case FI_CQ_FORMAT_DATA:
630 read_func = util_cq_read_data;
631 break;
632 case FI_CQ_FORMAT_TAGGED:
633 read_func = util_cq_read_tagged;
634 break;
635 default:
636 assert(0);
637 return -FI_EINVAL;
638 }
639
640 ret = fi_cq_init(domain, attr, read_func, cq, context);
641 if (ret)
642 return ret;
643
644 /* CQ must be fully operational before adding to wait set */
645 if (cq->wait) {
646 ret = fi_poll_add(&cq->wait->pollset->poll_fid,
647 &cq->cq_fid.fid, 0);
648 if (ret) {
649 ofi_cq_cleanup(cq);
650 return ret;
651 }
652 }
653
654 cq->cirq = util_comp_cirq_create(attr->size == 0 ? UTIL_DEF_CQ_SIZE : attr->size);
655 if (!cq->cirq) {
656 ret = -FI_ENOMEM;
657 goto err1;
658 }
659
660 if (cq->domain->info_domain_caps & FI_SOURCE) {
661 cq->src = calloc(cq->cirq->size, sizeof *cq->src);
662 if (!cq->src) {
663 ret = -FI_ENOMEM;
664 goto err2;
665 }
666 }
667 return 0;
668
669 err2:
670 util_comp_cirq_free(cq->cirq);
671 err1:
672 ofi_cq_cleanup(cq);
673 return ret;
674 }
675
676 uint64_t ofi_rx_flags[] = {
677 [ofi_op_msg] = FI_RECV,
678 [ofi_op_tagged] = FI_RECV | FI_TAGGED,
679 [ofi_op_read_req] = FI_RMA | FI_REMOTE_READ,
680 [ofi_op_read_rsp] = FI_RMA | FI_REMOTE_READ,
681 [ofi_op_write] = FI_RMA | FI_REMOTE_WRITE,
682 [ofi_op_write_async] = FI_RMA | FI_REMOTE_WRITE,
683 [ofi_op_atomic] = FI_ATOMIC | FI_REMOTE_WRITE,
684 [ofi_op_atomic_fetch] = FI_ATOMIC | FI_REMOTE_READ,
685 [ofi_op_atomic_compare] = FI_ATOMIC | FI_REMOTE_READ,
686 [ofi_op_read_async] = FI_RMA | FI_READ,
687 };
688
689 uint64_t ofi_tx_flags[] = {
690 [ofi_op_msg] = FI_SEND,
691 [ofi_op_tagged] = FI_SEND | FI_TAGGED,
692 [ofi_op_read_req] = FI_RMA | FI_READ,
693 [ofi_op_read_rsp] = FI_RMA | FI_READ,
694 [ofi_op_write] = FI_RMA | FI_WRITE,
695 [ofi_op_write_async] = FI_RMA | FI_WRITE,
696 [ofi_op_atomic] = FI_ATOMIC | FI_WRITE,
697 [ofi_op_atomic_fetch] = FI_ATOMIC | FI_READ,
698 [ofi_op_atomic_compare] = FI_ATOMIC | FI_READ,
699 [ofi_op_read_async] = FI_RMA | FI_RMA,
700 };
701
702