1 /*
2  * Copyright (c) 2013-2016 Intel Corporation. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include <stdlib.h>
34 #include <string.h>
35 
36 #include <ofi_enosys.h>
37 #include <ofi_util.h>
38 
39 #define UTIL_DEF_CQ_SIZE (1024)
40 
41 /* Caller must hold `cq_lock` */
ofi_cq_write_overflow(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)42 int ofi_cq_write_overflow(struct util_cq *cq, void *context, uint64_t flags, size_t len,
43 			  void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
44 {
45 	struct util_cq_oflow_err_entry *entry;
46 
47 	assert(ofi_cirque_isfull(cq->cirq));
48 
49 	if (!(entry = calloc(1, sizeof(*entry))))
50 		return -FI_ENOMEM;
51 
52 	entry->parent_comp = ofi_cirque_tail(cq->cirq);
53 	entry->parent_comp->flags |= UTIL_FLAG_OVERFLOW;
54 
55 	entry->comp.op_context = context;
56 	entry->comp.flags = flags;
57 	entry->comp.len = len;
58 	entry->comp.buf = buf;
59 	entry->comp.data = data;
60 	entry->comp.tag = tag;
61 
62 	entry->src = src;
63 	slist_insert_tail(&entry->list_entry, &cq->oflow_err_list);
64 
65 	return 0;
66 }
67 
ofi_cq_write_error(struct util_cq * cq,const struct fi_cq_err_entry * err_entry)68 int ofi_cq_write_error(struct util_cq *cq,
69 		       const struct fi_cq_err_entry *err_entry)
70 {
71 	struct util_cq_oflow_err_entry *entry;
72 	struct fi_cq_tagged_entry *comp;
73 
74 	assert(err_entry->err);
75 
76 	if (!(entry = calloc(1, sizeof(*entry))))
77 		return -FI_ENOMEM;
78 
79 	entry->comp = *err_entry;
80 	cq->cq_fastlock_acquire(&cq->cq_lock);
81 	slist_insert_tail(&entry->list_entry, &cq->oflow_err_list);
82 
83 	if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
84 		comp = ofi_cirque_tail(cq->cirq);
85 		comp->flags |= (UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
86 		entry->parent_comp = ofi_cirque_tail(cq->cirq);
87 	} else {
88 		comp = ofi_cirque_tail(cq->cirq);
89 		comp->flags = UTIL_FLAG_ERROR;
90 		ofi_cirque_commit(cq->cirq);
91 	}
92 	cq->cq_fastlock_release(&cq->cq_lock);
93 	if (cq->wait)
94 		cq->wait->signal(cq->wait);
95 	return 0;
96 }
97 
ofi_cq_write_error_peek(struct util_cq * cq,uint64_t tag,void * context)98 int ofi_cq_write_error_peek(struct util_cq *cq, uint64_t tag, void *context)
99 {
100 	struct fi_cq_err_entry err_entry = {
101 		.op_context	= context,
102 		.flags		= FI_TAGGED | FI_RECV,
103 		.tag		= tag,
104 		.err		= FI_ENOMSG,
105 		.prov_errno	= -FI_ENOMSG,
106 	};
107 	return ofi_cq_write_error(cq, &err_entry);
108 }
109 
ofi_cq_write_error_trunc(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,size_t olen)110 int ofi_cq_write_error_trunc(struct util_cq *cq, void *context, uint64_t flags,
111 			     size_t len, void *buf, uint64_t data, uint64_t tag,
112 			     size_t olen)
113 {
114 	struct fi_cq_err_entry err_entry = {
115 		.op_context	= context,
116 		.flags		= flags,
117 		.len		= len,
118 		.buf		= buf,
119 		.data		= data,
120 		.tag		= tag,
121 		.olen		= olen,
122 		.err		= FI_ETRUNC,
123 		.prov_errno	= -FI_ETRUNC,
124 	};
125 	return ofi_cq_write_error(cq, &err_entry);
126 }
127 
ofi_check_cq_attr(const struct fi_provider * prov,const struct fi_cq_attr * attr)128 int ofi_check_cq_attr(const struct fi_provider *prov,
129 		      const struct fi_cq_attr *attr)
130 {
131 	switch (attr->format) {
132 	case FI_CQ_FORMAT_UNSPEC:
133 	case FI_CQ_FORMAT_CONTEXT:
134 	case FI_CQ_FORMAT_MSG:
135 	case FI_CQ_FORMAT_DATA:
136 	case FI_CQ_FORMAT_TAGGED:
137 		break;
138 	default:
139 		FI_WARN(prov, FI_LOG_CQ, "unsupported format\n");
140 		return -FI_EINVAL;
141 	}
142 
143 	switch (attr->wait_obj) {
144 	case FI_WAIT_NONE:
145 	case FI_WAIT_YIELD:
146 		break;
147 	case FI_WAIT_SET:
148 		if (!attr->wait_set) {
149 			FI_WARN(prov, FI_LOG_CQ, "invalid wait set\n");
150 			return -FI_EINVAL;
151 		}
152 		/* fall through */
153 	case FI_WAIT_UNSPEC:
154 	case FI_WAIT_FD:
155 	case FI_WAIT_POLLFD:
156 		switch (attr->wait_cond) {
157 		case FI_CQ_COND_NONE:
158 		case FI_CQ_COND_THRESHOLD:
159 			break;
160 		default:
161 			FI_WARN(prov, FI_LOG_CQ, "unsupported wait cond\n");
162 			return -FI_EINVAL;
163 		}
164 		break;
165 	default:
166 		FI_WARN(prov, FI_LOG_CQ, "unsupported wait object\n");
167 		return -FI_EINVAL;
168 	}
169 
170 	if (attr->flags & ~(FI_AFFINITY)) {
171 		FI_WARN(prov, FI_LOG_CQ, "invalid flags\n");
172 		return -FI_EINVAL;
173 	}
174 
175 	if (attr->flags & FI_AFFINITY) {
176 		FI_WARN(prov, FI_LOG_CQ, "signaling vector ignored\n");
177 	}
178 
179 	return 0;
180 }
181 
util_cq_read_ctx(void ** dst,void * src)182 static void util_cq_read_ctx(void **dst, void *src)
183 {
184 	*(struct fi_cq_entry *) *dst = *(struct fi_cq_entry *) src;
185 	*(char**)dst += sizeof(struct fi_cq_entry);
186 }
187 
util_cq_read_msg(void ** dst,void * src)188 static void util_cq_read_msg(void **dst, void *src)
189 {
190 	*(struct fi_cq_msg_entry *) *dst = *(struct fi_cq_msg_entry *) src;
191 	*(char**)dst += sizeof(struct fi_cq_msg_entry);
192 }
193 
util_cq_read_data(void ** dst,void * src)194 static void util_cq_read_data(void **dst, void *src)
195 {
196 	*(struct fi_cq_data_entry *) *dst = *(struct fi_cq_data_entry *) src;
197 	*(char**)dst += sizeof(struct fi_cq_data_entry);
198 }
199 
util_cq_read_tagged(void ** dst,void * src)200 static void util_cq_read_tagged(void **dst, void *src)
201 {
202 	*(struct fi_cq_tagged_entry *) *dst = *(struct fi_cq_tagged_entry *) src;
203 	*(char **)dst += sizeof(struct fi_cq_tagged_entry);
204 }
205 
206 static inline
util_cq_read_oflow_entry(struct util_cq * cq,struct util_cq_oflow_err_entry * oflow_entry,struct fi_cq_tagged_entry * cirq_entry,void ** buf,fi_addr_t * src_addr,ssize_t i)207 void util_cq_read_oflow_entry(struct util_cq *cq,
208 			      struct util_cq_oflow_err_entry *oflow_entry,
209 			      struct fi_cq_tagged_entry *cirq_entry,
210 			      void **buf, fi_addr_t *src_addr, ssize_t i)
211 {
212 	if (src_addr && cq->src) {
213 		src_addr[i] = cq->src[ofi_cirque_rindex(cq->cirq)];
214 		cq->src[ofi_cirque_rindex(cq->cirq)] = oflow_entry->src;
215 	}
216 	cq->read_entry(buf, cirq_entry);
217 	cirq_entry->op_context = oflow_entry->comp.op_context;
218 	cirq_entry->flags = oflow_entry->comp.flags;
219 	cirq_entry->len = oflow_entry->comp.len;
220 	cirq_entry->buf = oflow_entry->comp.buf;
221 	cirq_entry->data = oflow_entry->comp.data;
222 	cirq_entry->tag = oflow_entry->comp.tag;
223 }
224 
225 static inline
util_cq_read_entry(struct util_cq * cq,struct fi_cq_tagged_entry * entry,void ** buf,fi_addr_t * src_addr,ssize_t i)226 void util_cq_read_entry(struct util_cq *cq, struct fi_cq_tagged_entry *entry,
227 			void **buf, fi_addr_t *src_addr, ssize_t i)
228 {
229 	if (src_addr && cq->src)
230 		src_addr[i] = cq->src[ofi_cirque_rindex(cq->cirq)];
231 	cq->read_entry(buf, entry);
232 	ofi_cirque_discard(cq->cirq);
233 }
234 
ofi_cq_readfrom(struct fid_cq * cq_fid,void * buf,size_t count,fi_addr_t * src_addr)235 ssize_t ofi_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
236 			fi_addr_t *src_addr)
237 {
238 	struct util_cq *cq;
239 	struct fi_cq_tagged_entry *entry;
240 	ssize_t i;
241 
242 	cq = container_of(cq_fid, struct util_cq, cq_fid);
243 
244 	cq->cq_fastlock_acquire(&cq->cq_lock);
245 	if (ofi_cirque_isempty(cq->cirq) || !count) {
246 		cq->cq_fastlock_release(&cq->cq_lock);
247 		cq->progress(cq);
248 		cq->cq_fastlock_acquire(&cq->cq_lock);
249 		if (ofi_cirque_isempty(cq->cirq)) {
250 			i = -FI_EAGAIN;
251 			goto out;
252 		}
253 	}
254 
255 	if (count > ofi_cirque_usedcnt(cq->cirq))
256 		count = ofi_cirque_usedcnt(cq->cirq);
257 
258 	for (i = 0; i < (ssize_t)count; i++) {
259 		entry = ofi_cirque_head(cq->cirq);
260 		if (OFI_UNLIKELY(entry->flags & (UTIL_FLAG_ERROR |
261 						 UTIL_FLAG_OVERFLOW))) {
262 			if (entry->flags & UTIL_FLAG_ERROR) {
263 				struct util_cq_oflow_err_entry *oflow_err_entry =
264 						container_of(cq->oflow_err_list.head,
265 							     struct util_cq_oflow_err_entry,
266 							     list_entry);
267 				if (oflow_err_entry->comp.err) {
268 					/* This handles case when the head of oflow_err_list is
269 					 * an error entry.
270 					 *
271 					 * NOTE: if this isn't an error entry, we have to handle
272 					 * overflow entries and then the error entries to ensure
273 					 * ordering. */
274 					if (!i)
275 						i = -FI_EAVAIL;
276 					break;
277 				}
278 			}
279 			if (entry->flags & UTIL_FLAG_OVERFLOW) {
280 				assert(!slist_empty(&cq->oflow_err_list));
281 				struct util_cq_oflow_err_entry *oflow_entry =
282 					container_of(cq->oflow_err_list.head,
283 						     struct util_cq_oflow_err_entry,
284 						     list_entry);
285 				if (oflow_entry->parent_comp != entry) {
286 					/* Handle case when all overflow/error CQ entries were read
287 					 * for particular CIRQ entry */
288 					entry->flags &= ~(UTIL_FLAG_OVERFLOW | UTIL_FLAG_ERROR);
289 				} else {
290 					uint64_t service_flags =
291 						(entry->flags & (UTIL_FLAG_OVERFLOW | UTIL_FLAG_ERROR));
292 					slist_remove_head(&cq->oflow_err_list);
293 
294 					entry->flags &= ~(service_flags);
295 					util_cq_read_oflow_entry(cq, oflow_entry, entry,
296 								 &buf, src_addr, i);
297 					/* To ensure checking of overflow CQ entries once again */
298 					if (!slist_empty(&cq->oflow_err_list))
299 						entry->flags |= service_flags;
300 					free(oflow_entry);
301 					continue;
302 				}
303 			}
304 		}
305 		util_cq_read_entry(cq, entry, &buf, src_addr, i);
306 	}
307 out:
308 	cq->cq_fastlock_release(&cq->cq_lock);
309 	return i;
310 }
311 
ofi_cq_read(struct fid_cq * cq_fid,void * buf,size_t count)312 ssize_t ofi_cq_read(struct fid_cq *cq_fid, void *buf, size_t count)
313 {
314 	return ofi_cq_readfrom(cq_fid, buf, count, NULL);
315 }
316 
ofi_cq_readerr(struct fid_cq * cq_fid,struct fi_cq_err_entry * buf,uint64_t flags)317 ssize_t ofi_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *buf,
318 		       uint64_t flags)
319 {
320 	struct util_cq *cq;
321 	struct util_cq_oflow_err_entry *err;
322 	struct slist_entry *entry;
323 	struct fi_cq_tagged_entry *cirq_entry;
324 	char *err_buf_save;
325 	size_t err_data_size;
326 	uint32_t api_version;
327 	ssize_t ret;
328 
329 	cq = container_of(cq_fid, struct util_cq, cq_fid);
330 	api_version = cq->domain->fabric->fabric_fid.api_version;
331 
332 	cq->cq_fastlock_acquire(&cq->cq_lock);
333 	if (ofi_cirque_isempty(cq->cirq) ||
334 	    !(ofi_cirque_head(cq->cirq)->flags & UTIL_FLAG_ERROR)) {
335 		ret = -FI_EAGAIN;
336 		goto unlock;
337 	}
338 
339 	entry = slist_remove_head(&cq->oflow_err_list);
340 	err = container_of(entry, struct util_cq_oflow_err_entry, list_entry);
341 	if ((FI_VERSION_GE(api_version, FI_VERSION(1, 5))) && buf->err_data_size) {
342 		err_data_size = MIN(buf->err_data_size, err->comp.err_data_size);
343 		memcpy(buf->err_data, err->comp.err_data, err_data_size);
344 		err_buf_save = buf->err_data;
345 		*buf = err->comp;
346 		buf->err_data = err_buf_save;
347 		buf->err_data_size = err_data_size;
348 	} else {
349 		memcpy(buf, &err->comp, sizeof(struct fi_cq_err_entry_1_0));
350 	}
351 
352 	cirq_entry = ofi_cirque_head(cq->cirq);
353 	if (!(cirq_entry->flags & UTIL_FLAG_OVERFLOW)) {
354 		ofi_cirque_discard(cq->cirq);
355 	} else if (!slist_empty(&cq->oflow_err_list)) {
356 		struct util_cq_oflow_err_entry *oflow_entry =
357 			container_of(cq->oflow_err_list.head,
358 				     struct util_cq_oflow_err_entry,
359 				     list_entry);
360 		if (oflow_entry->parent_comp != cirq_entry) {
361 			/* The normal CQ entry were used to report error due to
362 			 * out of space in the circular queue. We have to unset
363 			 * UTIL_FLAG_ERROR and UTIL_FLAG_OVERFLOW flags */
364 			cirq_entry->flags &= ~(UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
365 		}
366 		/* If the next entry in the oflow_err_list use the same entry from CIRQ to
367 		 * report error/overflow, don't unset UTIL_FLAG_ERRO and UTIL_FLAG_OVERFLOW
368 		 * flags to ensure the next round of handling overflow/error entries */
369 	} else {
370 		cirq_entry->flags &= ~(UTIL_FLAG_ERROR | UTIL_FLAG_OVERFLOW);
371 	}
372 
373 	ret = 1;
374 	free(err);
375 unlock:
376 	cq->cq_fastlock_release(&cq->cq_lock);
377 	return ret;
378 }
379 
ofi_cq_sreadfrom(struct fid_cq * cq_fid,void * buf,size_t count,fi_addr_t * src_addr,const void * cond,int timeout)380 ssize_t ofi_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
381 			 fi_addr_t *src_addr, const void *cond, int timeout)
382 {
383 	struct util_cq *cq;
384 	uint64_t endtime;
385 	int ret;
386 
387 	cq = container_of(cq_fid, struct util_cq, cq_fid);
388 	assert(cq->wait && cq->internal_wait);
389 	endtime = ofi_timeout_time(timeout);
390 
391 	do {
392 		ret = ofi_cq_readfrom(cq_fid, buf, count, src_addr);
393 		if (ret != -FI_EAGAIN)
394 			break;
395 
396 		if (ofi_adjust_timeout(endtime, &timeout))
397 			return -FI_EAGAIN;
398 
399 		if (ofi_atomic_get32(&cq->signaled)) {
400 			ofi_atomic_set32(&cq->signaled, 0);
401 			return -FI_EAGAIN;
402 		}
403 
404 		ret = fi_wait(&cq->wait->wait_fid, timeout);
405 	} while (!ret);
406 
407 	return ret == -FI_ETIMEDOUT ? -FI_EAGAIN : ret;
408 }
409 
ofi_cq_sread(struct fid_cq * cq_fid,void * buf,size_t count,const void * cond,int timeout)410 ssize_t ofi_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
411 		const void *cond, int timeout)
412 {
413 	return ofi_cq_sreadfrom(cq_fid, buf, count, NULL, cond, timeout);
414 }
415 
ofi_cq_signal(struct fid_cq * cq_fid)416 int ofi_cq_signal(struct fid_cq *cq_fid)
417 {
418 	struct util_cq *cq = container_of(cq_fid, struct util_cq, cq_fid);
419 	ofi_atomic_set32(&cq->signaled, 1);
420 	util_cq_signal(cq);
421 	return 0;
422 }
423 
util_cq_strerror(struct fid_cq * cq,int prov_errno,const void * err_data,char * buf,size_t len)424 static const char *util_cq_strerror(struct fid_cq *cq, int prov_errno,
425 				    const void *err_data, char *buf, size_t len)
426 {
427 	return fi_strerror(prov_errno);
428 }
429 
430 static struct fi_ops_cq util_cq_ops = {
431 	.size = sizeof(struct fi_ops_cq),
432 	.read = ofi_cq_read,
433 	.readfrom = ofi_cq_readfrom,
434 	.readerr = ofi_cq_readerr,
435 	.sread = ofi_cq_sread,
436 	.sreadfrom = ofi_cq_sreadfrom,
437 	.signal = ofi_cq_signal,
438 	.strerror = util_cq_strerror,
439 };
440 
ofi_cq_cleanup(struct util_cq * cq)441 int ofi_cq_cleanup(struct util_cq *cq)
442 {
443 	struct util_cq_oflow_err_entry *err;
444 	struct slist_entry *entry;
445 
446 	if (ofi_atomic_get32(&cq->ref))
447 		return -FI_EBUSY;
448 
449 	while (!slist_empty(&cq->oflow_err_list)) {
450 		entry = slist_remove_head(&cq->oflow_err_list);
451 		err = container_of(entry, struct util_cq_oflow_err_entry, list_entry);
452 		free(err);
453 	}
454 
455 	if (cq->wait) {
456 		fi_poll_del(&cq->wait->pollset->poll_fid,
457 			    &cq->cq_fid.fid, 0);
458 		if (cq->internal_wait)
459 			fi_close(&cq->wait->wait_fid.fid);
460 	}
461 
462 	ofi_atomic_dec32(&cq->domain->ref);
463 	util_comp_cirq_free(cq->cirq);
464 	fastlock_destroy(&cq->cq_lock);
465 	fastlock_destroy(&cq->ep_list_lock);
466 	free(cq->src);
467 	return 0;
468 }
469 
ofi_cq_control(struct fid * fid,int command,void * arg)470 int ofi_cq_control(struct fid *fid, int command, void *arg)
471 {
472 	struct util_cq *cq = container_of(fid, struct util_cq, cq_fid.fid);
473 
474 	switch (command) {
475 	case FI_GETWAIT:
476 	case FI_GETWAITOBJ:
477 		if (!cq->wait)
478 			return -FI_ENODATA;
479 		return fi_control(&cq->wait->wait_fid.fid, command, arg);
480 	default:
481 		FI_INFO(cq->wait->prov, FI_LOG_CQ, "Unsupported command\n");
482 		return -FI_ENOSYS;
483 	}
484 }
485 
util_cq_close(struct fid * fid)486 static int util_cq_close(struct fid *fid)
487 {
488 	struct util_cq *cq;
489 	int ret;
490 
491 	cq = container_of(fid, struct util_cq, cq_fid.fid);
492 	ret = ofi_cq_cleanup(cq);
493 	if (ret)
494 		return ret;
495 
496 	free(cq);
497 	return 0;
498 }
499 
500 static struct fi_ops util_cq_fi_ops = {
501 	.size = sizeof(struct fi_ops),
502 	.close = util_cq_close,
503 	.bind = fi_no_bind,
504 	.control = ofi_cq_control,
505 	.ops_open = fi_no_ops_open,
506 };
507 
fi_cq_init(struct fid_domain * domain,struct fi_cq_attr * attr,fi_cq_read_func read_entry,struct util_cq * cq,void * context)508 static int fi_cq_init(struct fid_domain *domain, struct fi_cq_attr *attr,
509 		      fi_cq_read_func read_entry, struct util_cq *cq,
510 		      void *context)
511 {
512 	struct fi_wait_attr wait_attr;
513 	struct fid_wait *wait;
514 	int ret;
515 
516 	cq->domain = container_of(domain, struct util_domain, domain_fid);
517 	ofi_atomic_initialize32(&cq->ref, 0);
518 	ofi_atomic_initialize32(&cq->signaled, 0);
519 	dlist_init(&cq->ep_list);
520 	fastlock_init(&cq->ep_list_lock);
521 	fastlock_init(&cq->cq_lock);
522 	if (cq->domain->threading == FI_THREAD_COMPLETION ||
523 	    (cq->domain->threading == FI_THREAD_DOMAIN)) {
524 		cq->cq_fastlock_acquire = ofi_fastlock_acquire_noop;
525 		cq->cq_fastlock_release = ofi_fastlock_release_noop;
526 	} else {
527 		cq->cq_fastlock_acquire = ofi_fastlock_acquire;
528 		cq->cq_fastlock_release = ofi_fastlock_release;
529 	}
530 	slist_init(&cq->oflow_err_list);
531 	cq->read_entry = read_entry;
532 
533 	cq->cq_fid.fid.fclass = FI_CLASS_CQ;
534 	cq->cq_fid.fid.context = context;
535 
536 	switch (attr->wait_obj) {
537 	case FI_WAIT_NONE:
538 		wait = NULL;
539 		break;
540 	case FI_WAIT_UNSPEC:
541 	case FI_WAIT_FD:
542 	case FI_WAIT_POLLFD:
543 	case FI_WAIT_MUTEX_COND:
544 	case FI_WAIT_YIELD:
545 		memset(&wait_attr, 0, sizeof wait_attr);
546 		wait_attr.wait_obj = attr->wait_obj;
547 		cq->internal_wait = 1;
548 		ret = fi_wait_open(&cq->domain->fabric->fabric_fid,
549 				   &wait_attr, &wait);
550 		if (ret)
551 			return ret;
552 		break;
553 	case FI_WAIT_SET:
554 		wait = attr->wait_set;
555 		break;
556 	default:
557 		assert(0);
558 		return -FI_EINVAL;
559 	}
560 
561 	if (wait)
562 		cq->wait = container_of(wait, struct util_wait, wait_fid);
563 
564 	ofi_atomic_inc32(&cq->domain->ref);
565 	return 0;
566 }
567 
ofi_check_bind_cq_flags(struct util_ep * ep,struct util_cq * cq,uint64_t flags)568 int ofi_check_bind_cq_flags(struct util_ep *ep, struct util_cq *cq,
569 			    uint64_t flags)
570 {
571 	const struct fi_provider *prov = ep->domain->fabric->prov;
572 
573 	if (flags & ~(FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION)) {
574 		FI_WARN(prov, FI_LOG_EP_CTRL,
575 			"Unsupported flags\n");
576 		return -FI_EBADFLAGS;
577 	}
578 
579 	if (((flags & FI_TRANSMIT) && ep->tx_cq) ||
580 	    ((flags & FI_RECV) && ep->rx_cq)) {
581 		FI_WARN(prov, FI_LOG_EP_CTRL,
582 			"Duplicate CQ binding\n");
583 		return -FI_EINVAL;
584 	}
585 
586 	return FI_SUCCESS;
587 }
588 
ofi_cq_progress(struct util_cq * cq)589 void ofi_cq_progress(struct util_cq *cq)
590 {
591 	struct util_ep *ep;
592 	struct fid_list_entry *fid_entry;
593 	struct dlist_entry *item;
594 
595 	cq->cq_fastlock_acquire(&cq->ep_list_lock);
596 	dlist_foreach(&cq->ep_list, item) {
597 		fid_entry = container_of(item, struct fid_list_entry, entry);
598 		ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
599 		ep->progress(ep);
600 
601 	}
602 	cq->cq_fastlock_release(&cq->ep_list_lock);
603 }
604 
ofi_cq_init(const struct fi_provider * prov,struct fid_domain * domain,struct fi_cq_attr * attr,struct util_cq * cq,ofi_cq_progress_func progress,void * context)605 int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
606 		 struct fi_cq_attr *attr, struct util_cq *cq,
607 		 ofi_cq_progress_func progress, void *context)
608 {
609 	fi_cq_read_func read_func;
610 	int ret;
611 
612 	assert(progress);
613 	ret = ofi_check_cq_attr(prov, attr);
614 	if (ret)
615 		return ret;
616 
617 	cq->cq_fid.fid.ops = &util_cq_fi_ops;
618 	cq->cq_fid.ops = &util_cq_ops;
619 	cq->progress = progress;
620 
621 	switch (attr->format) {
622 	case FI_CQ_FORMAT_UNSPEC:
623 	case FI_CQ_FORMAT_CONTEXT:
624 		read_func = util_cq_read_ctx;
625 		break;
626 	case FI_CQ_FORMAT_MSG:
627 		read_func = util_cq_read_msg;
628 		break;
629 	case FI_CQ_FORMAT_DATA:
630 		read_func = util_cq_read_data;
631 		break;
632 	case FI_CQ_FORMAT_TAGGED:
633 		read_func = util_cq_read_tagged;
634 		break;
635 	default:
636 		assert(0);
637 		return -FI_EINVAL;
638 	}
639 
640 	ret = fi_cq_init(domain, attr, read_func, cq, context);
641 	if (ret)
642 		return ret;
643 
644 	/* CQ must be fully operational before adding to wait set */
645 	if (cq->wait) {
646 		ret = fi_poll_add(&cq->wait->pollset->poll_fid,
647 				  &cq->cq_fid.fid, 0);
648 		if (ret) {
649 			ofi_cq_cleanup(cq);
650 			return ret;
651 		}
652 	}
653 
654 	cq->cirq = util_comp_cirq_create(attr->size == 0 ? UTIL_DEF_CQ_SIZE : attr->size);
655 	if (!cq->cirq) {
656 		ret = -FI_ENOMEM;
657 		goto err1;
658 	}
659 
660 	if (cq->domain->info_domain_caps & FI_SOURCE) {
661 		cq->src = calloc(cq->cirq->size, sizeof *cq->src);
662 		if (!cq->src) {
663 			ret = -FI_ENOMEM;
664 			goto err2;
665 		}
666 	}
667 	return 0;
668 
669 err2:
670 	util_comp_cirq_free(cq->cirq);
671 err1:
672 	ofi_cq_cleanup(cq);
673 	return ret;
674 }
675 
676 uint64_t ofi_rx_flags[] = {
677 	[ofi_op_msg] = FI_RECV,
678 	[ofi_op_tagged] = FI_RECV | FI_TAGGED,
679 	[ofi_op_read_req] = FI_RMA | FI_REMOTE_READ,
680 	[ofi_op_read_rsp] = FI_RMA | FI_REMOTE_READ,
681 	[ofi_op_write] = FI_RMA | FI_REMOTE_WRITE,
682 	[ofi_op_write_async] = FI_RMA | FI_REMOTE_WRITE,
683 	[ofi_op_atomic] = FI_ATOMIC | FI_REMOTE_WRITE,
684 	[ofi_op_atomic_fetch] = FI_ATOMIC | FI_REMOTE_READ,
685 	[ofi_op_atomic_compare] = FI_ATOMIC | FI_REMOTE_READ,
686 	[ofi_op_read_async] = FI_RMA | FI_READ,
687 };
688 
689 uint64_t ofi_tx_flags[] = {
690 	[ofi_op_msg] = FI_SEND,
691 	[ofi_op_tagged] = FI_SEND | FI_TAGGED,
692 	[ofi_op_read_req] = FI_RMA | FI_READ,
693 	[ofi_op_read_rsp] = FI_RMA | FI_READ,
694 	[ofi_op_write] = FI_RMA | FI_WRITE,
695 	[ofi_op_write_async] = FI_RMA | FI_WRITE,
696 	[ofi_op_atomic] = FI_ATOMIC | FI_WRITE,
697 	[ofi_op_atomic_fetch] = FI_ATOMIC | FI_READ,
698 	[ofi_op_atomic_compare] = FI_ATOMIC | FI_READ,
699 	[ofi_op_read_async] = FI_RMA | FI_RMA,
700 };
701 
702