1 /*
2  * Copyright (c) 2012 Intel Corporation.  All rights reserved.
3  *
4  * This software is available to you under the OpenIB.org BSD license
5  * below:
6  *
7  *     Redistribution and use in source and binary forms, with or
8  *     without modification, are permitted provided that the following
9  *     conditions are met:
10  *
11  *      - Redistributions of source code must retain the above
12  *        copyright notice, this list of conditions and the following
13  *        disclaimer.
14  *
15  *      - Redistributions in binary form must reproduce the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer in the documentation and/or other materials
18  *        provided with the distribution.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
23  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
24  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
26  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE.
28  */
29 
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <strings.h>
34 #include <errno.h>
35 #include <getopt.h>
36 #include <sys/types.h>
37 #include <sys/socket.h>
38 #include <sys/time.h>
39 #include <sys/wait.h>
40 #include <netdb.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <netinet/tcp.h>
44 #include <arpa/inet.h>
45 
46 #include <rdma/rdma_cma.h>
47 #include <rdma/rsocket.h>
48 #include <util/compiler.h>
49 #include "common.h"
50 
51 static int test_size[] = {
52 	(1 <<  6),
53 	(1 <<  7), ((1 <<  7) + (1 << 6)),
54 	(1 <<  8), ((1 <<  8) + (1 << 7)),
55 	(1 <<  9), ((1 <<  9) + (1 << 8)),
56 	(1 << 10), ((1 << 10) + (1 << 9)),
57 };
58 #define TEST_CNT (sizeof test_size / sizeof test_size[0])
59 
60 enum {
61 	msg_op_login,
62 	msg_op_start,
63 	msg_op_data,
64 	msg_op_echo,
65 	msg_op_end
66 };
67 
68 struct message {
69 	uint8_t op;
70 	uint8_t id;
71 	uint8_t seqno;
72 	uint8_t reserved;
73 	__be32 data;
74 	uint8_t  buf[2048];
75 };
76 
77 #define CTRL_MSG_SIZE 16
78 
79 struct client {
80 	uint64_t recvcnt;
81 };
82 
83 static struct client clients[256];
84 static uint8_t id;
85 
86 static int rs;
87 static int use_async;
88 static int flags = MSG_DONTWAIT;
89 static int poll_timeout;
90 static int custom;
91 static int echo;
92 static int transfer_size = 1000;
93 static int transfer_count = 1000;
94 static int buffer_size;
95 static char test_name[10] = "custom";
96 static const char *port = "7174";
97 static char *dst_addr;
98 static char *src_addr;
99 static union socket_addr g_addr;
100 static socklen_t g_addrlen;
101 static struct timeval start, end;
102 static struct message g_msg;
103 
104 static void show_perf(void)
105 {
106 	char str[32];
107 	float usec;
108 	long long bytes;
109 	int transfers;
110 
111 	usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
112 	transfers = echo ? transfer_count * 2 : be32toh(g_msg.data);
113 	bytes = (long long) transfers * transfer_size;
114 
115 	/* name size transfers bytes seconds Gb/sec usec/xfer */
116 	printf("%-10s", test_name);
117 	size_str(str, sizeof str, transfer_size);
118 	printf("%-8s", str);
119 	cnt_str(str, sizeof str, transfers);
120 	printf("%-8s", str);
121 	size_str(str, sizeof str, bytes);
122 	printf("%-8s", str);
123 	printf("%8.2fs%10.2f%11.2f\n",
124 		usec / 1000000., (bytes * 8) / (1000. * usec),
125 		(usec / transfers));
126 }
127 
128 static void init_latency_test(int size)
129 {
130 	char sstr[5];
131 
132 	size_str(sstr, sizeof sstr, size);
133 	snprintf(test_name, sizeof test_name, "%s_lat", sstr);
134 	transfer_size = size;
135 	transfer_count = size_to_count(transfer_size) / 10;
136 	echo = 1;
137 }
138 
139 static void init_bandwidth_test(int size)
140 {
141 	char sstr[5];
142 
143 	size_str(sstr, sizeof sstr, size);
144 	snprintf(test_name, sizeof test_name, "%s_bw", sstr);
145 	transfer_size = size;
146 	transfer_count = size_to_count(transfer_size);
147 	echo = 0;
148 }
149 
150 static void set_options(int fd)
151 {
152 	int val;
153 
154 	if (buffer_size) {
155 		rs_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &buffer_size,
156 			      sizeof buffer_size);
157 		rs_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &buffer_size,
158 			      sizeof buffer_size);
159 	} else {
160 		val = 1 << 19;
161 		rs_setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &val, sizeof val);
162 		rs_setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &val, sizeof val);
163 	}
164 
165 	if (flags & MSG_DONTWAIT)
166 		rs_fcntl(fd, F_SETFL, O_NONBLOCK);
167 }
168 
169 static ssize_t svr_send(struct message *msg, size_t size,
170 			union socket_addr *addr, socklen_t addrlen)
171 {
172 	struct pollfd fds;
173 	ssize_t ret;
174 
175 	if (use_async) {
176 		fds.fd = rs;
177 		fds.events = POLLOUT;
178 	}
179 
180 	do {
181 		if (use_async) {
182 			ret = do_poll(&fds, poll_timeout);
183 			if (ret)
184 				return ret;
185 		}
186 
187 		ret = rs_sendto(rs, msg, size, flags, &addr->sa, addrlen);
188 	} while (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN));
189 
190 	if (ret < 0)
191 		perror("rsend");
192 
193 	return ret;
194 }
195 
196 static ssize_t svr_recv(struct message *msg, size_t size,
197 			union socket_addr *addr, socklen_t *addrlen)
198 {
199 	struct pollfd fds;
200 	ssize_t ret;
201 
202 	if (use_async) {
203 		fds.fd = rs;
204 		fds.events = POLLIN;
205 	}
206 
207 	do {
208 		if (use_async) {
209 			ret = do_poll(&fds, poll_timeout);
210 			if (ret)
211 				return ret;
212 		}
213 
214 		ret = rs_recvfrom(rs, msg, size, flags, &addr->sa, addrlen);
215 	} while (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN));
216 
217 	if (ret < 0)
218 		perror("rrecv");
219 
220 	return ret;
221 }
222 
223 static int svr_process(struct message *msg, size_t size,
224 		       union socket_addr *addr, socklen_t addrlen)
225 {
226 	char str[64];
227 	ssize_t ret;
228 
229 	switch (msg->op) {
230 	case msg_op_login:
231 		if (addr->sa.sa_family == AF_INET) {
232 			printf("client login from %s\n",
233 			       inet_ntop(AF_INET, &addr->sin.sin_addr.s_addr,
234 					 str, sizeof str));
235 		} else {
236 			printf("client login from %s\n",
237 			       inet_ntop(AF_INET6, &addr->sin6.sin6_addr.s6_addr,
238 					 str, sizeof str));
239 		}
240 		msg->id = id++;
241 		/* fall through */
242 	case msg_op_start:
243 		memset(&clients[msg->id], 0, sizeof clients[msg->id]);
244 		break;
245 	case msg_op_echo:
246 		clients[msg->id].recvcnt++;
247 		break;
248 	case msg_op_end:
249 		msg->data = htobe32(clients[msg->id].recvcnt);
250 		break;
251 	default:
252 		clients[msg->id].recvcnt++;
253 		return 0;
254 	}
255 
256 	ret = svr_send(msg, size, addr, addrlen);
257 	return (ret == size) ? 0 : (int) ret;
258 }
259 
260 static int svr_bind(void)
261 {
262 	struct addrinfo hints, *res;
263 	int ret;
264 
265 	memset(&hints, 0, sizeof hints);
266 	hints.ai_socktype = SOCK_DGRAM;
267  	ret = getaddrinfo(src_addr, port, &hints, &res);
268 	if (ret) {
269 		printf("getaddrinfo: %s\n", gai_strerror(ret));
270 		return ret;
271 	}
272 
273 	rs = rs_socket(res->ai_family, res->ai_socktype, res->ai_protocol);
274 	if (rs < 0) {
275 		perror("rsocket");
276 		ret = rs;
277 		goto out;
278 	}
279 
280 	set_options(rs);
281 	ret = rs_bind(rs, res->ai_addr, res->ai_addrlen);
282 	if (ret) {
283 		perror("rbind");
284 		rs_close(rs);
285 	}
286 
287 out:
288 	free(res);
289 	return ret;
290 }
291 
292 static int svr_run(void)
293 {
294 	ssize_t len;
295 	int ret;
296 
297 	ret = svr_bind();
298 	while (!ret) {
299 		g_addrlen = sizeof g_addr;
300 		len = svr_recv(&g_msg, sizeof g_msg, &g_addr, &g_addrlen);
301 		if (len < 0)
302 			return len;
303 
304 		ret = svr_process(&g_msg, len, &g_addr, g_addrlen);
305 	}
306 	return ret;
307 }
308 
309 static ssize_t client_send(struct message *msg, size_t size)
310 {
311 	struct pollfd fds;
312 	int ret;
313 
314 	if (use_async) {
315 		fds.fd = rs;
316 		fds.events = POLLOUT;
317 	}
318 
319 	do {
320 		if (use_async) {
321 			ret = do_poll(&fds, poll_timeout);
322 			if (ret)
323 				return ret;
324 		}
325 
326 		ret = rs_send(rs, msg, size, flags);
327 	} while (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN));
328 
329 	if (ret < 0)
330 		perror("rsend");
331 
332 	return ret;
333 }
334 
335 static ssize_t client_recv(struct message *msg, size_t size, int timeout)
336 {
337 	struct pollfd fds;
338 	int ret;
339 
340 	if (timeout) {
341 		fds.fd = rs;
342 		fds.events = POLLIN;
343 
344 		ret = rs_poll(&fds, 1, timeout);
345 		if (ret <= 0)
346 			return ret;
347 	}
348 
349 	ret = rs_recv(rs, msg, size, flags | MSG_DONTWAIT);
350 	if (ret < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
351 		perror("rrecv");
352 
353 	return ret;
354 }
355 
356 static int client_send_recv(struct message *msg, size_t size, int timeout)
357 {
358 	static uint8_t seqno;
359 	int ret;
360 
361 	msg->seqno = seqno;
362 	do {
363 		ret = client_send(msg, size);
364 		if (ret != size)
365 			return ret;
366 
367 		ret = client_recv(msg, size, timeout);
368 	} while (ret <= 0 || msg->seqno != seqno);
369 
370 	seqno++;
371 	return ret;
372 }
373 
374 static int run_test(void)
375 {
376 	int ret, i;
377 
378 	g_msg.op = msg_op_start;
379 	ret = client_send_recv(&g_msg, CTRL_MSG_SIZE, 1000);
380 	if (ret != CTRL_MSG_SIZE)
381 		goto out;
382 
383 	g_msg.op = echo ? msg_op_echo : msg_op_data;
384 	gettimeofday(&start, NULL);
385 	for (i = 0; i < transfer_count; i++) {
386 		ret = echo ? client_send_recv(&g_msg, transfer_size, 1) :
387 			     client_send(&g_msg, transfer_size);
388 		if (ret != transfer_size)
389 			goto out;
390 	}
391 
392 	g_msg.op = msg_op_end;
393 	ret = client_send_recv(&g_msg, CTRL_MSG_SIZE, 1);
394 	if (ret != CTRL_MSG_SIZE)
395 		goto out;
396 
397 	gettimeofday(&end, NULL);
398 	show_perf();
399 	ret = 0;
400 
401 out:
402 	return ret;
403 }
404 
405 static int client_connect(void)
406 {
407 	struct addrinfo hints, *res;
408 	int ret;
409 
410 	memset(&hints, 0, sizeof hints);
411 	hints.ai_socktype = SOCK_DGRAM;
412  	ret = getaddrinfo(dst_addr, port, &hints, &res);
413 	if (ret) {
414 		printf("getaddrinfo: %s\n", gai_strerror(ret));
415 		return ret;
416 	}
417 
418 	rs = rs_socket(res->ai_family, res->ai_socktype, res->ai_protocol);
419 	if (rs < 0) {
420 		perror("rsocket");
421 		ret = rs;
422 		goto out;
423 	}
424 
425 	set_options(rs);
426 	ret = rs_connect(rs, res->ai_addr, res->ai_addrlen);
427 	if (ret) {
428 		perror("rconnect");
429 		rs_close(rs);
430 		goto out;
431 	}
432 
433 	g_msg.op = msg_op_login;
434 	ret = client_send_recv(&g_msg, CTRL_MSG_SIZE, 1000);
435 	if (ret == CTRL_MSG_SIZE)
436 		ret = 0;
437 
438 out:
439 	freeaddrinfo(res);
440 	return ret;
441 }
442 
443 static int client_run(void)
444 {
445 	int i, ret;
446 
447 	printf("%-10s%-8s%-8s%-8s%8s %10s%13s\n",
448 	       "name", "bytes", "xfers", "total", "time", "Gb/sec", "usec/xfer");
449 
450 	ret = client_connect();
451 	if (ret)
452 		return ret;
453 
454 	if (!custom) {
455 		for (i = 0; i < TEST_CNT; i++) {
456 			init_latency_test(test_size[i]);
457 			run_test();
458 		}
459 		for (i = 0; i < TEST_CNT; i++) {
460 			init_bandwidth_test(test_size[i]);
461 			run_test();
462 		}
463 	} else {
464 		run_test();
465 	}
466 	rs_close(rs);
467 
468 	return ret;
469 }
470 
471 static int set_test_opt(const char *arg)
472 {
473 	if (strlen(arg) == 1) {
474 		switch (arg[0]) {
475 		case 's':
476 			use_rs = 0;
477 			break;
478 		case 'a':
479 			use_async = 1;
480 			break;
481 		case 'b':
482 			flags = 0;
483 			break;
484 		case 'n':
485 			flags = MSG_DONTWAIT;
486 			break;
487 		case 'e':
488 			echo = 1;
489 			break;
490 		default:
491 			return -1;
492 		}
493 	} else {
494 		if (!strncasecmp("socket", arg, 6)) {
495 			use_rs = 0;
496 		} else if (!strncasecmp("async", arg, 5)) {
497 			use_async = 1;
498 		} else if (!strncasecmp("block", arg, 5)) {
499 			flags = 0;
500 		} else if (!strncasecmp("nonblock", arg, 8)) {
501 			flags = MSG_DONTWAIT;
502 		} else if (!strncasecmp("echo", arg, 4)) {
503 			echo = 1;
504 		} else {
505 			return -1;
506 		}
507 	}
508 	return 0;
509 }
510 
511 int main(int argc, char **argv)
512 {
513 	int op, ret;
514 
515 	while ((op = getopt(argc, argv, "s:b:B:C:S:p:T:")) != -1) {
516 		switch (op) {
517 		case 's':
518 			dst_addr = optarg;
519 			break;
520 		case 'b':
521 			src_addr = optarg;
522 			break;
523 		case 'B':
524 			buffer_size = atoi(optarg);
525 			break;
526 		case 'C':
527 			custom = 1;
528 			transfer_count = atoi(optarg);
529 			break;
530 		case 'S':
531 			custom = 1;
532 			transfer_size = atoi(optarg);
533 			if (transfer_size < CTRL_MSG_SIZE) {
534 				printf("size must be at least %d bytes\n",
535 				       CTRL_MSG_SIZE);
536 				exit(1);
537 			}
538 			break;
539 		case 'p':
540 			port = optarg;
541 			break;
542 		case 'T':
543 			if (!set_test_opt(optarg))
544 				break;
545 			/* invalid option - fall through */
546 			SWITCH_FALLTHROUGH;
547 		default:
548 			printf("usage: %s\n", argv[0]);
549 			printf("\t[-s server_address]\n");
550 			printf("\t[-b bind_address]\n");
551 			printf("\t[-B buffer_size]\n");
552 			printf("\t[-C transfer_count]\n");
553 			printf("\t[-S transfer_size]\n");
554 			printf("\t[-p port_number]\n");
555 			printf("\t[-T test_option]\n");
556 			printf("\t    s|sockets - use standard tcp/ip sockets\n");
557 			printf("\t    a|async - asynchronous operation (use poll)\n");
558 			printf("\t    b|blocking - use blocking calls\n");
559 			printf("\t    n|nonblocking - use nonblocking calls\n");
560 			printf("\t    e|echo - server echoes all messages\n");
561 			exit(1);
562 		}
563 	}
564 
565 	if (flags)
566 		poll_timeout = -1;
567 
568 	ret = dst_addr ? client_run() : svr_run();
569 	return ret;
570 }
571