1 /*
2  * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3  * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * OpenIB.org BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 #define _GNU_SOURCE
34 #include <infiniband/endian.h>
35 #include <getopt.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <stdio.h>
39 #include <errno.h>
40 #include <sys/types.h>
41 #include <sys/socket.h>
42 #include <netdb.h>
43 #include <semaphore.h>
44 #include <pthread.h>
45 #include <inttypes.h>
46 #include <libgen.h>
47 #include <rdma/rdma_cma.h>
48 
49 static int debug = 0;
50 #define DEBUG_LOG if (debug) printf
51 
52 /*
53  * rping "ping/pong" loop:
54  * 	client sends source rkey/addr/len
55  *	server receives source rkey/add/len
56  *	server rdma reads "ping" data from source
57  * 	server sends "go ahead" on rdma read completion
58  *	client sends sink rkey/addr/len
59  * 	server receives sink rkey/addr/len
60  * 	server rdma writes "pong" data to sink
61  * 	server sends "go ahead" on rdma write completion
62  * 	<repeat loop>
63  */
64 
65 /*
66  * These states are used to signal events between the completion handler
67  * and the main client or server thread.
68  *
69  * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
70  * and RDMA_WRITE_COMPLETE for each ping.
71  */
72 enum test_state {
73 	IDLE = 1,
74 	CONNECT_REQUEST,
75 	ADDR_RESOLVED,
76 	ROUTE_RESOLVED,
77 	CONNECTED,
78 	RDMA_READ_ADV,
79 	RDMA_READ_COMPLETE,
80 	RDMA_WRITE_ADV,
81 	RDMA_WRITE_COMPLETE,
82 	DISCONNECTED,
83 	ERROR
84 };
85 
86 struct rping_rdma_info {
87 	__be64 buf;
88 	__be32 rkey;
89 	__be32 size;
90 };
91 
92 /*
93  * Default max buffer size for IO...
94  */
95 #define RPING_BUFSIZE 64*1024
96 #define RPING_SQ_DEPTH 16
97 
98 /* Default string for print data and
99  * minimum buffer size
100  */
101 #define _stringify( _x ) # _x
102 #define stringify( _x ) _stringify( _x )
103 
104 #define RPING_MSG_FMT           "rdma-ping-%d: "
105 #define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
106 
107 /*
108  * Control block struct.
109  */
110 struct rping_cb {
111 	int server;			/* 0 iff client */
112 	pthread_t cqthread;
113 	pthread_t persistent_server_thread;
114 	struct ibv_comp_channel *channel;
115 	struct ibv_cq *cq;
116 	struct ibv_pd *pd;
117 	struct ibv_qp *qp;
118 
119 	struct ibv_recv_wr rq_wr;	/* recv work request record */
120 	struct ibv_sge recv_sgl;	/* recv single SGE */
121 	struct rping_rdma_info recv_buf;/* malloc'd buffer */
122 	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
123 
124 	struct ibv_send_wr sq_wr;	/* send work request record */
125 	struct ibv_sge send_sgl;
126 	struct rping_rdma_info send_buf;/* single send buf */
127 	struct ibv_mr *send_mr;
128 
129 	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
130 	struct ibv_sge rdma_sgl;	/* rdma single SGE */
131 	char *rdma_buf;			/* used as rdma sink */
132 	struct ibv_mr *rdma_mr;
133 
134 	uint32_t remote_rkey;		/* remote guys RKEY */
135 	uint64_t remote_addr;		/* remote guys TO */
136 	uint32_t remote_len;		/* remote guys LEN */
137 
138 	char *start_buf;		/* rdma read src */
139 	struct ibv_mr *start_mr;
140 
141 	enum test_state state;		/* used for cond/signalling */
142 	sem_t sem;
143 
144 	struct sockaddr_storage sin;
145 	struct sockaddr_storage ssource;
146 	__be16 port;			/* dst port in NBO */
147 	int verbose;			/* verbose logging */
148 	int count;			/* ping count */
149 	int size;			/* ping data size */
150 	int validate;			/* validate ping data */
151 
152 	/* CM stuff */
153 	pthread_t cmthread;
154 	struct rdma_event_channel *cm_channel;
155 	struct rdma_cm_id *cm_id;	/* connection on client side,*/
156 					/* listener on service side. */
157 	struct rdma_cm_id *child_cm_id;	/* connection on server side */
158 };
159 
160 static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
161 				    struct rdma_cm_event *event)
162 {
163 	int ret = 0;
164 	struct rping_cb *cb = cma_id->context;
165 
166 	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
167 		  rdma_event_str(event->event), cma_id,
168 		  (cma_id == cb->cm_id) ? "parent" : "child");
169 
170 	switch (event->event) {
171 	case RDMA_CM_EVENT_ADDR_RESOLVED:
172 		cb->state = ADDR_RESOLVED;
173 		ret = rdma_resolve_route(cma_id, 2000);
174 		if (ret) {
175 			cb->state = ERROR;
176 			perror("rdma_resolve_route");
177 			sem_post(&cb->sem);
178 		}
179 		break;
180 
181 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
182 		cb->state = ROUTE_RESOLVED;
183 		sem_post(&cb->sem);
184 		break;
185 
186 	case RDMA_CM_EVENT_CONNECT_REQUEST:
187 		cb->state = CONNECT_REQUEST;
188 		cb->child_cm_id = cma_id;
189 		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
190 		sem_post(&cb->sem);
191 		break;
192 
193 	case RDMA_CM_EVENT_ESTABLISHED:
194 		DEBUG_LOG("ESTABLISHED\n");
195 
196 		/*
197 		 * Server will wake up when first RECV completes.
198 		 */
199 		if (!cb->server) {
200 			cb->state = CONNECTED;
201 		}
202 		sem_post(&cb->sem);
203 		break;
204 
205 	case RDMA_CM_EVENT_ADDR_ERROR:
206 	case RDMA_CM_EVENT_ROUTE_ERROR:
207 	case RDMA_CM_EVENT_CONNECT_ERROR:
208 	case RDMA_CM_EVENT_UNREACHABLE:
209 	case RDMA_CM_EVENT_REJECTED:
210 		fprintf(stderr, "cma event %s, error %d\n",
211 			rdma_event_str(event->event), event->status);
212 		sem_post(&cb->sem);
213 		ret = -1;
214 		break;
215 
216 	case RDMA_CM_EVENT_DISCONNECTED:
217 		fprintf(stderr, "%s DISCONNECT EVENT...\n",
218 			cb->server ? "server" : "client");
219 		cb->state = DISCONNECTED;
220 		sem_post(&cb->sem);
221 		break;
222 
223 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
224 		fprintf(stderr, "cma detected device removal!!!!\n");
225 		cb->state = ERROR;
226 		sem_post(&cb->sem);
227 		ret = -1;
228 		break;
229 
230 	default:
231 		fprintf(stderr, "unhandled event: %s, ignoring\n",
232 			rdma_event_str(event->event));
233 		break;
234 	}
235 
236 	return ret;
237 }
238 
239 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
240 {
241 	if (wc->byte_len != sizeof(cb->recv_buf)) {
242 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
243 		return -1;
244 	}
245 
246 	cb->remote_rkey = be32toh(cb->recv_buf.rkey);
247 	cb->remote_addr = be64toh(cb->recv_buf.buf);
248 	cb->remote_len  = be32toh(cb->recv_buf.size);
249 	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
250 		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
251 
252 	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
253 		cb->state = RDMA_READ_ADV;
254 	else
255 		cb->state = RDMA_WRITE_ADV;
256 
257 	return 0;
258 }
259 
260 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
261 {
262 	if (wc->byte_len != sizeof(cb->recv_buf)) {
263 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
264 		return -1;
265 	}
266 
267 	if (cb->state == RDMA_READ_ADV)
268 		cb->state = RDMA_WRITE_ADV;
269 	else
270 		cb->state = RDMA_WRITE_COMPLETE;
271 
272 	return 0;
273 }
274 
275 static int rping_cq_event_handler(struct rping_cb *cb)
276 {
277 	struct ibv_wc wc;
278 	struct ibv_recv_wr *bad_wr;
279 	int ret;
280 	int flushed = 0;
281 
282 	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
283 		ret = 0;
284 
285 		if (wc.status) {
286 			if (wc.status == IBV_WC_WR_FLUSH_ERR) {
287 				flushed = 1;
288 				continue;
289 
290 			}
291 			fprintf(stderr,
292 				"cq completion failed status %d\n",
293 				wc.status);
294 			ret = -1;
295 			goto error;
296 		}
297 
298 		switch (wc.opcode) {
299 		case IBV_WC_SEND:
300 			DEBUG_LOG("send completion\n");
301 			break;
302 
303 		case IBV_WC_RDMA_WRITE:
304 			DEBUG_LOG("rdma write completion\n");
305 			cb->state = RDMA_WRITE_COMPLETE;
306 			sem_post(&cb->sem);
307 			break;
308 
309 		case IBV_WC_RDMA_READ:
310 			DEBUG_LOG("rdma read completion\n");
311 			cb->state = RDMA_READ_COMPLETE;
312 			sem_post(&cb->sem);
313 			break;
314 
315 		case IBV_WC_RECV:
316 			DEBUG_LOG("recv completion\n");
317 			ret = cb->server ? server_recv(cb, &wc) :
318 					   client_recv(cb, &wc);
319 			if (ret) {
320 				fprintf(stderr, "recv wc error: %d\n", ret);
321 				goto error;
322 			}
323 
324 			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
325 			if (ret) {
326 				fprintf(stderr, "post recv error: %d\n", ret);
327 				goto error;
328 			}
329 			sem_post(&cb->sem);
330 			break;
331 
332 		default:
333 			DEBUG_LOG("unknown!!!!! completion\n");
334 			ret = -1;
335 			goto error;
336 		}
337 	}
338 	if (ret) {
339 		fprintf(stderr, "poll error %d\n", ret);
340 		goto error;
341 	}
342 	return flushed;
343 
344 error:
345 	cb->state = ERROR;
346 	sem_post(&cb->sem);
347 	return ret;
348 }
349 
350 static int rping_accept(struct rping_cb *cb)
351 {
352 	int ret;
353 
354 	DEBUG_LOG("accepting client connection request\n");
355 
356 	ret = rdma_accept(cb->child_cm_id, NULL);
357 	if (ret) {
358 		perror("rdma_accept");
359 		return ret;
360 	}
361 
362 	sem_wait(&cb->sem);
363 	if (cb->state == ERROR) {
364 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
365 		return -1;
366 	}
367 	return 0;
368 }
369 
370 static void rping_setup_wr(struct rping_cb *cb)
371 {
372 	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
373 	cb->recv_sgl.length = sizeof cb->recv_buf;
374 	cb->recv_sgl.lkey = cb->recv_mr->lkey;
375 	cb->rq_wr.sg_list = &cb->recv_sgl;
376 	cb->rq_wr.num_sge = 1;
377 
378 	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
379 	cb->send_sgl.length = sizeof cb->send_buf;
380 	cb->send_sgl.lkey = cb->send_mr->lkey;
381 
382 	cb->sq_wr.opcode = IBV_WR_SEND;
383 	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
384 	cb->sq_wr.sg_list = &cb->send_sgl;
385 	cb->sq_wr.num_sge = 1;
386 
387 	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
388 	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
389 	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
390 	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
391 	cb->rdma_sq_wr.num_sge = 1;
392 }
393 
394 static int rping_setup_buffers(struct rping_cb *cb)
395 {
396 	int ret;
397 
398 	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
399 
400 	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
401 				 IBV_ACCESS_LOCAL_WRITE);
402 	if (!cb->recv_mr) {
403 		fprintf(stderr, "recv_buf reg_mr failed\n");
404 		return errno;
405 	}
406 
407 	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
408 	if (!cb->send_mr) {
409 		fprintf(stderr, "send_buf reg_mr failed\n");
410 		ret = errno;
411 		goto err1;
412 	}
413 
414 	cb->rdma_buf = malloc(cb->size);
415 	if (!cb->rdma_buf) {
416 		fprintf(stderr, "rdma_buf malloc failed\n");
417 		ret = -ENOMEM;
418 		goto err2;
419 	}
420 
421 	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
422 				 IBV_ACCESS_LOCAL_WRITE |
423 				 IBV_ACCESS_REMOTE_READ |
424 				 IBV_ACCESS_REMOTE_WRITE);
425 	if (!cb->rdma_mr) {
426 		fprintf(stderr, "rdma_buf reg_mr failed\n");
427 		ret = errno;
428 		goto err3;
429 	}
430 
431 	if (!cb->server) {
432 		cb->start_buf = malloc(cb->size);
433 		if (!cb->start_buf) {
434 			fprintf(stderr, "start_buf malloc failed\n");
435 			ret = -ENOMEM;
436 			goto err4;
437 		}
438 
439 		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
440 					  IBV_ACCESS_LOCAL_WRITE |
441 					  IBV_ACCESS_REMOTE_READ |
442 					  IBV_ACCESS_REMOTE_WRITE);
443 		if (!cb->start_mr) {
444 			fprintf(stderr, "start_buf reg_mr failed\n");
445 			ret = errno;
446 			goto err5;
447 		}
448 	}
449 
450 	rping_setup_wr(cb);
451 	DEBUG_LOG("allocated & registered buffers...\n");
452 	return 0;
453 
454 err5:
455 	free(cb->start_buf);
456 err4:
457 	ibv_dereg_mr(cb->rdma_mr);
458 err3:
459 	free(cb->rdma_buf);
460 err2:
461 	ibv_dereg_mr(cb->send_mr);
462 err1:
463 	ibv_dereg_mr(cb->recv_mr);
464 	return ret;
465 }
466 
467 static void rping_free_buffers(struct rping_cb *cb)
468 {
469 	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
470 	ibv_dereg_mr(cb->recv_mr);
471 	ibv_dereg_mr(cb->send_mr);
472 	ibv_dereg_mr(cb->rdma_mr);
473 	free(cb->rdma_buf);
474 	if (!cb->server) {
475 		ibv_dereg_mr(cb->start_mr);
476 		free(cb->start_buf);
477 	}
478 }
479 
480 static int rping_create_qp(struct rping_cb *cb)
481 {
482 	struct ibv_qp_init_attr init_attr;
483 	int ret;
484 
485 	memset(&init_attr, 0, sizeof(init_attr));
486 	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
487 	init_attr.cap.max_recv_wr = 2;
488 	init_attr.cap.max_recv_sge = 1;
489 	init_attr.cap.max_send_sge = 1;
490 	init_attr.qp_type = IBV_QPT_RC;
491 	init_attr.send_cq = cb->cq;
492 	init_attr.recv_cq = cb->cq;
493 
494 	if (cb->server) {
495 		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
496 		if (!ret)
497 			cb->qp = cb->child_cm_id->qp;
498 	} else {
499 		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
500 		if (!ret)
501 			cb->qp = cb->cm_id->qp;
502 	}
503 
504 	return ret;
505 }
506 
507 static void rping_free_qp(struct rping_cb *cb)
508 {
509 	ibv_destroy_qp(cb->qp);
510 	ibv_destroy_cq(cb->cq);
511 	ibv_destroy_comp_channel(cb->channel);
512 	ibv_dealloc_pd(cb->pd);
513 }
514 
515 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
516 {
517 	int ret;
518 
519 	cb->pd = ibv_alloc_pd(cm_id->verbs);
520 	if (!cb->pd) {
521 		fprintf(stderr, "ibv_alloc_pd failed\n");
522 		return errno;
523 	}
524 	DEBUG_LOG("created pd %p\n", cb->pd);
525 
526 	cb->channel = ibv_create_comp_channel(cm_id->verbs);
527 	if (!cb->channel) {
528 		fprintf(stderr, "ibv_create_comp_channel failed\n");
529 		ret = errno;
530 		goto err1;
531 	}
532 	DEBUG_LOG("created channel %p\n", cb->channel);
533 
534 	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
535 				cb->channel, 0);
536 	if (!cb->cq) {
537 		fprintf(stderr, "ibv_create_cq failed\n");
538 		ret = errno;
539 		goto err2;
540 	}
541 	DEBUG_LOG("created cq %p\n", cb->cq);
542 
543 	ret = ibv_req_notify_cq(cb->cq, 0);
544 	if (ret) {
545 		fprintf(stderr, "ibv_create_cq failed\n");
546 		ret = errno;
547 		goto err3;
548 	}
549 
550 	ret = rping_create_qp(cb);
551 	if (ret) {
552 		perror("rdma_create_qp");
553 		goto err3;
554 	}
555 	DEBUG_LOG("created qp %p\n", cb->qp);
556 	return 0;
557 
558 err3:
559 	ibv_destroy_cq(cb->cq);
560 err2:
561 	ibv_destroy_comp_channel(cb->channel);
562 err1:
563 	ibv_dealloc_pd(cb->pd);
564 	return ret;
565 }
566 
567 static void *cm_thread(void *arg)
568 {
569 	struct rping_cb *cb = arg;
570 	struct rdma_cm_event *event;
571 	int ret;
572 
573 	while (1) {
574 		ret = rdma_get_cm_event(cb->cm_channel, &event);
575 		if (ret) {
576 			perror("rdma_get_cm_event");
577 			exit(ret);
578 		}
579 		ret = rping_cma_event_handler(event->id, event);
580 		rdma_ack_cm_event(event);
581 		if (ret)
582 			exit(ret);
583 	}
584 }
585 
586 static void *cq_thread(void *arg)
587 {
588 	struct rping_cb *cb = arg;
589 	struct ibv_cq *ev_cq;
590 	void *ev_ctx;
591 	int ret;
592 
593 	DEBUG_LOG("cq_thread started.\n");
594 
595 	while (1) {
596 		pthread_testcancel();
597 
598 		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
599 		if (ret) {
600 			fprintf(stderr, "Failed to get cq event!\n");
601 			pthread_exit(NULL);
602 		}
603 		if (ev_cq != cb->cq) {
604 			fprintf(stderr, "Unknown CQ!\n");
605 			pthread_exit(NULL);
606 		}
607 		ret = ibv_req_notify_cq(cb->cq, 0);
608 		if (ret) {
609 			fprintf(stderr, "Failed to set notify!\n");
610 			pthread_exit(NULL);
611 		}
612 		ret = rping_cq_event_handler(cb);
613 		ibv_ack_cq_events(cb->cq, 1);
614 		if (ret)
615 			pthread_exit(NULL);
616 	}
617 }
618 
619 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
620 {
621 	struct rping_rdma_info *info = &cb->send_buf;
622 
623 	info->buf = htobe64((uint64_t) (unsigned long) buf);
624 	info->rkey = htobe32(mr->rkey);
625 	info->size = htobe32(cb->size);
626 
627 	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
628 		  be64toh(info->buf), be32toh(info->rkey), be32toh(info->size));
629 }
630 
631 static int rping_test_server(struct rping_cb *cb)
632 {
633 	struct ibv_send_wr *bad_wr;
634 	int ret;
635 
636 	while (1) {
637 		/* Wait for client's Start STAG/TO/Len */
638 		sem_wait(&cb->sem);
639 		if (cb->state != RDMA_READ_ADV) {
640 			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
641 				cb->state);
642 			ret = -1;
643 			break;
644 		}
645 
646 		DEBUG_LOG("server received sink adv\n");
647 
648 		/* Issue RDMA Read. */
649 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
650 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
651 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
652 		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
653 
654 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
655 		if (ret) {
656 			fprintf(stderr, "post send error %d\n", ret);
657 			break;
658 		}
659 		DEBUG_LOG("server posted rdma read req \n");
660 
661 		/* Wait for read completion */
662 		sem_wait(&cb->sem);
663 		if (cb->state != RDMA_READ_COMPLETE) {
664 			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
665 				cb->state);
666 			ret = -1;
667 			break;
668 		}
669 		DEBUG_LOG("server received read complete\n");
670 
671 		/* Display data in recv buf */
672 		if (cb->verbose)
673 			printf("server ping data: %s\n", cb->rdma_buf);
674 
675 		/* Tell client to continue */
676 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
677 		if (ret) {
678 			fprintf(stderr, "post send error %d\n", ret);
679 			break;
680 		}
681 		DEBUG_LOG("server posted go ahead\n");
682 
683 		/* Wait for client's RDMA STAG/TO/Len */
684 		sem_wait(&cb->sem);
685 		if (cb->state != RDMA_WRITE_ADV) {
686 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
687 				cb->state);
688 			ret = -1;
689 			break;
690 		}
691 		DEBUG_LOG("server received sink adv\n");
692 
693 		/* RDMA Write echo data */
694 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
695 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
696 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
697 		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
698 		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
699 			  cb->rdma_sq_wr.sg_list->lkey,
700 			  cb->rdma_sq_wr.sg_list->addr,
701 			  cb->rdma_sq_wr.sg_list->length);
702 
703 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
704 		if (ret) {
705 			fprintf(stderr, "post send error %d\n", ret);
706 			break;
707 		}
708 
709 		/* Wait for completion */
710 		ret = sem_wait(&cb->sem);
711 		if (cb->state != RDMA_WRITE_COMPLETE) {
712 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
713 				cb->state);
714 			ret = -1;
715 			break;
716 		}
717 		DEBUG_LOG("server rdma write complete \n");
718 
719 		/* Tell client to begin again */
720 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
721 		if (ret) {
722 			fprintf(stderr, "post send error %d\n", ret);
723 			break;
724 		}
725 		DEBUG_LOG("server posted go ahead\n");
726 	}
727 
728 	return (cb->state == DISCONNECTED) ? 0 : ret;
729 }
730 
731 static int rping_bind_server(struct rping_cb *cb)
732 {
733 	int ret;
734 
735 	if (cb->sin.ss_family == AF_INET)
736 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
737 	else
738 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
739 
740 	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
741 	if (ret) {
742 		perror("rdma_bind_addr");
743 		return ret;
744 	}
745 	DEBUG_LOG("rdma_bind_addr successful\n");
746 
747 	DEBUG_LOG("rdma_listen\n");
748 	ret = rdma_listen(cb->cm_id, 3);
749 	if (ret) {
750 		perror("rdma_listen");
751 		return ret;
752 	}
753 
754 	return 0;
755 }
756 
757 static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
758 {
759 	struct rping_cb *cb = malloc(sizeof *cb);
760 	if (!cb)
761 		return NULL;
762 	memset(cb, 0, sizeof *cb);
763 	*cb = *listening_cb;
764 	cb->child_cm_id->context = cb;
765 	return cb;
766 }
767 
768 static void free_cb(struct rping_cb *cb)
769 {
770 	free(cb);
771 }
772 
773 static void *rping_persistent_server_thread(void *arg)
774 {
775 	struct rping_cb *cb = arg;
776 	struct ibv_recv_wr *bad_wr;
777 	int ret;
778 
779 	ret = rping_setup_qp(cb, cb->child_cm_id);
780 	if (ret) {
781 		fprintf(stderr, "setup_qp failed: %d\n", ret);
782 		goto err0;
783 	}
784 
785 	ret = rping_setup_buffers(cb);
786 	if (ret) {
787 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
788 		goto err1;
789 	}
790 
791 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
792 	if (ret) {
793 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
794 		goto err2;
795 	}
796 
797 	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
798 	if (ret) {
799 		perror("pthread_create");
800 		goto err2;
801 	}
802 
803 	ret = rping_accept(cb);
804 	if (ret) {
805 		fprintf(stderr, "connect error %d\n", ret);
806 		goto err3;
807 	}
808 
809 	rping_test_server(cb);
810 	rdma_disconnect(cb->child_cm_id);
811 	pthread_join(cb->cqthread, NULL);
812 	rping_free_buffers(cb);
813 	rping_free_qp(cb);
814 	rdma_destroy_id(cb->child_cm_id);
815 	free_cb(cb);
816 	return NULL;
817 err3:
818 	pthread_cancel(cb->cqthread);
819 	pthread_join(cb->cqthread, NULL);
820 err2:
821 	rping_free_buffers(cb);
822 err1:
823 	rping_free_qp(cb);
824 err0:
825 	free_cb(cb);
826 	return NULL;
827 }
828 
829 static int rping_run_persistent_server(struct rping_cb *listening_cb)
830 {
831 	int ret;
832 	struct rping_cb *cb;
833 	pthread_attr_t attr;
834 
835 	ret = rping_bind_server(listening_cb);
836 	if (ret)
837 		return ret;
838 
839 	/*
840 	 * Set persistent server threads to DEATCHED state so
841 	 * they release all their resources when they exit.
842 	 */
843 	ret = pthread_attr_init(&attr);
844 	if (ret) {
845 		perror("pthread_attr_init");
846 		return ret;
847 	}
848 	ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
849 	if (ret) {
850 		perror("pthread_attr_setdetachstate");
851 		return ret;
852 	}
853 
854 	while (1) {
855 		sem_wait(&listening_cb->sem);
856 		if (listening_cb->state != CONNECT_REQUEST) {
857 			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
858 				listening_cb->state);
859 			return -1;
860 		}
861 
862 		cb = clone_cb(listening_cb);
863 		if (!cb)
864 			return -1;
865 
866 		ret = pthread_create(&cb->persistent_server_thread, &attr, rping_persistent_server_thread, cb);
867 		if (ret) {
868 			perror("pthread_create");
869 			return ret;
870 		}
871 	}
872 	return 0;
873 }
874 
875 static int rping_run_server(struct rping_cb *cb)
876 {
877 	struct ibv_recv_wr *bad_wr;
878 	int ret;
879 
880 	ret = rping_bind_server(cb);
881 	if (ret)
882 		return ret;
883 
884 	sem_wait(&cb->sem);
885 	if (cb->state != CONNECT_REQUEST) {
886 		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
887 			cb->state);
888 		return -1;
889 	}
890 
891 	ret = rping_setup_qp(cb, cb->child_cm_id);
892 	if (ret) {
893 		fprintf(stderr, "setup_qp failed: %d\n", ret);
894 		return ret;
895 	}
896 
897 	ret = rping_setup_buffers(cb);
898 	if (ret) {
899 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
900 		goto err1;
901 	}
902 
903 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
904 	if (ret) {
905 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
906 		goto err2;
907 	}
908 
909 	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
910 	if (ret) {
911 		perror("pthread_create");
912 		goto err2;
913 	}
914 
915 	ret = rping_accept(cb);
916 	if (ret) {
917 		fprintf(stderr, "connect error %d\n", ret);
918 		goto err2;
919 	}
920 
921 	ret = rping_test_server(cb);
922 	if (ret) {
923 		fprintf(stderr, "rping server failed: %d\n", ret);
924 		goto err3;
925 	}
926 
927 	ret = 0;
928 err3:
929 	rdma_disconnect(cb->child_cm_id);
930 	pthread_join(cb->cqthread, NULL);
931 	rdma_destroy_id(cb->child_cm_id);
932 err2:
933 	rping_free_buffers(cb);
934 err1:
935 	rping_free_qp(cb);
936 
937 	return ret;
938 }
939 
940 static int rping_test_client(struct rping_cb *cb)
941 {
942 	int ping, start, cc, i, ret = 0;
943 	struct ibv_send_wr *bad_wr;
944 	unsigned char c;
945 
946 	start = 65;
947 	for (ping = 0; !cb->count || ping < cb->count; ping++) {
948 		cb->state = RDMA_READ_ADV;
949 
950 		/* Put some ascii text in the buffer. */
951 		cc = snprintf(cb->start_buf, cb->size, RPING_MSG_FMT, ping);
952 		for (i = cc, c = start; i < cb->size; i++) {
953 			cb->start_buf[i] = c;
954 			c++;
955 			if (c > 122)
956 				c = 65;
957 		}
958 		start++;
959 		if (start > 122)
960 			start = 65;
961 		cb->start_buf[cb->size - 1] = 0;
962 
963 		rping_format_send(cb, cb->start_buf, cb->start_mr);
964 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
965 		if (ret) {
966 			fprintf(stderr, "post send error %d\n", ret);
967 			break;
968 		}
969 
970 		/* Wait for server to ACK */
971 		sem_wait(&cb->sem);
972 		if (cb->state != RDMA_WRITE_ADV) {
973 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
974 				cb->state);
975 			ret = -1;
976 			break;
977 		}
978 
979 		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
980 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
981 		if (ret) {
982 			fprintf(stderr, "post send error %d\n", ret);
983 			break;
984 		}
985 
986 		/* Wait for the server to say the RDMA Write is complete. */
987 		sem_wait(&cb->sem);
988 		if (cb->state != RDMA_WRITE_COMPLETE) {
989 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
990 				cb->state);
991 			ret = -1;
992 			break;
993 		}
994 
995 		if (cb->validate)
996 			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
997 				fprintf(stderr, "data mismatch!\n");
998 				ret = -1;
999 				break;
1000 			}
1001 
1002 		if (cb->verbose)
1003 			printf("ping data: %s\n", cb->rdma_buf);
1004 	}
1005 
1006 	return (cb->state == DISCONNECTED) ? 0 : ret;
1007 }
1008 
1009 static int rping_connect_client(struct rping_cb *cb)
1010 {
1011 	struct rdma_conn_param conn_param;
1012 	int ret;
1013 
1014 	memset(&conn_param, 0, sizeof conn_param);
1015 	conn_param.responder_resources = 1;
1016 	conn_param.initiator_depth = 1;
1017 	conn_param.retry_count = 7;
1018 
1019 	ret = rdma_connect(cb->cm_id, &conn_param);
1020 	if (ret) {
1021 		perror("rdma_connect");
1022 		return ret;
1023 	}
1024 
1025 	sem_wait(&cb->sem);
1026 	if (cb->state != CONNECTED) {
1027 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
1028 		return -1;
1029 	}
1030 
1031 	DEBUG_LOG("rmda_connect successful\n");
1032 	return 0;
1033 }
1034 
1035 static int rping_bind_client(struct rping_cb *cb)
1036 {
1037 	int ret;
1038 
1039 	if (cb->sin.ss_family == AF_INET)
1040 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1041 	else
1042 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1043 
1044 	if (cb->ssource.ss_family)
1045 		ret = rdma_resolve_addr(cb->cm_id, (struct sockaddr *) &cb->ssource,
1046 					(struct sockaddr *) &cb->sin, 2000);
1047 	else
1048 		ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1049 
1050 	if (ret) {
1051 		perror("rdma_resolve_addr");
1052 		return ret;
1053 	}
1054 
1055 	sem_wait(&cb->sem);
1056 	if (cb->state != ROUTE_RESOLVED) {
1057 		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1058 			cb->state);
1059 		return -1;
1060 	}
1061 
1062 	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1063 	return 0;
1064 }
1065 
1066 static int rping_run_client(struct rping_cb *cb)
1067 {
1068 	struct ibv_recv_wr *bad_wr;
1069 	int ret;
1070 
1071 	ret = rping_bind_client(cb);
1072 	if (ret)
1073 		return ret;
1074 
1075 	ret = rping_setup_qp(cb, cb->cm_id);
1076 	if (ret) {
1077 		fprintf(stderr, "setup_qp failed: %d\n", ret);
1078 		return ret;
1079 	}
1080 
1081 	ret = rping_setup_buffers(cb);
1082 	if (ret) {
1083 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1084 		goto err1;
1085 	}
1086 
1087 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1088 	if (ret) {
1089 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1090 		goto err2;
1091 	}
1092 
1093 	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1094 	if (ret) {
1095 		perror("pthread_create");
1096 		goto err2;
1097 	}
1098 
1099 	ret = rping_connect_client(cb);
1100 	if (ret) {
1101 		fprintf(stderr, "connect error %d\n", ret);
1102 		goto err3;
1103 	}
1104 
1105 	ret = rping_test_client(cb);
1106 	if (ret) {
1107 		fprintf(stderr, "rping client failed: %d\n", ret);
1108 		goto err4;
1109 	}
1110 
1111 	ret = 0;
1112 err4:
1113 	rdma_disconnect(cb->cm_id);
1114 err3:
1115 	pthread_join(cb->cqthread, NULL);
1116 err2:
1117 	rping_free_buffers(cb);
1118 err1:
1119 	rping_free_qp(cb);
1120 
1121 	return ret;
1122 }
1123 
1124 static int get_addr(char *dst, struct sockaddr *addr)
1125 {
1126 	struct addrinfo *res;
1127 	int ret;
1128 
1129 	ret = getaddrinfo(dst, NULL, NULL, &res);
1130 	if (ret) {
1131 		printf("getaddrinfo failed (%s) - invalid hostname or IP address\n", gai_strerror(ret));
1132 		return ret;
1133 	}
1134 
1135 	if (res->ai_family == PF_INET)
1136 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1137 	else if (res->ai_family == PF_INET6)
1138 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1139 	else
1140 		ret = -1;
1141 
1142 	freeaddrinfo(res);
1143 	return ret;
1144 }
1145 
1146 static void usage(const char *name)
1147 {
1148 	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1149 	       basename(name));
1150 	printf("%s -c [-vVd] [-S size] [-C count] [-I addr] -a addr [-p port]\n",
1151 	       basename(name));
1152 	printf("\t-c\t\tclient side\n");
1153 	printf("\t-I\t\tSource address to bind to for client.\n");
1154 	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1155 	printf("\t-v\t\tdisplay ping data to stdout\n");
1156 	printf("\t-V\t\tvalidate ping data\n");
1157 	printf("\t-d\t\tdebug printfs\n");
1158 	printf("\t-S size \tping data size\n");
1159 	printf("\t-C count\tping count times\n");
1160 	printf("\t-a addr\t\taddress\n");
1161 	printf("\t-p port\t\tport\n");
1162 	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1163 }
1164 
1165 int main(int argc, char *argv[])
1166 {
1167 	struct rping_cb *cb;
1168 	int op;
1169 	int ret = 0;
1170 	int persistent_server = 0;
1171 
1172 	cb = malloc(sizeof(*cb));
1173 	if (!cb)
1174 		return -ENOMEM;
1175 
1176 	memset(cb, 0, sizeof(*cb));
1177 	cb->server = -1;
1178 	cb->state = IDLE;
1179 	cb->size = 64;
1180 	cb->sin.ss_family = PF_INET;
1181 	cb->port = htobe16(7174);
1182 	sem_init(&cb->sem, 0, 0);
1183 
1184 	opterr = 0;
1185 	while ((op=getopt(argc, argv, "a:I:Pp:C:S:t:scvVd")) != -1) {
1186 		switch (op) {
1187 		case 'a':
1188 			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1189 			break;
1190 		case 'I':
1191 			ret = get_addr(optarg, (struct sockaddr *) &cb->ssource);
1192 			break;
1193 		case 'P':
1194 			persistent_server = 1;
1195 			break;
1196 		case 'p':
1197 			cb->port = htobe16(atoi(optarg));
1198 			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1199 			break;
1200 		case 's':
1201 			cb->server = 1;
1202 			DEBUG_LOG("server\n");
1203 			break;
1204 		case 'c':
1205 			cb->server = 0;
1206 			DEBUG_LOG("client\n");
1207 			break;
1208 		case 'S':
1209 			cb->size = atoi(optarg);
1210 			if ((cb->size < RPING_MIN_BUFSIZE) ||
1211 			    (cb->size > (RPING_BUFSIZE - 1))) {
1212 				fprintf(stderr, "Invalid size %d "
1213 				       "(valid range is %zd to %d)\n",
1214 				       cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
1215 				ret = EINVAL;
1216 			} else
1217 				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1218 			break;
1219 		case 'C':
1220 			cb->count = atoi(optarg);
1221 			if (cb->count < 0) {
1222 				fprintf(stderr, "Invalid count %d\n",
1223 					cb->count);
1224 				ret = EINVAL;
1225 			} else
1226 				DEBUG_LOG("count %d\n", (int) cb->count);
1227 			break;
1228 		case 'v':
1229 			cb->verbose++;
1230 			DEBUG_LOG("verbose\n");
1231 			break;
1232 		case 'V':
1233 			cb->validate++;
1234 			DEBUG_LOG("validate data\n");
1235 			break;
1236 		case 'd':
1237 			debug++;
1238 			break;
1239 		default:
1240 			usage("rping");
1241 			ret = EINVAL;
1242 			goto out;
1243 		}
1244 	}
1245 	if (ret)
1246 		goto out;
1247 
1248 	if (cb->server == -1) {
1249 		usage("rping");
1250 		ret = EINVAL;
1251 		goto out;
1252 	}
1253 
1254 	cb->cm_channel = rdma_create_event_channel();
1255 	if (!cb->cm_channel) {
1256 		perror("rdma_create_event_channel");
1257 		ret = errno;
1258 		goto out;
1259 	}
1260 
1261 	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1262 	if (ret) {
1263 		perror("rdma_create_id");
1264 		goto out2;
1265 	}
1266 	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1267 
1268 	ret = pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1269 	if (ret) {
1270 		perror("pthread_create");
1271 		goto out2;
1272 	}
1273 
1274 	if (cb->server) {
1275 		if (persistent_server)
1276 			ret = rping_run_persistent_server(cb);
1277 		else
1278 			ret = rping_run_server(cb);
1279 	} else {
1280 		ret = rping_run_client(cb);
1281 	}
1282 
1283 	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1284 	rdma_destroy_id(cb->cm_id);
1285 out2:
1286 	rdma_destroy_event_channel(cb->cm_channel);
1287 out:
1288 	free(cb);
1289 	return ret;
1290 }
1291