1 /*
2 * Copyright (c) 2013-2017 Intel Corporation. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32
33 #include "psmx.h"
34
psmx_am_enqueue_send(struct psmx_fid_domain * domain,struct psmx_am_request * req)35 static inline void psmx_am_enqueue_send(struct psmx_fid_domain *domain,
36 struct psmx_am_request *req)
37 {
38 fastlock_acquire(&domain->send_queue.lock);
39 slist_insert_tail(&req->list_entry, &domain->send_queue.list);
40 fastlock_release(&domain->send_queue.lock);
41 }
42
psmx_am_enqueue_recv(struct psmx_fid_domain * domain,struct psmx_am_request * req)43 static inline void psmx_am_enqueue_recv(struct psmx_fid_domain *domain,
44 struct psmx_am_request *req)
45 {
46 fastlock_acquire(&domain->recv_queue.lock);
47 slist_insert_tail(&req->list_entry, &domain->recv_queue.list);
48 fastlock_release(&domain->recv_queue.lock);
49 }
50
match_recv(struct slist_entry * item,const void * src_addr)51 static int match_recv(struct slist_entry *item, const void *src_addr)
52 {
53 struct psmx_am_request *req;
54
55 req = container_of(item, struct psmx_am_request, list_entry);
56 if (!req->recv.src_addr || req->recv.src_addr == src_addr)
57 return 1;
58
59 return 0;
60 }
61
psmx_am_search_and_dequeue_recv(struct psmx_fid_domain * domain,const void * src_addr)62 static struct psmx_am_request *psmx_am_search_and_dequeue_recv(
63 struct psmx_fid_domain *domain,
64 const void *src_addr)
65 {
66 struct slist_entry *item;
67
68 fastlock_acquire(&domain->recv_queue.lock);
69 item = slist_remove_first_match(&domain->recv_queue.list,
70 match_recv, src_addr);
71 fastlock_release(&domain->recv_queue.lock);
72
73 if (!item)
74 return NULL;
75
76 return container_of(item, struct psmx_am_request, list_entry);
77 }
78
psmx_am_enqueue_unexp(struct psmx_fid_domain * domain,struct psmx_unexp * unexp)79 static inline void psmx_am_enqueue_unexp(struct psmx_fid_domain *domain,
80 struct psmx_unexp *unexp)
81 {
82 fastlock_acquire(&domain->unexp_queue.lock);
83 slist_insert_tail(&unexp->list_entry, &domain->unexp_queue.list);
84 fastlock_release(&domain->unexp_queue.lock);
85 }
86
match_unexp(struct slist_entry * item,const void * src_addr)87 static int match_unexp(struct slist_entry *item, const void *src_addr)
88 {
89 struct psmx_unexp *unexp;
90
91 unexp = container_of(item, struct psmx_unexp, list_entry);
92
93 if (!src_addr || src_addr == unexp->sender_addr)
94 return 1;
95
96 return 0;
97 }
98
psmx_am_search_and_dequeue_unexp(struct psmx_fid_domain * domain,const void * src_addr)99 static struct psmx_unexp *psmx_am_search_and_dequeue_unexp(
100 struct psmx_fid_domain *domain,
101 const void *src_addr)
102 {
103 struct slist_entry *item;
104
105 fastlock_acquire(&domain->unexp_queue.lock);
106 item = slist_remove_first_match(&domain->unexp_queue.list,
107 match_unexp, src_addr);
108 fastlock_release(&domain->unexp_queue.lock);
109
110 if (!item)
111 return NULL;
112
113 return container_of(item, struct psmx_unexp, list_entry);
114 }
115
116 /* Message protocol:
117 *
118 * Send REQ:
119 * args[0].u32w0 cmd, flag
120 * args[0].u32w1 len
121 * args[1].u64 req
122 * args[2].u64 recv_req
123 * args[3].u64 offset
124 *
125 * Send REP:
126 * args[0].u32w0 cmd
127 * args[0].u32w1 error
128 * args[1].u64 req
129 * args[2].u64 recv_req
130 */
131
psmx_am_msg_handler(psm_am_token_t token,psm_epaddr_t epaddr,psm_amarg_t * args,int nargs,void * src,uint32_t len)132 int psmx_am_msg_handler(psm_am_token_t token, psm_epaddr_t epaddr,
133 psm_amarg_t *args, int nargs, void *src, uint32_t len)
134 {
135 psm_amarg_t rep_args[8];
136 struct psmx_am_request *req;
137 struct psmx_cq_event *event;
138 struct psmx_epaddr_context *epaddr_context;
139 struct psmx_fid_domain *domain;
140 int copy_len;
141 uint64_t offset;
142 int cmd, eom;
143 int err = 0;
144 int op_error = 0;
145 struct psmx_unexp *unexp;
146
147 epaddr_context = psm_epaddr_getctxt(epaddr);
148 if (!epaddr_context) {
149 FI_WARN(&psmx_prov, FI_LOG_EP_DATA,
150 "NULL context for epaddr %p\n", epaddr);
151 return -FI_EIO;
152 }
153
154 domain = epaddr_context->domain;
155
156 cmd = args[0].u32w0 & PSMX_AM_OP_MASK;
157 eom = args[0].u32w0 & PSMX_AM_EOM;
158
159 switch (cmd) {
160 case PSMX_AM_REQ_SEND:
161 assert(len == args[0].u32w1);
162 offset = args[3].u64;
163 if (offset == 0) {
164 /* this is the first packet */
165 req = psmx_am_search_and_dequeue_recv(domain, (const void *)epaddr);
166 if (req) {
167 copy_len = MIN(len, req->recv.len);
168 memcpy(req->recv.buf, src, len);
169 req->recv.len_received += copy_len;
170 } else {
171 unexp = malloc(sizeof(*unexp) + len);
172 if (!unexp) {
173 op_error = -FI_ENOSPC;
174 } else {
175 memcpy(unexp->buf, src, len);
176 unexp->sender_addr = epaddr;
177 unexp->sender_context = args[1].u64;
178 unexp->len_received = len;
179 unexp->done = !!eom;
180 unexp->list_entry.next = NULL;
181 psmx_am_enqueue_unexp(domain, unexp);
182
183 if (!eom) {
184 /* stop here. will reply when recv is posted */
185 break;
186 }
187 }
188 }
189
190 if (!op_error && !eom) {
191 /* reply w/ recv req to be used for following packets */
192 rep_args[0].u32w0 = PSMX_AM_REP_SEND;
193 rep_args[0].u32w1 = 0;
194 rep_args[1].u64 = args[1].u64;
195 rep_args[2].u64 = (uint64_t)(uintptr_t)req;
196 err = psm_am_reply_short(token, PSMX_AM_MSG_HANDLER,
197 rep_args, 3, NULL, 0, 0,
198 NULL, NULL );
199 }
200 } else {
201 req = (struct psmx_am_request *)(uintptr_t)args[2].u64;
202 if (req) {
203 copy_len = MIN(req->recv.len + offset, len);
204 memcpy(req->recv.buf + offset, src, copy_len);
205 req->recv.len_received += copy_len;
206 } else {
207 FI_WARN(&psmx_prov, FI_LOG_EP_DATA,
208 "NULL recv_req in follow-up packets.\n");
209 op_error = -FI_ENOMSG;
210 }
211 }
212
213 if (eom && req) {
214 if (req->ep->recv_cq && !req->no_event) {
215 event = psmx_cq_create_event(
216 req->ep->recv_cq,
217 req->recv.context,
218 req->recv.buf,
219 req->cq_flags,
220 req->recv.len_received,
221 0, /* data */
222 0, /* tag */
223 req->recv.len - req->recv.len_received,
224 0 /* err */);
225 if (event)
226 psmx_cq_enqueue_event(req->ep->recv_cq, event);
227 else
228 err = -FI_ENOMEM;
229 }
230
231 if (req->ep->recv_cntr)
232 psmx_cntr_inc(req->ep->recv_cntr);
233
234 free(req);
235 }
236
237 if (eom || op_error) {
238 rep_args[0].u32w0 = PSMX_AM_REP_SEND;
239 rep_args[0].u32w1 = op_error;
240 rep_args[1].u64 = args[1].u64;
241 rep_args[2].u64 = 0; /* done */
242 err = psm_am_reply_short(token, PSMX_AM_MSG_HANDLER,
243 rep_args, 3, NULL, 0, 0,
244 NULL, NULL );
245 }
246 break;
247
248 case PSMX_AM_REP_SEND:
249 req = (struct psmx_am_request *)(uintptr_t)args[1].u64;
250 op_error = (int)args[0].u32w1;
251 assert(req->op == PSMX_AM_REQ_SEND);
252
253 if (args[2].u64) { /* more to send */
254 req->send.peer_context = (void *)(uintptr_t)args[2].u64;
255
256 /* psm_am_request_short() can't be called inside the handler.
257 * put the request into a queue and process it later.
258 */
259 psmx_am_enqueue_send(req->ep->domain, req);
260 } else { /* done */
261 if (req->ep->send_cq && !req->no_event) {
262 event = psmx_cq_create_event(
263 req->ep->send_cq,
264 req->send.context,
265 req->send.buf,
266 req->cq_flags,
267 req->send.len,
268 0, /* data */
269 0, /* tag */
270 0, /* olen */
271 op_error);
272 if (event)
273 psmx_cq_enqueue_event(req->ep->send_cq, event);
274 else
275 err = -FI_ENOMEM;
276 }
277
278 if (req->ep->send_cntr)
279 psmx_cntr_inc(req->ep->send_cntr);
280
281 free(req);
282 }
283 break;
284
285 default:
286 err = -FI_EINVAL;
287 }
288
289 return err;
290 }
291
psmx_am_process_send(struct psmx_fid_domain * domain,struct psmx_am_request * req)292 int psmx_am_process_send(struct psmx_fid_domain *domain, struct psmx_am_request *req)
293 {
294 psm_amarg_t args[8];
295 int am_flags = PSM_AM_FLAG_ASYNC;
296 int chunk_size;
297 size_t len;
298 uint64_t offset;
299 int err;
300
301 offset = req->send.len_sent;
302 len = req->send.len - offset;
303
304 chunk_size = MIN(PSMX_AM_CHUNK_SIZE, psmx_am_param.max_request_short);
305
306 while (len > chunk_size) {
307 args[0].u32w0 = PSMX_AM_REQ_SEND;
308 args[0].u32w1 = chunk_size;
309 args[1].u64 = (uint64_t)(uintptr_t)req;
310 args[2].u64 = (uint64_t)(uintptr_t)req->send.peer_context;
311 args[3].u64 = offset;
312
313 err = psm_am_request_short((psm_epaddr_t) req->send.dest_addr,
314 PSMX_AM_MSG_HANDLER, args, 4,
315 req->send.buf+offset, chunk_size,
316 am_flags, NULL, NULL);
317 if (err)
318 return psmx_errno(err);
319
320 len -= chunk_size;
321 offset += chunk_size;
322 }
323
324 args[0].u32w0 = PSMX_AM_REQ_SEND | PSMX_AM_EOM;
325 args[0].u32w1 = len;
326 args[1].u64 = (uint64_t)(uintptr_t)req;
327 args[2].u64 = (uint64_t)(uintptr_t)req->send.peer_context;
328 args[3].u64 = offset;
329
330 req->send.len_sent = offset + len;
331 err = psm_am_request_short((psm_epaddr_t) req->send.dest_addr,
332 PSMX_AM_MSG_HANDLER, args, 4,
333 req->send.buf+offset, len,
334 am_flags, NULL, NULL);
335
336 return psmx_errno(err);
337 }
338
_psmx_recv2(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context,uint64_t flags)339 static ssize_t _psmx_recv2(struct fid_ep *ep, void *buf, size_t len,
340 void *desc, fi_addr_t src_addr,
341 void *context, uint64_t flags)
342 {
343 psm_amarg_t args[8];
344 struct psmx_fid_ep *ep_priv;
345 struct psmx_fid_av *av;
346 struct psmx_am_request *req;
347 struct psmx_unexp *unexp;
348 struct psmx_cq_event *event;
349 int recv_done;
350 int err = 0;
351 size_t idx;
352
353 ep_priv = container_of(ep, struct psmx_fid_ep, ep);
354
355 if ((ep_priv->caps & FI_DIRECTED_RECV) && src_addr != FI_ADDR_UNSPEC) {
356 av = ep_priv->av;
357 if (av && av->type == FI_AV_TABLE) {
358 idx = (size_t)src_addr;
359 if (idx >= av->last)
360 return -FI_EINVAL;
361
362 src_addr = (fi_addr_t)av->psm_epaddrs[idx];
363 }
364 } else {
365 src_addr = 0;
366 }
367
368 req = calloc(1, sizeof(*req));
369 if (!req)
370 return -FI_ENOMEM;
371
372 req->op = PSMX_AM_REQ_SEND;
373 req->recv.buf = (void *)buf;
374 req->recv.len = len;
375 req->recv.context = context;
376 req->recv.src_addr = (void *)src_addr;
377 req->ep = ep_priv;
378 req->cq_flags = FI_RECV | FI_MSG;
379
380 if (ep_priv->recv_selective_completion && !(flags & FI_COMPLETION))
381 req->no_event = 1;
382
383 unexp = psmx_am_search_and_dequeue_unexp(ep_priv->domain,
384 (const void *)src_addr);
385 if (!unexp) {
386 psmx_am_enqueue_recv(ep_priv->domain, req);
387 return 0;
388 }
389
390 req->recv.len_received = MIN(req->recv.len, unexp->len_received);
391 memcpy(req->recv.buf, unexp->buf, req->recv.len_received);
392
393 recv_done = (req->recv.len_received >= req->recv.len);
394
395 if (unexp->done) {
396 recv_done = 1;
397 } else {
398 args[0].u32w0 = PSMX_AM_REP_SEND;
399 args[0].u32w1 = 0;
400 args[1].u64 = unexp->sender_context;
401 args[2].u64 = recv_done ? 0 : (uint64_t)(uintptr_t)req;
402 err = psm_am_request_short(unexp->sender_addr,
403 PSMX_AM_MSG_HANDLER,
404 args, 3, NULL, 0, 0,
405 NULL, NULL );
406 }
407
408 free(unexp);
409
410 if (recv_done) {
411 if (req->ep->recv_cq && !req->no_event) {
412 event = psmx_cq_create_event(
413 req->ep->recv_cq,
414 req->recv.context,
415 req->recv.buf,
416 req->cq_flags,
417 req->recv.len_received,
418 0, /* data */
419 0, /* tag */
420 req->recv.len - req->recv.len_received,
421 0 /* err */);
422 if (event)
423 psmx_cq_enqueue_event(req->ep->recv_cq, event);
424 else
425 err = -FI_ENOMEM;
426 }
427
428 if (req->ep->recv_cntr)
429 psmx_cntr_inc(req->ep->recv_cntr);
430
431 free(req);
432 }
433
434 return err;
435 }
436
psmx_recv2(struct fid_ep * ep,void * buf,size_t len,void * desc,fi_addr_t src_addr,void * context)437 static ssize_t psmx_recv2(struct fid_ep *ep, void *buf, size_t len,
438 void *desc, fi_addr_t src_addr, void *context)
439 {
440 struct psmx_fid_ep *ep_priv;
441
442 ep_priv = container_of(ep, struct psmx_fid_ep, ep);
443 return _psmx_recv2(ep, buf, len, desc, src_addr, context, ep_priv->rx_flags);
444 }
445
psmx_recvmsg2(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)446 static ssize_t psmx_recvmsg2(struct fid_ep *ep, const struct fi_msg *msg,
447 uint64_t flags)
448 {
449 void *buf;
450 size_t len;
451
452 if (!msg || (msg->iov_count && !msg->msg_iov))
453 return -FI_EINVAL;
454
455 if (msg->iov_count > 1) {
456 return -FI_ENOSYS;
457 } else if (msg->iov_count) {
458 buf = msg->msg_iov[0].iov_base;
459 len = msg->msg_iov[0].iov_len;
460 } else {
461 buf = NULL;
462 len = 0;
463 }
464
465 return _psmx_recv2(ep, buf, len,
466 msg->desc, msg->addr, msg->context, flags);
467 }
468
psmx_recvv2(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t src_addr,void * context)469 static ssize_t psmx_recvv2(struct fid_ep *ep, const struct iovec *iov,
470 void **desc, size_t count, fi_addr_t src_addr,
471 void *context)
472 {
473 void *buf;
474 size_t len;
475
476 if (count && !iov)
477 return -FI_EINVAL;
478
479 if (count > 1) {
480 return -FI_ENOSYS;
481 } else if (count) {
482 buf = iov[0].iov_base;
483 len = iov[0].iov_len;
484 } else {
485 buf = NULL;
486 len = 0;
487 }
488
489 return psmx_recv2(ep, buf, len, desc ? desc[0] : NULL, src_addr, context);
490 }
491
_psmx_send2(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context,uint64_t flags)492 static ssize_t _psmx_send2(struct fid_ep *ep, const void *buf, size_t len,
493 void *desc, fi_addr_t dest_addr,
494 void *context, uint64_t flags)
495 {
496 struct psmx_fid_ep *ep_priv;
497 struct psmx_fid_av *av;
498 struct psmx_am_request *req;
499 psm_amarg_t args[8];
500 int am_flags = PSM_AM_FLAG_ASYNC;
501 int err;
502 int chunk_size, msg_size;
503 size_t idx;
504
505 ep_priv = container_of(ep, struct psmx_fid_ep, ep);
506
507 if (!buf)
508 return -FI_EINVAL;
509
510 av = ep_priv->av;
511 if (av && av->type == FI_AV_TABLE) {
512 idx = dest_addr;
513 if (idx >= av->last)
514 return -FI_EINVAL;
515
516 dest_addr = (fi_addr_t) av->psm_epaddrs[idx];
517 } else if (!dest_addr) {
518 return -FI_EINVAL;
519 }
520
521 chunk_size = MIN(PSMX_AM_CHUNK_SIZE, psmx_am_param.max_request_short);
522 msg_size = MIN(len, chunk_size);
523
524 req = calloc(1, sizeof(*req));
525 if (!req)
526 return -FI_ENOMEM;
527
528 req->op = PSMX_AM_REQ_SEND;
529 req->send.buf = (void *)buf;
530 req->send.len = len;
531 req->send.context = context;
532 req->send.len_sent = msg_size;
533 req->send.dest_addr = (void *)dest_addr;
534 req->ep = ep_priv;
535 req->cq_flags = FI_SEND | FI_MSG;
536
537 if ((flags & PSMX_NO_COMPLETION) ||
538 (ep_priv->send_selective_completion && !(flags & FI_COMPLETION)))
539 req->no_event = 1;
540
541 args[0].u32w0 = PSMX_AM_REQ_SEND | (msg_size == len ? PSMX_AM_EOM : 0);
542 args[0].u32w1 = msg_size;
543 args[1].u64 = (uint64_t)(uintptr_t)req;
544 args[2].u64 = 0;
545 args[3].u64 = 0;
546
547 err = psm_am_request_short((psm_epaddr_t) dest_addr,
548 PSMX_AM_MSG_HANDLER, args, 4,
549 (void *)buf, msg_size, am_flags, NULL, NULL);
550
551 return psmx_errno(err);
552
553 }
554
psmx_send2(struct fid_ep * ep,const void * buf,size_t len,void * desc,fi_addr_t dest_addr,void * context)555 static ssize_t psmx_send2(struct fid_ep *ep, const void *buf,
556 size_t len, void *desc,
557 fi_addr_t dest_addr, void *context)
558 {
559 struct psmx_fid_ep *ep_priv;
560
561 ep_priv = container_of(ep, struct psmx_fid_ep, ep);
562 return _psmx_send2(ep, buf, len, desc, dest_addr, context, ep_priv->tx_flags);
563 }
564
psmx_sendmsg2(struct fid_ep * ep,const struct fi_msg * msg,uint64_t flags)565 static ssize_t psmx_sendmsg2(struct fid_ep *ep, const struct fi_msg *msg,
566 uint64_t flags)
567 {
568 void *buf;
569 size_t len;
570
571 if (!msg || (msg->iov_count && !msg->msg_iov))
572 return -FI_EINVAL;
573
574 if (msg->iov_count > 1) {
575 return -FI_ENOSYS;
576 } else if (msg->iov_count) {
577 buf = msg->msg_iov[0].iov_base;
578 len = msg->msg_iov[0].iov_len;
579 } else {
580 buf = NULL;
581 len = 0;
582 }
583
584 return _psmx_send2(ep, buf, len,
585 msg->desc, msg->addr, msg->context, flags);
586 }
587
psmx_sendv2(struct fid_ep * ep,const struct iovec * iov,void ** desc,size_t count,fi_addr_t dest_addr,void * context)588 static ssize_t psmx_sendv2(struct fid_ep *ep, const struct iovec *iov,
589 void **desc, size_t count, fi_addr_t dest_addr,
590 void *context)
591 {
592 void *buf;
593 size_t len;
594
595 if (count && !iov)
596 return -FI_EINVAL;
597
598 if (count > 1) {
599 return -FI_ENOSYS;
600 } else if (count) {
601 buf = iov[0].iov_base;
602 len = iov[0].iov_len;
603 } else {
604 buf = NULL;
605 len = 0;
606 }
607
608 return psmx_send2(ep, buf, len, desc ? desc[0] : NULL, dest_addr, context);
609 }
610
psmx_inject2(struct fid_ep * ep,const void * buf,size_t len,fi_addr_t dest_addr)611 static ssize_t psmx_inject2(struct fid_ep *ep, const void *buf, size_t len,
612 fi_addr_t dest_addr)
613 {
614 struct psmx_fid_ep *ep_priv;
615
616 ep_priv = container_of(ep, struct psmx_fid_ep, ep);
617
618 /* TODO: optimize it & guarantee buffered */
619 return _psmx_send2(ep, buf, len, NULL, dest_addr, NULL,
620 ep_priv->tx_flags | FI_INJECT | PSMX_NO_COMPLETION);
621 }
622
623 struct fi_ops_msg psmx_msg2_ops = {
624 .size = sizeof(struct fi_ops_msg),
625 .recv = psmx_recv2,
626 .recvv = psmx_recvv2,
627 .recvmsg = psmx_recvmsg2,
628 .send = psmx_send2,
629 .sendv = psmx_sendv2,
630 .sendmsg = psmx_sendmsg2,
631 .inject = psmx_inject2,
632 .senddata = fi_no_msg_senddata,
633 .injectdata = fi_no_msg_injectdata,
634 };
635
636