1 /*
2  * Copyright (c) 2013-2017 Intel Corporation. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 
33 #include "psmx.h"
34 
psmx_am_enqueue_send(struct psmx_fid_domain * domain,struct psmx_am_request * req)35 static inline void psmx_am_enqueue_send(struct psmx_fid_domain *domain,
36 					struct psmx_am_request *req)
37 {
38 	fastlock_acquire(&domain->send_queue.lock);
39 	slist_insert_tail(&req->list_entry, &domain->send_queue.list);
40 	fastlock_release(&domain->send_queue.lock);
41 }
42 
psmx_am_enqueue_recv(struct psmx_fid_domain * domain,struct psmx_am_request * req)43 static inline void psmx_am_enqueue_recv(struct psmx_fid_domain *domain,
44 					struct psmx_am_request *req)
45 {
46 	fastlock_acquire(&domain->recv_queue.lock);
47 	slist_insert_tail(&req->list_entry, &domain->recv_queue.list);
48 	fastlock_release(&domain->recv_queue.lock);
49 }
50 
match_recv(struct slist_entry * item,const void * src_addr)51 static int match_recv(struct slist_entry *item, const void *src_addr)
52 {
53 	struct psmx_am_request *req;
54 
55 	req = container_of(item, struct psmx_am_request, list_entry);
56 	if (!req->recv.src_addr || req->recv.src_addr == src_addr)
57 		return 1;
58 
59 	return 0;
60 }
61 
psmx_am_search_and_dequeue_recv(struct psmx_fid_domain * domain,const void * src_addr)62 static struct psmx_am_request *psmx_am_search_and_dequeue_recv(
63 				struct psmx_fid_domain *domain,
64 				const void *src_addr)
65 {
66 	struct slist_entry *item;
67 
68 	fastlock_acquire(&domain->recv_queue.lock);
69 	item = slist_remove_first_match(&domain->recv_queue.list,
70 					match_recv, src_addr);
71 	fastlock_release(&domain->recv_queue.lock);
72 
73 	if (!item)
74 		return NULL;
75 
76 	return container_of(item, struct psmx_am_request, list_entry);
77 }
78 
psmx_am_enqueue_unexp(struct psmx_fid_domain * domain,struct psmx_unexp * unexp)79 static inline void psmx_am_enqueue_unexp(struct psmx_fid_domain *domain,
80 					 struct psmx_unexp *unexp)
81 {
82 	fastlock_acquire(&domain->unexp_queue.lock);
83 	slist_insert_tail(&unexp->list_entry, &domain->unexp_queue.list);
84 	fastlock_release(&domain->unexp_queue.lock);
85 }
86 
match_unexp(struct slist_entry * item,const void * src_addr)87 static int match_unexp(struct slist_entry *item, const void *src_addr)
88 {
89 	struct psmx_unexp *unexp;
90 
91 	unexp = container_of(item, struct psmx_unexp, list_entry);
92 
93 	if (!src_addr || src_addr == unexp->sender_addr)
94 		return 1;
95 
96 	return 0;
97 }
98 
psmx_am_search_and_dequeue_unexp(struct psmx_fid_domain * domain,const void * src_addr)99 static struct psmx_unexp *psmx_am_search_and_dequeue_unexp(
100 				struct psmx_fid_domain *domain,
101 				const void *src_addr)
102 {
103 	struct slist_entry *item;
104 
105 	fastlock_acquire(&domain->unexp_queue.lock);
106 	item = slist_remove_first_match(&domain->unexp_queue.list,
107 					match_unexp, src_addr);
108 	fastlock_release(&domain->unexp_queue.lock);
109 
110 	if (!item)
111 		return NULL;
112 
113 	return container_of(item, struct psmx_unexp, list_entry);
114 }
115 
116 /* Message protocol:
117  *
118  * Send REQ:
119  *	args[0].u32w0	cmd, flag
120  *	args[0].u32w1	len
121  *	args[1].u64	req
122  *	args[2].u64	recv_req
123  *	args[3].u64	offset
124  *
125  * Send REP:
126  *	args[0].u32w0	cmd
127  *	args[0].u32w1	error
128  *	args[1].u64	req
129  *	args[2].u64	recv_req
130  */
131 
psmx_am_msg_handler(psm_am_token_t token,psm_epaddr_t epaddr,psm_amarg_t * args,int nargs,void * src,uint32_t len)132 int psmx_am_msg_handler(psm_am_token_t token, psm_epaddr_t epaddr,
133 			psm_amarg_t *args, int nargs, void *src, uint32_t len)
134 {
135 	psm_amarg_t rep_args[8];
136 	struct psmx_am_request *req;
137 	struct psmx_cq_event *event;
138 	struct psmx_epaddr_context *epaddr_context;
139 	struct psmx_fid_domain *domain;
140 	int copy_len;
141 	uint64_t offset;
142 	int cmd, eom;
143 	int err = 0;
144 	int op_error = 0;
145 	struct psmx_unexp *unexp;
146 
147 	epaddr_context = psm_epaddr_getctxt(epaddr);
148 	if (!epaddr_context) {
149 		FI_WARN(&psmx_prov, FI_LOG_EP_DATA,
150 			"NULL context for epaddr %p\n", epaddr);
151 		return -FI_EIO;
152 	}
153 
154 	domain = epaddr_context->domain;
155 
156 	cmd = args[0].u32w0 & PSMX_AM_OP_MASK;
157 	eom = args[0].u32w0 & PSMX_AM_EOM;
158 
159 	switch (cmd) {
160 	case PSMX_AM_REQ_SEND:
161 		assert(len == args[0].u32w1);
162 		offset = args[3].u64;
163 		if (offset == 0) {
164 			/* this is the first packet */
165 			req = psmx_am_search_and_dequeue_recv(domain, (const void *)epaddr);
166 			if (req) {
167 				copy_len = MIN(len, req->recv.len);
168 				memcpy(req->recv.buf, src, len);
169 				req->recv.len_received += copy_len;
170 			} else {
171 				unexp = malloc(sizeof(*unexp) + len);
172 				if (!unexp) {
173 					op_error = -FI_ENOSPC;
174 				} else {
175 					memcpy(unexp->buf, src, len);
176 					unexp->sender_addr = epaddr;
177 					unexp->sender_context = args[1].u64;
178 					unexp->len_received = len;
179 					unexp->done = !!eom;
180 					unexp->list_entry.next = NULL;
181 					psmx_am_enqueue_unexp(domain, unexp);
182 
183 					if (!eom) {
184 						/* stop here. will reply when recv is posted */
185 						break;
186 					}
187 				}
188 			}
189 
190 			if (!op_error && !eom) {
191 				/* reply w/ recv req to be used for following packets */
192 				rep_args[0].u32w0 = PSMX_AM_REP_SEND;
193 				rep_args[0].u32w1 = 0;
194 				rep_args[1].u64 = args[1].u64;
195 				rep_args[2].u64 = (uint64_t)(uintptr_t)req;
196 				err = psm_am_reply_short(token, PSMX_AM_MSG_HANDLER,
197 						rep_args, 3, NULL, 0, 0,
198 						NULL, NULL );
199 			}
200 		} else {
201 			req = (struct psmx_am_request *)(uintptr_t)args[2].u64;
202 			if (req) {
203 				copy_len = MIN(req->recv.len + offset, len);
204 				memcpy(req->recv.buf + offset, src, copy_len);
205 				req->recv.len_received += copy_len;
206 			} else {
207 				FI_WARN(&psmx_prov, FI_LOG_EP_DATA,
208 					"NULL recv_req in follow-up packets.\n");
209 				op_error = -FI_ENOMSG;
210 			}
211 		}
212 
213 		if (eom && req) {
214 			if (req->ep->recv_cq && !req->no_event) {
215 				event = psmx_cq_create_event(
216 						req->ep->recv_cq,
217 						req->recv.context,
218 						req->recv.buf,
219 						req->cq_flags,
220 						req->recv.len_received,
221 						0, /* data */
222 						0, /* tag */
223 						req->recv.len - req->recv.len_received,
224 						0 /* err */);
225 				if (event)
226 					psmx_cq_enqueue_event(req->ep->recv_cq, event);
227 				else
228 					err = -FI_ENOMEM;
229 			}
230 
231 			if (req->ep->recv_cntr)
232 				psmx_cntr_inc(req->ep->recv_cntr);
233 
234 			free(req);
235 		}
236 
237 		if (eom || op_error) {
238 			rep_args[0].u32w0 = PSMX_AM_REP_SEND;
239 			rep_args[0].u32w1 = op_error;
240 			rep_args[1].u64 = args[1].u64;
241 			rep_args[2].u64 = 0; /* done */
242 			err = psm_am_reply_short(token, PSMX_AM_MSG_HANDLER,
243 					rep_args, 3, NULL, 0, 0,
244 					NULL, NULL );
245 		}
246 		break;
247 
248 	case PSMX_AM_REP_SEND:
249 		req = (struct psmx_am_request *)(uintptr_t)args[1].u64;
250 		op_error = (int)args[0].u32w1;
251 		assert(req->op == PSMX_AM_REQ_SEND);
252 
253 		if (args[2].u64) { /* more to send */
254 			req->send.peer_context = (void *)(uintptr_t)args[2].u64;
255 
256 			/* psm_am_request_short() can't be called inside the handler.
257 			 * put the request into a queue and process it later.
258 			 */
259 			psmx_am_enqueue_send(req->ep->domain, req);
260 		} else { /* done */
261 			if (req->ep->send_cq && !req->no_event) {
262 				event = psmx_cq_create_event(
263 						req->ep->send_cq,
264 						req->send.context,
265 						req->send.buf,
266 						req->cq_flags,
267 						req->send.len,
268 						0, /* data */
269 						0, /* tag */
270 						0, /* olen */
271 						op_error);
272 				if (event)
273 					psmx_cq_enqueue_event(req->ep->send_cq, event);
274 				else
275 					err = -FI_ENOMEM;
276 			}
277 
278 			if (req->ep->send_cntr)
279 				psmx_cntr_inc(req->ep->send_cntr);
280 
281 			free(req);
282 		}
283 		break;
284 
285 	default:
286 		err = -FI_EINVAL;
287 	}
288 
289 	return err;
290 }
291 
psmx_am_process_send(struct psmx_fid_domain * domain,struct psmx_am_request * req)292 int psmx_am_process_send(struct psmx_fid_domain *domain, struct psmx_am_request *req)
293 {
294 	psm_amarg_t args[8];
295 	int am_flags = PSM_AM_FLAG_ASYNC;
296 	int chunk_size;
297 	size_t len;
298 	uint64_t offset;
299 	int err;
300 
301 	offset = req->send.len_sent;
302 	len = req->send.len - offset;
303 
304 	chunk_size = MIN(PSMX_AM_CHUNK_SIZE, psmx_am_param.max_request_short);
305 
306 	while (len > chunk_size) {
307 		args[0].u32w0 = PSMX_AM_REQ_SEND;
308 		args[0].u32w1 = chunk_size;
309 		args[1].u64 = (uint64_t)(uintptr_t)req;
310 		args[2].u64 = (uint64_t)(uintptr_t)req->send.peer_context;
311 		args[3].u64 = offset;
312 
313 		err = psm_am_request_short((psm_epaddr_t) req->send.dest_addr,
314 					PSMX_AM_MSG_HANDLER, args, 4,
315 					req->send.buf+offset, chunk_size,
316 					am_flags, NULL, NULL);
317 		if (err)
318 			return psmx_errno(err);
319 
320 		len -= chunk_size;
321 		offset += chunk_size;
322 	}
323 
324 	args[0].u32w0 = PSMX_AM_REQ_SEND | PSMX_AM_EOM;
325 	args[0].u32w1 = len;
326 	args[1].u64 = (uint64_t)(uintptr_t)req;
327 	args[2].u64 = (uint64_t)(uintptr_t)req->send.peer_context;
328 	args[3].u64 = offset;
329 
330 	req->send.len_sent = offset + len;
331 	err = psm_am_request_short((psm_epaddr_t) req->send.dest_addr,
332 				PSMX_AM_MSG_HANDLER, args, 4,
333 				req->send.buf+offset, len,
334 				am_flags, NULL, NULL);
335 
336 	return psmx_errno(err);
337 }
338 
_psmx_recv2(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context,uint64_t flags)339 static ssize_t _psmx_recv2(struct fid_ep *ep, void *buf, size_t len,
340 			   void *desc, fi_addr_t src_addr,
341 			   void *context, uint64_t flags)
342 {
343 	psm_amarg_t args[8];
344 	struct psmx_fid_ep *ep_priv;
345 	struct psmx_fid_av *av;
346 	struct psmx_am_request *req;
347 	struct psmx_unexp *unexp;
348 	struct psmx_cq_event *event;
349 	int recv_done;
350 	int err = 0;
351 	size_t idx;
352 
353 	ep_priv = container_of(ep, struct psmx_fid_ep, ep);
354 
355 	if ((ep_priv->caps & FI_DIRECTED_RECV) && src_addr != FI_ADDR_UNSPEC) {
356 		av = ep_priv->av;
357 		if (av && av->type == FI_AV_TABLE) {
358 			idx = (size_t)src_addr;
359 			if (idx >= av->last)
360 				return -FI_EINVAL;
361 
362 			src_addr = (fi_addr_t)av->psm_epaddrs[idx];
363 		}
364 	} else {
365 		src_addr = 0;
366 	}
367 
368 	req = calloc(1, sizeof(*req));
369 	if (!req)
370 		return -FI_ENOMEM;
371 
372 	req->op = PSMX_AM_REQ_SEND;
373 	req->recv.buf = (void *)buf;
374 	req->recv.len = len;
375 	req->recv.context = context;
376 	req->recv.src_addr = (void *)src_addr;
377 	req->ep = ep_priv;
378 	req->cq_flags = FI_RECV | FI_MSG;
379 
380 	if (ep_priv->recv_selective_completion && !(flags & FI_COMPLETION))
381 		req->no_event = 1;
382 
383 	unexp = psmx_am_search_and_dequeue_unexp(ep_priv->domain,
384 						 (const void *)src_addr);
385 	if (!unexp) {
386 		psmx_am_enqueue_recv(ep_priv->domain, req);
387 		return 0;
388 	}
389 
390 	req->recv.len_received = MIN(req->recv.len, unexp->len_received);
391 	memcpy(req->recv.buf, unexp->buf, req->recv.len_received);
392 
393 	recv_done = (req->recv.len_received >= req->recv.len);
394 
395 	if (unexp->done) {
396 		recv_done = 1;
397 	} else {
398 		args[0].u32w0 = PSMX_AM_REP_SEND;
399 		args[0].u32w1 = 0;
400 		args[1].u64 = unexp->sender_context;
401 		args[2].u64 = recv_done ? 0 : (uint64_t)(uintptr_t)req;
402 		err = psm_am_request_short(unexp->sender_addr,
403 					   PSMX_AM_MSG_HANDLER,
404 					   args, 3, NULL, 0, 0,
405 					   NULL, NULL );
406 	}
407 
408 	free(unexp);
409 
410 	if (recv_done) {
411 		if (req->ep->recv_cq && !req->no_event) {
412 			event = psmx_cq_create_event(
413 					req->ep->recv_cq,
414 					req->recv.context,
415 					req->recv.buf,
416 					req->cq_flags,
417 					req->recv.len_received,
418 					0, /* data */
419 					0, /* tag */
420 					req->recv.len - req->recv.len_received,
421 					0 /* err */);
422 			if (event)
423 				psmx_cq_enqueue_event(req->ep->recv_cq, event);
424 			else
425 				err = -FI_ENOMEM;
426 		}
427 
428 		if (req->ep->recv_cntr)
429 			psmx_cntr_inc(req->ep->recv_cntr);
430 
431 		free(req);
432 	}
433 
434 	return err;
435 }
436 
psmx_recv2(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context)437 static ssize_t psmx_recv2(struct fid_ep *ep, void *buf, size_t len,
438 			  void *desc, fi_addr_t src_addr, void *context)
439 {
440 	struct psmx_fid_ep *ep_priv;
441 
442 	ep_priv = container_of(ep, struct psmx_fid_ep, ep);
443 	return _psmx_recv2(ep, buf, len, desc, src_addr, context, ep_priv->rx_flags);
444 }
445 
psmx_recvmsg2(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)446 static ssize_t psmx_recvmsg2(struct fid_ep *ep, const struct fi_msg *msg,
447 			     uint64_t flags)
448 {
449 	void *buf;
450 	size_t len;
451 
452 	if (!msg || (msg->iov_count && !msg->msg_iov))
453 		return -FI_EINVAL;
454 
455 	if (msg->iov_count > 1) {
456 		return -FI_ENOSYS;
457 	} else if (msg->iov_count) {
458 		buf = msg->msg_iov[0].iov_base;
459 		len = msg->msg_iov[0].iov_len;
460 	} else {
461 		buf = NULL;
462 		len = 0;
463 	}
464 
465 	return _psmx_recv2(ep, buf, len,
466 			   msg->desc, msg->addr, msg->context, flags);
467 }
468 
psmx_recvv2(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,void * context)469 static ssize_t psmx_recvv2(struct fid_ep *ep, const struct iovec *iov,
470 			   void **desc, size_t count, fi_addr_t src_addr,
471 			   void *context)
472 {
473 	void *buf;
474 	size_t len;
475 
476 	if (count && !iov)
477 		return -FI_EINVAL;
478 
479 	if (count > 1) {
480 		return -FI_ENOSYS;
481 	} else if (count) {
482 		buf = iov[0].iov_base;
483 		len = iov[0].iov_len;
484 	} else {
485 		buf = NULL;
486 		len = 0;
487 	}
488 
489 	return psmx_recv2(ep, buf, len, desc ? desc[0] : NULL, src_addr, context);
490 }
491 
_psmx_send2(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context,uint64_t flags)492 static ssize_t _psmx_send2(struct fid_ep *ep, const void *buf, size_t len,
493 			   void *desc, fi_addr_t dest_addr,
494 			   void *context, uint64_t flags)
495 {
496 	struct psmx_fid_ep *ep_priv;
497 	struct psmx_fid_av *av;
498 	struct psmx_am_request *req;
499 	psm_amarg_t args[8];
500 	int am_flags = PSM_AM_FLAG_ASYNC;
501 	int err;
502 	int chunk_size, msg_size;
503 	size_t idx;
504 
505 	ep_priv = container_of(ep, struct psmx_fid_ep, ep);
506 
507 	if (!buf)
508 		return -FI_EINVAL;
509 
510 	av = ep_priv->av;
511 	if (av && av->type == FI_AV_TABLE) {
512 		idx = dest_addr;
513 		if (idx >= av->last)
514 			return -FI_EINVAL;
515 
516 		dest_addr = (fi_addr_t) av->psm_epaddrs[idx];
517 	} else if (!dest_addr) {
518 		return -FI_EINVAL;
519 	}
520 
521 	chunk_size = MIN(PSMX_AM_CHUNK_SIZE, psmx_am_param.max_request_short);
522 	msg_size = MIN(len, chunk_size);
523 
524 	req = calloc(1, sizeof(*req));
525 	if (!req)
526 		return -FI_ENOMEM;
527 
528 	req->op = PSMX_AM_REQ_SEND;
529 	req->send.buf = (void *)buf;
530 	req->send.len = len;
531 	req->send.context = context;
532 	req->send.len_sent = msg_size;
533 	req->send.dest_addr = (void *)dest_addr;
534 	req->ep = ep_priv;
535 	req->cq_flags = FI_SEND | FI_MSG;
536 
537 	if ((flags & PSMX_NO_COMPLETION) ||
538 	    (ep_priv->send_selective_completion && !(flags & FI_COMPLETION)))
539 		req->no_event = 1;
540 
541 	args[0].u32w0 = PSMX_AM_REQ_SEND | (msg_size == len ? PSMX_AM_EOM : 0);
542 	args[0].u32w1 = msg_size;
543 	args[1].u64 = (uint64_t)(uintptr_t)req;
544 	args[2].u64 = 0;
545 	args[3].u64 = 0;
546 
547 	err = psm_am_request_short((psm_epaddr_t) dest_addr,
548 				PSMX_AM_MSG_HANDLER, args, 4,
549 				(void *)buf, msg_size, am_flags, NULL, NULL);
550 
551 	return psmx_errno(err);
552 
553 }
554 
psmx_send2(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context)555 static ssize_t psmx_send2(struct fid_ep *ep, const void *buf,
556 			  size_t len, void *desc,
557 			  fi_addr_t dest_addr, void *context)
558 {
559 	struct psmx_fid_ep *ep_priv;
560 
561 	ep_priv = container_of(ep, struct psmx_fid_ep, ep);
562 	return _psmx_send2(ep, buf, len, desc, dest_addr, context, ep_priv->tx_flags);
563 }
564 
psmx_sendmsg2(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)565 static ssize_t psmx_sendmsg2(struct fid_ep *ep, const struct fi_msg *msg,
566 			     uint64_t flags)
567 {
568 	void *buf;
569 	size_t len;
570 
571 	if (!msg || (msg->iov_count && !msg->msg_iov))
572 		return -FI_EINVAL;
573 
574 	if (msg->iov_count > 1) {
575 		return -FI_ENOSYS;
576 	} else if (msg->iov_count) {
577 		buf = msg->msg_iov[0].iov_base;
578 		len = msg->msg_iov[0].iov_len;
579 	} else {
580 		buf = NULL;
581 		len = 0;
582 	}
583 
584 	return _psmx_send2(ep, buf, len,
585 			   msg->desc, msg->addr, msg->context, flags);
586 }
587 
psmx_sendv2(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,void * context)588 static ssize_t psmx_sendv2(struct fid_ep *ep, const struct iovec *iov,
589 			   void **desc, size_t count, fi_addr_t dest_addr,
590 			   void *context)
591 {
592 	void *buf;
593 	size_t len;
594 
595 	if (count && !iov)
596 		return -FI_EINVAL;
597 
598 	if (count > 1) {
599 		return -FI_ENOSYS;
600 	} else if (count) {
601 		buf = iov[0].iov_base;
602 		len = iov[0].iov_len;
603 	} else {
604 		buf = NULL;
605 		len = 0;
606 	}
607 
608 	return psmx_send2(ep, buf, len, desc ? desc[0] : NULL, dest_addr, context);
609 }
610 
psmx_inject2(struct fid_ep * ep,const void * buf,size_t len,fi_addr_t dest_addr)611 static ssize_t psmx_inject2(struct fid_ep *ep, const void *buf, size_t len,
612 			    fi_addr_t dest_addr)
613 {
614 	struct psmx_fid_ep *ep_priv;
615 
616 	ep_priv = container_of(ep, struct psmx_fid_ep, ep);
617 
618 	/* TODO: optimize it & guarantee buffered */
619 	return _psmx_send2(ep, buf, len, NULL, dest_addr, NULL,
620 			   ep_priv->tx_flags | FI_INJECT | PSMX_NO_COMPLETION);
621 }
622 
623 struct fi_ops_msg psmx_msg2_ops = {
624 	.size = sizeof(struct fi_ops_msg),
625 	.recv = psmx_recv2,
626 	.recvv = psmx_recvv2,
627 	.recvmsg = psmx_recvmsg2,
628 	.send = psmx_send2,
629 	.sendv = psmx_sendv2,
630 	.sendmsg = psmx_sendmsg2,
631 	.inject = psmx_inject2,
632 	.senddata = fi_no_msg_senddata,
633 	.injectdata = fi_no_msg_injectdata,
634 };
635 
636