1 /*
2  * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3  * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * OpenIB.org BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <getopt.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <stdio.h>
38 #include <errno.h>
39 #include <sys/types.h>
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <netdb.h>
43 #include <byteswap.h>
44 #include <semaphore.h>
45 #include <arpa/inet.h>
46 #include <pthread.h>
47 #include <inttypes.h>
48 
49 #include <rdma/rdma_cma.h>
50 #include <infiniband/arch.h>
51 
52 static int debug = 0;
53 #define DEBUG_LOG if (debug) printf
54 
55 /*
56  * rping "ping/pong" loop:
57  * 	client sends source rkey/addr/len
58  *	server receives source rkey/add/len
59  *	server rdma reads "ping" data from source
60  * 	server sends "go ahead" on rdma read completion
61  *	client sends sink rkey/addr/len
62  * 	server receives sink rkey/addr/len
63  * 	server rdma writes "pong" data to sink
64  * 	server sends "go ahead" on rdma write completion
65  * 	<repeat loop>
66  */
67 
68 /*
69  * These states are used to signal events between the completion handler
70  * and the main client or server thread.
71  *
72  * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
73  * and RDMA_WRITE_COMPLETE for each ping.
74  */
75 enum test_state {
76 	IDLE = 1,
77 	CONNECT_REQUEST,
78 	ADDR_RESOLVED,
79 	ROUTE_RESOLVED,
80 	CONNECTED,
81 	RDMA_READ_ADV,
82 	RDMA_READ_COMPLETE,
83 	RDMA_WRITE_ADV,
84 	RDMA_WRITE_COMPLETE,
85 	ERROR
86 };
87 
88 struct rping_rdma_info {
89 	uint64_t buf;
90 	uint32_t rkey;
91 	uint32_t size;
92 };
93 
94 /*
95  * Default max buffer size for IO...
96  */
97 #define RPING_BUFSIZE 64*1024
98 #define RPING_SQ_DEPTH 16
99 
100 /* Default string for print data and
101  * minimum buffer size
102  */
103 #define _stringify( _x ) # _x
104 #define stringify( _x ) _stringify( _x )
105 
106 #define RPING_MSG_FMT           "rdma-ping-%d: "
107 #define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
108 
109 /*
110  * Control block struct.
111  */
112 struct rping_cb {
113 	int server;			/* 0 iff client */
114 	pthread_t cqthread;
115 	pthread_t persistent_server_thread;
116 	struct ibv_comp_channel *channel;
117 	struct ibv_cq *cq;
118 	struct ibv_pd *pd;
119 	struct ibv_qp *qp;
120 
121 	struct ibv_recv_wr rq_wr;	/* recv work request record */
122 	struct ibv_sge recv_sgl;	/* recv single SGE */
123 	struct rping_rdma_info recv_buf;/* malloc'd buffer */
124 	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
125 
126 	struct ibv_send_wr sq_wr;	/* send work request record */
127 	struct ibv_sge send_sgl;
128 	struct rping_rdma_info send_buf;/* single send buf */
129 	struct ibv_mr *send_mr;
130 
131 	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
132 	struct ibv_sge rdma_sgl;	/* rdma single SGE */
133 	char *rdma_buf;			/* used as rdma sink */
134 	struct ibv_mr *rdma_mr;
135 
136 	uint32_t remote_rkey;		/* remote guys RKEY */
137 	uint64_t remote_addr;		/* remote guys TO */
138 	uint32_t remote_len;		/* remote guys LEN */
139 
140 	char *start_buf;		/* rdma read src */
141 	struct ibv_mr *start_mr;
142 
143 	enum test_state state;		/* used for cond/signalling */
144 	sem_t sem;
145 
146 	struct sockaddr_storage sin;
147 	uint16_t port;			/* dst port in NBO */
148 	int verbose;			/* verbose logging */
149 	int count;			/* ping count */
150 	int size;			/* ping data size */
151 	int validate;			/* validate ping data */
152 
153 	/* CM stuff */
154 	pthread_t cmthread;
155 	struct rdma_event_channel *cm_channel;
156 	struct rdma_cm_id *cm_id;	/* connection on client side,*/
157 					/* listener on service side. */
158 	struct rdma_cm_id *child_cm_id;	/* connection on server side */
159 };
160 
161 static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
162 				    struct rdma_cm_event *event)
163 {
164 	int ret = 0;
165 	struct rping_cb *cb = cma_id->context;
166 
167 	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
168 		  rdma_event_str(event->event), cma_id,
169 		  (cma_id == cb->cm_id) ? "parent" : "child");
170 
171 	switch (event->event) {
172 	case RDMA_CM_EVENT_ADDR_RESOLVED:
173 		cb->state = ADDR_RESOLVED;
174 		ret = rdma_resolve_route(cma_id, 2000);
175 		if (ret) {
176 			cb->state = ERROR;
177 			perror("rdma_resolve_route");
178 			sem_post(&cb->sem);
179 		}
180 		break;
181 
182 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
183 		cb->state = ROUTE_RESOLVED;
184 		sem_post(&cb->sem);
185 		break;
186 
187 	case RDMA_CM_EVENT_CONNECT_REQUEST:
188 		cb->state = CONNECT_REQUEST;
189 		cb->child_cm_id = cma_id;
190 		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
191 		sem_post(&cb->sem);
192 		break;
193 
194 	case RDMA_CM_EVENT_ESTABLISHED:
195 		DEBUG_LOG("ESTABLISHED\n");
196 
197 		/*
198 		 * Server will wake up when first RECV completes.
199 		 */
200 		if (!cb->server) {
201 			cb->state = CONNECTED;
202 		}
203 		sem_post(&cb->sem);
204 		break;
205 
206 	case RDMA_CM_EVENT_ADDR_ERROR:
207 	case RDMA_CM_EVENT_ROUTE_ERROR:
208 	case RDMA_CM_EVENT_CONNECT_ERROR:
209 	case RDMA_CM_EVENT_UNREACHABLE:
210 	case RDMA_CM_EVENT_REJECTED:
211 		fprintf(stderr, "cma event %s, error %d\n",
212 			rdma_event_str(event->event), event->status);
213 		sem_post(&cb->sem);
214 		ret = -1;
215 		break;
216 
217 	case RDMA_CM_EVENT_DISCONNECTED:
218 		fprintf(stderr, "%s DISCONNECT EVENT...\n",
219 			cb->server ? "server" : "client");
220 		sem_post(&cb->sem);
221 		break;
222 
223 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
224 		fprintf(stderr, "cma detected device removal!!!!\n");
225 		ret = -1;
226 		break;
227 
228 	default:
229 		fprintf(stderr, "unhandled event: %s, ignoring\n",
230 			rdma_event_str(event->event));
231 		break;
232 	}
233 
234 	return ret;
235 }
236 
237 static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
238 {
239 	if (wc->byte_len != sizeof(cb->recv_buf)) {
240 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
241 		return -1;
242 	}
243 
244 	cb->remote_rkey = ntohl(cb->recv_buf.rkey);
245 	cb->remote_addr = ntohll(cb->recv_buf.buf);
246 	cb->remote_len  = ntohl(cb->recv_buf.size);
247 	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
248 		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
249 
250 	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
251 		cb->state = RDMA_READ_ADV;
252 	else
253 		cb->state = RDMA_WRITE_ADV;
254 
255 	return 0;
256 }
257 
258 static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
259 {
260 	if (wc->byte_len != sizeof(cb->recv_buf)) {
261 		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
262 		return -1;
263 	}
264 
265 	if (cb->state == RDMA_READ_ADV)
266 		cb->state = RDMA_WRITE_ADV;
267 	else
268 		cb->state = RDMA_WRITE_COMPLETE;
269 
270 	return 0;
271 }
272 
273 static int rping_cq_event_handler(struct rping_cb *cb)
274 {
275 	struct ibv_wc wc;
276 	struct ibv_recv_wr *bad_wr;
277 	int ret;
278 
279 	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
280 		ret = 0;
281 
282 		if (wc.status) {
283 			fprintf(stderr, "cq completion failed status %d\n",
284 				wc.status);
285 			if (wc.status != IBV_WC_WR_FLUSH_ERR)
286 				ret = -1;
287 			goto error;
288 		}
289 
290 		switch (wc.opcode) {
291 		case IBV_WC_SEND:
292 			DEBUG_LOG("send completion\n");
293 			break;
294 
295 		case IBV_WC_RDMA_WRITE:
296 			DEBUG_LOG("rdma write completion\n");
297 			cb->state = RDMA_WRITE_COMPLETE;
298 			sem_post(&cb->sem);
299 			break;
300 
301 		case IBV_WC_RDMA_READ:
302 			DEBUG_LOG("rdma read completion\n");
303 			cb->state = RDMA_READ_COMPLETE;
304 			sem_post(&cb->sem);
305 			break;
306 
307 		case IBV_WC_RECV:
308 			DEBUG_LOG("recv completion\n");
309 			ret = cb->server ? server_recv(cb, &wc) :
310 					   client_recv(cb, &wc);
311 			if (ret) {
312 				fprintf(stderr, "recv wc error: %d\n", ret);
313 				goto error;
314 			}
315 
316 			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
317 			if (ret) {
318 				fprintf(stderr, "post recv error: %d\n", ret);
319 				goto error;
320 			}
321 			sem_post(&cb->sem);
322 			break;
323 
324 		default:
325 			DEBUG_LOG("unknown!!!!! completion\n");
326 			ret = -1;
327 			goto error;
328 		}
329 	}
330 	if (ret) {
331 		fprintf(stderr, "poll error %d\n", ret);
332 		goto error;
333 	}
334 	return 0;
335 
336 error:
337 	cb->state = ERROR;
338 	sem_post(&cb->sem);
339 	return ret;
340 }
341 
342 static int rping_accept(struct rping_cb *cb)
343 {
344 	struct rdma_conn_param conn_param;
345 	int ret;
346 
347 	DEBUG_LOG("accepting client connection request\n");
348 
349 	memset(&conn_param, 0, sizeof conn_param);
350 	conn_param.responder_resources = 1;
351 	conn_param.initiator_depth = 1;
352 
353 	ret = rdma_accept(cb->child_cm_id, &conn_param);
354 	if (ret) {
355 		perror("rdma_accept");
356 		return ret;
357 	}
358 
359 	sem_wait(&cb->sem);
360 	if (cb->state == ERROR) {
361 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
362 		return -1;
363 	}
364 	return 0;
365 }
366 
367 static void rping_setup_wr(struct rping_cb *cb)
368 {
369 	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
370 	cb->recv_sgl.length = sizeof cb->recv_buf;
371 	cb->recv_sgl.lkey = cb->recv_mr->lkey;
372 	cb->rq_wr.sg_list = &cb->recv_sgl;
373 	cb->rq_wr.num_sge = 1;
374 
375 	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
376 	cb->send_sgl.length = sizeof cb->send_buf;
377 	cb->send_sgl.lkey = cb->send_mr->lkey;
378 
379 	cb->sq_wr.opcode = IBV_WR_SEND;
380 	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
381 	cb->sq_wr.sg_list = &cb->send_sgl;
382 	cb->sq_wr.num_sge = 1;
383 
384 	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
385 	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
386 	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
387 	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
388 	cb->rdma_sq_wr.num_sge = 1;
389 }
390 
391 static int rping_setup_buffers(struct rping_cb *cb)
392 {
393 	int ret;
394 
395 	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
396 
397 	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
398 				 IBV_ACCESS_LOCAL_WRITE);
399 	if (!cb->recv_mr) {
400 		fprintf(stderr, "recv_buf reg_mr failed\n");
401 		return errno;
402 	}
403 
404 	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
405 	if (!cb->send_mr) {
406 		fprintf(stderr, "send_buf reg_mr failed\n");
407 		ret = errno;
408 		goto err1;
409 	}
410 
411 	cb->rdma_buf = malloc(cb->size);
412 	if (!cb->rdma_buf) {
413 		fprintf(stderr, "rdma_buf malloc failed\n");
414 		ret = -ENOMEM;
415 		goto err2;
416 	}
417 
418 	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
419 				 IBV_ACCESS_LOCAL_WRITE |
420 				 IBV_ACCESS_REMOTE_READ |
421 				 IBV_ACCESS_REMOTE_WRITE);
422 	if (!cb->rdma_mr) {
423 		fprintf(stderr, "rdma_buf reg_mr failed\n");
424 		ret = errno;
425 		goto err3;
426 	}
427 
428 	if (!cb->server) {
429 		cb->start_buf = malloc(cb->size);
430 		if (!cb->start_buf) {
431 			fprintf(stderr, "start_buf malloc failed\n");
432 			ret = -ENOMEM;
433 			goto err4;
434 		}
435 
436 		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
437 					  IBV_ACCESS_LOCAL_WRITE |
438 					  IBV_ACCESS_REMOTE_READ |
439 					  IBV_ACCESS_REMOTE_WRITE);
440 		if (!cb->start_mr) {
441 			fprintf(stderr, "start_buf reg_mr failed\n");
442 			ret = errno;
443 			goto err5;
444 		}
445 	}
446 
447 	rping_setup_wr(cb);
448 	DEBUG_LOG("allocated & registered buffers...\n");
449 	return 0;
450 
451 err5:
452 	free(cb->start_buf);
453 err4:
454 	ibv_dereg_mr(cb->rdma_mr);
455 err3:
456 	free(cb->rdma_buf);
457 err2:
458 	ibv_dereg_mr(cb->send_mr);
459 err1:
460 	ibv_dereg_mr(cb->recv_mr);
461 	return ret;
462 }
463 
464 static void rping_free_buffers(struct rping_cb *cb)
465 {
466 	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
467 	ibv_dereg_mr(cb->recv_mr);
468 	ibv_dereg_mr(cb->send_mr);
469 	ibv_dereg_mr(cb->rdma_mr);
470 	free(cb->rdma_buf);
471 	if (!cb->server) {
472 		ibv_dereg_mr(cb->start_mr);
473 		free(cb->start_buf);
474 	}
475 }
476 
477 static int rping_create_qp(struct rping_cb *cb)
478 {
479 	struct ibv_qp_init_attr init_attr;
480 	int ret;
481 
482 	memset(&init_attr, 0, sizeof(init_attr));
483 	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
484 	init_attr.cap.max_recv_wr = 2;
485 	init_attr.cap.max_recv_sge = 1;
486 	init_attr.cap.max_send_sge = 1;
487 	init_attr.qp_type = IBV_QPT_RC;
488 	init_attr.send_cq = cb->cq;
489 	init_attr.recv_cq = cb->cq;
490 
491 	if (cb->server) {
492 		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
493 		if (!ret)
494 			cb->qp = cb->child_cm_id->qp;
495 	} else {
496 		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
497 		if (!ret)
498 			cb->qp = cb->cm_id->qp;
499 	}
500 
501 	return ret;
502 }
503 
504 static void rping_free_qp(struct rping_cb *cb)
505 {
506 	ibv_destroy_qp(cb->qp);
507 	ibv_destroy_cq(cb->cq);
508 	ibv_destroy_comp_channel(cb->channel);
509 	ibv_dealloc_pd(cb->pd);
510 }
511 
512 static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
513 {
514 	int ret;
515 
516 	cb->pd = ibv_alloc_pd(cm_id->verbs);
517 	if (!cb->pd) {
518 		fprintf(stderr, "ibv_alloc_pd failed\n");
519 		return errno;
520 	}
521 	DEBUG_LOG("created pd %p\n", cb->pd);
522 
523 	cb->channel = ibv_create_comp_channel(cm_id->verbs);
524 	if (!cb->channel) {
525 		fprintf(stderr, "ibv_create_comp_channel failed\n");
526 		ret = errno;
527 		goto err1;
528 	}
529 	DEBUG_LOG("created channel %p\n", cb->channel);
530 
531 	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
532 				cb->channel, 0);
533 	if (!cb->cq) {
534 		fprintf(stderr, "ibv_create_cq failed\n");
535 		ret = errno;
536 		goto err2;
537 	}
538 	DEBUG_LOG("created cq %p\n", cb->cq);
539 
540 	ret = ibv_req_notify_cq(cb->cq, 0);
541 	if (ret) {
542 		fprintf(stderr, "ibv_create_cq failed\n");
543 		ret = errno;
544 		goto err3;
545 	}
546 
547 	ret = rping_create_qp(cb);
548 	if (ret) {
549 		perror("rdma_create_qp");
550 		goto err3;
551 	}
552 	DEBUG_LOG("created qp %p\n", cb->qp);
553 	return 0;
554 
555 err3:
556 	ibv_destroy_cq(cb->cq);
557 err2:
558 	ibv_destroy_comp_channel(cb->channel);
559 err1:
560 	ibv_dealloc_pd(cb->pd);
561 	return ret;
562 }
563 
564 static void *cm_thread(void *arg)
565 {
566 	struct rping_cb *cb = arg;
567 	struct rdma_cm_event *event;
568 	int ret;
569 
570 	while (1) {
571 		ret = rdma_get_cm_event(cb->cm_channel, &event);
572 		if (ret) {
573 			perror("rdma_get_cm_event");
574 			exit(ret);
575 		}
576 		ret = rping_cma_event_handler(event->id, event);
577 		rdma_ack_cm_event(event);
578 		if (ret)
579 			exit(ret);
580 	}
581 }
582 
583 static void *cq_thread(void *arg)
584 {
585 	struct rping_cb *cb = arg;
586 	struct ibv_cq *ev_cq;
587 	void *ev_ctx;
588 	int ret;
589 
590 	DEBUG_LOG("cq_thread started.\n");
591 
592 	while (1) {
593 		pthread_testcancel();
594 
595 		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
596 		if (ret) {
597 			fprintf(stderr, "Failed to get cq event!\n");
598 			pthread_exit(NULL);
599 		}
600 		if (ev_cq != cb->cq) {
601 			fprintf(stderr, "Unknown CQ!\n");
602 			pthread_exit(NULL);
603 		}
604 		ret = ibv_req_notify_cq(cb->cq, 0);
605 		if (ret) {
606 			fprintf(stderr, "Failed to set notify!\n");
607 			pthread_exit(NULL);
608 		}
609 		ret = rping_cq_event_handler(cb);
610 		ibv_ack_cq_events(cb->cq, 1);
611 		if (ret)
612 			pthread_exit(NULL);
613 	}
614 }
615 
616 static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
617 {
618 	struct rping_rdma_info *info = &cb->send_buf;
619 
620 	info->buf = htonll((uint64_t) (unsigned long) buf);
621 	info->rkey = htonl(mr->rkey);
622 	info->size = htonl(cb->size);
623 
624 	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
625 		  ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
626 }
627 
628 static int rping_test_server(struct rping_cb *cb)
629 {
630 	struct ibv_send_wr *bad_wr;
631 	int ret;
632 
633 	while (1) {
634 		/* Wait for client's Start STAG/TO/Len */
635 		sem_wait(&cb->sem);
636 		if (cb->state != RDMA_READ_ADV) {
637 			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
638 				cb->state);
639 			ret = -1;
640 			break;
641 		}
642 
643 		DEBUG_LOG("server received sink adv\n");
644 
645 		/* Issue RDMA Read. */
646 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
647 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
648 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
649 		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
650 
651 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
652 		if (ret) {
653 			fprintf(stderr, "post send error %d\n", ret);
654 			break;
655 		}
656 		DEBUG_LOG("server posted rdma read req \n");
657 
658 		/* Wait for read completion */
659 		sem_wait(&cb->sem);
660 		if (cb->state != RDMA_READ_COMPLETE) {
661 			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
662 				cb->state);
663 			ret = -1;
664 			break;
665 		}
666 		DEBUG_LOG("server received read complete\n");
667 
668 		/* Display data in recv buf */
669 		if (cb->verbose)
670 			printf("server ping data: %s\n", cb->rdma_buf);
671 
672 		/* Tell client to continue */
673 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
674 		if (ret) {
675 			fprintf(stderr, "post send error %d\n", ret);
676 			break;
677 		}
678 		DEBUG_LOG("server posted go ahead\n");
679 
680 		/* Wait for client's RDMA STAG/TO/Len */
681 		sem_wait(&cb->sem);
682 		if (cb->state != RDMA_WRITE_ADV) {
683 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
684 				cb->state);
685 			ret = -1;
686 			break;
687 		}
688 		DEBUG_LOG("server received sink adv\n");
689 
690 		/* RDMA Write echo data */
691 		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
692 		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
693 		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
694 		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
695 		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
696 			  cb->rdma_sq_wr.sg_list->lkey,
697 			  cb->rdma_sq_wr.sg_list->addr,
698 			  cb->rdma_sq_wr.sg_list->length);
699 
700 		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
701 		if (ret) {
702 			fprintf(stderr, "post send error %d\n", ret);
703 			break;
704 		}
705 
706 		/* Wait for completion */
707 		ret = sem_wait(&cb->sem);
708 		if (cb->state != RDMA_WRITE_COMPLETE) {
709 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
710 				cb->state);
711 			ret = -1;
712 			break;
713 		}
714 		DEBUG_LOG("server rdma write complete \n");
715 
716 		/* Tell client to begin again */
717 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
718 		if (ret) {
719 			fprintf(stderr, "post send error %d\n", ret);
720 			break;
721 		}
722 		DEBUG_LOG("server posted go ahead\n");
723 	}
724 
725 	return ret;
726 }
727 
728 static int rping_bind_server(struct rping_cb *cb)
729 {
730 	int ret;
731 
732 	if (cb->sin.ss_family == AF_INET)
733 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
734 	else
735 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
736 
737 	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
738 	if (ret) {
739 		perror("rdma_bind_addr");
740 		return ret;
741 	}
742 	DEBUG_LOG("rdma_bind_addr successful\n");
743 
744 	DEBUG_LOG("rdma_listen\n");
745 	ret = rdma_listen(cb->cm_id, 3);
746 	if (ret) {
747 		perror("rdma_listen");
748 		return ret;
749 	}
750 
751 	return 0;
752 }
753 
754 static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
755 {
756 	struct rping_cb *cb = malloc(sizeof *cb);
757 	if (!cb)
758 		return NULL;
759 	*cb = *listening_cb;
760 	cb->child_cm_id->context = cb;
761 	return cb;
762 }
763 
764 static void free_cb(struct rping_cb *cb)
765 {
766 	free(cb);
767 }
768 
769 static void *rping_persistent_server_thread(void *arg)
770 {
771 	struct rping_cb *cb = arg;
772 	struct ibv_recv_wr *bad_wr;
773 	int ret;
774 
775 	ret = rping_setup_qp(cb, cb->child_cm_id);
776 	if (ret) {
777 		fprintf(stderr, "setup_qp failed: %d\n", ret);
778 		goto err0;
779 	}
780 
781 	ret = rping_setup_buffers(cb);
782 	if (ret) {
783 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
784 		goto err1;
785 	}
786 
787 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
788 	if (ret) {
789 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
790 		goto err2;
791 	}
792 
793 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
794 
795 	ret = rping_accept(cb);
796 	if (ret) {
797 		fprintf(stderr, "connect error %d\n", ret);
798 		goto err3;
799 	}
800 
801 	rping_test_server(cb);
802 	rdma_disconnect(cb->child_cm_id);
803 	rping_free_buffers(cb);
804 	rping_free_qp(cb);
805 	pthread_cancel(cb->cqthread);
806 	pthread_join(cb->cqthread, NULL);
807 	rdma_destroy_id(cb->child_cm_id);
808 	free_cb(cb);
809 	return NULL;
810 err3:
811 	pthread_cancel(cb->cqthread);
812 	pthread_join(cb->cqthread, NULL);
813 err2:
814 	rping_free_buffers(cb);
815 err1:
816 	rping_free_qp(cb);
817 err0:
818 	free_cb(cb);
819 	return NULL;
820 }
821 
822 static int rping_run_persistent_server(struct rping_cb *listening_cb)
823 {
824 	int ret;
825 	struct rping_cb *cb;
826 
827 	ret = rping_bind_server(listening_cb);
828 	if (ret)
829 		return ret;
830 
831 	while (1) {
832 		sem_wait(&listening_cb->sem);
833 		if (listening_cb->state != CONNECT_REQUEST) {
834 			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
835 				listening_cb->state);
836 			return -1;
837 		}
838 
839 		cb = clone_cb(listening_cb);
840 		if (!cb)
841 			return -1;
842 		pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
843 	}
844 	return 0;
845 }
846 
847 static int rping_run_server(struct rping_cb *cb)
848 {
849 	struct ibv_recv_wr *bad_wr;
850 	int ret;
851 
852 	ret = rping_bind_server(cb);
853 	if (ret)
854 		return ret;
855 
856 	sem_wait(&cb->sem);
857 	if (cb->state != CONNECT_REQUEST) {
858 		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
859 			cb->state);
860 		return -1;
861 	}
862 
863 	ret = rping_setup_qp(cb, cb->child_cm_id);
864 	if (ret) {
865 		fprintf(stderr, "setup_qp failed: %d\n", ret);
866 		return ret;
867 	}
868 
869 	ret = rping_setup_buffers(cb);
870 	if (ret) {
871 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
872 		goto err1;
873 	}
874 
875 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
876 	if (ret) {
877 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
878 		goto err2;
879 	}
880 
881 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
882 
883 	ret = rping_accept(cb);
884 	if (ret) {
885 		fprintf(stderr, "connect error %d\n", ret);
886 		goto err2;
887 	}
888 
889 	rping_test_server(cb);
890 	rdma_disconnect(cb->child_cm_id);
891 	rdma_destroy_id(cb->child_cm_id);
892 err2:
893 	rping_free_buffers(cb);
894 err1:
895 	rping_free_qp(cb);
896 
897 	return ret;
898 }
899 
900 static int rping_test_client(struct rping_cb *cb)
901 {
902 	int ping, start, cc, i, ret = 0;
903 	struct ibv_send_wr *bad_wr;
904 	unsigned char c;
905 
906 	start = 65;
907 	for (ping = 0; !cb->count || ping < cb->count; ping++) {
908 		cb->state = RDMA_READ_ADV;
909 
910 		/* Put some ascii text in the buffer. */
911 		cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
912 		for (i = cc, c = start; i < cb->size; i++) {
913 			cb->start_buf[i] = c;
914 			c++;
915 			if (c > 122)
916 				c = 65;
917 		}
918 		start++;
919 		if (start > 122)
920 			start = 65;
921 		cb->start_buf[cb->size - 1] = 0;
922 
923 		rping_format_send(cb, cb->start_buf, cb->start_mr);
924 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
925 		if (ret) {
926 			fprintf(stderr, "post send error %d\n", ret);
927 			break;
928 		}
929 
930 		/* Wait for server to ACK */
931 		sem_wait(&cb->sem);
932 		if (cb->state != RDMA_WRITE_ADV) {
933 			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
934 				cb->state);
935 			ret = -1;
936 			break;
937 		}
938 
939 		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
940 		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
941 		if (ret) {
942 			fprintf(stderr, "post send error %d\n", ret);
943 			break;
944 		}
945 
946 		/* Wait for the server to say the RDMA Write is complete. */
947 		sem_wait(&cb->sem);
948 		if (cb->state != RDMA_WRITE_COMPLETE) {
949 			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
950 				cb->state);
951 			ret = -1;
952 			break;
953 		}
954 
955 		if (cb->validate)
956 			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
957 				fprintf(stderr, "data mismatch!\n");
958 				ret = -1;
959 				break;
960 			}
961 
962 		if (cb->verbose)
963 			printf("ping data: %s\n", cb->rdma_buf);
964 	}
965 
966 	return ret;
967 }
968 
969 static int rping_connect_client(struct rping_cb *cb)
970 {
971 	struct rdma_conn_param conn_param;
972 	int ret;
973 
974 	memset(&conn_param, 0, sizeof conn_param);
975 	conn_param.responder_resources = 1;
976 	conn_param.initiator_depth = 1;
977 	conn_param.retry_count = 10;
978 
979 	ret = rdma_connect(cb->cm_id, &conn_param);
980 	if (ret) {
981 		perror("rdma_connect");
982 		return ret;
983 	}
984 
985 	sem_wait(&cb->sem);
986 	if (cb->state != CONNECTED) {
987 		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
988 		return -1;
989 	}
990 
991 	DEBUG_LOG("rmda_connect successful\n");
992 	return 0;
993 }
994 
995 static int rping_bind_client(struct rping_cb *cb)
996 {
997 	int ret;
998 
999 	if (cb->sin.ss_family == AF_INET)
1000 		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1001 	else
1002 		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1003 
1004 	ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1005 	if (ret) {
1006 		perror("rdma_resolve_addr");
1007 		return ret;
1008 	}
1009 
1010 	sem_wait(&cb->sem);
1011 	if (cb->state != ROUTE_RESOLVED) {
1012 		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1013 			cb->state);
1014 		return -1;
1015 	}
1016 
1017 	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1018 	return 0;
1019 }
1020 
1021 static int rping_run_client(struct rping_cb *cb)
1022 {
1023 	struct ibv_recv_wr *bad_wr;
1024 	int ret;
1025 
1026 	ret = rping_bind_client(cb);
1027 	if (ret)
1028 		return ret;
1029 
1030 	ret = rping_setup_qp(cb, cb->cm_id);
1031 	if (ret) {
1032 		fprintf(stderr, "setup_qp failed: %d\n", ret);
1033 		return ret;
1034 	}
1035 
1036 	ret = rping_setup_buffers(cb);
1037 	if (ret) {
1038 		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1039 		goto err1;
1040 	}
1041 
1042 	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1043 	if (ret) {
1044 		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1045 		goto err2;
1046 	}
1047 
1048 	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1049 
1050 	ret = rping_connect_client(cb);
1051 	if (ret) {
1052 		fprintf(stderr, "connect error %d\n", ret);
1053 		goto err2;
1054 	}
1055 
1056 	rping_test_client(cb);
1057 	rdma_disconnect(cb->cm_id);
1058 err2:
1059 	rping_free_buffers(cb);
1060 err1:
1061 	rping_free_qp(cb);
1062 
1063 	return ret;
1064 }
1065 
1066 static int get_addr(char *dst, struct sockaddr *addr)
1067 {
1068 	struct addrinfo *res;
1069 	int ret;
1070 
1071 	ret = getaddrinfo(dst, NULL, NULL, &res);
1072 	if (ret) {
1073 		printf("getaddrinfo failed - invalid hostname or IP address\n");
1074 		return ret;
1075 	}
1076 
1077 	if (res->ai_family == PF_INET)
1078 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1079 	else if (res->ai_family == PF_INET6)
1080 		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1081 	else
1082 		ret = -1;
1083 
1084 	freeaddrinfo(res);
1085 	return ret;
1086 }
1087 
1088 static void usage(char *name)
1089 {
1090 	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1091 	       name);
1092 	printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n",
1093 	       name);
1094 	printf("\t-c\t\tclient side\n");
1095 	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1096 	printf("\t-v\t\tdisplay ping data to stdout\n");
1097 	printf("\t-V\t\tvalidate ping data\n");
1098 	printf("\t-d\t\tdebug printfs\n");
1099 	printf("\t-S size \tping data size\n");
1100 	printf("\t-C count\tping count times\n");
1101 	printf("\t-a addr\t\taddress\n");
1102 	printf("\t-p port\t\tport\n");
1103 	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1104 }
1105 
1106 int main(int argc, char *argv[])
1107 {
1108 	struct rping_cb *cb;
1109 	int op;
1110 	int ret = 0;
1111 	int persistent_server = 0;
1112 
1113 	cb = malloc(sizeof(*cb));
1114 	if (!cb)
1115 		return -ENOMEM;
1116 
1117 	memset(cb, 0, sizeof(*cb));
1118 	cb->server = -1;
1119 	cb->state = IDLE;
1120 	cb->size = 64;
1121 	cb->sin.ss_family = PF_INET;
1122 	cb->port = htons(7174);
1123 	sem_init(&cb->sem, 0, 0);
1124 
1125 	opterr = 0;
1126 	while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1127 		switch (op) {
1128 		case 'a':
1129 			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1130 			break;
1131 		case 'P':
1132 			persistent_server = 1;
1133 			break;
1134 		case 'p':
1135 			cb->port = htons(atoi(optarg));
1136 			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1137 			break;
1138 		case 's':
1139 			cb->server = 1;
1140 			DEBUG_LOG("server\n");
1141 			break;
1142 		case 'c':
1143 			cb->server = 0;
1144 			DEBUG_LOG("client\n");
1145 			break;
1146 		case 'S':
1147 			cb->size = atoi(optarg);
1148 			if ((cb->size < RPING_MIN_BUFSIZE) ||
1149 			    (cb->size > (RPING_BUFSIZE - 1))) {
1150 				fprintf(stderr, "Invalid size %d "
1151 				       "(valid range is %Zd to %d)\n",
1152 				       cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
1153 				ret = EINVAL;
1154 			} else
1155 				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1156 			break;
1157 		case 'C':
1158 			cb->count = atoi(optarg);
1159 			if (cb->count < 0) {
1160 				fprintf(stderr, "Invalid count %d\n",
1161 					cb->count);
1162 				ret = EINVAL;
1163 			} else
1164 				DEBUG_LOG("count %d\n", (int) cb->count);
1165 			break;
1166 		case 'v':
1167 			cb->verbose++;
1168 			DEBUG_LOG("verbose\n");
1169 			break;
1170 		case 'V':
1171 			cb->validate++;
1172 			DEBUG_LOG("validate data\n");
1173 			break;
1174 		case 'd':
1175 			debug++;
1176 			break;
1177 		default:
1178 			usage("rping");
1179 			ret = EINVAL;
1180 			goto out;
1181 		}
1182 	}
1183 	if (ret)
1184 		goto out;
1185 
1186 	if (cb->server == -1) {
1187 		usage("rping");
1188 		ret = EINVAL;
1189 		goto out;
1190 	}
1191 
1192 	cb->cm_channel = rdma_create_event_channel();
1193 	if (!cb->cm_channel) {
1194 		perror("rdma_create_event_channel");
1195 		goto out;
1196 	}
1197 
1198 	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1199 	if (ret) {
1200 		perror("rdma_create_id");
1201 		goto out2;
1202 	}
1203 	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1204 
1205 	pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1206 
1207 	if (cb->server) {
1208 		if (persistent_server)
1209 			ret = rping_run_persistent_server(cb);
1210 		else
1211 			ret = rping_run_server(cb);
1212 	} else
1213 		ret = rping_run_client(cb);
1214 
1215 	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1216 	rdma_destroy_id(cb->cm_id);
1217 out2:
1218 	rdma_destroy_event_channel(cb->cm_channel);
1219 out:
1220 	free(cb);
1221 	return ret;
1222 }
1223